Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread mode #66

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 147 additions & 8 deletions simplegmail/gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class Gmail(object):
# https://developers.google.com/gmail/api/quickstart/python
# Make sure the client secret file is in the root directory of your app.

MESSAGE_MODE = 'messages'
THREAD_MODE = 'threads'

def __init__(
self,
client_secret_file: str = 'client_secret.json',
Expand All @@ -66,6 +69,7 @@ def __init__(
) -> None:
self.client_secret_file = client_secret_file
self.creds_file = creds_file
self._mode = self.MESSAGE_MODE

try:
# The file gmail_token.json stores the user's access and refresh
Expand Down Expand Up @@ -156,8 +160,7 @@ def send_message(
)

try:
req = self.service.users().messages().send(userId='me', body=msg)
res = req.execute()
res = self.service.users().messages().send(userId='me', body=msg).execute()
return self._build_message_from_ref(user_id, res, 'reference')

except HttpError as error:
Expand Down Expand Up @@ -509,28 +512,45 @@ def get_messages(
]

try:
response = self.service.users().messages().list(
if self._mode == self.MESSAGE_MODE:
api = self.service.users().messages()
else:
api = self.service.users().threads()
response = api.list(
userId=user_id,
q=query,
labelIds=labels_ids,
includeSpamTrash=include_spam_trash
).execute()

message_refs = []
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
if self._mode == self.MESSAGE_MODE:
message_refs = []
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
else:
thread_refs = []
if 'threads' in response: # ensure request was successful
thread_refs.extend(response['threads'])

while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.service.users().messages().list(
response = api.list(
userId=user_id,
q=query,
labelIds=labels_ids,
includeSpamTrash=include_spam_trash,
pageToken=page_token
).execute()

message_refs.extend(response['messages'])
if self._mode == self.MESSAGE_MODE:
if 'messages' in response: # ensure request was successful
message_refs.extend(response['messages'])
else:
if 'threads' in response: # ensure request was successful
thread_refs.extend(response['threads'])

if self._mode == self.THREAD_MODE:
message_refs = self._get_message_refs_from_thread_refs(user_id, thread_refs)

return self._get_messages_from_refs(user_id, message_refs,
attachments)
Expand Down Expand Up @@ -572,6 +592,7 @@ def list_labels(self, user_id: str = 'me') -> List[Label]:
labels = [Label(name=x['name'], id=x['id']) for x in res['labels']]
return labels


def _get_messages_from_refs(
self,
user_id: str,
Expand Down Expand Up @@ -828,6 +849,115 @@ def _evaluate_message_payload(

return []

def _get_message_refs_from_thread_refs(
self,
user_id: str,
thread_refs: List[dict],
parallel: bool = True
) -> List[Message]:
"""
Retrieves a list of message references from a list of thread references.

Args:
user_id: The account the messages belong to.
thread_refs: A list of thread references.
parallel: Whether to retrieve messages in parallel. Default true.
Currently parallelization is always on, since there is no
reason to do otherwise.


Returns:
A list of Message objects.

Raises:
googleapiclient.errors.HttpError: There was an error executing the
HTTP request.

"""

if not thread_refs:
return []

if not parallel:
message_refs = []
for ref in thread_refs:
message_refs.extend(self._build_message_refs_from_thread_ref(user_id, ref))
return message_refs

max_num_threads = 12 # empirically chosen, prevents throttling
target_msgs_per_thread = 10 # empirically chosen
num_threads = min(
math.ceil(len(thread_refs) / target_msgs_per_thread),
max_num_threads
)
batch_size = math.ceil(len(thread_refs) / num_threads)
message_lists = [None] * num_threads

def thread_download_batch(thread_num):
gmail = Gmail(_creds=self.creds)

start = thread_num * batch_size
end = min(len(thread_refs), (thread_num + 1) * batch_size)
message_lists[thread_num] = []
for i in range(start, end):
message_lists[thread_num].extend(gmail._build_message_refs_from_thread_ref(
user_id, thread_refs[i]
))
threads = [
threading.Thread(target=thread_download_batch, args=(i,))
for i in range(num_threads)
]

for t in threads:
t.start()

for t in threads:
t.join()

return sum(message_lists, [])

def _build_message_refs_from_thread_ref(
self,
user_id: str,
thread_ref: dict,
) -> Message:
"""
Creates a list of messages from a thread reference.

Args:
user_id: The username of the account the message belongs to.
thread_ref: A thread references returned from the Gmail
API.

Returns:
A list of dicts containing message IDs and thread IDs.

Raises:
googleapiclient.errors.HttpError: There was an error executing the
HTTP request.

"""

try:
# Get thread JSON
thread = self.service.users().threads().get(
userId=user_id, id=thread_ref['id']
).execute()

except HttpError as error:
# Pass along the error
raise error

else:
messages = []
for message in thread['messages']:
h = {
"id": message['id'],
"threadId": thread_ref['id']
}
messages.append(h)
return messages

def _create_message(
self,
sender: str,
Expand Down Expand Up @@ -988,3 +1118,12 @@ def _get_alias_info(

res = req.execute()
return res

@property
def mode(self):
return self._mode

@mode.setter
def mode(self, value: str):
if value.lower() in [self.MESSAGE_MODE, self.THREAD_MODE]:
self._mode = value.lower()