diff --git a/dev/scalastyle-config.xml b/dev/scalastyle-config.xml new file mode 100644 index 000000000..f5b517a02 --- /dev/null +++ b/dev/scalastyle-config.xml @@ -0,0 +1,856 @@ + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW + + + + + + ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, CATCH, FINALLY, LARROW, RARROW + + + + + + + + + ^AnyFunSuite[A-Za-z]*$ + Tests must extend org.apache.spark.SparkFunSuite instead. + + + + + ^println$ + + + + + s".*\$\{MDC\( + + + + + spark(.sqlContext)?.sparkContext.hadoopConfiguration + + + + + @VisibleForTesting + + + + + Runtime\.getRuntime\.addShutdownHook + + + + + mutable\.SynchronizedBuffer + + + + + Class\.forName + + + + + Await\.result + + + + + Await\.ready + + + + + new.*ParVector + + + + + (\.toUpperCase|\.toLowerCase)(?!(\(|\(Locale.ROOT\))) + + + + + throw new \w+Error\( + + + + + + JavaConversions + Instead of importing implicits in scala.collection.JavaConversions._, import + scala.jdk.CollectionConverters._ and use .asScala / .asJava methods + + + + + JavaConverters + Instead of importing implicits in scala.collection.JavaConverters._, import + scala.jdk.CollectionConverters._ and use .asScala / .asJava methods + + + + \bFiles\.createParentDirs\b + Use createParentDirs of SparkFileUtils or Utils instead. + + + + \bFiles\.equal\b + Use contentEquals of SparkFileUtils or Utils instead. + + + + \bFiles\.toByteArray\b + Use java.nio.file.Files.readAllBytes instead. + + + + \bFiles\.asByteSource\b + Use java.nio.file.Files.newInputStream instead. + + + + \bFileUtils\.getTempDirectory\b + Use System.getProperty instead. + + + + \bFileUtils\.readLines\b + Use Files.readAllLines instead. + + + + \bFiles\.readLines\b + Use Files.readAllLines instead. + + + + \bFileUtils\.readFileToString\b + Use Files.readString instead. + + + + \bFiles\.asCharSource\b + Use Files.readString instead. + + + + \bFileUtils\.write\b + Use Files.writeString instead. + + + + \bFiles\.asCharSink\b + Use Files.writeString instead. + + + + \bFileUtils\.writeLines\b + Use Files.write instead. + + + + \bFileUtils\.cleanDirectory\b + Use cleanDirectory of JavaUtils/SparkFileUtils/Utils + + + + \bFileUtils\.deleteDirectory\b + Use deleteRecursively of JavaUtils/SparkFileUtils/Utils + + + + \bFileUtils\.forceDelete\b + Use deleteRecursively of JavaUtils/SparkFileUtils/Utils + + + + \bFileUtils\.forceDeleteOnExit\b + Use forceDeleteOnExit of JavaUtils/SparkFileUtils/Utils instead. + + + + \bFileUtils\.deleteQuietly\b + Use deleteQuietly of JavaUtils/SparkFileUtils/Utils + + + + \bFileUtils\.readFileToByteArray\b + Use java.nio.file.Files.readAllBytes + + + + \bFileUtils\.sizeOf(Directory)?\b + Use sizeOf of JavaUtils or Utils instead. + + + + \bFileUtils\.moveFile\b + Use copyFile of JavaUtils/SparkFileUtils/Utils instead. + + + + \bFileUtils\.copyURLToFile\b + Use copyURLToFile of JavaUtils instead. + + + + \bFileUtils\.copyFile\b + Use copyFile of SparkFileUtils or Utils instead. + + + + \bFileUtils\.copyFileToDirectory\b + Use copyFileToDirectory of SparkFileUtils or Utils instead. + + + + \bFileUtils\.copyDirectory\b + Use copyDirectory of JavaUtils/SparkFileUtils/Utils instead. + + + + \bFileUtils\.moveDirectory\b + Use copyDirectory of SparkFileUtils or Utils instead. + + + + \bFileUtils\.contentEquals\b + Use contentEquals of SparkFileUtils or Utils instead. + + + + org\.apache\.commons\.lang\. + Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead + of Commons Lang 2 (package org.apache.commons.lang.*) + + + + \bFileUtils\.getFile\b + Use getFile of SparkFileUtil or Utils instead. + + + + \bFileUtils\.touch\b + Use touch of SparkFileUtil or Utils instead. + + + + \bFiles\.touch\b + Use touch of SparkFileUtil or Utils instead. + + + + \bFileUtils\.writeStringToFile\b + Use java.nio.file.Files.writeString instead. + + + + \bFileUtils\.listFiles\b + Use listFiles of SparkFileUtil or Utils instead. + + + + org\.apache\.commons\.codec\.binary\.Base64\b + Use java.util.Base64 instead + + + + org\.apache\.commons\.lang3\..*JavaVersion + Use JEP 223 API (java.lang.Runtime.Version) instead of + Commons Lang 3 JavaVersion (org.apache.commons.lang3.JavaVersion) + + + + org\.apache\.commons\.lang3\.tuple + Use org.apache.spark.util.Pair instead + + + + org\.apache\.commons\.lang3\.builder\.ToStringBuilder + Use String concatenation instead + + + + \bStringUtils\.(left|right)Pad\b + Use (left|right)Pad of SparkStringUtils or Utils instead + + + + \bStringUtils\.split\b + Use Utils.stringToSeq instead + + + + \bStringUtils\.is(Not)?(Blank|Empty)\b + Use Utils.is(Not)?(Blank|Empty) instead + + + + \bExceptionUtils\.getRootCause\b + Use getRootCause of SparkErrorUtils or Utils instead + + + + \bExceptionUtils\.getStackTrace\b + Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead. + + + + org\.apache\.commons\.lang3\.Strings\b + Use Java String methods instead + + + + \bStringUtils\.strip\b + Use Utils.strip method instead + + + + \bHex\.encodeHexString\b + Use java.util.HexFormat instead + + + + org\.apache\.commons\.io\.FileUtils\b + Use Java API or Spark's JavaUtils/SparkSystemUtils/Utils instead + + + + org\.apache\.commons\.lang3\.StringUtils\b + Use Java String or Spark's Utils/JavaUtils methods instead + + + + org\.apache\.commons\.lang3\.SystemUtils\b + Use SparkSystemUtils or Utils instead + + + + org\.apache\.commons\.text\.StringSubstitutor\b + Use org.apache.spark.StringSubstitutor instead + + + + \bStringUtils\.abbreviate\b + Use Utils.abbreviate method instead + + + + \bStringUtils\.substring\b + Use Java String.substring instead. + + + + \bUriBuilder\.fromUri\b + Use Utils.getUriBuilder instead. + + + + scala\.concurrent\.ExecutionContext\.Implicits\.global + User queries can use global thread pool, causing starvation and eventual OOM. + Thus, Spark-internal APIs should not use this thread pool + + + + \bFileSystem\.get\([a-zA-Z_$][a-zA-Z_$0-9]*\) + + + + + extractOpt + Use jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter + is slower. + + + + + java,scala,3rdParty,auron + javax?\..* + scala\..* + (?!(javax?\.|scala\.|org\.apache\.auron\.)).* + org\.apache\.auron\..* + + + + + + COMMA + + + + + + \)\{ + + + + + (?m)^(\s*)/[*][*].*$(\r|)\n^\1 [*] + Use Javadoc style indentation for multiline comments + + + + case[^\n>]*=>\s*\{ + Omit braces in case clauses. + + + + new (java\.lang\.)?(Byte|Integer|Long|Short)\( + Use static factory 'valueOf' or 'parseXXX' instead of the deprecated constructors. + + + + + + + + + + Please use scala.collection instead. + + + + + Please use Apache Log4j 2 instead. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 800> + + + + + 30 + + + + + 10 + + + + + 50 + + + + + + + + + + + -1,0,1,2,3 + + + + \bbyteCountToDisplaySize\b + Use Utils.bytesToString instead of byteCountToDisplaySize for consistency. + + + + new Path\(new URI\( + + + + + new URL\( + Use URI.toURL or URL.of instead of URL constructors. + + + + buildConf\("spark.databricks. + Use Apache Spark config namespace. + + + + com\.google\.common\.base\.Strings\b + Use Java built-in methods or SparkStringUtils instead + + + + org\.apache\.hadoop\.io\.IOUtils\b + Use org.apache.spark.util.Utils instead. + + + + Charset\.defaultCharset + Use StandardCharsets.UTF_8 instead. + + + + \bIOUtils\.toByteArray\b + Use Java readAllBytes instead. + + + + \bIOUtils\.closeQuietly\b + Use closeQuietly of SparkErrorUtils or Utils instead. + + + + \bIOUtils\.copy\b + Use Java transferTo instead. + + + + \bIOUtils\.toString\b + Use toString of SparkStreamUtils or Utils instead. + + + + \bCharStreams\.toString\b + Use toString of SparkStreamUtils or Utils instead. + + + + \bIOUtils\.write\b + Use Java `write` instead. + + + + \bByteStreams\.read\b + Use Java readNBytes instead. + + + + \bByteStreams\.copy\b + Use Java transferTo instead. + + + + \bByteStreams\.skipFully\b + Use Java `skipNBytes` instead. + + + + \bByteStreams\.readFully\b + Use readFully of JavaUtils/SparkStreamUtils/Utils instead. + + + + \bByteStreams\.nullOutputStream\b + Use OutputStream.nullOutputStream instead. + + + + \bImmutableMap\.copyOf\b + Use Map.copyOf instead. + + + + \bImmutableSet\.of\b + Use java.util.Set.of instead. + + + + \bCollections\.emptySet\b + Use java.util.Set.of() instead. + + + + org\.apache\.commons\.collections4\.MapUtils\b + Use org.apache.spark.util.collection.Utils instead. + + + + com\.google\.common\.io\.Files\b + Use Java API or Spark's JavaUtils/SparkFileUtils/Utils instead. + + + + com\.google\.common\.base\.Objects\b + Use Java APIs (like java.util.Objects) instead. + + + + com\.google\.common\.base\.Joiner\b + Use Java APIs (like String.join/StringJoiner) instead. + + + + com\.google\.common\.io\.BaseEncoding\b + Use Java APIs (like java.util.Base64) instead. + + + + \bThrowables\.getStackTraceAsString\b + Use stackTraceToString of JavaUtils/SparkFileUtils/Utils instead. + + + + \bThrowables\.getRootCause\b + Use getRootCause of SparkErrorUtils or Utils instead + + + + \bPreconditions\.checkNotNull\b + Use requireNonNull of java.util.Objects instead. + + + + \bInts\.checkedCast\b + Use JavaUtils.checkedCast instead. + + + + \bstartsWith\("k8s\b + Use SparkMasterRegex.isK8s instead. + + diff --git a/pom.xml b/pom.xml index 63ebf1873..b02819c59 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ 3.9.12 4.9.2 2.2.0 + 1.0.0 false @@ -466,6 +467,32 @@ + + org.scalastyle + scalastyle-maven-plugin + ${maven.plugin.scalastyle.version} + + false + true + false + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${maven.multiModuleProjectDirectory}/dev/scalastyle-config.xml + ${basedir}/target/scalastyle-output.xml + ${project.build.sourceEncoding} + ${project.reporting.outputEncoding} + + + + + check + + validate + + + + org.codehaus.mojo flatten-maven-plugin diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala index 4d559b32f..e19c86e50 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -104,7 +104,8 @@ case class NativeParquetInsertIntoHiveTableExec( } @sparkver("3.2 / 3.3") - override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { + override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) + : org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker @@ -138,7 +139,8 @@ case class NativeParquetInsertIntoHiveTableExec( } @sparkver("3.1") - override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { + override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) + : org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats @@ -187,7 +189,8 @@ case class NativeParquetInsertIntoHiveTableExec( } @sparkver("3.0") - override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = { + override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) + : org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = { import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala index 37952b205..9d688ad1c 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.auron.plan import scala.collection.mutable -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import org.apache.spark._ @@ -78,6 +77,9 @@ case class NativeShuffleExchangeExec( // 'mapOutputStatisticsFuture' is only needed when enable AQE. @transient override lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = { + // scalastyle:off executioncontextglobal + import scala.concurrent.ExecutionContext.Implicits.global + // scalastyle:on executioncontextglobal if (inputRDD.getNumPartitions == 0) { Future.successful(null) } else { @@ -173,7 +175,7 @@ case class NativeShuffleExchangeExec( outputPartitioning != SinglePartition @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") - override def shuffleOrigin = { + override def shuffleOrigin: org.apache.spark.sql.execution.exchange.ShuffleOrigin = { import org.apache.spark.sql.execution.exchange.ShuffleOrigin; _shuffleOrigin.get.asInstanceOf[ShuffleOrigin] } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala index fd51bec3d..2f04829c6 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala @@ -62,7 +62,8 @@ case class NativeBroadcastJoinExec( } @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") - override def requiredChildDistribution = { + override def requiredChildDistribution + : List[org.apache.spark.sql.catalyst.plans.physical.Distribution] = { import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode @@ -83,7 +84,7 @@ case class NativeBroadcastJoinExec( override def supportCodegen: Boolean = false @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") - override def inputRDDs() = { + override def inputRDDs(): Nothing = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronArrowColumnVector.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronArrowColumnVector.scala index ca5c12cbd..d8be57e61 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronArrowColumnVector.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronArrowColumnVector.scala @@ -190,7 +190,7 @@ object AuronArrowColumnVector { private class NullAccessor(vector: NullVector) extends AuronArrowColumnVector.ArrowVectorAccessor(vector) { - override def isNullAt(rowId: Int) = true + override def isNullAt(rowId: Int): Boolean = true } private class BooleanAccessor(vector: BitVector) @@ -215,7 +215,7 @@ object AuronArrowColumnVector { private class UInt4Accessor(vector: UInt4Vector) extends AuronArrowColumnVector.ArrowVectorAccessor(vector) { - final override def getInt(rowId: Int) = vector.get(rowId) + final override def getInt(rowId: Int): Int = vector.get(rowId) } private class UInt8Accessor(vector: UInt8Vector) @@ -260,14 +260,15 @@ object AuronArrowColumnVector { extends AuronArrowColumnVector.ArrowVectorAccessor(vector) { final private val stringResult = new NullableVarCharHolder - final override def getUTF8String(rowId: Int) = { + final override def getUTF8String(rowId: Int): UTF8String = { vector.get(rowId, stringResult) if (stringResult.isSet == 0) null - else + else { UTF8String.fromAddress( null, stringResult.buffer.memoryAddress + stringResult.start, stringResult.end - stringResult.start) + } } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarMap.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarMap.scala index 2fed6bf11..fc506abd2 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarMap.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarMap.scala @@ -33,5 +33,5 @@ class AuronColumnarMap( override def valueArray: ArrayData = new AuronColumnarArray(values, offset, length) - override def copy = new ArrayBasedMapData(keyArray.copy, valueArray.copy) + override def copy: ArrayBasedMapData = new ArrayBasedMapData(keyArray.copy, valueArray.copy) } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReaderBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReaderBase.scala index b64ad0459..477c1b5e3 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReaderBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReaderBase.scala @@ -90,8 +90,10 @@ object AuronBlockStoreShuffleReaderBase extends Logging { } private def unwrapInputStream(in: InputStream): InputStream = { + // scalastyle:off classforname val bufferReleasingInputStreamCls = Class.forName("org.apache.spark.storage.BufferReleasingInputStream") + // scalastyle:on classforname if (in.getClass != bufferReleasingInputStreamCls) { return in } diff --git a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala index 9a786b902..18745f5c7 100644 --- a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala +++ b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala @@ -250,8 +250,9 @@ class AuronUniffleShuffleReader[K, C]( var readBytes = 0 while (readBytes < len) { while (byteBuffer == null || !byteBuffer.hasRemaining()) { - if (!this.toNextBuffer) + if (!this.toNextBuffer) { return if (readBytes > 0) readBytes else -1 + } } val bytesToRead = Math.min(byteBuffer.remaining(), len - readBytes) byteBuffer.get(arrayBytes, off + readBytes, bytesToRead)