Skip to content

Commit

Permalink
Merge pull request #63 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.2.3
  • Loading branch information
civitaspo authored Jul 19, 2019
2 parents 6ac81c0 + b920f48 commit fa82f07
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.2.3 (2019-07-19)
==================
* [New Feature] Add `athena.drop_table_multi>` operator
* [Enhancement] Expose the real error message and state when the query execution is failed.

0.2.2 (2019-07-19)
==================

Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.2
- pro.civitaspo:digdag-operator-athena:0.2.3
athena:
auth_method: profile

Expand Down Expand Up @@ -228,7 +228,7 @@ Nothing
## Configuration for `athena.drop_table>` operator

- **database**: The name of the database. (string, required)
- **table**: The name of the partitioned table. (string, required)
- **table**: The name of the table. (string, required)
- **with_location**: Drop the partition with removing objects on S3 (boolean, default: `false`)
- **ignore_if_not_exist**: Ignore if the partition does not exist. (boolean, default: `true`)
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
Expand All @@ -237,6 +237,18 @@ Nothing

Nothing

## Configuration for `athena.drop_table_multi>` operator

- **database**: The name of the database. (string, required)
- **regexp**: The regular expression for table names to be dropped. (string, required)
- **with_location**: Drop the partition with removing objects on S3 (boolean, default: `false`)
- **limit**: Max number of tables that can be dropped. (integer, optional)
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)

### Output Parameters

Nothing

# Development

## Run an Example
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.2.2'
version = '0.2.3'

def digdagVersion = '0.9.37'
def awsSdkVersion = "1.11.587"
Expand Down
27 changes: 25 additions & 2 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.2
- pro.civitaspo:digdag-operator-athena:0.2.3
athena:
auth_method: profile
value: 5
Expand All @@ -22,7 +22,7 @@ _export:
athena.ctas>: template.sql
database: ${database}
table: hoge
location: ${output}
location: ${output}/

+step5:
echo>: ${athena}
Expand Down Expand Up @@ -90,3 +90,26 @@ _export:
b: "9"
c: "10"
save_mode: overwrite

+step14:
loop>: 10
_parallel: true
_do:
athena.ctas>: template.sql
database: ${database}
table: hoge_${i}
location: ${output}/hoge_${i}/
save_mode: overwrite

+step15:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
limit: 3

+step16:
athena.drop_table_multi>:
database: ${database}
regexp: 'hoge_\d+'
with_location: true
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import pro.civitaspo.digdag.plugin.athena.apas.{AthenaApasOperator, AthenaDiffSc
import pro.civitaspo.digdag.plugin.athena.ctas.AthenaCtasOperator
import pro.civitaspo.digdag.plugin.athena.drop_partition.AthenaDropPartitionOperator
import pro.civitaspo.digdag.plugin.athena.drop_table.AthenaDropTableOperator
import pro.civitaspo.digdag.plugin.athena.drop_table_multi.AthenaDropTableMultiOperator
import pro.civitaspo.digdag.plugin.athena.preview.AthenaPreviewOperator
import pro.civitaspo.digdag.plugin.athena.query.AthenaQueryOperator

Expand All @@ -36,7 +37,8 @@ object AthenaPlugin
operatorFactory("athena.ctas", classOf[AthenaCtasOperator]),
operatorFactory("athena.query", classOf[AthenaQueryOperator]),
operatorFactory("athena.preview", classOf[AthenaPreviewOperator]),
operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator])
operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]),
operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator])
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,20 @@ case class Athena(aws: Aws)
outputLocation = outputLocation,
requestToken = requestToken)

waitQueryExecution(executionId = executionId,
successStates = successStates,
failureStates = failureStates,
timeout = timeout)
val t = Try {
waitQueryExecution(executionId = executionId,
successStates = successStates,
failureStates = failureStates,
timeout = timeout)
}
t match {
case Success(_) => logger.info(s"Success to execute the query: $executionId")
case Failure(exception) =>
logger.error(exception.getMessage, exception)
val qe = getQueryExecution(executionId = executionId)
throw new IllegalStateException(s"Failed the query execution: ${qe.withQuery(null).toString}", exception)
}


getQueryExecution(executionId = executionId)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pro.civitaspo.digdag.plugin.athena.aws.glue.catalog


import com.amazonaws.services.glue.model.{DeleteTableRequest, GetTableRequest, Table}
import com.amazonaws.services.glue.model.{DeleteTableRequest, GetTableRequest, GetTablesRequest, Table}
import pro.civitaspo.digdag.plugin.athena.aws.glue.Glue

import scala.jdk.CollectionConverters._
import scala.util.Try


Expand Down Expand Up @@ -45,4 +46,31 @@ case class TableCatalog(glue: Glue)
glue.withGlue(_.deleteTable(req))
}

def list(catalogIdOption: Option[String],
database: String,
expression: Option[String] = None,
limit: Option[Int] = None): Seq[Table] =
{
val req = new GetTablesRequest()
catalogIdOption.foreach(req.setCatalogId)
req.setDatabaseName(database)
expression.foreach(req.setExpression)
limit.foreach(l => req.setMaxResults(l))

def recursiveGetTables(nextToken: Option[String] = None): Seq[Table] =
{
nextToken.foreach(req.setNextToken)
val results = glue.withGlue(_.getTables(req))
val tables = results.getTableList.asScala.toSeq
limit.foreach { i =>
if (tables.length >= i) return tables.slice(0, i)
}
Option(results.getNextToken) match {
case Some(nt) => tables ++ recursiveGetTables(nextToken = Option(nt))
case None => tables
}
}

recursiveGetTables()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package pro.civitaspo.digdag.plugin.athena.drop_table_multi


import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator


class AthenaDropTableMultiOperator(operatorName: String,
context: OperatorContext,
systemConfig: Config,
templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine)
{
val database: String = params.get("database", classOf[String])
val regexp: String = params.getOptional("regexp", classOf[String]).orNull()
val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull())
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())

override def runTask(): TaskResult =
{
logger.info(s"Drop tables matched by the expression: /$regexp/ in $database")
aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t =>
if (withLocation) {
val location: String = {
val l = t.getStorageDescriptor.getLocation
if (l.endsWith("/")) l
else l + "/"
}
if (aws.s3.hasObjects(location)) {
logger.info(s"Delete objects because the location $location has objects.")
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
}
}
logger.info(s"Drop the table '$database.${t.getName}'")
aws.glue.table.delete(catalogId, database, t.getName)
}
TaskResult.empty(cf)
}

}

0 comments on commit fa82f07

Please sign in to comment.