-
-
Notifications
You must be signed in to change notification settings - Fork 102
Gathering slack messages from conversations #1513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Dishant1804
wants to merge
28
commits into
OWASP:main
Choose a base branch
from
Dishant1804:message_model
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
96305ff
message model and message fetching
Dishant1804 ddfeb01
checks and lints
Dishant1804 bdd2e59
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 18fce4e
tests and command fix
Dishant1804 86b09a2
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 c731b5f
spellin fixed
Dishant1804 fed1d2d
Merge branch 'main' into message_model
Dishant1804 243c2d7
Merge branch 'main' into message_model
Dishant1804 d06a386
Merge branch 'main' into message_model
Dishant1804 1c4a5ff
Merge branch 'main' into message_model
Dishant1804 4ab44a3
message model with updated fetch
Dishant1804 7a0e239
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 ebc9009
changes and code rabbit suggestions
Dishant1804 6b2554c
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 9c737e4
Merge branch 'main' into message_model
Dishant1804 0d67c1f
Merge branch 'main' into message_model
Dishant1804 2182427
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 9ba5e9f
Update code
arkid15r 0f9dd70
suggestions implemented
Dishant1804 02db4ff
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 a530ae5
chunk model along with embeddings
Dishant1804 93e1a5b
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 46076df
pre commit
Dishant1804 4cb8d97
cspell checks
Dishant1804 532b104
code rabbit suggestions
Dishant1804 fdf7936
Merge branch 'main' into message_model
Dishant1804 2bed7e0
removed files related to chunking
Dishant1804 78f6200
Merge remote-tracking branch 'upstream/main' into message_model
Dishant1804 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
312 changes: 312 additions & 0 deletions
312
backend/apps/slack/management/commands/slack_sync_messages.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,312 @@ | ||
"""A command to populate Slack messages data for all conversations.""" | ||
|
||
import logging | ||
import time | ||
|
||
from django.core.management.base import BaseCommand | ||
from slack_sdk import WebClient | ||
from slack_sdk.errors import SlackApiError | ||
|
||
from apps.slack.models import Conversation, Member, Message, Workspace | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Populate messages for all Slack conversations" | ||
|
||
def add_arguments(self, parser): | ||
"""Define command line arguments.""" | ||
parser.add_argument( | ||
"--batch-size", | ||
type=int, | ||
default=200, | ||
help="Number of messages to retrieve per request", | ||
) | ||
parser.add_argument( | ||
"--delay", | ||
type=float, | ||
default=0.5, | ||
help="Delay between API requests in seconds", | ||
) | ||
parser.add_argument( | ||
"--channel-id", | ||
type=str, | ||
help="Specific channel ID to fetch messages from", | ||
) | ||
|
||
def handle(self, *args, **options): | ||
batch_size = options["batch_size"] | ||
channel_id = options["channel_id"] | ||
delay = options["delay"] | ||
include_threads = True | ||
|
||
workspaces = Workspace.objects.all() | ||
if not workspaces.exists(): | ||
self.stdout.write(self.style.WARNING("No workspaces found in the database")) | ||
return | ||
|
||
for workspace in workspaces: | ||
self.stdout.write(f"\nProcessing workspace: {workspace.name}") | ||
|
||
if not (bot_token := workspace.bot_token): | ||
self.stdout.write(self.style.ERROR(f"No bot token found for {workspace}")) | ||
continue | ||
|
||
client = WebClient(token=bot_token) | ||
|
||
conversations = ( | ||
Conversation.objects.filter(slack_channel_id=channel_id) | ||
if channel_id | ||
else Conversation.objects.filter(workspace=workspace) | ||
) | ||
|
||
for conversation in conversations: | ||
self._fetch_messages_for_conversation( | ||
batch_size=batch_size, | ||
client=client, | ||
conversation=conversation, | ||
delay=delay, | ||
include_threads=include_threads, | ||
) | ||
|
||
self.stdout.write(self.style.SUCCESS("\nFinished processing all workspaces")) | ||
|
||
def _fetch_messages_for_conversation( | ||
self, | ||
client: WebClient, | ||
conversation: Conversation, | ||
batch_size: int, | ||
delay: float, | ||
*, | ||
include_threads: bool, | ||
): | ||
"""Fetch messages for a single conversation from its beginning.""" | ||
self.stdout.write(f"\nProcessing channel: {conversation.name}") | ||
|
||
try: | ||
parent_messages = self._fetch_parent_messages( | ||
client=client, conversation=conversation, batch_size=batch_size, delay=delay | ||
) | ||
|
||
if include_threads: | ||
self._fetch_thread_replies( | ||
client=client, | ||
conversation=conversation, | ||
parent_messages=parent_messages, | ||
delay=delay, | ||
) | ||
|
||
self.stdout.write( | ||
self.style.SUCCESS(f"Finished processing messages from {conversation.name}") | ||
) | ||
|
||
except SlackApiError as e: | ||
self.stdout.write( | ||
self.style.ERROR( | ||
f"Failed to fetch messages for {conversation.name}: {e.response['error']}" | ||
) | ||
) | ||
|
||
def _fetch_parent_messages( | ||
self, client: WebClient, conversation: Conversation, batch_size: int, delay: float | ||
) -> list[Message]: | ||
"""Fetch all parent messages (non-thread) for a conversation.""" | ||
cursor = None | ||
has_more = True | ||
batch_messages = [] | ||
all_threaded_parents = [] | ||
|
||
last_message = ( | ||
Message.objects.filter(conversation=conversation).order_by("-timestamp").first() | ||
) | ||
oldest = last_message.timestamp.timestamp() if last_message else None | ||
|
||
while has_more: | ||
try: | ||
response = client.conversations_history( | ||
channel=conversation.slack_channel_id, | ||
cursor=cursor, | ||
limit=batch_size, | ||
oldest=oldest, | ||
) | ||
self._handle_slack_response(response, "conversations_history") | ||
|
||
for message_data in response.get("messages", []): | ||
if message_data.get("thread_ts") and message_data.get( | ||
"ts" | ||
) != message_data.get("thread_ts"): | ||
continue | ||
|
||
message = self._create_message_from_data( | ||
client=client, | ||
message_data=message_data, | ||
conversation=conversation, | ||
is_thread_reply=False, | ||
parent_message=None, | ||
) | ||
|
||
if message: | ||
batch_messages.append(message) | ||
if message.is_thread_parent: | ||
all_threaded_parents.append(message) | ||
|
||
if batch_messages: | ||
Message.bulk_save(batch_messages) | ||
batch_messages = [] | ||
|
||
cursor = response.get("response_metadata", {}).get("next_cursor") | ||
has_more = bool(cursor) | ||
|
||
if delay and has_more: | ||
time.sleep(delay) | ||
|
||
except SlackApiError as e: | ||
self.stdout.write( | ||
self.style.ERROR(f"Error fetching messages: {e.response['error']}") | ||
) | ||
break | ||
|
||
return all_threaded_parents | ||
|
||
def _fetch_thread_replies( | ||
self, | ||
client: WebClient, | ||
conversation: Conversation, | ||
parent_messages: list[Message], | ||
delay: float, | ||
): | ||
"""Fetch all thread replies for parent messages.""" | ||
if not parent_messages: | ||
return | ||
|
||
replies_to_save = [] | ||
|
||
for parent_message in parent_messages: | ||
try: | ||
latest_reply = ( | ||
Message.objects.filter( | ||
conversation=conversation, | ||
parent_message=parent_message, | ||
) | ||
.order_by("-timestamp") | ||
.first() | ||
) | ||
oldest_ts = latest_reply.timestamp.timestamp() if latest_reply else None | ||
|
||
cursor = None | ||
has_more = True | ||
thread_reply_count = 0 | ||
|
||
while has_more: | ||
params = { | ||
"channel": conversation.slack_channel_id, | ||
"ts": parent_message.slack_message_id, | ||
"cursor": cursor, | ||
"limit": 100, | ||
"inclusive": True, | ||
} | ||
if oldest_ts: | ||
params["oldest"] = str(oldest_ts) | ||
|
||
response = client.conversations_replies(**params) | ||
self._handle_slack_response(response, "conversations_replies") | ||
|
||
messages_in_response = response.get("messages", []) | ||
if not messages_in_response: | ||
break | ||
|
||
for reply_data in messages_in_response[1:]: | ||
reply = self._create_message_from_data( | ||
client=client, | ||
message_data=reply_data, | ||
conversation=conversation, | ||
is_thread_reply=True, | ||
parent_message=parent_message, | ||
) | ||
if reply: | ||
replies_to_save.append(reply) | ||
thread_reply_count += 1 | ||
|
||
cursor = response.get("response_metadata", {}).get("next_cursor") | ||
has_more = bool(cursor) | ||
|
||
if delay and has_more: | ||
time.sleep(delay) | ||
|
||
except SlackApiError: | ||
self.stdout.write(self.style.ERROR("Failed to fetch thread replies for message")) | ||
|
||
if replies_to_save: | ||
batch_size = 1000 | ||
for i in range(0, len(replies_to_save), batch_size): | ||
batch = replies_to_save[i : i + batch_size] | ||
Message.bulk_save(batch) | ||
|
||
def _create_message_from_data( | ||
Dishant1804 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, | ||
client: WebClient, | ||
Dishant1804 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
message_data: dict, | ||
conversation: Conversation, | ||
*, | ||
is_thread_reply: bool, | ||
parent_message: Message | None = None, | ||
) -> Message | None: | ||
"""Create Message instance using from_slack pattern.""" | ||
try: | ||
if message_data.get("subtype") in ["channel_join", "channel_leave", "bot_message"]: | ||
return None | ||
if not any( | ||
[ | ||
message_data.get("text"), | ||
message_data.get("attachments"), | ||
message_data.get("files"), | ||
message_data.get("blocks"), | ||
] | ||
): | ||
return None | ||
|
||
slack_user_id = message_data.get("user") or message_data.get("bot_id") | ||
if not slack_user_id: | ||
return None | ||
|
||
try: | ||
author = Member.objects.get( | ||
slack_user_id=slack_user_id, workspace=conversation.workspace | ||
) | ||
except Member.DoesNotExist: | ||
Dishant1804 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
user_info = client.users_info(user=slack_user_id) | ||
self._handle_slack_response(user_info, "users_info") | ||
|
||
user_data = user_info["user"] | ||
author = Member.update_data(user_data, conversation.workspace, save=True) | ||
|
||
self.stdout.write(self.style.SUCCESS(f"Created new member: {slack_user_id}")) | ||
except SlackApiError as e: | ||
self.stdout.write( | ||
self.style.WARNING( | ||
f"Failed to fetch user data for {slack_user_id}: {e.response['error']}" | ||
) | ||
) | ||
return None | ||
|
||
return Message.update_data( | ||
data=message_data, | ||
conversation=conversation, | ||
author=author, | ||
is_thread_reply=is_thread_reply, | ||
parent_message=parent_message, | ||
save=False, | ||
) | ||
|
||
except Exception: | ||
logger.exception("Error creating message from data") | ||
return None | ||
Dishant1804 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _handle_slack_response(self, response, api_method): | ||
"""Handle Slack API response and raise exception if needed.""" | ||
if not response["ok"]: | ||
error_message = f"{api_method} API call failed" | ||
logger.error(error_message) | ||
self.stdout.write(self.style.ERROR(error_message)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.