diff --git a/simplegmail/gmail.py b/simplegmail/gmail.py index 43c1d4b..e850cd5 100644 --- a/simplegmail/gmail.py +++ b/simplegmail/gmail.py @@ -160,11 +160,7 @@ def send_message( ) try: - if self._mode == self.MESSAGE_MODE: - api = self.service.users().messages() - else: - api = self.service.users().threads() - res = api.send(userId='me', body=msg).execute() + res = self.service.users().messages().send(userId='me', body=msg).execute() return self._build_message_from_ref(user_id, res, 'reference') except HttpError as error: @@ -527,13 +523,18 @@ def get_messages( includeSpamTrash=include_spam_trash ).execute() - message_refs = [] - if 'messages' in response: # ensure request was successful - message_refs.extend(response['messages']) + if self._mode == self.MESSAGE_MODE: + message_refs = [] + if 'messages' in response: # ensure request was successful + message_refs.extend(response['messages']) + else: + thread_refs = [] + if 'threads' in response: # ensure request was successful + thread_refs.extend(response['threads']) while 'nextPageToken' in response: page_token = response['nextPageToken'] - response = self.service.users().messages().list( + response = api.list( userId=user_id, q=query, labelIds=labels_ids, @@ -541,7 +542,15 @@ def get_messages( pageToken=page_token ).execute() - message_refs.extend(response['messages']) + if self._mode == self.MESSAGE_MODE: + if 'messages' in response: # ensure request was successful + message_refs.extend(response['messages']) + else: + if 'threads' in response: # ensure request was successful + thread_refs.extend(response['threads']) + + if self._mode == self.THREAD_MODE: + message_refs = self._get_message_refs_from_thread_refs(user_id, thread_refs) return self._get_messages_from_refs(user_id, message_refs, attachments) @@ -583,6 +592,7 @@ def list_labels(self, user_id: str = 'me') -> List[Label]: labels = [Label(name=x['name'], id=x['id']) for x in res['labels']] return labels + def _get_messages_from_refs( self, user_id: str, @@ -686,11 +696,7 @@ def _build_message_from_ref( try: # Get message JSON - if self._mode == self.MESSAGE_MODE: - api = self.service.users().messages() - else: - api = self.service.users().threads() - message = api.get( + message = self.service.users().messages().get( userId=user_id, id=message_ref['id'] ).execute() @@ -812,11 +818,7 @@ def _evaluate_message_payload( if 'data' in payload['body']: data = payload['body']['data'] else: - if self._mode == self.MESSAGE_MODE: - api = self.service.users().messages() - else: - api = self.service.users().threads() - res = api.attachments().get( + res = self.service.users().messages().attachments().get( userId=user_id, messageId=msg_id, id=att_id ).execute() data = res['data'] @@ -847,6 +849,115 @@ def _evaluate_message_payload( return [] + def _get_message_refs_from_thread_refs( + self, + user_id: str, + thread_refs: List[dict], + parallel: bool = True + ) -> List[Message]: + """ + Retrieves a list of message references from a list of thread references. + + Args: + user_id: The account the messages belong to. + thread_refs: A list of thread references. + parallel: Whether to retrieve messages in parallel. Default true. + Currently parallelization is always on, since there is no + reason to do otherwise. + + + Returns: + A list of Message objects. + + Raises: + googleapiclient.errors.HttpError: There was an error executing the + HTTP request. + + """ + + if not thread_refs: + return [] + + if not parallel: + message_refs = [] + for ref in thread_refs: + message_refs.extend(self._build_message_refs_from_thread_ref(user_id, ref)) + return message_refs + + max_num_threads = 12 # empirically chosen, prevents throttling + target_msgs_per_thread = 10 # empirically chosen + num_threads = min( + math.ceil(len(thread_refs) / target_msgs_per_thread), + max_num_threads + ) + batch_size = math.ceil(len(thread_refs) / num_threads) + message_lists = [None] * num_threads + + def thread_download_batch(thread_num): + gmail = Gmail(_creds=self.creds) + + start = thread_num * batch_size + end = min(len(thread_refs), (thread_num + 1) * batch_size) + message_lists[thread_num] = [] + for i in range(start, end): + message_lists[thread_num].extend(gmail._build_message_refs_from_thread_ref( + user_id, thread_refs[i] + )) + threads = [ + threading.Thread(target=thread_download_batch, args=(i,)) + for i in range(num_threads) + ] + + for t in threads: + t.start() + + for t in threads: + t.join() + + return sum(message_lists, []) + + def _build_message_refs_from_thread_ref( + self, + user_id: str, + thread_ref: dict, + ) -> Message: + """ + Creates a list of messages from a thread reference. + + Args: + user_id: The username of the account the message belongs to. + thread_ref: A thread references returned from the Gmail + API. + + Returns: + A list of dicts containing message IDs and thread IDs. + + Raises: + googleapiclient.errors.HttpError: There was an error executing the + HTTP request. + + """ + + try: + # Get thread JSON + thread = self.service.users().threads().get( + userId=user_id, id=thread_ref['id'] + ).execute() + + except HttpError as error: + # Pass along the error + raise error + + else: + messages = [] + for message in thread['messages']: + h = { + "id": message['id'], + "threadId": thread_ref['id'] + } + messages.append(h) + return messages + def _create_message( self, sender: str,