diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ce04872..958930c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -68,10 +68,10 @@ jobs: key: ${{ runner.os }}-gradle-wrapper-${{ hashFiles('**/gradle/wrapper/gradle-wrapper.properties') }} - name: Style Check - run: ./gradlew scalastyleMainCheck -PscalaVersion=${{ matrix.scalaVersion }} -PsparkVersion=${{ matrix.sparkVersion }} -PscalaCompt=${{ matrix.scalaCompt }} + run: ./gradlew :spark:scalastyleMainCheck -PscalaVersion=${{ matrix.scalaVersion }} -PsparkVersion=${{ matrix.sparkVersion }} -PscalaCompt=${{ matrix.scalaCompt }} - name: Test - run: ./gradlew test -PscalaVersion=${{ matrix.scalaVersion }} -PsparkVersion=${{ matrix.sparkVersion }} -PscalaCompt=${{ matrix.scalaCompt }} + run: ./gradlew test -x :flink:test -PscalaVersion=${{ matrix.scalaVersion }} -PsparkVersion=${{ matrix.sparkVersion }} -PscalaCompt=${{ matrix.scalaCompt }} # run: ./gradlew test aggregateScoverage -PscalaVersion=${{ matrix.scalaVersion }} -PsparkVersion=${{ matrix.sparkVersion }} -PscalaCompt=${{ matrix.scalaCompt }} # - uses: codecov/codecov-action@v2 diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/datasource/ConsoleDataSource.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/datasource/ConsoleDataSource.scala index 1a4896b..0fbb6a4 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/datasource/ConsoleDataSource.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/datasource/ConsoleDataSource.scala @@ -15,6 +15,6 @@ class ConsoleDataSource extends Sink[DataFrame] { println(df.explain()) println("console output:") - df.fetch(10000).execute().print() + df.execute().print() } } diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala index e1a27d9..0bb0357 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/quality/FlinkQualityCheck.scala @@ -2,7 +2,7 @@ package com.github.sharpdata.sharpetl.flink.quality import com.github.sharpdata.sharpetl.core.annotation.Annotations.Stable import com.github.sharpdata.sharpetl.core.quality.QualityCheck._ -import com.github.sharpdata.sharpetl.core.quality.{DataQualityCheckResult, DataQualityConfig, ErrorType, QualityCheck, QualityCheckRule} +import com.github.sharpdata.sharpetl.core.quality._ import com.github.sharpdata.sharpetl.core.repository.QualityCheckAccessor import com.github.sharpdata.sharpetl.core.util.{ETLLogger, StringUtil} import com.github.sharpdata.sharpetl.flink.extra.driver.FlinkJdbcStatement.fixedResult @@ -13,7 +13,6 @@ import org.apache.flink.table.api.{TableEnvironment, ValidationException} import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation} import java.util -import java.util.List import scala.jdk.CollectionConverters.asScalaIteratorConverter @Stable(since = "1.0.0") @@ -29,12 +28,14 @@ class FlinkQualityCheck(val tEnv: TableEnvironment, ETLLogger.info(s"execution sql:\n $sql") tEnv.sqlQuery(sql).execute().collect().asScala .map(it => DataQualityCheckResult( + // scalastyle:off it.getField(0).toString, // column it.getField(1).toString, // dataCheckType it.getField(2).toString, // ids it.getField(3).toString.split(DELIMITER).head, // errorType it.getField(4).toString.toInt, // warnCount it.getField(5).toString.toInt) // errorCount + // scalastyle:on ) .filterNot(it => it.warnCount < 1 && it.errorCount < 1) .toSeq diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala index acf2fb1..e1a6c95 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala @@ -59,7 +59,7 @@ object ETLFlinkSession { // } // } - def createCatalogIfNeed(etlDatabaseType: String, session: TableEnvironment) = { + def createCatalogIfNeed(etlDatabaseType: String, session: TableEnvironment): Unit = { if (etlDatabaseType == FLINK_SHARP_ETL) { val catalogName = ETLConfig.getProperty("flyway.catalog") val catalog = session.getCatalog(catalogName)