Skip to content

Commit

Permalink
Merge pull request #79 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.3.2
  • Loading branch information
civitaspo authored Aug 6, 2019
2 parents eb918f9 + 9119cc1 commit ee61b71
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 9 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
0.3.2 (2019-08-06)
==================

* [Fix] `aws.glue.list` bug: `limit` does not work correctly.
* [New feature] Add `athena.each_database>` operator.

0.3.1 (2019-08-05)
==================

* [Fix -- `athena.ctas>`] When using `save_mode: overwrite`, delete the specified table and location, not the table location that the data catalog has.
* [New featuere -- `athena.drop_table_multi>`] `protect` option.
* [New feature -- `athena.drop_table_multi>`] `protect` option.

0.3.0 (2019-07-30)
==================
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.3.1
- pro.civitaspo:digdag-operator-athena:0.3.2
athena:
auth_method: profile

Expand Down Expand Up @@ -282,6 +282,17 @@ Nothing
- **athena.last_partition_exists.table_exists**: `true` if the table exists, or `false` (boolean)
- **athena.last_partition_exists.location_exists**: `true` if the table location exists, or `false`. `null` if not set **with_location** option is `true`. (boolean)

## Configuration for `athena.each_database>` operator

- **catalog_id**: Glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
- **parallel_slice**: The slice number of parallelism execution for **_do** subtasks. (integer, default: `1`)
- **_do**: The definition of subtasks with exported database information. (config, required)
- exported parameters are below.
- **athena.each_database.export.name**: database name (string)
- **athena.each_database.export.created_at**: epoch millis of creation time (integer)
- **athena.each_database.export.description**: database description (string)
- **athena.each_database.export.parameters**: database parameters that are defined in glue data catalog (conifg)


# Development

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.3.1'
version = '0.3.2'

def digdagVersion = '0.9.37'
def awsSdkVersion = "1.11.587"
Expand Down
11 changes: 10 additions & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.3.1
- pro.civitaspo:digdag-operator-athena:0.3.2
athena:
auth_method: profile
value: 5
Expand Down Expand Up @@ -193,3 +193,12 @@ _export:
c: "10"
+step4:
echo>: ${athena.last_partition_exists}

+step21:
athena.each_database>:
parallel_slice: 8
_do:
+echo1:
echo>: ${athena.each_database}
+echo2:
echo>: ${athena.each_database.name}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import pro.civitaspo.digdag.plugin.athena.ctas.AthenaCtasOperator
import pro.civitaspo.digdag.plugin.athena.drop_partition.AthenaDropPartitionOperator
import pro.civitaspo.digdag.plugin.athena.drop_table.AthenaDropTableOperator
import pro.civitaspo.digdag.plugin.athena.drop_table_multi.AthenaDropTableMultiOperator
import pro.civitaspo.digdag.plugin.athena.each_database.AthenaEachDatabaseOperator
import pro.civitaspo.digdag.plugin.athena.partition_exists.AthenaPartitionExistsOperator
import pro.civitaspo.digdag.plugin.athena.preview.AthenaPreviewOperator
import pro.civitaspo.digdag.plugin.athena.query.AthenaQueryOperator
Expand Down Expand Up @@ -42,7 +43,8 @@ object AthenaPlugin
operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]),
operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator]),
operatorFactory("athena.partition_exists?", classOf[AthenaPartitionExistsOperator]),
operatorFactory("athena.table_exists?", classOf[AthenaTableExistsOperator])
operatorFactory("athena.table_exists?", classOf[AthenaTableExistsOperator]),
operatorFactory("athena.each_database", classOf[AthenaEachDatabaseOperator])
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pro.civitaspo.digdag.plugin.athena.aws.glue.catalog


import com.amazonaws.services.glue.model.{Database, GetDatabaseRequest}
import com.amazonaws.services.glue.model.{Database, GetDatabaseRequest, GetDatabasesRequest}
import pro.civitaspo.digdag.plugin.athena.aws.glue.Glue

import scala.jdk.CollectionConverters._
import scala.util.Try


Expand All @@ -25,4 +26,29 @@ case class DatabaseCatalog(glue: Glue)
Try(describe(catalogIdOption, database)).isSuccess
}

def list(catalogIdOption: Option[String],
limit: Option[Int] = None): Seq[Database] =
{
val req = new GetDatabasesRequest()
catalogIdOption.foreach(req.setCatalogId)
limit.foreach(l => req.setMaxResults(l))

def recursiveGetDatabases(nextToken: Option[String] = None,
lastDatabases: Seq[Database] = Seq()): Seq[Database] =
{
nextToken.foreach(req.setNextToken)
val results = glue.withGlue(_.getDatabases(req))
val databases = lastDatabases ++ results.getDatabaseList.asScala.toSeq
limit.foreach { i =>
if (databases.length >= i) return databases.slice(0, i)
}
Option(results.getNextToken) match {
case Some(nt) => recursiveGetDatabases(nextToken = Option(nt), lastDatabases = databases)
case None => databases
}
}

recursiveGetDatabases()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ case class TableCatalog(glue: Glue)
expression.foreach(req.setExpression)
limit.foreach(l => req.setMaxResults(l))

def recursiveGetTables(nextToken: Option[String] = None): Seq[Table] =
def recursiveGetTables(nextToken: Option[String] = None,
lastTables: Seq[Table] = Seq()): Seq[Table] =
{
nextToken.foreach(req.setNextToken)
val results = glue.withGlue(_.getTables(req))
val tables = results.getTableList.asScala.toSeq
val tables = lastTables ++ results.getTableList.asScala.toSeq
limit.foreach { i =>
if (tables.length >= i) return tables.slice(0, i)
}
Option(results.getNextToken) match {
case Some(nt) => tables ++ recursiveGetTables(nextToken = Option(nt))
case Some(nt) => recursiveGetTables(nextToken = Option(nt), lastTables = tables)
case None => tables
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package pro.civitaspo.digdag.plugin.athena.each_database


import com.amazonaws.services.glue.model.Database
import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator

import scala.jdk.CollectionConverters._
import scala.util.chaining._


class AthenaEachDatabaseOperator(operatorName: String,
context: OperatorContext,
systemConfig: Config,
templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine)
{
protected val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
protected val parallelSlice: Int = params.get("parallel_slice", classOf[Int], 1)
protected val doConfig: Config = params.getNested("_do")

override def runTask(): TaskResult =
{
val doConfigs = aws.glue.database.list(catalogId).map { db =>
cf.create().tap { newDoConfig =>
newDoConfig.getNestedOrSetEmpty(s"+${db.getName}").tap { c =>
c.setAll(doConfig)
c.setNested("_export", convertDatabaseToExport(db))
}
}
}

val builder = TaskResult.defaultBuilder(cf)
builder.subtaskConfig(generateParallelTasks(doConfigs = doConfigs))
builder.build()
}

protected def generateParallelTasks(doConfigs: Seq[Config]): Config =
{
def gen(remainingDoConfigs: Seq[Config],
result: Config = cf.create(),
idx: Int = 0): Config =
{
result.tap { r =>
val (left, right) = remainingDoConfigs.splitAt(parallelSlice)
val subTaskConfig = cf.create().tap { c =>
val taskGroup: Config = c.getNestedOrSetEmpty(s"+p$idx")
taskGroup.set("_parallel", true)
left.foreach(taskGroup.setAll)
}
r.setAll(subTaskConfig)
if (right.nonEmpty) gen(remainingDoConfigs = right, result = r, idx = idx + 1)
}
}

gen(remainingDoConfigs = doConfigs)
}

protected def convertDatabaseToExport(database: Database): Config =
{
cf.create().tap { ret =>
val export = ret
.getNestedOrSetEmpty("athena")
.getNestedOrSetEmpty("each_database")
.getNestedOrSetEmpty("export")

export.set("name", database.getName)
Option(database.getCreateTime).foreach(ct => export.set("created_at", ct.toInstant.toEpochMilli))
Option(database.getDescription).foreach(desc => export.set("description", desc))
Option(database.getParameters).foreach { p =>
p.asScala.foreach {
case (k: String, v: String) => export.getNestedOrSetEmpty("parameters").set(k, v)
}
}
}
}
}

0 comments on commit ee61b71

Please sign in to comment.