Skip to content

Commit

Permalink
add airrflow dag trigger to deactivate search job
Browse files Browse the repository at this point in the history
  • Loading branch information
hanars committed Mar 3, 2025
1 parent 36a50fa commit b28e3ff
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
15 changes: 13 additions & 2 deletions seqr/management/tests/deactivate_project_search_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@

from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command
from django.test import TestCase
from django.core.management.base import CommandError
from seqr.models import Sample
from seqr.views.utils.test_utils import AirflowTestCase

PROJECT_GUID = 'R0001_1kg'
VARIANT_ID = '21-3343353-GAGA-G'


class DeactivateProjectSearchTest(TestCase):
class DeactivateProjectSearchTest(AirflowTestCase):
fixtures = ['users', '1kg_project']

DAG_NAME = 'DELETE_PROJECTS'
DAG_VARIABLES = {
'projects_to_run': [PROJECT_GUID],
'dataset_type': 'SNV_INDEL',
'reference_genome': 'GRCh37',
}

@mock.patch('seqr.management.commands.deactivate_project_search.input')
@mock.patch('seqr.management.commands.deactivate_project_search.logger')
def test_command(self, mock_logger, mock_input):
Expand All @@ -35,7 +42,11 @@ def test_command(self, mock_logger, mock_input):

active_samples = Sample.objects.filter(individual__family__project__guid=PROJECT_GUID, is_active=True)
self.assertEqual(active_samples.count(), 0)
self.assert_airflow_calls(self.DAG_VARIABLES, 5)

# Re-running has no effect
call_command('deactivate_project_search', PROJECT_GUID)
mock_logger.info.assert_called_with('Deactivated 0 samples')

def _add_update_check_dag_responses(self):
return self._add_check_dag_variable_responses(self.DAG_VARIABLES)
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,16 @@ def set_up_one_dag(self, **kwargs):
dataset_type = kwargs.pop('dataset_type', 'MITO')
super().set_up_one_dag(dataset_type=dataset_type, **kwargs)

def _get_dag_variables(self, dataset_type):
def _get_dag_variables(self, dataset_type, **kwargs):
return {
'projects_to_run': [self.PROJECT_GUID],
'family_guids': ['F000002_2'],
'reference_genome': 'GRCh37',
'dataset_type': dataset_type
}

def _add_update_check_dag_responses(self, status=200, **kwargs):
# get variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': '{}'
}, status=status)
# get variables again if the response of the previous request didn't include the updated variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': json.dumps(self._get_dag_variables(**kwargs))
}, status=status)
def _add_update_check_dag_responses(self, **kwargs):
return self._add_check_dag_variable_responses(self._get_dag_variables(**kwargs), **kwargs)

def assert_airflow_delete_families_calls(self):
self._assert_call_counts(13)
Expand Down
18 changes: 4 additions & 14 deletions seqr/views/apis/data_manager_api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2118,20 +2118,10 @@ def _test_validate_dataset_type(self, url):
self._set_file_not_found()

def _add_update_check_dag_responses(self, variables=None, **kwargs):
if not variables:
super()._add_update_check_dag_responses(**kwargs)
return

# get variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': '{}'
})
# get variables again if the response of the previous request didn't include the updated variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': json.dumps(variables)
})
if variables:
return self._add_check_dag_variable_responses(variables)

return super()._add_update_check_dag_responses(**kwargs)

def _assert_update_check_airflow_calls(self, call_count, offset, update_check_path):
if self.DAG_NAME != 'LOADING_PIPELINE':
Expand Down
12 changes: 12 additions & 0 deletions seqr/views/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,18 @@ def _add_dag_tasks_response(self, projects):
'tasks': tasks, 'total_entries': len(tasks),
})

def _add_check_dag_variable_responses(self, variables, status=200, **kwargs):
# get variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': '{}'
}, status=status)
# get variables again if the response of the previous request didn't include the updated variables
responses.add(responses.GET, f'{self.MOCK_AIRFLOW_URL}/api/v1/variables/{self.DAG_NAME}', json={
'key': self.DAG_NAME,
'value': json.dumps(variables)
}, status=status)

def set_dag_trigger_error_response(self, status=200):
responses.replace(responses.GET, f'{self._dag_url}/dagRuns?execution_date_gte=2022-04-24T04:17:10Z', status=status, json={'dag_runs': [{
'conf': {},
Expand Down

0 comments on commit b28e3ff

Please sign in to comment.