Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions application_sdk/activities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import os
from abc import ABC
from datetime import datetime, timedelta
from typing import Any, Dict, Generic, Optional, TypeVar

from pydantic import BaseModel
Expand Down Expand Up @@ -62,6 +63,7 @@ class ActivitiesState(BaseModel, Generic[HandlerType]):
model_config = {"arbitrary_types_allowed": True}
handler: Optional[HandlerType] = None
workflow_args: Optional[Dict[str, Any]] = None
last_updated_timestamp: Optional[datetime] = None


ActivitiesStateType = TypeVar("ActivitiesStateType", bound=ActivitiesState)
Expand Down Expand Up @@ -142,6 +144,15 @@ async def _get_state(self, workflow_args: Dict[str, Any]) -> ActivitiesStateType
workflow_id = get_workflow_id()
if workflow_id not in self._state:
await self._set_state(workflow_args)

if workflow_id in self._state:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put this under an else block

current_timestamp = datetime.now()
# if difference of current_timestamp and last_updated_timestamp is greater than 15 minutes, then again _set_state
last_updated = self._state[workflow_id].last_updated_timestamp
if last_updated and current_timestamp - last_updated > timedelta(
minutes=15
):
await self._set_state(workflow_args)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Bug

The ActivitiesInterface._set_state method doesn't set the last_updated_timestamp field. This prevents the 15-minute state refresh logic in _get_state from ever triggering for activities using the base class, leading to inconsistent refresh behavior across different activity implementations.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhishekagrawal-atlan please take a loo

return self._state[workflow_id]
except OrchestratorError as e:
logger.error(
Expand Down
3 changes: 3 additions & 0 deletions application_sdk/activities/metadata_extraction/sql.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -68,6 +69,7 @@ class BaseSQLMetadataExtractionActivitiesState(ActivitiesState):
sql_client: Optional[BaseSQLClient] = None
handler: Optional[BaseSQLHandler] = None
transformer: Optional[TransformerInterface] = None
last_updated_timestamp: Optional[datetime] = None


class BaseSQLMetadataExtractionActivities(ActivitiesInterface):
Expand Down Expand Up @@ -177,6 +179,7 @@ async def _set_state(self, workflow_args: Dict[str, Any]):
self._state[workflow_id].sql_client = sql_client
handler = self.handler_class(sql_client)
self._state[workflow_id].handler = handler
self._state[workflow_id].last_updated_timestamp = datetime.now()

# Create transformer with required parameters from ApplicationConstants
transformer_params = {
Expand Down
Loading