Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airflow] Extend rule to check class attributes, methods, arguments (AIR302) #15083

Merged
merged 12 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from airflow.plugins_manager import AirflowPlugin


class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
# --- Invalid extensions start
operators = [PluginOperator]
sensors = [PluginSensorOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
# --- Invalid extensions end
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
global_operator_extra_links = [
AirflowLink(),
GithubLink(),
]
operator_extra_links = [
GoogleLink(),
AirflowLink2(),
CustomOpLink(),
CustomBaseIndexOpLink(1),
]
timetables = [CustomCronDataIntervalTimetable]
listeners = [empty_listener, ClassBasedListener()]
ti_deps = [CustomTestTriggerRule()]
priority_weight_strategies = [CustomPriorityWeightStrategy]
34 changes: 24 additions & 10 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR302_args.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from airflow import DAG, dag
from airflow.timetables.simple import NullTimetable

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.standard.operators import trigger_dagrun
from datetime import timedelta

from airflow import DAG, dag
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.providers.standard.operators import datetime

from airflow.sensors.weekday import DayOfWeekSensor, BranchDayOfWeekOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.providers.standard.operators import datetime, trigger_dagrun
from airflow.providers.standard.sensors import weekday
from airflow.sensors.weekday import BranchDayOfWeekOperator, DayOfWeekSensor
from airflow.timetables.simple import NullTimetable

DAG(dag_id="class_schedule", schedule="@hourly")

Expand Down Expand Up @@ -54,10 +57,12 @@ def decorator_deprecated_operator_args():
)

branch_dt_op = datetime.BranchDateTimeOperator(
task_id="branch_dt_op", use_task_execution_day=True
task_id="branch_dt_op", use_task_execution_day=True, task_concurrency=5
)
branch_dt_op2 = BranchDateTimeOperator(
task_id="branch_dt_op2", use_task_execution_day=True
task_id="branch_dt_op2",
use_task_execution_day=True,
sla=timedelta(seconds=10),
)

dof_task_sensor = weekday.DayOfWeekSensor(
Expand All @@ -76,3 +81,12 @@ def decorator_deprecated_operator_args():
branch_dt_op >> branch_dt_op2
dof_task_sensor >> dof_task_sensor2
bdow_op >> bdow_op2


# deprecated filename_template arugment in FileTaskHandler
S3TaskHandler(filename_template="/tmp/test")
HdfsTaskHandler(filename_template="/tmp/test")
ElasticsearchTaskHandler(filename_template="/tmp/test")
GCSTaskHandler(filename_template="/tmp/test")

FabAuthManager(None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from airflow.datasets.manager import DatasetManager
from airflow.lineage.hook import DatasetLineageInfo, HookLineageCollector
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.apache.beam.hooks import BeamHook, NotAir302HookError
from airflow.providers.google.cloud.secrets.secret_manager import (
CloudSecretManagerBackend,
)
from airflow.providers.hashicorp.secrets.vault import NotAir302SecretError, VaultBackend
from airflow.providers_manager import ProvidersManager
from airflow.secrets.base_secrets import BaseSecretsBackend

dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()

hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()

aam = AwsAuthManager()
aam.is_authorized_dataset()

pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories

base_secret_backend = BaseSecretsBackend()
base_secret_backend.get_conn_uri()
base_secret_backend.get_connections()

csm_backend = CloudSecretManagerBackend()
csm_backend.get_conn_uri()
csm_backend.get_connections()

vault_backend = VaultBackend()
vault_backend.get_conn_uri()
vault_backend.get_connections()

not_an_error = NotAir302SecretError()
not_an_error.get_conn_uri()

beam_hook = BeamHook()
beam_hook.get_conn_uri()

not_an_error = NotAir302HookError()
not_an_error.get_conn_uri()

provider_manager = ProvidersManager()
provider_manager.dataset_factories
provider_manager.dataset_uri_handlers
provider_manager.dataset_to_openlineage_converters

dl_info = DatasetLineageInfo()
dl_info.dataset
51 changes: 20 additions & 31 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR302_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
dataset_manager,
resolve_dataset_manager,
)
from airflow.hooks.base_hook import BaseHook
from airflow.lineage.hook import DatasetLineageInfo
from airflow.listeners.spec.dataset import on_dataset_changed, on_dataset_created
from airflow.metrics.validators import AllowListValidator, BlockListValidator
Expand Down Expand Up @@ -96,6 +97,10 @@
PY36, PY37, PY38, PY39, PY310, PY311, PY312
DatasetFromRoot

dataset_from_root = DatasetFromRoot()
dataset_from_root.iter_datasets()
dataset_from_root.iter_dataset_aliases()

# airflow.api_connexion.security
requires_access, requires_access_dataset

Expand All @@ -119,9 +124,24 @@
expand_alias_to_datasets
Metadata

dataset_to_test_method_call = Dataset()
dataset_to_test_method_call.iter_datasets()
dataset_to_test_method_call.iter_dataset_aliases()

alias_to_test_method_call = DatasetAlias()
alias_to_test_method_call.iter_datasets()
alias_to_test_method_call.iter_dataset_aliases()

any_to_test_method_call = DatasetAny()
any_to_test_method_call.iter_datasets()
any_to_test_method_call.iter_dataset_aliases()

# airflow.datasets.manager
DatasetManager, dataset_manager, resolve_dataset_manager

# airflow.hooks
BaseHook()

# airflow.lineage.hook
DatasetLineageInfo

Expand Down Expand Up @@ -253,34 +273,3 @@

# airflow.www.utils
get_sensitive_variables_fields, should_hide_value_for_key

from airflow.datasets.manager import DatasetManager

dm = DatasetManager()
dm.register_dataset_change()
dm.create_datasets()
dm.notify_dataset_created()
dm.notify_dataset_changed()
dm.notify_dataset_alias_created()


from airflow.lineage.hook import HookLineageCollector

hlc = HookLineageCollector()
hlc.create_dataset()
hlc.add_input_dataset()
hlc.add_output_dataset()
hlc.collected_datasets()


from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager

aam = AwsAuthManager()
aam.is_authorized_dataset()


from airflow.providers_manager import ProvidersManager

pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories
3 changes: 3 additions & 0 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
);
}
}
if checker.enabled(Rule::Airflow3Removal) {
airflow::rules::removed_in_3(checker, expr);
}
if checker.enabled(Rule::MixedCaseVariableInGlobalScope) {
if matches!(checker.semantic.current_scope().kind, ScopeKind::Module) {
pep8_naming::rules::mixed_case_variable_in_global_scope(
Expand Down
2 changes: 2 additions & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod tests {
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_args.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_names.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_class_attribute.py"))]
#[test_case(Rule::Airflow3Removal, Path::new("AIR302_airflow_plugin.py"))]
#[test_case(Rule::Airflow3MovedToProvider, Path::new("AIR303.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
Expand Down
Loading
Loading