aboutsummaryrefslogtreecommitdiffstats
path: root/cloud_mdir_sync/maildir.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloud_mdir_sync/maildir.py')
-rw-r--r--cloud_mdir_sync/maildir.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/cloud_mdir_sync/maildir.py b/cloud_mdir_sync/maildir.py
new file mode 100644
index 0000000..fdc8a90
--- /dev/null
+++ b/cloud_mdir_sync/maildir.py
@@ -0,0 +1,206 @@
+# SPDX-License-Identifier: GPL-2.0+
+import logging
+import os
+import pickle
+import re
+import time
+
+import pyinotify
+
+from . import config, mailbox, messages, util
+
+
+def unfold_header(s):
+ # Hrm, I wonder if this is the right way to normalize a header?
+ return re.sub(r"\n[ \t]+", " ", s)
+
+
+class MailDirMailbox(mailbox.Mailbox):
+ """Local MailDir mail directory"""
+ storage_kind = "maildir"
+ cfg: config.Config
+
+ def __init__(self, directory):
+ super().__init__()
+ self.dfn = os.path.expanduser(directory)
+ for sub in ["tmp", "cur", "new"]:
+ os.makedirs(os.path.join(self.dfn, sub), mode=0o700, exist_ok=True)
+
+ async def setup_mbox(self, cfg: config.Config):
+ self.cfg = cfg
+ cfg.watch_manager.add_watch(
+ path=[
+ os.path.join(self.dfn, "cur"),
+ os.path.join(self.dfn, "new")
+ ],
+ proc_fun=self._dir_changed,
+ mask=(pyinotify.IN_ATTRIB | pyinotify.IN_MOVED_FROM
+ | pyinotify.IN_MOVED_TO
+ | pyinotify.IN_CREATE | pyinotify.IN_DELETE
+ | pyinotify.IN_ONLYDIR),
+ quiet=False)
+
+ def _dir_changed(self, notifier):
+ self.need_update = True
+ self.changed_event.set()
+
+ def _msg_to_flags(self, msg: messages.Message):
+ """Return the desired maildir flags from a message"""
+ # See https://cr.yp.to/proto/maildir.html
+ res = set()
+ if msg.flags & messages.Message.FLAG_REPLIED:
+ res.add("R")
+ if msg.flags & messages.Message.FLAG_READ:
+ res.add("S")
+ if msg.flags & messages.Message.FLAG_FLAGGED:
+ res.add("F")
+ return res
+
+ def _decode_msg_filename(self, fn):
+ """Return the base maildir filename, message flags, and maildir flag
+ letters"""
+ fn = os.path.basename(fn)
+ if ":2," not in fn:
+ return (fn, set(), 0)
+ fn, _, flags = fn.partition(":2,")
+ flags = set(flags)
+ mflags = 0
+ if "R" in flags:
+ mflags |= messages.Message.FLAG_REPLIED
+ if "S" in flags:
+ mflags |= messages.Message.FLAG_READ
+ if "F" in flags:
+ mflags |= messages.Message.FLAG_FLAGGED
+ assert ":2," not in fn
+ return (fn, flags, mflags)
+
+ def _load_message(self, msgdb: messages.MessageDB, fn, ffn):
+ sid, _, mflags = self._decode_msg_filename(fn)
+ msg = messages.Message(mailbox=self, storage_id=sid)
+ msg.flags = mflags
+ msgdb.msg_from_file(msg, ffn)
+ return msg
+
+ def _update_message_dir(self, res, msgdb: messages.MessageDB, dfn):
+ for fn in os.listdir(dfn):
+ if fn.startswith("."):
+ continue
+ msg = self._load_message(msgdb, fn, os.path.join(dfn, fn))
+ res[msg.content_hash] = msg
+
+ @util.log_progress(lambda self: f"Updating Message List for {self.dfn}",
+ lambda self: f", {len(self.messages)} msgs",
+ level=logging.DEBUG)
+ @mailbox.update_on_failure
+ async def update_message_list(self, msgdb: messages.MessageDB):
+ """Read the message list from the maildir and compute the content hashes"""
+ res: messages.CHMsgDict_Type = {}
+ st = {}
+ for sd in ["cur", "new"]:
+ st[sd] = os.stat(os.path.join(self.dfn, sd))
+ for sd in ["cur", "new"]:
+ self._update_message_dir(res, msgdb, os.path.join(self.dfn, sd))
+ for sd in ["cur", "new"]:
+ fn = os.path.join(self.dfn, sd)
+ # Retry if the dirs changed while trying to read them
+ if os.stat(fn).st_mtime != st[sd].st_mtime:
+ raise IOError(f"Maildir {fn} changed during listing")
+
+ self.messages = res
+ self.need_update = False
+ if self.cfg.trace_file is not None:
+ pickle.dump(["update_message_list", self.dfn, self.messages],
+ self.cfg.trace_file)
+
+ def _new_maildir_id(self, msg: messages.Message):
+ """Return a unique maildir filename for the given message"""
+ tm = time.clock_gettime(time.CLOCK_REALTIME)
+ base = f"{int(tm)}.M{int((tm%1)*1000*1000)}-{msg.content_hash}"
+ flags = self._msg_to_flags(msg)
+ if flags:
+ fn = os.path.join(self.dfn, "cur",
+ base + ":2," + "".join(sorted(flags)))
+ else:
+ fn = os.path.join(self.dfn, "new", base)
+ return base, fn
+
+ def _store_msg(self, msgdb: messages.MessageDB,
+ cloudmsg: messages.Message):
+ """Apply a delta from the cloud: New message from cloud"""
+ sid, fn = self._new_maildir_id(cloudmsg)
+ msg = messages.Message(mailbox=self,
+ storage_id=sid,
+ email_id=cloudmsg.email_id)
+ msg.flags = cloudmsg.flags
+ msg.content_hash = cloudmsg.content_hash
+ assert msg.content_hash is not None
+ msg.fn = fn
+
+ msgdb.write_content(cloudmsg.content_hash, msg.fn)
+
+ # It isn't clear if we need to do this, but make the local timestamps
+ # match when the message would have been received if the local MTA
+ # delivered it.
+ if cloudmsg.received_time is not None:
+ os.utime(fn, (time.time(), cloudmsg.received_time.timestamp()))
+ self.messages[msg.content_hash] = msg
+
+ def _set_flags(self, mymsg: messages.Message, cloudmsg: messages.Message):
+ """Apply a delta from the cloud: Same message in cloud, synchronize flags"""
+ if mymsg.flags == cloudmsg.flags:
+ return
+
+ cloud_flags = self._msg_to_flags(cloudmsg)
+
+ base, mflags, _ = self._decode_msg_filename(mymsg.fn)
+ nflags = (mflags - set(("R", "S", "F"))) | cloud_flags
+ if mflags == nflags:
+ return
+ if nflags:
+ nfn = os.path.join(self.dfn, "cur",
+ base + ":2," + "".join(sorted(nflags)))
+ else:
+ nfn = os.path.join(self.dfn, "new", base)
+ os.rename(mymsg.fn, nfn)
+ mymsg.fn = nfn
+ mymsg.flags = cloudmsg.flags
+
+ def _remove_msg(self, mymsg: messages.Message):
+ """Apply a delta from the cloud: Message deleted in cloud"""
+ assert mymsg.content_hash is not None
+ os.unlink(mymsg.fn)
+ del self.messages[mymsg.content_hash]
+
+ @util.log_progress(
+ lambda self: f"Applying cloud changes for {self.dfn}", lambda self:
+ f", {self.last_force_new} added, {self.last_force_rm} removed, {self.last_force_kept} same"
+ )
+ @mailbox.update_on_failure
+ def force_content(self, msgdb: messages.MessageDB, msgs: messages.CHMsgDict_Type):
+ """Force this mailbox to contain the message list msgs (from cloud), including
+ all the flags and state"""
+ self.last_force_kept = 0
+ self.last_force_new = 0
+ self.last_force_rm = 0
+
+ have = set(self.messages.keys())
+ want = set(msgs.keys())
+
+ for content_hash in want.intersection(have):
+ self.last_force_kept += 1
+ self._set_flags(self.messages[content_hash], msgs[content_hash])
+
+ for content_hash in want - have:
+ self.last_force_new += 1
+ self._store_msg(msgdb, msgs[content_hash])
+
+ for content_hash in have - want:
+ self.last_force_rm += 1
+ self._remove_msg(self.messages[content_hash])
+
+ if self.cfg.trace_file is not None:
+ pickle.dump(["force_content", self.dfn, self.messages, msgs],
+ self.cfg.trace_file)
+
+ async def merge_content(self, msgs):
+ raise RuntimeError("Cannot merge local changes into a local mailbox")