aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Gunthorpe <jgg@mellanox.com>2020-02-05 12:41:32 -0400
committerJason Gunthorpe <jgg@mellanox.com>2020-02-07 11:25:26 -0400
commitf578f75464791e7bfbf6b2544fd8651fe8b6f974 (patch)
treed46639c402d826874db20e095e30fe77cb1ed055
parentbf72ac2c33516a951b7b156c10c4e753c781bfc2 (diff)
downloadcloud_mdir_sync-f578f75464791e7bfbf6b2544fd8651fe8b6f974.tar.gz
cloud_mdir_sync-f578f75464791e7bfbf6b2544fd8651fe8b6f974.tar.bz2
cloud_mdir_sync-f578f75464791e7bfbf6b2544fd8651fe8b6f974.zip
Do not pass msgdb all over the place
The mailboxes can only be linked to a single msgdb, always take it from the mailbox cfg. Signed-off-by: Jason Gunthorpe <jgg@mellanox.com>
-rw-r--r--cloud_mdir_sync/gmail.py31
-rw-r--r--cloud_mdir_sync/mailbox.py7
-rw-r--r--cloud_mdir_sync/maildir.py21
-rw-r--r--cloud_mdir_sync/main.py4
-rw-r--r--cloud_mdir_sync/office365.py14
5 files changed, 38 insertions, 39 deletions
diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py
index 8492223..6bd71a1 100644
--- a/cloud_mdir_sync/gmail.py
+++ b/cloud_mdir_sync/gmail.py
@@ -301,10 +301,10 @@ class GMailMessage(messages.Message):
# GMail does not return the email_id, but it does have a stable REST
# ID, so if we have the REST ID in the database then we can compute
# the email_id
- self.content_hash = mailbox.cfg.msgdb.content_hashes_cloud.get(
+ self.content_hash = mailbox.msgdb.content_hashes_cloud.get(
self.cid())
if self.content_hash:
- self.email_id = mailbox.cfg.msgdb.content_msgid[self.content_hash]
+ self.email_id = mailbox.msgdb.content_msgid[self.content_hash]
self.gmail_labels = gmail_labels
if self.gmail_labels:
self._labels_to_flags()
@@ -385,8 +385,8 @@ class GMailMailbox(mailbox.Mailbox):
msg.update_from_json(jmsg)
return jmsg["historyId"]
- async def _fetch_message(self, msg: GMailMessage,
- msgdb: messages.MessageDB):
+ async def _fetch_message(self, msg: GMailMessage):
+ msgdb = self.msgdb
with util.log_progress_ctx(logging.DEBUG,
f"Downloading {msg.storage_id}",
lambda msg: f" {util.sizeof_fmt(msg.size)}",
@@ -405,7 +405,7 @@ class GMailMailbox(mailbox.Mailbox):
msg.content_hash = msgdb.store_hashed_msg(msg, F)
return jmsg["historyId"]
- async def _fetch_all_messages(self, msgdb: messages.MessageDB):
+ async def _fetch_all_messages(self):
"""Perform a full synchronization of the mailbox"""
start_history_id = None
todo = []
@@ -416,9 +416,9 @@ class GMailMailbox(mailbox.Mailbox):
key="messages",
params={"labelIds": self.label}):
msg = GMailMessage(mailbox=self, gmail_id=jmsg["id"])
- if not msgdb.have_content(msg):
+ if not self.msgdb.have_content(msg):
todo.append(
- asyncio.create_task(self._fetch_message(msg, msgdb)))
+ asyncio.create_task(self._fetch_message(msg)))
else:
todo.append(asyncio.create_task(self._fetch_metadata(msg)))
msgs.append(msg)
@@ -429,8 +429,6 @@ class GMailMailbox(mailbox.Mailbox):
return (msgs, start_history_id)
async def _fetch_delta_messages(self, old_msgs: List[GMailMessage],
- start_history_id: Optional[str],
- msgdb: messages.MessageDB):
start_history_id: Optional[str]):
# Mailbox is empty
if start_history_id is None:
@@ -495,9 +493,9 @@ class GMailMailbox(mailbox.Mailbox):
msg = GMailMessage(mailbox=self,
gmail_id=gmail_id,
gmail_labels=gmail_labels)
- if not msgdb.have_content(msg):
+ if not self.msgdb.have_content(msg):
todo.append(
- asyncio.create_task(self._fetch_message(msg, msgdb)))
+ asyncio.create_task(self._fetch_message(msg)))
else:
todo.append(asyncio.create_task(self._fetch_metadata(msg)))
else:
@@ -505,7 +503,7 @@ class GMailMailbox(mailbox.Mailbox):
gmail_id=gmail_id,
gmail_labels=gmail_labels)
msg.received_time = omsg.received_time
- assert msgdb.have_content(msg)
+ assert self.msgdb.have_content(msg)
msgs.append(msg)
await asyncio.gather(*todo)
return (msgs, next_history_id)
@@ -513,19 +511,18 @@ class GMailMailbox(mailbox.Mailbox):
@util.log_progress(lambda self: f"Updating Message List for {self.name}",
lambda self: f", {len(self.messages)} msgs")
@mailbox.update_on_failure
- async def update_message_list(self, msgdb: messages.MessageDB):
+ async def update_message_list(self):
"""Retrieve the list of all messages and store all the message content
in the content_hash message database"""
if self.history_delta is None or self.history_delta[1] is None:
# For whatever reason, there is usually more history than is
# suggested by the history_id from the messages.list, so always
# drain it out.
- self.history_delta = await self._fetch_all_messages(msgdb)
+ self.history_delta = await self._fetch_all_messages()
self.history_delta = await self._fetch_delta_messages(
start_history_id=self.history_delta[1],
- old_msgs=self.history_delta[0],
- msgdb=msgdb)
+ old_msgs=self.history_delta[0])
self.messages = {
msg.content_hash: msg
@@ -541,7 +538,7 @@ class GMailMailbox(mailbox.Mailbox):
self.need_update = True
self.changed_event.set()
- def force_content(self, msgdb, msgs):
+ def force_content(self, msgs):
raise RuntimeError("Cannot move messages into the Cloud")
def _update_msg_flags(self, cmsg: messages.Message, old_cmsg_flags: int,
diff --git a/cloud_mdir_sync/mailbox.py b/cloud_mdir_sync/mailbox.py
index 10b0b64..6f9fea8 100644
--- a/cloud_mdir_sync/mailbox.py
+++ b/cloud_mdir_sync/mailbox.py
@@ -52,14 +52,17 @@ class Mailbox(object):
pass
@abstractmethod
- def force_content(self, msgdb: "MessageDB",
- msgs: "CHMsgDict_Type") -> None:
+ def force_content(self, msgs: "CHMsgDict_Type") -> None:
pass
@abstractmethod
async def merge_content(self, msgs: "CHMsgMappingDict_Type") -> None:
pass
+ @property
+ def msgdb(self) -> "MessageDB":
+ return self.cfg.msgdb
+
def same_messages(self,
mdict: "CHMsgMappingDict_Type",
tuple_form=False) -> bool:
diff --git a/cloud_mdir_sync/maildir.py b/cloud_mdir_sync/maildir.py
index ffb9d77..c7f1be5 100644
--- a/cloud_mdir_sync/maildir.py
+++ b/cloud_mdir_sync/maildir.py
@@ -81,32 +81,32 @@ class MailDirMailbox(mailbox.Mailbox):
assert ":2," not in fn
return (fn, flags, mflags)
- def _load_message(self, msgdb: messages.MessageDB, fn, ffn):
+ def _load_message(self, fn, ffn):
sid, _, mflags = self._decode_msg_filename(fn)
msg = messages.Message(mailbox=self, storage_id=sid)
msg.flags = mflags
- msgdb.msg_from_file(msg, ffn)
+ self.msgdb.msg_from_file(msg, ffn)
return msg
- def _update_message_dir(self, res, msgdb: messages.MessageDB, dfn):
+ def _update_message_dir(self, res, dfn):
for fn in os.listdir(dfn):
if fn.startswith("."):
continue
- msg = self._load_message(msgdb, fn, os.path.join(dfn, fn))
+ msg = self._load_message(fn, os.path.join(dfn, fn))
res[msg.content_hash] = msg
@util.log_progress(lambda self: f"Updating Message List for {self.dfn}",
lambda self: f", {len(self.messages)} msgs",
level=logging.DEBUG)
@mailbox.update_on_failure
- async def update_message_list(self, msgdb: messages.MessageDB):
+ async def update_message_list(self):
"""Read the message list from the maildir and compute the content hashes"""
res: messages.CHMsgDict_Type = {}
st = {}
for sd in ["cur", "new"]:
st[sd] = os.stat(os.path.join(self.dfn, sd))
for sd in ["cur", "new"]:
- self._update_message_dir(res, msgdb, os.path.join(self.dfn, sd))
+ self._update_message_dir(res, os.path.join(self.dfn, sd))
for sd in ["cur", "new"]:
fn = os.path.join(self.dfn, sd)
# Retry if the dirs changed while trying to read them
@@ -131,8 +131,7 @@ class MailDirMailbox(mailbox.Mailbox):
fn = os.path.join(self.dfn, "new", base)
return base, fn
- def _store_msg(self, msgdb: messages.MessageDB,
- cloudmsg: messages.Message):
+ def _store_msg(self, cloudmsg: messages.Message):
"""Apply a delta from the cloud: New message from cloud"""
sid, fn = self._new_maildir_id(cloudmsg)
msg = messages.Message(mailbox=self,
@@ -143,7 +142,7 @@ class MailDirMailbox(mailbox.Mailbox):
assert msg.content_hash is not None
msg.fn = fn
- msgdb.write_content(cloudmsg.content_hash, msg.fn)
+ self.msgdb.write_content(cloudmsg.content_hash, msg.fn)
# It isn't clear if we need to do this, but make the local timestamps
# match when the message would have been received if the local MTA
@@ -192,7 +191,7 @@ class MailDirMailbox(mailbox.Mailbox):
f", {self.last_force_new} added, {self.last_force_rm} removed, {self.last_force_kept} same"
)
@mailbox.update_on_failure
- def force_content(self, msgdb: messages.MessageDB, msgs: messages.CHMsgDict_Type):
+ def force_content(self, msgs: messages.CHMsgDict_Type):
"""Force this mailbox to contain the message list msgs (from cloud), including
all the flags and state"""
self.last_force_kept = 0
@@ -208,7 +207,7 @@ class MailDirMailbox(mailbox.Mailbox):
for content_hash in want - have:
self.last_force_new += 1
- self._store_msg(msgdb, msgs[content_hash])
+ self._store_msg(msgs[content_hash])
for content_hash in have - want:
self.last_force_rm += 1
diff --git a/cloud_mdir_sync/main.py b/cloud_mdir_sync/main.py
index ab39676..af179cb 100644
--- a/cloud_mdir_sync/main.py
+++ b/cloud_mdir_sync/main.py
@@ -28,7 +28,7 @@ def force_local_to_cloud(cfg: config.Config, msgs: messages.MBoxDict_Type):
local changes."""
for mbox, msgdict in msgs.items():
if not mbox.same_messages(msgdict):
- mbox.force_content(cfg.msgdb, msgdict)
+ mbox.force_content(msgdict)
return msgs
@@ -65,7 +65,7 @@ async def synchronize_mail(cfg: config.Config):
msgs = None
while True:
try:
- await asyncio.gather(*(mbox.update_message_list(cfg.msgdb)
+ await asyncio.gather(*(mbox.update_message_list()
for mbox in cfg.all_mboxes()
if mbox.need_update))
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index 7b6e7c8..f96ef29 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -401,8 +401,8 @@ class O365Mailbox(mailbox.Mailbox):
asyncio.create_task(self._monitor_changes())
@mailbox.update_on_failure
- async def _fetch_message(self, msg: messages.Message,
- msgdb: messages.MessageDB):
+ async def _fetch_message(self, msg: messages.Message):
+ msgdb = self.msgdb
with util.log_progress_ctx(logging.DEBUG,
f"Downloading {msg.email_id}",
lambda msg: f" {util.sizeof_fmt(msg.size)}",
@@ -459,7 +459,7 @@ class O365Mailbox(mailbox.Mailbox):
@util.log_progress(lambda self: f"Updating Message List for {self.name}",
lambda self: f", {len(self.messages)} msgs")
@mailbox.update_on_failure
- async def update_message_list(self, msgdb: messages.MessageDB):
+ async def update_message_list(self):
"""Retrieve the list of all messages and store all the message content in the
content_hash message database"""
todo = []
@@ -483,9 +483,9 @@ class O365Mailbox(mailbox.Mailbox):
jmsg["receivedDateTime"], '%Y-%m-%dT%H:%M:%SZ')
msg.flags = self._json_to_flags(jmsg)
- if not msgdb.have_content(msg):
+ if not self.msgdb.have_content(msg):
todo.append(
- asyncio.create_task(self._fetch_message(msg, msgdb)))
+ asyncio.create_task(self._fetch_message(msg)))
msgs.append(msg)
await asyncio.gather(*todo)
@@ -539,7 +539,7 @@ class O365Mailbox(mailbox.Mailbox):
self.need_update = True
self.changed_event.set()
- def force_content(self, msgdb, msgs):
+ def force_content(self, msgs):
raise RuntimeError("Cannot move messages into the Cloud")
def _update_msg_flags(self, cmsg: messages.Message,
@@ -639,7 +639,7 @@ class O365Mailbox(mailbox.Mailbox):
# Debugging that the message really is to be deleted
if cmsg is not None and lmsg is None:
- assert os.stat(os.path.join(self.cfg.msgdb.hashes_dir,
+ assert os.stat(os.path.join(self.msgdb.hashes_dir,
ch)).st_nlink == 1
if cmsg is not None and (lmsg is None or lmsg.flags