diff --git a/flink/build.gradle b/flink/build.gradle index 5626c58..249eac7 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -60,7 +60,7 @@ dependencies { implementation "org.apache.flink:flink-clients:${flinkVersion}" implementation "org.apache.flink:flink-connector-files:${flinkVersion}" implementation "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}" - implementation "org.apache.flink:flink-table-api-java-uber:${flinkVersion}" + implementation "org.apache.flink:flink-table-api-java:${flinkVersion}" // -------------------------------------------------------------- // Dependencies that should be part of the shadow jar, e.g. @@ -254,7 +254,6 @@ shadowJar { relocate "com.google", "com.github.sharpdata.sharpetl.google" relocate "org.apache.commons.net", "com.github.sharpdata.sharpetl.commons.net" relocate "com.zaxxer.hikari", "com.github.sharpdata.sharpetl.hikari" - //from '../hadoop' archiveFileName = "sharp-etl-flink-standalone-${flinkVersion}_${scalaVersion}-${version}.jar" mergeServiceFiles { // https://github.com/flyway/flyway/issues/3482#issuecomment-1493367875 diff --git a/flink/src/main/resources/application.properties b/flink/src/main/resources/application.properties index 01b2b7e..28f98cb 100644 --- a/flink/src/main/resources/application.properties +++ b/flink/src/main/resources/application.properties @@ -4,9 +4,13 @@ etl.default.jobTime.column=job_time flyway.url=jdbc:flink_sharp_etl://localhost/sharp_etl flyway.catalog=paimon flyway.database=sharp_etl -flyway.warehouse=file:///Users/izhangzhihao/Downloads/sharp-etl/paimon-warehouse flyway.driver=com.github.sharpdata.sharpetl.flink.extra.driver.FlinkJdbcDriver +flyway.warehouse=oss://sh-flink/warehouse +flyway.endpoint=oss-cn-shanghai-internal.aliyuncs.com +flyway.ak=AKAKAKAKAKAKA +flyway.sk=SKSKSKSKSKSKSKSKSK flink.default.__table.exec.sort.non-temporal.enabled__=true flink.default.execution.runtime-mode=batch -flink.default.sql-client.execution.result-mode=tableau \ No newline at end of file +flink.default.sql-client.execution.result-mode=tableau +flink.default.table.dml-sync=true \ No newline at end of file diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/Entrypoint.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/Entrypoint.scala index 4bdeca4..92d5071 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/Entrypoint.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/Entrypoint.scala @@ -8,6 +8,7 @@ object Entrypoint { val errorHandler: CommandLine.IExecutionExceptionHandler = new CommandLine.IExecutionExceptionHandler() { def handleExecutionException(ex: Exception, commandLine: CommandLine, parseResult: CommandLine.ParseResult): Int = { + println("Failed to execute job, exiting with error: " + ex.getMessage) ex.printStackTrace() commandLine.getCommandSpec.exitCodeOnExecutionException } @@ -18,6 +19,7 @@ object Entrypoint { args: _* ) if (!succeed(code)) { + println("Failed to execute job, exiting with code " + code) System.exit(code) } } diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/cli/Command.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/cli/Command.scala index 7e06093..eedd69c 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/cli/Command.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/cli/Command.scala @@ -21,9 +21,9 @@ class SingleFlinkJobCommand extends SingleJobCommand { ETLConfig.setPropertyPath(propertyPath, env) val etlDatabaseType = JDBCUtil.dbType val interpreter = getFlinkInterpreter(local, wfName, releaseResource, etlDatabaseType, readQualityCheckRules()) - migrate() //JavaVersionChecker.checkJavaVersion() try { + migrate() val wfInterpretingResult: WfEvalResult = LogDrivenInterpreter( WorkflowReader.readWorkflow(wfName), interpreter, @@ -32,6 +32,10 @@ class SingleFlinkJobCommand extends SingleJobCommand { ).eval() new NotificationUtil(jobLogAccessor).notify(Seq(wfInterpretingResult)) throwFirstException(Seq(wfInterpretingResult)) + } catch { + case e: Exception => + ETLLogger.error("Failed to execute job", e) + throw e } finally { interpreter.close() } diff --git a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala index e1a6c95..e6d02bb 100644 --- a/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala +++ b/flink/src/main/scala/com/github/sharpdata/sharpetl/flink/util/ETLFlinkSession.scala @@ -66,7 +66,16 @@ object ETLFlinkSession { if (!catalog.isPresent) { if (local) { ETLLogger.info(s"catalog $catalogName not found, create it") - session.executeSql(s"CREATE CATALOG $catalogName WITH ('type' = 'paimon', 'warehouse' = '${ETLConfig.getProperty("flyway.warehouse")}')") + session.executeSql( + s""" + |CREATE CATALOG $catalogName + |WITH ( + | 'type' = 'paimon', + | 'warehouse' = '${ETLConfig.getProperty("flyway.warehouse")}', + | 'fs.oss.endpoint' = '${ETLConfig.getProperty("flyway.endpoint")}', + | 'fs.oss.accessKeyId' = '${ETLConfig.getProperty("flyway.ak")}', + | 'fs.oss.accessKeySecret' = '${ETLConfig.getProperty("flyway.sk")}' + |)""".stripMargin) ETLFlinkSession.batchEnv.useCatalog(catalogName) session.executeSql(s"CREATE DATABASE IF NOT EXISTS ${ETLConfig.getProperty("flyway.database")}") } else {