Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
856 changes: 856 additions & 0 deletions dev/scalastyle-config.xml

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<maven.version>3.9.12</maven.version>
<maven.plugin.scala.version>4.9.2</maven.plugin.scala.version>
<maven.plugin.scalatest.version>2.2.0</maven.plugin.scalatest.version>
<maven.plugin.scalastyle.version>1.0.0</maven.plugin.scalastyle.version>
<maven.plugin.scalatest.exclude.tags />
<maven.plugin.scalatest.include.tags />
<maven.plugin.scalatest.debug.enabled>false</maven.plugin.scalatest.debug.enabled>
Expand Down Expand Up @@ -466,6 +467,32 @@
</executions>
</plugin>

<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>${maven.plugin.scalastyle.version}</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${maven.multiModuleProjectDirectory}/dev/scalastyle-config.xml</configLocation>
<outputFile>${basedir}/target/scalastyle-output.xml</outputFile>
<inputEncoding>${project.build.sourceEncoding}</inputEncoding>
<outputEncoding>${project.reporting.outputEncoding}</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
<phase>validate</phase>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ case class NativeParquetInsertIntoHiveTableExec(
}

@sparkver("3.2 / 3.3")
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration)
: org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.BasicWriteTaskStatsTracker
Expand Down Expand Up @@ -138,7 +139,8 @@ case class NativeParquetInsertIntoHiveTableExec(
}

@sparkver("3.1")
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration)
: org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats
Expand Down Expand Up @@ -187,7 +189,8 @@ case class NativeParquetInsertIntoHiveTableExec(
}

@sparkver("3.0")
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration)
: org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.BasicWriteTaskStats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.auron.plan

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import org.apache.spark._
Expand Down Expand Up @@ -78,6 +77,9 @@ case class NativeShuffleExchangeExec(

// 'mapOutputStatisticsFuture' is only needed when enable AQE.
@transient override lazy val mapOutputStatisticsFuture: Future[MapOutputStatistics] = {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
if (inputRDD.getNumPartitions == 0) {
Future.successful(null)
} else {
Expand Down Expand Up @@ -173,7 +175,7 @@ case class NativeShuffleExchangeExec(
outputPartitioning != SinglePartition

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def shuffleOrigin = {
override def shuffleOrigin: org.apache.spark.sql.execution.exchange.ShuffleOrigin = {
import org.apache.spark.sql.execution.exchange.ShuffleOrigin;
_shuffleOrigin.get.asInstanceOf[ShuffleOrigin]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ case class NativeBroadcastJoinExec(
}

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def requiredChildDistribution = {
override def requiredChildDistribution
: List[org.apache.spark.sql.catalyst.plans.physical.Distribution] = {
import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
Expand All @@ -83,7 +84,7 @@ case class NativeBroadcastJoinExec(
override def supportCodegen: Boolean = false

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def inputRDDs() = {
override def inputRDDs(): Nothing = {
throw new NotImplementedError("NativeBroadcastJoin dose not support codegen")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ object AuronArrowColumnVector {

private class NullAccessor(vector: NullVector)
extends AuronArrowColumnVector.ArrowVectorAccessor(vector) {
override def isNullAt(rowId: Int) = true
override def isNullAt(rowId: Int): Boolean = true
}

private class BooleanAccessor(vector: BitVector)
Expand All @@ -215,7 +215,7 @@ object AuronArrowColumnVector {

private class UInt4Accessor(vector: UInt4Vector)
extends AuronArrowColumnVector.ArrowVectorAccessor(vector) {
final override def getInt(rowId: Int) = vector.get(rowId)
final override def getInt(rowId: Int): Int = vector.get(rowId)
}

private class UInt8Accessor(vector: UInt8Vector)
Expand Down Expand Up @@ -260,14 +260,15 @@ object AuronArrowColumnVector {
extends AuronArrowColumnVector.ArrowVectorAccessor(vector) {
final private val stringResult = new NullableVarCharHolder

final override def getUTF8String(rowId: Int) = {
final override def getUTF8String(rowId: Int): UTF8String = {
vector.get(rowId, stringResult)
if (stringResult.isSet == 0) null
else
else {
UTF8String.fromAddress(
null,
stringResult.buffer.memoryAddress + stringResult.start,
stringResult.end - stringResult.start)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ class AuronColumnarMap(

override def valueArray: ArrayData = new AuronColumnarArray(values, offset, length)

override def copy = new ArrayBasedMapData(keyArray.copy, valueArray.copy)
override def copy: ArrayBasedMapData = new ArrayBasedMapData(keyArray.copy, valueArray.copy)
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ object AuronBlockStoreShuffleReaderBase extends Logging {
}

private def unwrapInputStream(in: InputStream): InputStream = {
// scalastyle:off classforname
val bufferReleasingInputStreamCls =
Class.forName("org.apache.spark.storage.BufferReleasingInputStream")
// scalastyle:on classforname
if (in.getClass != bufferReleasingInputStreamCls) {
return in
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ class AuronUniffleShuffleReader[K, C](
var readBytes = 0
while (readBytes < len) {
while (byteBuffer == null || !byteBuffer.hasRemaining()) {
if (!this.toNextBuffer)
if (!this.toNextBuffer) {
return if (readBytes > 0) readBytes else -1
}
}
val bytesToRead = Math.min(byteBuffer.remaining(), len - readBytes)
byteBuffer.get(arrayBytes, off + readBytes, bytesToRead)
Expand Down