diff options
Diffstat (limited to 'cloud_mdir_sync/maildir.py')
-rw-r--r-- | cloud_mdir_sync/maildir.py | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/cloud_mdir_sync/maildir.py b/cloud_mdir_sync/maildir.py new file mode 100644 index 0000000..fdc8a90 --- /dev/null +++ b/cloud_mdir_sync/maildir.py @@ -0,0 +1,206 @@ +# SPDX-License-Identifier: GPL-2.0+ +import logging +import os +import pickle +import re +import time + +import pyinotify + +from . import config, mailbox, messages, util + + +def unfold_header(s): + # Hrm, I wonder if this is the right way to normalize a header? + return re.sub(r"\n[ \t]+", " ", s) + + +class MailDirMailbox(mailbox.Mailbox): + """Local MailDir mail directory""" + storage_kind = "maildir" + cfg: config.Config + + def __init__(self, directory): + super().__init__() + self.dfn = os.path.expanduser(directory) + for sub in ["tmp", "cur", "new"]: + os.makedirs(os.path.join(self.dfn, sub), mode=0o700, exist_ok=True) + + async def setup_mbox(self, cfg: config.Config): + self.cfg = cfg + cfg.watch_manager.add_watch( + path=[ + os.path.join(self.dfn, "cur"), + os.path.join(self.dfn, "new") + ], + proc_fun=self._dir_changed, + mask=(pyinotify.IN_ATTRIB | pyinotify.IN_MOVED_FROM + | pyinotify.IN_MOVED_TO + | pyinotify.IN_CREATE | pyinotify.IN_DELETE + | pyinotify.IN_ONLYDIR), + quiet=False) + + def _dir_changed(self, notifier): + self.need_update = True + self.changed_event.set() + + def _msg_to_flags(self, msg: messages.Message): + """Return the desired maildir flags from a message""" + # See https://cr.yp.to/proto/maildir.html + res = set() + if msg.flags & messages.Message.FLAG_REPLIED: + res.add("R") + if msg.flags & messages.Message.FLAG_READ: + res.add("S") + if msg.flags & messages.Message.FLAG_FLAGGED: + res.add("F") + return res + + def _decode_msg_filename(self, fn): + """Return the base maildir filename, message flags, and maildir flag + letters""" + fn = os.path.basename(fn) + if ":2," not in fn: + return (fn, set(), 0) + fn, _, flags = fn.partition(":2,") + flags = set(flags) + mflags = 0 + if "R" in flags: + mflags |= messages.Message.FLAG_REPLIED + if "S" in flags: + mflags |= messages.Message.FLAG_READ + if "F" in flags: + mflags |= messages.Message.FLAG_FLAGGED + assert ":2," not in fn + return (fn, flags, mflags) + + def _load_message(self, msgdb: messages.MessageDB, fn, ffn): + sid, _, mflags = self._decode_msg_filename(fn) + msg = messages.Message(mailbox=self, storage_id=sid) + msg.flags = mflags + msgdb.msg_from_file(msg, ffn) + return msg + + def _update_message_dir(self, res, msgdb: messages.MessageDB, dfn): + for fn in os.listdir(dfn): + if fn.startswith("."): + continue + msg = self._load_message(msgdb, fn, os.path.join(dfn, fn)) + res[msg.content_hash] = msg + + @util.log_progress(lambda self: f"Updating Message List for {self.dfn}", + lambda self: f", {len(self.messages)} msgs", + level=logging.DEBUG) + @mailbox.update_on_failure + async def update_message_list(self, msgdb: messages.MessageDB): + """Read the message list from the maildir and compute the content hashes""" + res: messages.CHMsgDict_Type = {} + st = {} + for sd in ["cur", "new"]: + st[sd] = os.stat(os.path.join(self.dfn, sd)) + for sd in ["cur", "new"]: + self._update_message_dir(res, msgdb, os.path.join(self.dfn, sd)) + for sd in ["cur", "new"]: + fn = os.path.join(self.dfn, sd) + # Retry if the dirs changed while trying to read them + if os.stat(fn).st_mtime != st[sd].st_mtime: + raise IOError(f"Maildir {fn} changed during listing") + + self.messages = res + self.need_update = False + if self.cfg.trace_file is not None: + pickle.dump(["update_message_list", self.dfn, self.messages], + self.cfg.trace_file) + + def _new_maildir_id(self, msg: messages.Message): + """Return a unique maildir filename for the given message""" + tm = time.clock_gettime(time.CLOCK_REALTIME) + base = f"{int(tm)}.M{int((tm%1)*1000*1000)}-{msg.content_hash}" + flags = self._msg_to_flags(msg) + if flags: + fn = os.path.join(self.dfn, "cur", + base + ":2," + "".join(sorted(flags))) + else: + fn = os.path.join(self.dfn, "new", base) + return base, fn + + def _store_msg(self, msgdb: messages.MessageDB, + cloudmsg: messages.Message): + """Apply a delta from the cloud: New message from cloud""" + sid, fn = self._new_maildir_id(cloudmsg) + msg = messages.Message(mailbox=self, + storage_id=sid, + email_id=cloudmsg.email_id) + msg.flags = cloudmsg.flags + msg.content_hash = cloudmsg.content_hash + assert msg.content_hash is not None + msg.fn = fn + + msgdb.write_content(cloudmsg.content_hash, msg.fn) + + # It isn't clear if we need to do this, but make the local timestamps + # match when the message would have been received if the local MTA + # delivered it. + if cloudmsg.received_time is not None: + os.utime(fn, (time.time(), cloudmsg.received_time.timestamp())) + self.messages[msg.content_hash] = msg + + def _set_flags(self, mymsg: messages.Message, cloudmsg: messages.Message): + """Apply a delta from the cloud: Same message in cloud, synchronize flags""" + if mymsg.flags == cloudmsg.flags: + return + + cloud_flags = self._msg_to_flags(cloudmsg) + + base, mflags, _ = self._decode_msg_filename(mymsg.fn) + nflags = (mflags - set(("R", "S", "F"))) | cloud_flags + if mflags == nflags: + return + if nflags: + nfn = os.path.join(self.dfn, "cur", + base + ":2," + "".join(sorted(nflags))) + else: + nfn = os.path.join(self.dfn, "new", base) + os.rename(mymsg.fn, nfn) + mymsg.fn = nfn + mymsg.flags = cloudmsg.flags + + def _remove_msg(self, mymsg: messages.Message): + """Apply a delta from the cloud: Message deleted in cloud""" + assert mymsg.content_hash is not None + os.unlink(mymsg.fn) + del self.messages[mymsg.content_hash] + + @util.log_progress( + lambda self: f"Applying cloud changes for {self.dfn}", lambda self: + f", {self.last_force_new} added, {self.last_force_rm} removed, {self.last_force_kept} same" + ) + @mailbox.update_on_failure + def force_content(self, msgdb: messages.MessageDB, msgs: messages.CHMsgDict_Type): + """Force this mailbox to contain the message list msgs (from cloud), including + all the flags and state""" + self.last_force_kept = 0 + self.last_force_new = 0 + self.last_force_rm = 0 + + have = set(self.messages.keys()) + want = set(msgs.keys()) + + for content_hash in want.intersection(have): + self.last_force_kept += 1 + self._set_flags(self.messages[content_hash], msgs[content_hash]) + + for content_hash in want - have: + self.last_force_new += 1 + self._store_msg(msgdb, msgs[content_hash]) + + for content_hash in have - want: + self.last_force_rm += 1 + self._remove_msg(self.messages[content_hash]) + + if self.cfg.trace_file is not None: + pickle.dump(["force_content", self.dfn, self.messages, msgs], + self.cfg.trace_file) + + async def merge_content(self, msgs): + raise RuntimeError("Cannot merge local changes into a local mailbox") |