Skip to content

Commit

Permalink
Merge pull request #13 from danielvdende/dvde/add-config-driven-example
Browse files Browse the repository at this point in the history
Add config driven example
  • Loading branch information
danielvdende authored Jul 22, 2022
2 parents c408bee + 6a7c2aa commit b1aa702
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
9 changes: 8 additions & 1 deletion adfpy/activities/execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, List, Optional

from azure.mgmt.datafactory.models import ( # type: ignore
ActivityDependency,
AzureBlobStorageReadSettings,
Expand Down Expand Up @@ -73,14 +75,19 @@ def to_adf(self) -> DeleteActivity:


class AdfDatabricksSparkPythonActivity(AdfActivity):
def __init__(self, name: str, python_file: str, pipeline: AdfPipeline = None):
def __init__(self, name: str,
python_file: str,
parameters: Optional[List[Any]] = None,
pipeline: AdfPipeline = None):
super(AdfDatabricksSparkPythonActivity, self).__init__(name, pipeline)
self.python_file = python_file
self.parameters = parameters

def to_adf(self) -> DatabricksSparkPythonActivity:
return DatabricksSparkPythonActivity(
name=self.name,
python_file=self.python_file,
parameters=self.parameters,
depends_on=[
ActivityDependency(activity=dep_name, dependency_conditions=dep_conditions)
for dep_name, dep_conditions in self.depends_on.items()
Expand Down
4 changes: 4 additions & 0 deletions adfpy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ def add_dependency(self, activity_name: str, dependency_conditions: List[str] =
dependency_conditions = ["Succeeded"]
self.depends_on[activity_name] = dependency_conditions

def add_dependencies(self, activities: Dict[str, List[str]]):
for activity_name, dependency_conditions in activities.items():
self.add_dependency(activity_name, dependency_conditions)

def __rshift__(self, other: Union["AdfActivity", List["AdfActivity"]]):
"""Implements Activity >> Activity"""
if isinstance(other, List):
Expand Down
42 changes: 42 additions & 0 deletions examples/config_driven_example/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pathlib
import yaml
from azure.mgmt.datafactory.models import BlobSink, OracleSource

from adfpy.activities.execution import AdfCopyActivity, AdfDatabricksSparkPythonActivity
from adfpy.pipeline import AdfPipeline


def load_yaml(path):
with open(path, 'r') as file:
return yaml.safe_load(file)


config = load_yaml(f"{pathlib.Path(__file__).parent}/pipeline_config.yml")
dataFlowSettings = config["dataFlowSettings"]

extract = AdfCopyActivity(
name="Extract data",
input_dataset_name=dataFlowSettings["source"]["dataset"],
output_dataset_name=dataFlowSettings["landing"]["dataset"],
source_type=OracleSource,
sink_type=BlobSink,
)

ingestion_parameters = [
"--source_path",
dataFlowSettings['landing']['path'],
"--data_schema",
config["dataDefinitions"]['tables'],
"--target_path",
dataFlowSettings['ingested']['path']
]

ingest = AdfDatabricksSparkPythonActivity(name="Ingest Data",
python_file="my_ingestion_script.py",
parameters=ingestion_parameters)

extract >> ingest

pipeline = AdfPipeline(name=config["meta"]["name"],
activities=[extract, ingest],
schedule=config["meta"]["trigger"]["schedule"])
38 changes: 38 additions & 0 deletions examples/config_driven_example/pipeline_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
meta:
name: my_data
trigger:
schedule: "@daily"
dataFlowSettings:
source:
dataset: source
landing:
dataset: landing
path: "dbfs:/landing"
ingested:
dataset: ingested
path: "dbfs:/datalake"
dataDefinitions:
tables:
- table: my_table
schema: my_schema
database: my_database
target:
uniqueIdColumns:
- id
database: target_database
table: target_table
columns:
- source:
name: id
type: VARCHAR2(3)
target:
name: id
type: string
nullable: false
- source:
name: value
type: VARCHAR2(4)
target:
name: value
type: string
nullable: true

0 comments on commit b1aa702

Please sign in to comment.