Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airflow] Extend rule to check class attributes, methods, arguments (AIR302) #15083

Merged
merged 12 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from airflow.plugins_manager import AirflowPlugin


class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
# --- Invalid extensions start
operators = [PluginOperator]
sensors = [PluginSensorOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
# --- Invalid extensions end
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
global_operator_extra_links = [
AirflowLink(),
GithubLink(),
]
operator_extra_links = [
GoogleLink(),
AirflowLink2(),
CustomOpLink(),
CustomBaseIndexOpLink(1),
]
timetables = [CustomCronDataIntervalTimetable]
listeners = [empty_listener, ClassBasedListener()]
ti_deps = [CustomTestTriggerRule()]
priority_weight_strategies = [CustomPriorityWeightStrategy]
34 changes: 24 additions & 10 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR302_args.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from airflow import DAG, dag
from airflow.timetables.simple import NullTimetable

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators import trigger_dagrun
from datetime import timedelta

from airflow import DAG, dag
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.providers.standard.operators import datetime

from airflow.sensors.weekday import DayOfWeekSensor, BranchDayOfWeekOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.providers.standard.operators import datetime, trigger_dagrun
from airflow.providers.standard.sensors import weekday
from airflow.sensors.weekday import BranchDayOfWeekOperator, DayOfWeekSensor
from airflow.timetables.simple import NullTimetable

DAG(dag_id="class_schedule", schedule="@hourly")

Expand Down Expand Up @@ -54,10 +57,12 @@ def decorator_deprecated_operator_args():
)

branch_dt_op = datetime.BranchDateTimeOperator(
task_id="branch_dt_op", use_task_execution_day=True
task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
)
branch_dt_op2 = BranchDateTimeOperator(
task_id="branch_dt_op2", use_task_execution_day=True
task_id="branch_dt_op2",
use_task_execution_day=True,
sla=timedelta(seconds=10),
)

dof_task_sensor = weekday.DayOfWeekSensor(
Expand All @@ -76,3 +81,12 @@ def decorator_deprecated_operator_args():
branch_dt_op >> branch_dt_op2
dof_task_sensor >> dof_task_sensor2
bdow_op >> bdow_op2


# deprecated filename_template arugment in FileTaskHandler
S3TaskHandler(filename_template="/tmp/test")
HdfsTaskHandler(filename_template="/tmp/test")
ElasticsearchTaskHandler(filename_template="/tmp/test")
GCSTaskHandler(filename_template="/tmp/test")

FabAuthManager(None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from airflow.datasets.manager import DatasetManager
from airflow.lineage.hook import DatasetLineageInfo, HookLineageCollector
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.apache.beam.hooks import BeamHook, NotAir302HookError
from airflow.providers.google.cloud.secrets.secret_manager import (
CloudSecretManagerBackend,
)
from airflow.providers.hashicorp.secrets.vault import NotAir302SecretError, VaultBackend
from airflow.providers_manager import ProvidersManager
from airflow.secrets.base_secrets import BaseSecretsBackend

dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()

hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()

aam = AwsAuthManager()
aam.is_authorized_dataset()

pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories

base_secret_backend = BaseSecretsBackend()
base_secret_backend.get_conn_uri()
base_secret_backend.get_connections()

csm_backend = CloudSecretManagerBackend()
csm_backend.get_conn_uri()
csm_backend.get_connections()

vault_backend = VaultBackend()
vault_backend.get_conn_uri()
vault_backend.get_connections()

not_an_error = NotAir302SecretError()
not_an_error.get_conn_uri()

beam_hook = BeamHook()
beam_hook.get_conn_uri()

not_an_error = NotAir302HookError()
not_an_error.get_conn_uri()

provider_manager = ProvidersManager()
provider_manager.dataset_factories
provider_manager.dataset_uri_handlers
provider_manager.dataset_to_openlineage_converters

dl_info = DatasetLineageInfo()
dl_info.dataset
138 changes: 74 additions & 64 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR302_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,29 @@
DatasetAny,
expand_alias_to_datasets,
)
from airflow.datasets.metadata import Metadata
from airflow.datasets.manager import (
DatasetManager,
dataset_manager,
resolve_dataset_manager,
)
from airflow.datasets.metadata import Metadata
from airflow.hooks.base_hook import BaseHook
from airflow.lineage.hook import DatasetLineageInfo
from airflow.listeners.spec.dataset import on_dataset_changed, on_dataset_created
from airflow.metrics.validators import AllowListValidator, BlockListValidator
from airflow.operators import dummy_operator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.branch_operator import BaseBranchOperator
from airflow.operators.dagrun_operator import TriggerDagRunLink, TriggerDagRunOperator
from airflow.operators.dummy import DummyOperator, EmptyOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.operators.python_operator import (
BranchPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
ShortCircuitOperator,
)
from airflow.operators.subdag import SubDagOperator
from airflow.providers.amazon.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.datasets import s3
Expand Down Expand Up @@ -85,7 +94,7 @@
scale_time_units,
)
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory, mkdirs
from airflow.utils.file import mkdirs
from airflow.utils.helpers import chain, cross_downstream
from airflow.utils.state import SHUTDOWN, terminating_states
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -94,61 +103,93 @@

# airflow root
PY36, PY37, PY38, PY39, PY310, PY311, PY312
DatasetFromRoot
DatasetFromRoot()

dataset_from_root = DatasetFromRoot()
dataset_from_root.iter_datasets()
dataset_from_root.iter_dataset_aliases()

# airflow.api_connexion.security
requires_access, requires_access_dataset

# airflow.auth.managers
is_authorized_dataset
DatasetDetails
DatasetDetails()

# airflow.configuration
get, getboolean, getfloat, getint, has_option, remove_option, as_dict, set


# airflow.contrib.*
AWSAthenaHook
AWSAthenaHook()

# airflow.datasets
Dataset
DatasetAlias
DatasetAliasEvent
DatasetAll
DatasetAny
Dataset()
DatasetAlias()
DatasetAliasEvent()
DatasetAll()
DatasetAny()
expand_alias_to_datasets
Metadata
Metadata()

dataset_to_test_method_call = Dataset()
dataset_to_test_method_call.iter_datasets()
dataset_to_test_method_call.iter_dataset_aliases()

alias_to_test_method_call = DatasetAlias()
alias_to_test_method_call.iter_datasets()
alias_to_test_method_call.iter_dataset_aliases()

any_to_test_method_call = DatasetAny()
any_to_test_method_call.iter_datasets()
any_to_test_method_call.iter_dataset_aliases()

# airflow.datasets.manager
DatasetManager, dataset_manager, resolve_dataset_manager
DatasetManager(), dataset_manager, resolve_dataset_manager

# airflow.hooks
BaseHook()

# airflow.lineage.hook
DatasetLineageInfo
DatasetLineageInfo()

# airflow.listeners.spec.dataset
on_dataset_changed, on_dataset_created

# airflow.metrics.validators
AllowListValidator, BlockListValidator
AllowListValidator(), BlockListValidator()

# airflow.operators.dummy_operator
dummy_operator.EmptyOperator
dummy_operator.DummyOperator
dummy_operator.EmptyOperator()
dummy_operator.DummyOperator()

# airflow.operators.bash_operator
BashOperator
BashOperator()

# airflow.operators.branch_operator
BaseBranchOperator
BaseBranchOperator()

# airflow.operators.dagrun_operator
TriggerDagRunLink()
TriggerDagRunOperator()

# airflow.operators.dummy
EmptyOperator, DummyOperator
EmptyOperator(), DummyOperator()

# airflow.operators.email_operator
EmailOperator
EmailOperator()

# airflow.operators.latest_only_operator
LatestOnlyOperator()

# airflow.operators.python_operator
BranchPythonOperator()
PythonOperator()
PythonVirtualenvOperator()
ShortCircuitOperator()

# airflow.operators.subdag.*
SubDagOperator
SubDagOperator()

# airflow.providers.amazon
AvpEntities.DATASET
Expand All @@ -175,7 +216,7 @@
mysql.sanitize_uri

# airflow.providers.openlineage
DatasetInfo, translate_airflow_dataset
DatasetInfo(), translate_airflow_dataset

# airflow.providers.postgres
postgres.sanitize_uri
Expand All @@ -190,28 +231,28 @@
RESOURCE_DATASET

# airflow.sensors.base_sensor_operator
BaseSensorOperator
BaseSensorOperator()

# airflow.sensors.date_time_sensor
DateTimeSensor
DateTimeSensor()

# airflow.sensors.external_task
ExternalTaskSensorLinkFromExternalTask
ExternalTaskSensorLinkFromExternalTask()

# airflow.sensors.external_task_sensor
ExternalTaskMarker
ExternalTaskSensor
ExternalTaskSensorLinkFromExternalTaskSensor
ExternalTaskMarker()
ExternalTaskSensor()
ExternalTaskSensorLinkFromExternalTaskSensor()

# airflow.sensors.time_delta_sensor
TimeDeltaSensor
TimeDeltaSensor()

# airflow.timetables
DatasetOrTimeSchedule
DatasetTriggeredTimetable
DatasetOrTimeSchedule()
DatasetTriggeredTimetable()

# airflow.triggers.external_task
TaskStateTrigger
TaskStateTrigger()

# airflow.utils.date
dates.date_range
Expand All @@ -235,7 +276,7 @@
apply_defaults

# airflow.utils.file
TemporaryDirectory, mkdirs
TemporaryDirector(), mkdirs

# airflow.utils.helpers
chain, cross_downstream
Expand All @@ -253,34 +294,3 @@

# airflow.www.utils
get_sensitive_variables_fields, should_hide_value_for_key

from airflow.datasets.manager import DatasetManager

dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()


from airflow.lineage.hook import HookLineageCollector

hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()


from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager

aam = AwsAuthManager()
aam.is_authorized_dataset()


from airflow.providers_manager import ProvidersManager

pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories
3 changes: 3 additions & 0 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
);
}
}
if checker.enabled(Rule::Airflow3Removal) {
airflow::rules::removed_in_3(checker, expr);
}
if checker.enabled(Rule::MixedCaseVariableInGlobalScope) {
if matches!(checker.semantic.current_scope().kind, ScopeKind::Module) {
pep8_naming::rules::mixed_case_variable_in_global_scope(
Expand Down
Loading
Loading