aboutsummaryrefslogtreecommitdiffstats
path: root/cloud_mdir_sync/office365.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloud_mdir_sync/office365.py')
-rw-r--r--cloud_mdir_sync/office365.py637
1 files changed, 637 insertions, 0 deletions
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
new file mode 100644
index 0000000..0afaa3f
--- /dev/null
+++ b/cloud_mdir_sync/office365.py
@@ -0,0 +1,637 @@
+# SPDX-License-Identifier: GPL-2.0+
+import asyncio
+import datetime
+import functools
+import logging
+import os
+import pickle
+import secrets
+import webbrowser
+from typing import Any, Dict, Union
+
+import aiohttp
+import requests
+
+from . import config, mailbox, messages, util
+
+
+def _retry_protect(func):
+ # Graph can return various error codes, see:
+ # https://docs.microsoft.com/en-us/graph/errors
+ @functools.wraps(func)
+ async def async_wrapper(self, *args, **kwargs):
+ while True:
+ while (self.graph_token is None or self.owa_token is None):
+ await self.authenticate()
+
+ try:
+ return await func(self, *args, **kwargs)
+ except aiohttp.ClientResponseError as e:
+ self.cfg.logger.debug(
+ f"Got HTTP Error {e.code} in {func} for {e.request_info.url!r}"
+ )
+ if (e.code == 401 or # Unauthorized
+ e.code == 403): # Forbidden
+ self.graph_token = None
+ self.owa_token = None
+ await self.authenticate()
+ continue
+ if (e.code == 503 or # Service Unavilable
+ e.code == 509 or # Bandwidth Limit Exceeded
+ e.code == 429 or # Too Many Requests
+ e.code == 504 or # Gateway Timeout
+ e.code == 200): # Success, but error JSON
+ self.cfg.logger.error(f"Graph returns {e}, delaying")
+ await asyncio.sleep(10)
+ continue
+ if (e.code == 400 or # Bad Request
+ e.code == 405 or # Method Not Allowed
+ e.code == 406 or # Not Acceptable
+ e.code == 411 or # Length Required
+ e.code == 413 or # Request Entity Too Large
+ e.code == 415 or # Unsupported Media Type
+ e.code == 422 or # Unprocessable Entity
+ e.code == 501): # Not implemented
+ self.cfg.logger.exception(f"Graph call failed {e.body!r}")
+ raise RuntimeError(f"Graph call failed {e!r}")
+
+ # Other errors we retry after resetting the mailbox
+ raise
+ except (asyncio.TimeoutError,
+ aiohttp.client_exceptions.ClientError):
+ self.cfg.logger.debug(f"Got non-HTTP Error in {func}")
+ await asyncio.sleep(10)
+ continue
+
+ return async_wrapper
+
+
+class GraphAPI(object):
+ """An OAUTH2 authenticated session to the Microsoft Graph API"""
+ graph_scopes = [
+ "https://graph.microsoft.com/User.Read",
+ "https://graph.microsoft.com/Mail.ReadWrite"
+ ]
+ graph_token = None
+ owa_scopes = ["https://outlook.office.com/mail.read"]
+ owa_token = None
+ authenticator = None
+
+ def __init__(self, cfg, domain_id, user, tenant):
+ import msal
+ self.msl_cache = msal.SerializableTokenCache()
+ auth = cfg.msgdb.get_authenticator(domain_id)
+ if auth is not None:
+ self.msl_cache.deserialize(auth)
+
+ self.domain_id = domain_id
+ self.cfg = cfg
+ self.user = user
+ self.web_app = cfg.web_app
+
+ if self.user is not None:
+ self.name = f"{self.user}//{tenant}"
+ else:
+ self.name = f"//{tenant}"
+
+ connector = aiohttp.connector.TCPConnector(limit=20, limit_per_host=5)
+ self.session = aiohttp.ClientSession(connector=connector,
+ raise_for_status=False)
+ self.headers = {}
+ self.owa_headers = {}
+
+ # Use the new format much more immutable ids, this will work better
+ # with our caching scheme. See
+ # https://docs.microsoft.com/en-us/graph/outlook-immutable-id
+ self.headers["Prefer"] = 'IdType="ImmutableId"'
+
+ # FIXME: tennant/authority
+ self.msal = msal.PublicClientApplication(
+ client_id="122f4826-adf9-465d-8e84-e9d00bc9f234",
+ authority=f"https://login.microsoftonline.com/{tenant}",
+ token_cache=self.msl_cache)
+
+ def _cached_authenticate(self):
+ accounts = self.msal.get_accounts(self.user)
+ if len(accounts) != 1:
+ return False
+
+ try:
+ if self.graph_token is None:
+ self.graph_token = self.msal.acquire_token_silent(
+ scopes=self.graph_scopes, account=accounts[0])
+ if self.graph_token is None or "access_token" not in self.graph_token:
+ self.graph_token = None
+ return False
+
+ if self.owa_token is None:
+ self.owa_token = self.msal.acquire_token_silent(
+ scopes=self.owa_scopes, account=accounts[0])
+ if self.owa_token is None or "access_token" not in self.owa_token:
+ self.owa_token = None
+ return False
+ except requests.RequestException as e:
+ self.cfg.logger.error(f"msal failed on request {e}")
+ self.graph_token = None
+ self.owa_token = None
+ return False
+
+ self.headers["Authorization"] = self.graph_token[
+ "token_type"] + " " + self.graph_token["access_token"]
+ self.owa_headers["Authorization"] = self.owa_token[
+ "token_type"] + " " + self.owa_token["access_token"]
+ self.cfg.msgdb.set_authenticator(self.domain_id,
+ self.msl_cache.serialize())
+ return True
+
+ @util.log_progress(lambda self: f"Azure AD Authentication for {self.name}")
+ async def _do_authenticate(self):
+ while not self._cached_authenticate():
+ self.graph_token = None
+ self.owa_token = None
+
+ redirect_url = self.web_app.url + "oauth2/msal"
+ state = hex(id(self)) + secrets.token_urlsafe(8)
+ url = self.msal.get_authorization_request_url(
+ scopes=self.graph_scopes + self.owa_scopes,
+ state=state,
+ login_hint=self.user,
+ redirect_uri=redirect_url)
+
+ print(
+ f"Goto {self.cfg.web_app.url} in a web browser to authenticate"
+ )
+ webbrowser.open(url)
+ q = await self.cfg.web_app.auth_redir(url, state)
+ code = q["code"]
+
+ try:
+ self.graph_token = self.msal.acquire_token_by_authorization_code(
+ code=code,
+ scopes=self.graph_scopes,
+ redirect_uri=redirect_url)
+ except requests.RequestException as e:
+ self.cfg.logger.error(f"msal failed on request {e}")
+ await asyncio.sleep(10)
+
+ async def authenticate(self):
+ """Obtain OAUTH bearer tokens for MS services. For users this has to be done
+ interactively via the browser. A cache is used for tokens that have
+ not expired and they can be refreshed non-interactively into active
+ tokens within some limited time period."""
+ # Ensure we only ever have one authentication open at once. Other
+ # threads will all block here on the single authenticator.
+ if self.authenticator is None:
+ self.authenticator = asyncio.create_task(self._do_authenticate())
+ auth = self.authenticator
+ await auth
+ if self.authenticator is auth:
+ self.authenticator = None
+
+ async def _check_op(self, op):
+ if op.status >= 200 and op.status <= 299:
+ return
+ e = aiohttp.ClientResponseError(op.request_info,
+ op.history,
+ code=op.status,
+ message=op.reason,
+ headers=op.headers)
+ try:
+ e.body = await op.json()
+ except:
+ pass
+ raise e
+
+ async def _check_json(self, op):
+ """Check an operation for errors and convert errors to exceptions. Graph can
+ return an HTTP failure code, or (rarely) a JSON error message and a 200 success."""
+ await self._check_op(op)
+
+ res = await op.json()
+ if "error" in res:
+ e = aiohttp.ClientResponseError(op.request_info,
+ op.history,
+ code=op.status,
+ message=op.reason,
+ headers=op.headers)
+ e.body = res
+ raise e
+ return res
+
+ @_retry_protect
+ async def get_to_file(self, outf, ver, path, params=None, dos2unix=False):
+ """Copy the response of a GET operation into outf"""
+ async with self.session.get(f"https://graph.microsoft.com/{ver}{path}",
+ headers=self.headers,
+ params=params) as op:
+ await self._check_op(op)
+ carry = b""
+ async for data in op.content.iter_any():
+ if dos2unix:
+ if carry:
+ data = carry + data
+ data = data.replace(b"\r\n", b"\n")
+ if data[-1] == b'\r':
+ carry = data[-1:len(data)]
+ data = data[:-1]
+ else:
+ carry = b""
+ outf.write(data)
+ if dos2unix and carry:
+ outf.write(carry)
+
+ @_retry_protect
+ async def get_json(self, ver, path, params=None):
+ """Return the JSON dictionary from the GET operation"""
+ async with self.session.get(f"https://graph.microsoft.com/{ver}{path}",
+ headers=self.headers,
+ params=params) as op:
+ return await self._check_json(op)
+
+ @_retry_protect
+ async def post_json(self, ver, path, body, params=None):
+ """Return the JSON dictionary from the POST operation"""
+ async with self.session.post(
+ f"https://graph.microsoft.com/{ver}{path}",
+ headers=self.headers,
+ json=body,
+ params=params) as op:
+ return await self._check_json(op)
+
+ @_retry_protect
+ async def patch_json(self, ver, path, body, params=None):
+ """Return the JSON dictionary from the PATCH operation"""
+ async with self.session.patch(
+ f"https://graph.microsoft.com/{ver}{path}",
+ headers=self.headers,
+ json=body,
+ params=params) as op:
+ return await self._check_json(op)
+
+ @_retry_protect
+ async def delete(self, ver, path):
+ """Issue a delete. For Messages delete doesn't put it in the Deleted Items
+ folder, it is just deleted."""
+ async with self.session.delete(
+ f"https://graph.microsoft.com/{ver}{path}",
+ headers=self.headers) as op:
+ await self._check_op(op)
+ async for _ in op.content.iter_any():
+ pass
+
+ async def get_json_paged(self, ver, path, params=None):
+ """Return an iterator that iterates over every JSON element in a paged
+ result"""
+ # See https://docs.microsoft.com/en-us/graph/paging
+ resp = await self.get_json(ver, path, params)
+ while True:
+ for I in resp["value"]:
+ yield I
+ uri = resp.get("@odata.nextLink")
+ if uri is None:
+ break
+ async with self.session.get(uri, headers=self.headers) as op:
+ resp = await self._check_json(op)
+
+ @_retry_protect
+ async def owa_subscribe(self, resource, changetype):
+ """Graph does not support streaming subscriptions, so we use the OWA interface
+ instead. See
+
+ https://docs.microsoft.com/en-us/previous-versions/office/office-365-api/api/beta/notify-streaming-rest-operations"""
+ body = {
+ "@odata.type": "#Microsoft.OutlookServices.StreamingSubscription",
+ "Resource": resource,
+ "ChangeType": changetype
+ }
+
+ async with self.session.post(
+ f"https://outlook.office.com/api/beta/me/subscriptions",
+ headers=self.owa_headers,
+ json=body) as op:
+ return await self._check_json(op)
+
+ async def owa_get_notifications(self, subscription_id):
+ """Return the notifications as an async iterator"""
+ body = {
+ "ConnectionTimeoutInMinutes": 2,
+ "KeepAliveNotificationIntervalInSeconds": 10,
+ "SubscriptionIds": [subscription_id]
+ }
+ timeout = aiohttp.ClientTimeout(sock_read=20)
+ # FIXME: fine tune timeouts https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts
+ # FIXME: retry protect for this
+ async with self.session.post(
+ f"https://outlook.office.com/api/beta/Me/GetNotifications",
+ headers=self.owa_headers,
+ json=body,
+ timeout=timeout) as op:
+ await self._check_op(op)
+
+ # There seems to be no relation to http chunks and json fragments,
+ # other than the last chunk before sleeping terminates all the
+ # jsons. I guess this is supposed to be parsed using a fancy
+ # parser. FIXME: We do need to parse this to exclude the keep alives
+ first = True
+ buf = b""
+ async for data, chunk_end in op.content.iter_chunks():
+ buf += data
+ if not chunk_end:
+ continue
+
+ # Last, but probably not reliably so
+ if buf == b']}':
+ return
+
+ if not first:
+ yield buf
+ else:
+ first = False
+ buf = b""
+
+ async def close(self):
+ await self.session.close()
+
+
+class O365Mailbox(mailbox.Mailbox):
+ """Cloud Office365 mailbox using the Microsoft Graph RESET API for data access"""
+ storage_kind = "o365_v0"
+ loop: asyncio.AbstractEventLoop
+ timer = None
+ use_owa_subscribe = True
+ cfg: config.Config
+ graph: GraphAPI
+
+ def __init__(self, mailbox, user=None, tenant="common"):
+ super().__init__()
+ self.mailbox = mailbox
+ self.tenant = tenant
+ self.user = user
+
+ async def setup_mbox(self, cfg):
+ """Setup access to the authenticated API domain for this endpoint"""
+ self.cfg = cfg
+ self.loop = cfg.loop
+ did = f"o365-{self.user}-{self.tenant}"
+ self.graph = cfg.domains.get(did)
+ if self.graph is None:
+ self.graph = GraphAPI(cfg, did, self.user, self.tenant)
+ cfg.domains[did] = self.graph
+
+ self.name = f"{self.graph.name}:{self.mailbox}"
+
+ json = await self.graph.get_json(
+ "v1.0",
+ f"/me/mailFolders",
+ params={"$filter": f"displayName eq '{self.mailbox}'"})
+ if len(json["value"]) != 1:
+ raise ValueError(f"Invalid mailbox name {self.mailbox!r}")
+ self.json = json["value"][0]
+
+ self.mailbox_id = self.json["id"]
+ if self.use_owa_subscribe:
+ asyncio.create_task(self._monitor_changes())
+
+ @mailbox.update_on_failure
+ async def _fetch_message(self, msg, msgdb):
+ with util.log_progress_ctx(logging.DEBUG,
+ f"Downloading {msg.email_id}",
+ lambda msg: f" {util.sizeof_fmt(msg.size)}",
+ msg), msgdb.get_temp() as F:
+ # For some reason this returns a message with dos line
+ # endings. Really weird.
+ await self.graph.get_to_file(
+ F,
+ "v1.0",
+ f"/me/messages/{msg.storage_id}/$value",
+ dos2unix=True)
+ msg.size = F.tell()
+ msg.content_hash = msgdb.store_hashed_msg(msg, F)
+
+ def _json_to_flags(self, jmsg):
+ """This is was remarkably difficult to find out, and seems completely
+ undocumented."""
+ flags = 0
+ # First class properties are easy
+ if bool(jmsg["isRead"]):
+ flags |= messages.Message.FLAG_READ
+ if jmsg["flag"]["flagStatus"] == "flagged":
+ flags |= messages.Message.FLAG_FLAGGED
+
+ # 'Replied' is not a concept in MAPI, at least not a consistent concept.
+ for prop in jmsg.get("singleValueExtendedProperties", []):
+ if prop["id"] == "Integer 0x1080":
+ # Closely matches OWA and the Outlook App
+ # PidTagIconIndex
+ # https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/eeca3a02-14e7-419b-8918-986275a2fac0
+ val = int(prop["value"])
+ if (val == 0x105 or # Replied mail
+ val == 0x106): # Forwarded mail
+ flags |= messages.Message.FLAG_REPLIED
+ elif prop["id"] == "Integer 0x1081":
+ # Sort of matches OWA and the Outlook App
+ # PidTagLastVerbExecuted
+ # https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/4ec55eac-14b3-4dfa-adf3-340c0dcccd44
+ val = int(prop["value"])
+ if (val == 102 or # NOTEIVERB_REPLYTOSENDER
+ val == 103 or # NOTEIVERB_REPLYTOALL
+ val == 104): # NOTEIVERB_FORWARD
+ flags |= messages.Message.FLAG_REPLIED
+ elif prop["id"] == "Integer 0xe17":
+ # This is what IMAP uses but we can't set it
+ # PidTagMessageStatus
+ # https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/5d00fe2b-9548-4953-97ba-89b1aa0ba5ac
+ if int(prop["value"]) & 0x200: # MSGSTATUS_ANSWERED
+ flags |= messages.Message.FLAG_REPLIED
+ else:
+ util.pj(prop)
+ return flags
+
+ @util.log_progress(lambda self: f"Updating Message List for {self.name}",
+ lambda self: f", {len(self.messages)} msgs")
+ @mailbox.update_on_failure
+ async def update_message_list(self, msgdb):
+ """Retrieve the list of all messages and store all the message content in the
+ content_hash message database"""
+ todo = []
+ msgs = []
+
+ async for jmsg in self.graph.get_json_paged(
+ "v1.0",
+ f"/me/mailFolders/{self.mailbox_id}/messages",
+ params=
+ {
+ "$select":
+ "internetMessageId,isRead,Flag,receivedDateTime,singleValueExtendedProperties",
+ "$expand":
+ "SingleValueExtendedProperties($filter=(id eq 'Integer 0xe17') or"
+ " (id eq 'Integer 0x1080'))",
+ }):
+ msg = messages.Message(mailbox=self,
+ storage_id=jmsg["id"],
+ email_id=jmsg["internetMessageId"])
+ msg.received_time = datetime.datetime.strptime(
+ jmsg["receivedDateTime"], '%Y-%m-%dT%H:%M:%SZ')
+ msg.flags = self._json_to_flags(jmsg)
+
+ if not msgdb.have_content(msg):
+ todo.append(
+ asyncio.create_task(self._fetch_message(msg, msgdb)))
+
+ msgs.append(msg)
+ await asyncio.gather(*todo)
+
+ res = {}
+ for msg in msgs:
+ # Something went wrong?
+ if msg.content_hash is not None:
+ res[msg.content_hash] = msg
+ self.messages = res
+ self.need_update = False
+ if not self.use_owa_subscribe:
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
+ self.timer = self.loop.call_later(60, self._timer)
+ if self.cfg.trace_file is not None:
+ pickle.dump(["0365_update_message_list", self.name, self.messages],
+ self.cfg.trace_file)
+
+ async def _monitor_changes(self):
+ """Keep a persistent PUT that returns data when there are changes."""
+ r = None
+ while True:
+ if r is None:
+ r = await self.graph.owa_subscribe(
+ f"https://outlook.office.com/api/beta/me/mailfolders('{self.mailbox_id}')/Messages",
+ "Created,Updated,Deleted")
+ try:
+ # This should use a single notification channel per graph,
+ # however until we can parse the incremental json it can't be
+ # done.
+ async for data in self.graph.owa_get_notifications(r["Id"]):
+ # hacky hacky
+ if (data ==
+ b'{"@odata.type":"#Microsoft.OutlookServices.KeepAliveNotification","Status":"Ok"}'
+ or data ==
+ b',{"@odata.type":"#Microsoft.OutlookServices.KeepAliveNotification","Status":"Ok"}'
+ ):
+ continue
+ self.need_update = True
+ self.changed_event.set()
+ except (asyncio.TimeoutError,
+ aiohttp.client_exceptions.ClientError):
+ r = None
+ continue
+
+ def _timer(self):
+ self.need_update = True
+ self.changed_event.set()
+
+ def force_content(self, msgdb, msgs):
+ raise RuntimeError("Cannot move messages into the Cloud")
+
+ @util.log_progress(lambda self: f"Uploading local changes for {self.name}",
+ lambda self: f", {self.last_merge_len} changes ")
+ @mailbox.update_on_failure
+ async def merge_content(self, msgs):
+ # There is a batching API for this kind of stuff as well:
+ # https://docs.microsoft.com/en-us/graph/json-batching
+ self.last_merge_len = 0
+ todo = []
+ if self.cfg.trace_file is not None:
+ pickle.dump(["merge_content", self.name, self.messages, msgs],
+ self.cfg.trace_file)
+ for ch, mpair in msgs.items():
+ # lmsg is the message in the local mailbox
+ # cmsg is the current cloud message in this class
+ # old_cmsg is the original cloud message from the last sync
+ lmsg, old_cmsg = mpair
+ cmsg = self.messages.get(ch)
+
+ # Cloud message was deleted, cloud takes priority
+ if cmsg is None:
+ continue
+ if lmsg is None:
+ # Debugging that the message really is to be deleted
+ assert os.stat(os.path.join(self.cfg.msgdb.hashes_dir,
+ ch)).st_nlink == 1
+ # Delete cloud message
+ todo.append(
+ self.graph.post_json(
+ "v1.0",
+ f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move",
+ body={"destinationId": "deleteditems"}))
+ # FIXME: This should be after the operation completes?
+ del self.messages[ch]
+ continue
+
+ if (lmsg.flags == old_cmsg.flags or lmsg.flags == cmsg.flags):
+ continue
+
+ cloud_flags = cmsg.flags ^ old_cmsg.flags
+ flag_mask = messages.Message.ALL_FLAGS ^ cloud_flags
+ nflags = (lmsg.flags & flag_mask) | (cmsg.flags & cloud_flags)
+ modified_flags = nflags ^ cmsg.flags
+
+ # FIXME: https://docs.microsoft.com/en-us/graph/best-practices-concept#getting-minimal-responses
+ # FIXME: Does the ID change?
+ patch: Dict[str, Any] = {}
+ if modified_flags & messages.Message.FLAG_READ:
+ patch["isRead"] = bool(nflags & messages.Message.FLAG_READ)
+ if modified_flags & messages.Message.FLAG_FLAGGED:
+ patch["flag"] = {
+ "flagStatus":
+ "flagged" if nflags
+ & messages.Message.FLAG_FLAGGED else "notFlagged"
+ }
+ if modified_flags & messages.Message.FLAG_REPLIED:
+ # This can only be described as an undocumented disaster.
+ # Different clients set different things. The Icon shows up in
+ # OWS and the Mobile app. The MessageStatus shows up in
+ # IMAP. IMAP sets the MessageStatus but otherwise does not
+ # interact with the other two. We can't seem to set
+ # MessageStatus over REST because it needs RopSetMessageStatus.
+ if nflags & messages.Message.FLAG_REPLIED:
+ now = datetime.datetime.utcnow().strftime(
+ "%Y-%m-%dT%H:%M:%SZ")
+ patch["singleValueExtendedProperties"] = [
+ # PidTagLastVerbExecuted
+ {
+ "id": "Integer 0x1081",
+ "value": "103"
+ },
+ # PidTagLastVerbExecutionTime
+ {
+ "id": "SystemTime 0x1082",
+ "value": now
+ },
+ # PidTagIconIndex
+ {
+ "id": "Integer 0x1080",
+ "value": "261"
+ },
+ ]
+ else:
+ # Rarely does anything undo a replied flag, but it is
+ # useful for testing.
+ patch["singleValueExtendedProperties"] = [
+ {
+ "id":
+ "Integer 0x1080", # PidTagIconIndex
+ "value":
+ "256" if nflags
+ & messages.Message.FLAG_READ else "-1"
+ },
+ ]
+
+ if patch:
+ todo.append(
+ self.graph.patch_json(
+ "v1.0",
+ f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}",
+ body=patch))
+ cmsg.flags = nflags
+
+ await asyncio.gather(*todo)
+ self.last_merge_len = len(todo)