diff --git a/application_sdk/activities/__init__.py b/application_sdk/activities/__init__.py index 223a592d9..61c8c44aa 100644 --- a/application_sdk/activities/__init__.py +++ b/application_sdk/activities/__init__.py @@ -15,6 +15,7 @@ import os from abc import ABC +from datetime import datetime, timedelta from typing import Any, Dict, Generic, Optional, TypeVar from pydantic import BaseModel @@ -62,6 +63,7 @@ class ActivitiesState(BaseModel, Generic[HandlerType]): model_config = {"arbitrary_types_allowed": True} handler: Optional[HandlerType] = None workflow_args: Optional[Dict[str, Any]] = None + last_updated_timestamp: Optional[datetime] = None ActivitiesStateType = TypeVar("ActivitiesStateType", bound=ActivitiesState) @@ -142,6 +144,15 @@ async def _get_state(self, workflow_args: Dict[str, Any]) -> ActivitiesStateType workflow_id = get_workflow_id() if workflow_id not in self._state: await self._set_state(workflow_args) + + if workflow_id in self._state: + current_timestamp = datetime.now() + # if difference of current_timestamp and last_updated_timestamp is greater than 15 minutes, then again _set_state + last_updated = self._state[workflow_id].last_updated_timestamp + if last_updated and current_timestamp - last_updated > timedelta( + minutes=15 + ): + await self._set_state(workflow_args) return self._state[workflow_id] except OrchestratorError as e: logger.error( diff --git a/application_sdk/activities/metadata_extraction/sql.py b/application_sdk/activities/metadata_extraction/sql.py index dda62d193..ae0d0a3a1 100644 --- a/application_sdk/activities/metadata_extraction/sql.py +++ b/application_sdk/activities/metadata_extraction/sql.py @@ -1,4 +1,5 @@ import os +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -68,6 +69,7 @@ class BaseSQLMetadataExtractionActivitiesState(ActivitiesState): sql_client: Optional[BaseSQLClient] = None handler: Optional[BaseSQLHandler] = None transformer: Optional[TransformerInterface] = None + last_updated_timestamp: Optional[datetime] = None class BaseSQLMetadataExtractionActivities(ActivitiesInterface): @@ -177,6 +179,7 @@ async def _set_state(self, workflow_args: Dict[str, Any]): self._state[workflow_id].sql_client = sql_client handler = self.handler_class(sql_client) self._state[workflow_id].handler = handler + self._state[workflow_id].last_updated_timestamp = datetime.now() # Create transformer with required parameters from ApplicationConstants transformer_params = {