Skip to content

Commit

Permalink
Retrieve messages using thread API
Browse files Browse the repository at this point in the history
  • Loading branch information
rasa committed Apr 19, 2022
1 parent 80f27b6 commit 1c75395
Showing 1 changed file with 131 additions and 20 deletions.
151 changes: 131 additions & 20 deletions simplegmail/gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ def send_message(
)

try:
if self._mode == self.MESSAGE_MODE:
api = self.service.users().messages()
else:
api = self.service.users().threads()
res = api.send(userId='me', body=msg).execute()
res = self.service.users().messages().send(userId='me', body=msg).execute()
return self._build_message_from_ref(user_id, res, 'reference')

except HttpError as error:
Expand Down Expand Up @@ -527,21 +523,34 @@ def get_messages(
includeSpamTrash=include_spam_trash
).execute()

message_refs = []
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
if self._mode == self.MESSAGE_MODE:
message_refs = []
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
else:
thread_refs = []
if 'threads' in response: # ensure request was successful
thread_refs.extend(response['threads'])

while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.service.users().messages().list(
response = api.list(
userId=user_id,
q=query,
labelIds=labels_ids,
includeSpamTrash=include_spam_trash,
pageToken=page_token
).execute()

message_refs.extend(response['messages'])
if self._mode == self.MESSAGE_MODE:
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
else:
if 'threads' in response: # ensure request was successful
thread_refs.extend(response['threads'])

if self._mode == self.THREAD_MODE:
message_refs = self._get_message_refs_from_thread_refs(user_id, thread_refs)

return self._get_messages_from_refs(user_id, message_refs,
attachments)
Expand Down Expand Up @@ -583,6 +592,7 @@ def list_labels(self, user_id: str = 'me') -> List[Label]:
labels = [Label(name=x['name'], id=x['id']) for x in res['labels']]
return labels


def _get_messages_from_refs(
self,
user_id: str,
Expand Down Expand Up @@ -686,11 +696,7 @@ def _build_message_from_ref(

try:
# Get message JSON
if self._mode == self.MESSAGE_MODE:
api = self.service.users().messages()
else:
api = self.service.users().threads()
message = api.get(
message = self.service.users().messages().get(
userId=user_id, id=message_ref['id']
).execute()

Expand Down Expand Up @@ -812,11 +818,7 @@ def _evaluate_message_payload(
if 'data' in payload['body']:
data = payload['body']['data']
else:
if self._mode == self.MESSAGE_MODE:
api = self.service.users().messages()
else:
api = self.service.users().threads()
res = api.attachments().get(
res = self.service.users().messages().attachments().get(
userId=user_id, messageId=msg_id, id=att_id
).execute()
data = res['data']
Expand Down Expand Up @@ -847,6 +849,115 @@ def _evaluate_message_payload(

return []

def _get_message_refs_from_thread_refs(
self,
user_id: str,
thread_refs: List[dict],
parallel: bool = True
) -> List[Message]:
"""
Retrieves a list of message references from a list of thread references.
Args:
user_id: The account the messages belong to.
thread_refs: A list of thread references.
parallel: Whether to retrieve messages in parallel. Default true.
Currently parallelization is always on, since there is no
reason to do otherwise.
Returns:
A list of Message objects.
Raises:
googleapiclient.errors.HttpError: There was an error executing the
HTTP request.
"""

if not thread_refs:
return []

if not parallel:
message_refs = []
for ref in thread_refs:
message_refs.extend(self._build_message_refs_from_thread_ref(user_id, ref))
return message_refs

max_num_threads = 12 # empirically chosen, prevents throttling
target_msgs_per_thread = 10 # empirically chosen
num_threads = min(
math.ceil(len(thread_refs) / target_msgs_per_thread),
max_num_threads
)
batch_size = math.ceil(len(thread_refs) / num_threads)
message_lists = [None] * num_threads

def thread_download_batch(thread_num):
gmail = Gmail(_creds=self.creds)

start = thread_num * batch_size
end = min(len(thread_refs), (thread_num + 1) * batch_size)
message_lists[thread_num] = []
for i in range(start, end):
message_lists[thread_num].extend(gmail._build_message_refs_from_thread_ref(
user_id, thread_refs[i]
))
threads = [
threading.Thread(target=thread_download_batch, args=(i,))
for i in range(num_threads)
]

for t in threads:
t.start()

for t in threads:
t.join()

return sum(message_lists, [])

def _build_message_refs_from_thread_ref(
self,
user_id: str,
thread_ref: dict,
) -> Message:
"""
Creates a list of messages from a thread reference.
Args:
user_id: The username of the account the message belongs to.
thread_ref: A thread references returned from the Gmail
API.
Returns:
A list of dicts containing message IDs and thread IDs.
Raises:
googleapiclient.errors.HttpError: There was an error executing the
HTTP request.
"""

try:
# Get thread JSON
thread = self.service.users().threads().get(
userId=user_id, id=thread_ref['id']
).execute()

except HttpError as error:
# Pass along the error
raise error

else:
messages = []
for message in thread['messages']:
h = {
"id": message['id'],
"threadId": thread_ref['id']
}
messages.append(h)
return messages

def _create_message(
self,
sender: str,
Expand Down

0 comments on commit 1c75395

Please sign in to comment.