Skip to content

Commit

Permalink
Merge pull request #9 from danielvdende/dvde/add-type-checking
Browse files Browse the repository at this point in the history
Add missing type hints and enable type check in CI
  • Loading branch information
danielvdende authored Jun 7, 2022
2 parents 75295a3 + b6f6983 commit 9b7f449
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 44 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/lint_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,23 @@ jobs:
- name: Run tests
run: |
poetry run pytest --cov=adfpy tests
type-check:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.8, 3.9, 3.10.0 ]
steps:
- name: Check out repository code
uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
curl -sSL https://install.python-poetry.org | python3 -
poetry install
- name: Run type-check
run: |
poetry run mypy adfpy
19 changes: 10 additions & 9 deletions adfpy/activities/control.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List

from azure.mgmt.datafactory.models import (
from azure.mgmt.datafactory.models import ( # type: ignore
ActivityDependency,
ExecutePipelineActivity,
Expression,
Expand All @@ -19,7 +19,7 @@ def __init__(self, name: str, pipeline_name: str, pipeline=None):
super(AdfExecutePipelineActivity, self).__init__(name, pipeline)
self.pipeline_name = pipeline_name

def to_adf(self):
def to_adf(self) -> ExecutePipelineActivity:
return ExecutePipelineActivity(
name=self.name,
pipeline=PipelineReference(reference_name=self.pipeline_name),
Expand All @@ -44,7 +44,7 @@ def __init__(
self.if_false_activities = if_false_activities
self.if_true_activities = if_true_activities

def to_adf(self):
def to_adf(self) -> IfConditionActivity:
return IfConditionActivity(
name=self.name,
expression=Expression(value=self.expression),
Expand All @@ -60,20 +60,21 @@ def to_adf(self):
class AdfForEachActivity(AdfActivity):
def __init__(
self,
name,
name: str,
items: str,
activities: List[AdfActivity],
pipeline: AdfPipeline = None,
):
super(AdfForEachActivity, self).__init__(name, pipeline)
self.items = items # TODO: this now has to be an ADF expression. Probably want to revisit this
self.activities = activities
# TODO: This is a workaround that means activities within a foreachactivity may only have a single linear line
# of dependency

# WARNING This is a workaround that means activities within a foreachactivity may only have a single linear
# line of dependency
for i in range(1, len(self.activities)):
self.activities[i].add_dependency(self.activities[i - 1].name)

def to_adf(self):
def to_adf(self) -> ForEachActivity:
return ForEachActivity(
name=self.name,
items=Expression(value=self.items),
Expand All @@ -86,11 +87,11 @@ def to_adf(self):


class AdfSetVariableActivity(AdfActivity):
def __init__(self, name, value, pipeline: AdfPipeline = None):
def __init__(self, name: str, value: str, pipeline: AdfPipeline = None):
super(AdfSetVariableActivity, self).__init__(name, pipeline)
self.value = value

def to_adf(self):
def to_adf(self) -> SetVariableActivity:
return SetVariableActivity(
name=self.name,
value=self.value,
Expand Down
33 changes: 22 additions & 11 deletions adfpy/activities/execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from azure.mgmt.datafactory.models import (
from azure.mgmt.datafactory.models import ( # type: ignore
ActivityDependency,
AzureBlobStorageReadSettings,
CopySource,
CopySink,
CopyActivity,
DatabricksSparkPythonActivity,
DatasetReference,
Expand All @@ -11,17 +13,26 @@
)

from adfpy.activity import AdfActivity
from adfpy.pipeline import AdfPipeline


class AdfCopyActivity(AdfActivity):
def __init__(self, name, input_dataset_name, output_dataset_name, source_type, sink_type, pipeline=None):
def __init__(
self,
name: str,
input_dataset_name: str,
output_dataset_name: str,
source_type: CopySource,
sink_type: CopySink,
pipeline: AdfPipeline = None
):
super(AdfCopyActivity, self).__init__(name, pipeline)
self.input_dataset = DatasetReference(reference_name=input_dataset_name)
self.output_dataset = DatasetReference(reference_name=output_dataset_name)
self.source_type = source_type
self.sink_type = sink_type

def to_adf(self):
def to_adf(self) -> CopyActivity:
return CopyActivity(
name=self.name,
inputs=[self.input_dataset],
Expand All @@ -37,7 +48,7 @@ def to_adf(self):

class AdfDeleteActivity(AdfActivity):
def __init__(
self, name: str, dataset_name: str, recursive: bool = False, wildcard: str = None, pipeline=None
self, name: str, dataset_name: str, recursive: bool = False, wildcard: str = None, pipeline: AdfPipeline = None
):
super(AdfDeleteActivity, self).__init__(name, pipeline)
# TODO: this is an arbitrary subselection of options. We should bring this in line with what the SDK exposes.
Expand All @@ -47,7 +58,7 @@ def __init__(
self.recursive = recursive
self.wildcard = wildcard

def to_adf(self):
def to_adf(self) -> DeleteActivity:
return DeleteActivity(
name=self.name,
dataset=DatasetReference(reference_name=self.dataset_name),
Expand All @@ -62,11 +73,11 @@ def to_adf(self):


class AdfDatabricksSparkPythonActivity(AdfActivity):
def __init__(self, name, python_file, pipeline=None):
def __init__(self, name: str, python_file: str, pipeline: AdfPipeline = None):
super(AdfDatabricksSparkPythonActivity, self).__init__(name, pipeline)
self.python_file = python_file

def to_adf(self):
def to_adf(self) -> DatabricksSparkPythonActivity:
return DatabricksSparkPythonActivity(
name=self.name,
python_file=self.python_file,
Expand All @@ -78,12 +89,12 @@ def to_adf(self):


class AdfLookupActivity(AdfActivity):
def __init__(self, name: str, dataset: str, source, pipeline=None):
def __init__(self, name: str, dataset: str, source: CopySource, pipeline: AdfPipeline = None):
super(AdfLookupActivity, self).__init__(name, pipeline)
self.dataset = DatasetReference(reference_name=dataset)
self.source = source

def to_adf(self):
def to_adf(self) -> LookupActivity:
return LookupActivity(
name=self.name,
dataset=self.dataset,
Expand All @@ -96,12 +107,12 @@ def to_adf(self):


class AdfSqlServerStoredProcedureActivity(AdfActivity):
def __init__(self, name: str, stored_procedure_name: str, linked_service: str, pipeline=None):
def __init__(self, name: str, stored_procedure_name: str, linked_service: str, pipeline: AdfPipeline = None):
super(AdfSqlServerStoredProcedureActivity, self).__init__(name, pipeline)
self.stored_procedure_name = stored_procedure_name
self.linked_service = LinkedServiceReference(reference_name=linked_service)

def to_adf(self):
def to_adf(self) -> SqlServerStoredProcedureActivity:
return SqlServerStoredProcedureActivity(
name=self.name,
stored_procedure_name=self.stored_procedure_name,
Expand Down
10 changes: 6 additions & 4 deletions adfpy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@


class AdfActivity:
# TODO: figure out if there's a better way here. We make the type hint a string to avoid a circular import
def __init__(self, name, pipeline: "AdfPipeline" = None): # noqa: F821
depends_on: Dict

# String as type hint to avoid a circular import
def __init__(self, name: str, pipeline: "AdfPipeline" = None): # type: ignore # noqa: F821
self.name = name
self.depends_on = dict()
if pipeline:
Expand Down Expand Up @@ -37,12 +39,12 @@ def __lshift__(self, other: "AdfActivity"):
self.add_dependency(other.name)
return other

def __rrshift__(self, other):
def __rrshift__(self, other: "AdfActivity"):
"""Called for Activity >> [Activity] because list don't have __rshift__ operators."""
self.__lshift__(other)
return self

def __rlshift__(self, other):
def __rlshift__(self, other: "AdfActivity"):
"""Called for Activity >> [Activity] because list don't have __lshift__ operators."""
self.__rshift__(other)
return self
Expand Down
12 changes: 8 additions & 4 deletions adfpy/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from typing import Set, List

from azure.identity import ClientSecretCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient # type: ignore

from adfpy.error import PipelineModuleParseException
from adfpy.pipeline import AdfPipeline


Expand Down Expand Up @@ -60,9 +61,12 @@ def load_pipelines_from_file(file_path: Path) -> Set[AdfPipeline]:
"main",
file_path,
)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
return set(var for var in vars(module).values() if isinstance(var, AdfPipeline))
if mod_spec:
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module) # type: ignore
return set(var for var in vars(module).values() if isinstance(var, AdfPipeline))
else:
raise PipelineModuleParseException(f"Could not parse module spec from path {file_path}")


def load_pipelines_from_path(path: Path, pipelines: Set[AdfPipeline] = None) -> Set[AdfPipeline]:
Expand Down
4 changes: 4 additions & 0 deletions adfpy/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ class NotSupportedError(AdfPyException):

class InvalidCronExpressionError(AdfPyException):
pass


class PipelineModuleParseException(AdfPyException):
pass
2 changes: 1 addition & 1 deletion adfpy/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime, timezone
from typing import List

from azure.mgmt.datafactory.models import PipelineResource
from azure.mgmt.datafactory.models import PipelineResource # type: ignore

from adfpy.activity import AdfActivity
from adfpy.trigger import AdfScheduleTrigger
Expand Down
17 changes: 6 additions & 11 deletions adfpy/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from typing import List, Union

from azure.mgmt.datafactory.models import (
from azure.mgmt.datafactory.models import ( # type: ignore
ScheduleTrigger,
ScheduleTriggerRecurrence,
PipelineReference,
Expand Down Expand Up @@ -88,7 +88,7 @@ def to_adf(self):
)
return tr_properties

def convert_preset_expression_to_adf(self, schedule):
def convert_preset_expression_to_adf(self, schedule: str) -> ScheduleTriggerRecurrence:
if schedule not in self.preset_expressions_mapping:
raise ValueError(f"Expression {schedule} is not in the predefined expressions mapping")
mapped_frequency = self.preset_expressions_mapping[schedule]
Expand All @@ -103,7 +103,7 @@ def convert_preset_expression_to_adf(self, schedule):
start_time=self.start_time,
time_zone=self.time_zone)

def _convert_cron_to_adf(self):
def _convert_cron_to_adf(self) -> ScheduleTriggerRecurrence:
"""
Basic recurrence options: minutes/hours
Advanced recurrence options: days/weeks
Expand All @@ -113,23 +113,18 @@ def _convert_cron_to_adf(self):
Convert a cron-like string to a set of objects understood by adf
@hourly
@daily
@weekly
@monthly
@yearly
Returns:
"""
if self.schedule in self.preset_expressions_mapping:
return self.convert_preset_expression_to_adf(self.schedule)
else:
cron_components = self.schedule.split(" ")
if len(cron_components) != 5:
raw_cron_components = self.schedule.split(" ")
if len(raw_cron_components) != 5:
raise InvalidCronExpressionError(f"The provided cron expression: {self.schedule} has the wrong number "
f"of components. There should be 5")
cron_components = AdfCronExpression(*cron_components)
cron_components = AdfCronExpression(*raw_cron_components)

if cron_components.day_of_week == "*":
if cron_components.day_of_month == "*":
Expand Down
Loading

0 comments on commit 9b7f449

Please sign in to comment.