From f578f75464791e7bfbf6b2544fd8651fe8b6f974 Mon Sep 17 00:00:00 2001 From: Jason Gunthorpe Date: Wed, 5 Feb 2020 12:41:32 -0400 Subject: Do not pass msgdb all over the place The mailboxes can only be linked to a single msgdb, always take it from the mailbox cfg. Signed-off-by: Jason Gunthorpe --- cloud_mdir_sync/gmail.py | 31 ++++++++++++++----------------- cloud_mdir_sync/mailbox.py | 7 +++++-- cloud_mdir_sync/maildir.py | 21 ++++++++++----------- cloud_mdir_sync/main.py | 4 ++-- cloud_mdir_sync/office365.py | 14 +++++++------- 5 files changed, 38 insertions(+), 39 deletions(-) diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py index 8492223..6bd71a1 100644 --- a/cloud_mdir_sync/gmail.py +++ b/cloud_mdir_sync/gmail.py @@ -301,10 +301,10 @@ class GMailMessage(messages.Message): # GMail does not return the email_id, but it does have a stable REST # ID, so if we have the REST ID in the database then we can compute # the email_id - self.content_hash = mailbox.cfg.msgdb.content_hashes_cloud.get( + self.content_hash = mailbox.msgdb.content_hashes_cloud.get( self.cid()) if self.content_hash: - self.email_id = mailbox.cfg.msgdb.content_msgid[self.content_hash] + self.email_id = mailbox.msgdb.content_msgid[self.content_hash] self.gmail_labels = gmail_labels if self.gmail_labels: self._labels_to_flags() @@ -385,8 +385,8 @@ class GMailMailbox(mailbox.Mailbox): msg.update_from_json(jmsg) return jmsg["historyId"] - async def _fetch_message(self, msg: GMailMessage, - msgdb: messages.MessageDB): + async def _fetch_message(self, msg: GMailMessage): + msgdb = self.msgdb with util.log_progress_ctx(logging.DEBUG, f"Downloading {msg.storage_id}", lambda msg: f" {util.sizeof_fmt(msg.size)}", @@ -405,7 +405,7 @@ class GMailMailbox(mailbox.Mailbox): msg.content_hash = msgdb.store_hashed_msg(msg, F) return jmsg["historyId"] - async def _fetch_all_messages(self, msgdb: messages.MessageDB): + async def _fetch_all_messages(self): """Perform a full synchronization of the mailbox""" start_history_id = None todo = [] @@ -416,9 +416,9 @@ class GMailMailbox(mailbox.Mailbox): key="messages", params={"labelIds": self.label}): msg = GMailMessage(mailbox=self, gmail_id=jmsg["id"]) - if not msgdb.have_content(msg): + if not self.msgdb.have_content(msg): todo.append( - asyncio.create_task(self._fetch_message(msg, msgdb))) + asyncio.create_task(self._fetch_message(msg))) else: todo.append(asyncio.create_task(self._fetch_metadata(msg))) msgs.append(msg) @@ -429,8 +429,6 @@ class GMailMailbox(mailbox.Mailbox): return (msgs, start_history_id) async def _fetch_delta_messages(self, old_msgs: List[GMailMessage], - start_history_id: Optional[str], - msgdb: messages.MessageDB): start_history_id: Optional[str]): # Mailbox is empty if start_history_id is None: @@ -495,9 +493,9 @@ class GMailMailbox(mailbox.Mailbox): msg = GMailMessage(mailbox=self, gmail_id=gmail_id, gmail_labels=gmail_labels) - if not msgdb.have_content(msg): + if not self.msgdb.have_content(msg): todo.append( - asyncio.create_task(self._fetch_message(msg, msgdb))) + asyncio.create_task(self._fetch_message(msg))) else: todo.append(asyncio.create_task(self._fetch_metadata(msg))) else: @@ -505,7 +503,7 @@ class GMailMailbox(mailbox.Mailbox): gmail_id=gmail_id, gmail_labels=gmail_labels) msg.received_time = omsg.received_time - assert msgdb.have_content(msg) + assert self.msgdb.have_content(msg) msgs.append(msg) await asyncio.gather(*todo) return (msgs, next_history_id) @@ -513,19 +511,18 @@ class GMailMailbox(mailbox.Mailbox): @util.log_progress(lambda self: f"Updating Message List for {self.name}", lambda self: f", {len(self.messages)} msgs") @mailbox.update_on_failure - async def update_message_list(self, msgdb: messages.MessageDB): + async def update_message_list(self): """Retrieve the list of all messages and store all the message content in the content_hash message database""" if self.history_delta is None or self.history_delta[1] is None: # For whatever reason, there is usually more history than is # suggested by the history_id from the messages.list, so always # drain it out. - self.history_delta = await self._fetch_all_messages(msgdb) + self.history_delta = await self._fetch_all_messages() self.history_delta = await self._fetch_delta_messages( start_history_id=self.history_delta[1], - old_msgs=self.history_delta[0], - msgdb=msgdb) + old_msgs=self.history_delta[0]) self.messages = { msg.content_hash: msg @@ -541,7 +538,7 @@ class GMailMailbox(mailbox.Mailbox): self.need_update = True self.changed_event.set() - def force_content(self, msgdb, msgs): + def force_content(self, msgs): raise RuntimeError("Cannot move messages into the Cloud") def _update_msg_flags(self, cmsg: messages.Message, old_cmsg_flags: int, diff --git a/cloud_mdir_sync/mailbox.py b/cloud_mdir_sync/mailbox.py index 10b0b64..6f9fea8 100644 --- a/cloud_mdir_sync/mailbox.py +++ b/cloud_mdir_sync/mailbox.py @@ -52,14 +52,17 @@ class Mailbox(object): pass @abstractmethod - def force_content(self, msgdb: "MessageDB", - msgs: "CHMsgDict_Type") -> None: + def force_content(self, msgs: "CHMsgDict_Type") -> None: pass @abstractmethod async def merge_content(self, msgs: "CHMsgMappingDict_Type") -> None: pass + @property + def msgdb(self) -> "MessageDB": + return self.cfg.msgdb + def same_messages(self, mdict: "CHMsgMappingDict_Type", tuple_form=False) -> bool: diff --git a/cloud_mdir_sync/maildir.py b/cloud_mdir_sync/maildir.py index ffb9d77..c7f1be5 100644 --- a/cloud_mdir_sync/maildir.py +++ b/cloud_mdir_sync/maildir.py @@ -81,32 +81,32 @@ class MailDirMailbox(mailbox.Mailbox): assert ":2," not in fn return (fn, flags, mflags) - def _load_message(self, msgdb: messages.MessageDB, fn, ffn): + def _load_message(self, fn, ffn): sid, _, mflags = self._decode_msg_filename(fn) msg = messages.Message(mailbox=self, storage_id=sid) msg.flags = mflags - msgdb.msg_from_file(msg, ffn) + self.msgdb.msg_from_file(msg, ffn) return msg - def _update_message_dir(self, res, msgdb: messages.MessageDB, dfn): + def _update_message_dir(self, res, dfn): for fn in os.listdir(dfn): if fn.startswith("."): continue - msg = self._load_message(msgdb, fn, os.path.join(dfn, fn)) + msg = self._load_message(fn, os.path.join(dfn, fn)) res[msg.content_hash] = msg @util.log_progress(lambda self: f"Updating Message List for {self.dfn}", lambda self: f", {len(self.messages)} msgs", level=logging.DEBUG) @mailbox.update_on_failure - async def update_message_list(self, msgdb: messages.MessageDB): + async def update_message_list(self): """Read the message list from the maildir and compute the content hashes""" res: messages.CHMsgDict_Type = {} st = {} for sd in ["cur", "new"]: st[sd] = os.stat(os.path.join(self.dfn, sd)) for sd in ["cur", "new"]: - self._update_message_dir(res, msgdb, os.path.join(self.dfn, sd)) + self._update_message_dir(res, os.path.join(self.dfn, sd)) for sd in ["cur", "new"]: fn = os.path.join(self.dfn, sd) # Retry if the dirs changed while trying to read them @@ -131,8 +131,7 @@ class MailDirMailbox(mailbox.Mailbox): fn = os.path.join(self.dfn, "new", base) return base, fn - def _store_msg(self, msgdb: messages.MessageDB, - cloudmsg: messages.Message): + def _store_msg(self, cloudmsg: messages.Message): """Apply a delta from the cloud: New message from cloud""" sid, fn = self._new_maildir_id(cloudmsg) msg = messages.Message(mailbox=self, @@ -143,7 +142,7 @@ class MailDirMailbox(mailbox.Mailbox): assert msg.content_hash is not None msg.fn = fn - msgdb.write_content(cloudmsg.content_hash, msg.fn) + self.msgdb.write_content(cloudmsg.content_hash, msg.fn) # It isn't clear if we need to do this, but make the local timestamps # match when the message would have been received if the local MTA @@ -192,7 +191,7 @@ class MailDirMailbox(mailbox.Mailbox): f", {self.last_force_new} added, {self.last_force_rm} removed, {self.last_force_kept} same" ) @mailbox.update_on_failure - def force_content(self, msgdb: messages.MessageDB, msgs: messages.CHMsgDict_Type): + def force_content(self, msgs: messages.CHMsgDict_Type): """Force this mailbox to contain the message list msgs (from cloud), including all the flags and state""" self.last_force_kept = 0 @@ -208,7 +207,7 @@ class MailDirMailbox(mailbox.Mailbox): for content_hash in want - have: self.last_force_new += 1 - self._store_msg(msgdb, msgs[content_hash]) + self._store_msg(msgs[content_hash]) for content_hash in have - want: self.last_force_rm += 1 diff --git a/cloud_mdir_sync/main.py b/cloud_mdir_sync/main.py index ab39676..af179cb 100644 --- a/cloud_mdir_sync/main.py +++ b/cloud_mdir_sync/main.py @@ -28,7 +28,7 @@ def force_local_to_cloud(cfg: config.Config, msgs: messages.MBoxDict_Type): local changes.""" for mbox, msgdict in msgs.items(): if not mbox.same_messages(msgdict): - mbox.force_content(cfg.msgdb, msgdict) + mbox.force_content(msgdict) return msgs @@ -65,7 +65,7 @@ async def synchronize_mail(cfg: config.Config): msgs = None while True: try: - await asyncio.gather(*(mbox.update_message_list(cfg.msgdb) + await asyncio.gather(*(mbox.update_message_list() for mbox in cfg.all_mboxes() if mbox.need_update)) diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py index 7b6e7c8..f96ef29 100644 --- a/cloud_mdir_sync/office365.py +++ b/cloud_mdir_sync/office365.py @@ -401,8 +401,8 @@ class O365Mailbox(mailbox.Mailbox): asyncio.create_task(self._monitor_changes()) @mailbox.update_on_failure - async def _fetch_message(self, msg: messages.Message, - msgdb: messages.MessageDB): + async def _fetch_message(self, msg: messages.Message): + msgdb = self.msgdb with util.log_progress_ctx(logging.DEBUG, f"Downloading {msg.email_id}", lambda msg: f" {util.sizeof_fmt(msg.size)}", @@ -459,7 +459,7 @@ class O365Mailbox(mailbox.Mailbox): @util.log_progress(lambda self: f"Updating Message List for {self.name}", lambda self: f", {len(self.messages)} msgs") @mailbox.update_on_failure - async def update_message_list(self, msgdb: messages.MessageDB): + async def update_message_list(self): """Retrieve the list of all messages and store all the message content in the content_hash message database""" todo = [] @@ -483,9 +483,9 @@ class O365Mailbox(mailbox.Mailbox): jmsg["receivedDateTime"], '%Y-%m-%dT%H:%M:%SZ') msg.flags = self._json_to_flags(jmsg) - if not msgdb.have_content(msg): + if not self.msgdb.have_content(msg): todo.append( - asyncio.create_task(self._fetch_message(msg, msgdb))) + asyncio.create_task(self._fetch_message(msg))) msgs.append(msg) await asyncio.gather(*todo) @@ -539,7 +539,7 @@ class O365Mailbox(mailbox.Mailbox): self.need_update = True self.changed_event.set() - def force_content(self, msgdb, msgs): + def force_content(self, msgs): raise RuntimeError("Cannot move messages into the Cloud") def _update_msg_flags(self, cmsg: messages.Message, @@ -639,7 +639,7 @@ class O365Mailbox(mailbox.Mailbox): # Debugging that the message really is to be deleted if cmsg is not None and lmsg is None: - assert os.stat(os.path.join(self.cfg.msgdb.hashes_dir, + assert os.stat(os.path.join(self.msgdb.hashes_dir, ch)).st_nlink == 1 if cmsg is not None and (lmsg is None or lmsg.flags -- cgit v1.2.3