diff options
-rw-r--r-- | cloud_mdir_sync/office365.py | 120 |
1 files changed, 97 insertions, 23 deletions
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py index 9c8ee62..7d6fd86 100644 --- a/cloud_mdir_sync/office365.py +++ b/cloud_mdir_sync/office365.py @@ -15,6 +15,11 @@ import requests from . import config, mailbox, messages, oauth, util from .util import asyncio_complete +MAX_CONCURRENT_OPERATIONS = 5 +# Graph is completely crazy, it can only accept 20 requests in a batch, +# and more than some concurrent modify requests seems to hit 429 retry. +# So run modifies 3 at a time sequentially. Bleck. +MAX_BATCH_SIZE = 3 def _retry_protect(func): # Graph can return various error codes, see: @@ -107,7 +112,9 @@ class GraphAPI(oauth.Account): if auth is not None: self.msl_cache.deserialize(auth) - connector = aiohttp.connector.TCPConnector(limit=20, limit_per_host=5) + connector = aiohttp.connector.TCPConnector( + limit=MAX_CONCURRENT_OPERATIONS, + limit_per_host=MAX_CONCURRENT_OPERATIONS) self.session = aiohttp.ClientSession(connector=connector, raise_for_status=False) @@ -268,6 +275,22 @@ class GraphAPI(oauth.Account): params=params) as op: return await self._check_json(op) + def batch_post_json(self, batch, ver, path, body): + """Like post_json but appends the action to a batch. Note the ver of + all actions in the batch must be the same""" + assert(ver == "v1.0") + requests = batch["requests"] + req = { + "id": f"{len(requests)}", + "method": "POST", + "url": path, + "body": body, + "headers": { + "Content-Type": "application/json" + }, + } + requests.append(req) + @_retry_protect async def patch_json(self, ver, path, body, params=None): """Return the JSON dictionary from the PATCH operation""" @@ -278,6 +301,22 @@ class GraphAPI(oauth.Account): params=params) as op: return await self._check_json(op) + def batch_patch_json(self, batch, ver, path, body): + """Like patch_json but appends the action to a batch. Note the ver of + all actions in the batch must be the same""" + assert(ver == "v1.0") + requests = batch["requests"] + req = { + "id": f"{len(requests)}", + "method": "PATCH", + "url": path, + "body": body, + "headers": { + "Content-Type": "application/json" + }, + } + requests.append(req) + @_retry_protect async def delete(self, ver, path): """Issue a delete. For Messages delete doesn't put it in the Deleted Items @@ -303,6 +342,40 @@ class GraphAPI(oauth.Account): async with self.session.get(uri, headers=self.headers) as op: resp = await self._check_json(op) + async def _execute_batch(self, batch): + resp = await self.post_json("v1.0", "/$batch", batch) + to_retry = set() + for rep in resp["responses"]: + status = int(rep["status"]) + if status < 200 or status >= 300 or "error" in rep["body"]: + to_retry.add(rep["id"]) + self.cfg.logger.debug(f"Batched request failed, retrying: {rep}") + if not to_retry: + return + + # Otherwise issue the request natively and let the normal + # mechanisms sort it out. + for req in batch["requests"]: + if req["id"] not in to_retry: + continue + to_retry.remove(req["id"]) + + if req["method"] == "POST": + await self.post_json("v1.0", req["url"], req["body"]) + elif req["method"] == "PATCH": + await self.patch_json("v1.0", req["url"], req["body"]) + else: + raise ValueError(f"Incorrect batch {req}") + assert not to_retry + + async def execute_batch(self, batch): + """Execute a batch sequence created by batch_* functions""" + # See https://docs.microsoft.com/en-us/graph/json-batching + all_requests = batch["requests"] + while all_requests: + await self._execute_batch({"requests": all_requests[:MAX_BATCH_SIZE]}) + del all_requests[:MAX_BATCH_SIZE] + @_retry_protect async def owa_subscribe(self, resource, changetype): """Graph does not support streaming subscriptions, so we use the OWA interface @@ -561,7 +634,7 @@ class O365Mailbox(mailbox.Mailbox): def force_content(self, msgs): raise RuntimeError("Cannot move messages into the Cloud") - def _update_msg_flags(self, cmsg: messages.Message, + def _update_msg_flags(self, batch, cmsg: messages.Message, old_cmsg_flags: int, lmsg: messages.Message): lflags = lmsg.flags & (messages.Message.ALL_FLAGS ^ messages.Message.FLAG_DELETED) @@ -625,7 +698,8 @@ class O365Mailbox(mailbox.Mailbox): if not patch: return None cmsg.flags = nflags - return self.graph.patch_json( + self.graph.batch_patch_json( + batch, "v1.0", f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}", body=patch) @@ -634,11 +708,12 @@ class O365Mailbox(mailbox.Mailbox): lambda self: f", {self.last_merge_len} changes ") @mailbox.update_on_failure async def merge_content(self, msgs: messages.CHMsgMappingDict_Type): - # There is a batching API for this kind of stuff as well: - # https://docs.microsoft.com/en-us/graph/json-batching + # Note that the mutation operations return a full copy of the message, + # which is wasteful and we don't need. Couldn't find a way to prevent + # that. self.last_merge_len = 0 - todo_flags = [] - todo_del = [] + todo_flags = {"requests": []} + todo_del = {"requests": []} if self.cfg.trace_file is not None: pickle.dump(["merge_content", self.name, self.messages, msgs], self.cfg.trace_file) @@ -652,9 +727,7 @@ class O365Mailbox(mailbox.Mailbox): # Update flags if cmsg is not None and old_cmsg is not None and lmsg is not None: - patch = self._update_msg_flags(cmsg, old_cmsg.flags, lmsg) - if patch: - todo_flags.append(patch) + self._update_msg_flags(todo_flags, cmsg, old_cmsg.flags, lmsg) # Debugging that the message really is to be deleted if cmsg is not None and lmsg is None: @@ -664,19 +737,20 @@ class O365Mailbox(mailbox.Mailbox): if cmsg is not None and (lmsg is None or lmsg.flags & messages.Message.FLAG_DELETED): # Delete cloud message - todo_del.append( - self.graph.post_json( - "v1.0", - f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move", - body={ - "destinationId": - "deleteditems" - if self.delete_action == "delete" else "archive" - })) + self.graph.batch_post_json( + todo_del, + "v1.0", + f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move", + body={ + "destinationId": + "deleteditems" + if self.delete_action == "delete" else "archive" + }) del self.messages[ch] - await asyncio_complete(*todo_flags) + ops = len(todo_flags["requests"]) + len(todo_del["requests"]) + await asyncio_complete(self.graph.execute_batch(todo_flags)) # Delete must be temporally after move as move will change the mailbox - # id. - await asyncio_complete(*todo_del) - self.last_merge_len = len(todo_flags) + len(todo_del) + # id. FIXME: We could do this with batch ordering + await asyncio_complete(self.graph.execute_batch(todo_del)) + self.last_merge_len = ops |