aboutsummaryrefslogtreecommitdiffstats
path: root/cloud_mdir_sync/office365.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloud_mdir_sync/office365.py')
-rw-r--r--cloud_mdir_sync/office365.py120
1 files changed, 97 insertions, 23 deletions
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index 9c8ee62..7d6fd86 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -15,6 +15,11 @@ import requests
from . import config, mailbox, messages, oauth, util
from .util import asyncio_complete
+MAX_CONCURRENT_OPERATIONS = 5
+# Graph is completely crazy, it can only accept 20 requests in a batch,
+# and more than some concurrent modify requests seems to hit 429 retry.
+# So run modifies 3 at a time sequentially. Bleck.
+MAX_BATCH_SIZE = 3
def _retry_protect(func):
# Graph can return various error codes, see:
@@ -107,7 +112,9 @@ class GraphAPI(oauth.Account):
if auth is not None:
self.msl_cache.deserialize(auth)
- connector = aiohttp.connector.TCPConnector(limit=20, limit_per_host=5)
+ connector = aiohttp.connector.TCPConnector(
+ limit=MAX_CONCURRENT_OPERATIONS,
+ limit_per_host=MAX_CONCURRENT_OPERATIONS)
self.session = aiohttp.ClientSession(connector=connector,
raise_for_status=False)
@@ -268,6 +275,22 @@ class GraphAPI(oauth.Account):
params=params) as op:
return await self._check_json(op)
+ def batch_post_json(self, batch, ver, path, body):
+ """Like post_json but appends the action to a batch. Note the ver of
+ all actions in the batch must be the same"""
+ assert(ver == "v1.0")
+ requests = batch["requests"]
+ req = {
+ "id": f"{len(requests)}",
+ "method": "POST",
+ "url": path,
+ "body": body,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ }
+ requests.append(req)
+
@_retry_protect
async def patch_json(self, ver, path, body, params=None):
"""Return the JSON dictionary from the PATCH operation"""
@@ -278,6 +301,22 @@ class GraphAPI(oauth.Account):
params=params) as op:
return await self._check_json(op)
+ def batch_patch_json(self, batch, ver, path, body):
+ """Like patch_json but appends the action to a batch. Note the ver of
+ all actions in the batch must be the same"""
+ assert(ver == "v1.0")
+ requests = batch["requests"]
+ req = {
+ "id": f"{len(requests)}",
+ "method": "PATCH",
+ "url": path,
+ "body": body,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ }
+ requests.append(req)
+
@_retry_protect
async def delete(self, ver, path):
"""Issue a delete. For Messages delete doesn't put it in the Deleted Items
@@ -303,6 +342,40 @@ class GraphAPI(oauth.Account):
async with self.session.get(uri, headers=self.headers) as op:
resp = await self._check_json(op)
+ async def _execute_batch(self, batch):
+ resp = await self.post_json("v1.0", "/$batch", batch)
+ to_retry = set()
+ for rep in resp["responses"]:
+ status = int(rep["status"])
+ if status < 200 or status >= 300 or "error" in rep["body"]:
+ to_retry.add(rep["id"])
+ self.cfg.logger.debug(f"Batched request failed, retrying: {rep}")
+ if not to_retry:
+ return
+
+ # Otherwise issue the request natively and let the normal
+ # mechanisms sort it out.
+ for req in batch["requests"]:
+ if req["id"] not in to_retry:
+ continue
+ to_retry.remove(req["id"])
+
+ if req["method"] == "POST":
+ await self.post_json("v1.0", req["url"], req["body"])
+ elif req["method"] == "PATCH":
+ await self.patch_json("v1.0", req["url"], req["body"])
+ else:
+ raise ValueError(f"Incorrect batch {req}")
+ assert not to_retry
+
+ async def execute_batch(self, batch):
+ """Execute a batch sequence created by batch_* functions"""
+ # See https://docs.microsoft.com/en-us/graph/json-batching
+ all_requests = batch["requests"]
+ while all_requests:
+ await self._execute_batch({"requests": all_requests[:MAX_BATCH_SIZE]})
+ del all_requests[:MAX_BATCH_SIZE]
+
@_retry_protect
async def owa_subscribe(self, resource, changetype):
"""Graph does not support streaming subscriptions, so we use the OWA interface
@@ -561,7 +634,7 @@ class O365Mailbox(mailbox.Mailbox):
def force_content(self, msgs):
raise RuntimeError("Cannot move messages into the Cloud")
- def _update_msg_flags(self, cmsg: messages.Message,
+ def _update_msg_flags(self, batch, cmsg: messages.Message,
old_cmsg_flags: int, lmsg: messages.Message):
lflags = lmsg.flags & (messages.Message.ALL_FLAGS
^ messages.Message.FLAG_DELETED)
@@ -625,7 +698,8 @@ class O365Mailbox(mailbox.Mailbox):
if not patch:
return None
cmsg.flags = nflags
- return self.graph.patch_json(
+ self.graph.batch_patch_json(
+ batch,
"v1.0",
f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}",
body=patch)
@@ -634,11 +708,12 @@ class O365Mailbox(mailbox.Mailbox):
lambda self: f", {self.last_merge_len} changes ")
@mailbox.update_on_failure
async def merge_content(self, msgs: messages.CHMsgMappingDict_Type):
- # There is a batching API for this kind of stuff as well:
- # https://docs.microsoft.com/en-us/graph/json-batching
+ # Note that the mutation operations return a full copy of the message,
+ # which is wasteful and we don't need. Couldn't find a way to prevent
+ # that.
self.last_merge_len = 0
- todo_flags = []
- todo_del = []
+ todo_flags = {"requests": []}
+ todo_del = {"requests": []}
if self.cfg.trace_file is not None:
pickle.dump(["merge_content", self.name, self.messages, msgs],
self.cfg.trace_file)
@@ -652,9 +727,7 @@ class O365Mailbox(mailbox.Mailbox):
# Update flags
if cmsg is not None and old_cmsg is not None and lmsg is not None:
- patch = self._update_msg_flags(cmsg, old_cmsg.flags, lmsg)
- if patch:
- todo_flags.append(patch)
+ self._update_msg_flags(todo_flags, cmsg, old_cmsg.flags, lmsg)
# Debugging that the message really is to be deleted
if cmsg is not None and lmsg is None:
@@ -664,19 +737,20 @@ class O365Mailbox(mailbox.Mailbox):
if cmsg is not None and (lmsg is None or lmsg.flags
& messages.Message.FLAG_DELETED):
# Delete cloud message
- todo_del.append(
- self.graph.post_json(
- "v1.0",
- f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move",
- body={
- "destinationId":
- "deleteditems"
- if self.delete_action == "delete" else "archive"
- }))
+ self.graph.batch_post_json(
+ todo_del,
+ "v1.0",
+ f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move",
+ body={
+ "destinationId":
+ "deleteditems"
+ if self.delete_action == "delete" else "archive"
+ })
del self.messages[ch]
- await asyncio_complete(*todo_flags)
+ ops = len(todo_flags["requests"]) + len(todo_del["requests"])
+ await asyncio_complete(self.graph.execute_batch(todo_flags))
# Delete must be temporally after move as move will change the mailbox
- # id.
- await asyncio_complete(*todo_del)
- self.last_merge_len = len(todo_flags) + len(todo_del)
+ # id. FIXME: We could do this with batch ordering
+ await asyncio_complete(self.graph.execute_batch(todo_del))
+ self.last_merge_len = ops