diff --git a/CHANGELOG.md b/CHANGELOG.md index 854f54d..bf97c27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +0.2.3 (2019-07-19) +================== +* [New Feature] Add `athena.drop_table_multi>` operator +* [Enhancement] Expose the real error message and state when the query execution is failed. + 0.2.2 (2019-07-19) ================== diff --git a/README.md b/README.md index 7f14b6b..d70354e 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.2 + - pro.civitaspo:digdag-operator-athena:0.2.3 athena: auth_method: profile @@ -228,7 +228,7 @@ Nothing ## Configuration for `athena.drop_table>` operator - **database**: The name of the database. (string, required) -- **table**: The name of the partitioned table. (string, required) +- **table**: The name of the table. (string, required) - **with_location**: Drop the partition with removing objects on S3 (boolean, default: `false`) - **ignore_if_not_exist**: Ignore if the partition does not exist. (boolean, default: `true`) - **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional) @@ -237,6 +237,18 @@ Nothing Nothing +## Configuration for `athena.drop_table_multi>` operator + +- **database**: The name of the database. (string, required) +- **regexp**: The regular expression for table names to be dropped. (string, required) +- **with_location**: Drop the partition with removing objects on S3 (boolean, default: `false`) +- **limit**: Max number of tables that can be dropped. (integer, optional) +- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional) + +### Output Parameters + +Nothing + # Development ## Run an Example diff --git a/build.gradle b/build.gradle index 3c39e98..19f9344 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.2.2' +version = '0.2.3' def digdagVersion = '0.9.37' def awsSdkVersion = "1.11.587" diff --git a/example/example.dig b/example/example.dig index 496e0f6..df3adf1 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.2 + - pro.civitaspo:digdag-operator-athena:0.2.3 athena: auth_method: profile value: 5 @@ -22,7 +22,7 @@ _export: athena.ctas>: template.sql database: ${database} table: hoge - location: ${output} + location: ${output}/ +step5: echo>: ${athena} @@ -90,3 +90,26 @@ _export: b: "9" c: "10" save_mode: overwrite + ++step14: + loop>: 10 + _parallel: true + _do: + athena.ctas>: template.sql + database: ${database} + table: hoge_${i} + location: ${output}/hoge_${i}/ + save_mode: overwrite + ++step15: + athena.drop_table_multi>: + database: ${database} + regexp: 'hoge_\d+' + with_location: true + limit: 3 + ++step16: + athena.drop_table_multi>: + database: ${database} + regexp: 'hoge_\d+' + with_location: true diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index e67422c..0da9bcd 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -12,6 +12,7 @@ import pro.civitaspo.digdag.plugin.athena.apas.{AthenaApasOperator, AthenaDiffSc import pro.civitaspo.digdag.plugin.athena.ctas.AthenaCtasOperator import pro.civitaspo.digdag.plugin.athena.drop_partition.AthenaDropPartitionOperator import pro.civitaspo.digdag.plugin.athena.drop_table.AthenaDropTableOperator +import pro.civitaspo.digdag.plugin.athena.drop_table_multi.AthenaDropTableMultiOperator import pro.civitaspo.digdag.plugin.athena.preview.AthenaPreviewOperator import pro.civitaspo.digdag.plugin.athena.query.AthenaQueryOperator @@ -36,7 +37,8 @@ object AthenaPlugin operatorFactory("athena.ctas", classOf[AthenaCtasOperator]), operatorFactory("athena.query", classOf[AthenaQueryOperator]), operatorFactory("athena.preview", classOf[AthenaPreviewOperator]), - operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]) + operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]), + operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator]) ) } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/Athena.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/Athena.scala index bb8b532..e3d7a4c 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/Athena.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/Athena.scala @@ -97,10 +97,20 @@ case class Athena(aws: Aws) outputLocation = outputLocation, requestToken = requestToken) - waitQueryExecution(executionId = executionId, - successStates = successStates, - failureStates = failureStates, - timeout = timeout) + val t = Try { + waitQueryExecution(executionId = executionId, + successStates = successStates, + failureStates = failureStates, + timeout = timeout) + } + t match { + case Success(_) => logger.info(s"Success to execute the query: $executionId") + case Failure(exception) => + logger.error(exception.getMessage, exception) + val qe = getQueryExecution(executionId = executionId) + throw new IllegalStateException(s"Failed the query execution: ${qe.withQuery(null).toString}", exception) + } + getQueryExecution(executionId = executionId) } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/TableCatalog.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/TableCatalog.scala index 150b253..f46d3e9 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/TableCatalog.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/glue/catalog/TableCatalog.scala @@ -1,9 +1,10 @@ package pro.civitaspo.digdag.plugin.athena.aws.glue.catalog -import com.amazonaws.services.glue.model.{DeleteTableRequest, GetTableRequest, Table} +import com.amazonaws.services.glue.model.{DeleteTableRequest, GetTableRequest, GetTablesRequest, Table} import pro.civitaspo.digdag.plugin.athena.aws.glue.Glue +import scala.jdk.CollectionConverters._ import scala.util.Try @@ -45,4 +46,31 @@ case class TableCatalog(glue: Glue) glue.withGlue(_.deleteTable(req)) } + def list(catalogIdOption: Option[String], + database: String, + expression: Option[String] = None, + limit: Option[Int] = None): Seq[Table] = + { + val req = new GetTablesRequest() + catalogIdOption.foreach(req.setCatalogId) + req.setDatabaseName(database) + expression.foreach(req.setExpression) + limit.foreach(l => req.setMaxResults(l)) + + def recursiveGetTables(nextToken: Option[String] = None): Seq[Table] = + { + nextToken.foreach(req.setNextToken) + val results = glue.withGlue(_.getTables(req)) + val tables = results.getTableList.asScala.toSeq + limit.foreach { i => + if (tables.length >= i) return tables.slice(0, i) + } + Option(results.getNextToken) match { + case Some(nt) => tables ++ recursiveGetTables(nextToken = Option(nt)) + case None => tables + } + } + + recursiveGetTables() + } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table_multi/AthenaDropTableMultiOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table_multi/AthenaDropTableMultiOperator.scala new file mode 100644 index 0000000..734c42d --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table_multi/AthenaDropTableMultiOperator.scala @@ -0,0 +1,42 @@ +package pro.civitaspo.digdag.plugin.athena.drop_table_multi + + +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} +import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator + + +class AthenaDropTableMultiOperator(operatorName: String, + context: OperatorContext, + systemConfig: Config, + templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) +{ + val database: String = params.get("database", classOf[String]) + val regexp: String = params.getOptional("regexp", classOf[String]).orNull() + val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull()) + val withLocation: Boolean = params.get("with_location", classOf[Boolean], false) + val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull()) + + override def runTask(): TaskResult = + { + logger.info(s"Drop tables matched by the expression: /$regexp/ in $database") + aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t => + if (withLocation) { + val location: String = { + val l = t.getStorageDescriptor.getLocation + if (l.endsWith("/")) l + else l + "/" + } + if (aws.s3.hasObjects(location)) { + logger.info(s"Delete objects because the location $location has objects.") + aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}")) + } + } + logger.info(s"Drop the table '$database.${t.getName}'") + aws.glue.table.delete(catalogId, database, t.getName) + } + TaskResult.empty(cf) + } + +}