diff options
author | Jason Gunthorpe <jgg@mellanox.com> | 2020-06-12 16:36:38 -0300 |
---|---|---|
committer | Jason Gunthorpe <jgg@nvidia.com> | 2020-06-22 20:24:18 -0300 |
commit | c87f4221dc92c18a9d193ddeae60218f83282b96 (patch) | |
tree | 35dd099392530840097611cea4b41d07ecef49bc | |
parent | e241f83acb481c08881050d8b07717be7784bcc3 (diff) | |
download | cloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.tar.gz cloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.tar.bz2 cloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.zip |
O365: Use batching during modification
For whatever reason this week the concurrency limit in Graph went way down
for modify, this event impacts the batches.
It turns out graph is crazy. You can put up to 20 requests in a batch, but
during execution they can fail with throttling and need retry.
Experiments suggest 3 is the right number to avoid throttling on modify,
for some reason. Documentation seems wrong, and probably something broke
recently.
Implement batching, run the batch break up sequentially, and limit the
batches to 3. Use batching only for modify operations. GET can still use a
concurrency limit of 5.
Signed-off-by: Jason Gunthorpe <jgg@mellanox.com>
-rw-r--r-- | cloud_mdir_sync/office365.py | 120 |
1 files changed, 97 insertions, 23 deletions
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py index 9c8ee62..7d6fd86 100644 --- a/cloud_mdir_sync/office365.py +++ b/cloud_mdir_sync/office365.py @@ -15,6 +15,11 @@ import requests from . import config, mailbox, messages, oauth, util from .util import asyncio_complete +MAX_CONCURRENT_OPERATIONS = 5 +# Graph is completely crazy, it can only accept 20 requests in a batch, +# and more than some concurrent modify requests seems to hit 429 retry. +# So run modifies 3 at a time sequentially. Bleck. +MAX_BATCH_SIZE = 3 def _retry_protect(func): # Graph can return various error codes, see: @@ -107,7 +112,9 @@ class GraphAPI(oauth.Account): if auth is not None: self.msl_cache.deserialize(auth) - connector = aiohttp.connector.TCPConnector(limit=20, limit_per_host=5) + connector = aiohttp.connector.TCPConnector( + limit=MAX_CONCURRENT_OPERATIONS, + limit_per_host=MAX_CONCURRENT_OPERATIONS) self.session = aiohttp.ClientSession(connector=connector, raise_for_status=False) @@ -268,6 +275,22 @@ class GraphAPI(oauth.Account): params=params) as op: return await self._check_json(op) + def batch_post_json(self, batch, ver, path, body): + """Like post_json but appends the action to a batch. Note the ver of + all actions in the batch must be the same""" + assert(ver == "v1.0") + requests = batch["requests"] + req = { + "id": f"{len(requests)}", + "method": "POST", + "url": path, + "body": body, + "headers": { + "Content-Type": "application/json" + }, + } + requests.append(req) + @_retry_protect async def patch_json(self, ver, path, body, params=None): """Return the JSON dictionary from the PATCH operation""" @@ -278,6 +301,22 @@ class GraphAPI(oauth.Account): params=params) as op: return await self._check_json(op) + def batch_patch_json(self, batch, ver, path, body): + """Like patch_json but appends the action to a batch. Note the ver of + all actions in the batch must be the same""" + assert(ver == "v1.0") + requests = batch["requests"] + req = { + "id": f"{len(requests)}", + "method": "PATCH", + "url": path, + "body": body, + "headers": { + "Content-Type": "application/json" + }, + } + requests.append(req) + @_retry_protect async def delete(self, ver, path): """Issue a delete. For Messages delete doesn't put it in the Deleted Items @@ -303,6 +342,40 @@ class GraphAPI(oauth.Account): async with self.session.get(uri, headers=self.headers) as op: resp = await self._check_json(op) + async def _execute_batch(self, batch): + resp = await self.post_json("v1.0", "/$batch", batch) + to_retry = set() + for rep in resp["responses"]: + status = int(rep["status"]) + if status < 200 or status >= 300 or "error" in rep["body"]: + to_retry.add(rep["id"]) + self.cfg.logger.debug(f"Batched request failed, retrying: {rep}") + if not to_retry: + return + + # Otherwise issue the request natively and let the normal + # mechanisms sort it out. + for req in batch["requests"]: + if req["id"] not in to_retry: + continue + to_retry.remove(req["id"]) + + if req["method"] == "POST": + await self.post_json("v1.0", req["url"], req["body"]) + elif req["method"] == "PATCH": + await self.patch_json("v1.0", req["url"], req["body"]) + else: + raise ValueError(f"Incorrect batch {req}") + assert not to_retry + + async def execute_batch(self, batch): + """Execute a batch sequence created by batch_* functions""" + # See https://docs.microsoft.com/en-us/graph/json-batching + all_requests = batch["requests"] + while all_requests: + await self._execute_batch({"requests": all_requests[:MAX_BATCH_SIZE]}) + del all_requests[:MAX_BATCH_SIZE] + @_retry_protect async def owa_subscribe(self, resource, changetype): """Graph does not support streaming subscriptions, so we use the OWA interface @@ -561,7 +634,7 @@ class O365Mailbox(mailbox.Mailbox): def force_content(self, msgs): raise RuntimeError("Cannot move messages into the Cloud") - def _update_msg_flags(self, cmsg: messages.Message, + def _update_msg_flags(self, batch, cmsg: messages.Message, old_cmsg_flags: int, lmsg: messages.Message): lflags = lmsg.flags & (messages.Message.ALL_FLAGS ^ messages.Message.FLAG_DELETED) @@ -625,7 +698,8 @@ class O365Mailbox(mailbox.Mailbox): if not patch: return None cmsg.flags = nflags - return self.graph.patch_json( + self.graph.batch_patch_json( + batch, "v1.0", f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}", body=patch) @@ -634,11 +708,12 @@ class O365Mailbox(mailbox.Mailbox): lambda self: f", {self.last_merge_len} changes ") @mailbox.update_on_failure async def merge_content(self, msgs: messages.CHMsgMappingDict_Type): - # There is a batching API for this kind of stuff as well: - # https://docs.microsoft.com/en-us/graph/json-batching + # Note that the mutation operations return a full copy of the message, + # which is wasteful and we don't need. Couldn't find a way to prevent + # that. self.last_merge_len = 0 - todo_flags = [] - todo_del = [] + todo_flags = {"requests": []} + todo_del = {"requests": []} if self.cfg.trace_file is not None: pickle.dump(["merge_content", self.name, self.messages, msgs], self.cfg.trace_file) @@ -652,9 +727,7 @@ class O365Mailbox(mailbox.Mailbox): # Update flags if cmsg is not None and old_cmsg is not None and lmsg is not None: - patch = self._update_msg_flags(cmsg, old_cmsg.flags, lmsg) - if patch: - todo_flags.append(patch) + self._update_msg_flags(todo_flags, cmsg, old_cmsg.flags, lmsg) # Debugging that the message really is to be deleted if cmsg is not None and lmsg is None: @@ -664,19 +737,20 @@ class O365Mailbox(mailbox.Mailbox): if cmsg is not None and (lmsg is None or lmsg.flags & messages.Message.FLAG_DELETED): # Delete cloud message - todo_del.append( - self.graph.post_json( - "v1.0", - f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move", - body={ - "destinationId": - "deleteditems" - if self.delete_action == "delete" else "archive" - })) + self.graph.batch_post_json( + todo_del, + "v1.0", + f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move", + body={ + "destinationId": + "deleteditems" + if self.delete_action == "delete" else "archive" + }) del self.messages[ch] - await asyncio_complete(*todo_flags) + ops = len(todo_flags["requests"]) + len(todo_del["requests"]) + await asyncio_complete(self.graph.execute_batch(todo_flags)) # Delete must be temporally after move as move will change the mailbox - # id. - await asyncio_complete(*todo_del) - self.last_merge_len = len(todo_flags) + len(todo_del) + # id. FIXME: We could do this with batch ordering + await asyncio_complete(self.graph.execute_batch(todo_del)) + self.last_merge_len = ops |