aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Gunthorpe <jgg@mellanox.com>2020-06-12 16:36:38 -0300
committerJason Gunthorpe <jgg@nvidia.com>2020-06-22 20:24:18 -0300
commitc87f4221dc92c18a9d193ddeae60218f83282b96 (patch)
tree35dd099392530840097611cea4b41d07ecef49bc
parente241f83acb481c08881050d8b07717be7784bcc3 (diff)
downloadcloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.tar.gz
cloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.tar.bz2
cloud_mdir_sync-c87f4221dc92c18a9d193ddeae60218f83282b96.zip
O365: Use batching during modification
For whatever reason this week the concurrency limit in Graph went way down for modify, this event impacts the batches. It turns out graph is crazy. You can put up to 20 requests in a batch, but during execution they can fail with throttling and need retry. Experiments suggest 3 is the right number to avoid throttling on modify, for some reason. Documentation seems wrong, and probably something broke recently. Implement batching, run the batch break up sequentially, and limit the batches to 3. Use batching only for modify operations. GET can still use a concurrency limit of 5. Signed-off-by: Jason Gunthorpe <jgg@mellanox.com>
-rw-r--r--cloud_mdir_sync/office365.py120
1 files changed, 97 insertions, 23 deletions
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index 9c8ee62..7d6fd86 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -15,6 +15,11 @@ import requests
from . import config, mailbox, messages, oauth, util
from .util import asyncio_complete
+MAX_CONCURRENT_OPERATIONS = 5
+# Graph is completely crazy, it can only accept 20 requests in a batch,
+# and more than some concurrent modify requests seems to hit 429 retry.
+# So run modifies 3 at a time sequentially. Bleck.
+MAX_BATCH_SIZE = 3
def _retry_protect(func):
# Graph can return various error codes, see:
@@ -107,7 +112,9 @@ class GraphAPI(oauth.Account):
if auth is not None:
self.msl_cache.deserialize(auth)
- connector = aiohttp.connector.TCPConnector(limit=20, limit_per_host=5)
+ connector = aiohttp.connector.TCPConnector(
+ limit=MAX_CONCURRENT_OPERATIONS,
+ limit_per_host=MAX_CONCURRENT_OPERATIONS)
self.session = aiohttp.ClientSession(connector=connector,
raise_for_status=False)
@@ -268,6 +275,22 @@ class GraphAPI(oauth.Account):
params=params) as op:
return await self._check_json(op)
+ def batch_post_json(self, batch, ver, path, body):
+ """Like post_json but appends the action to a batch. Note the ver of
+ all actions in the batch must be the same"""
+ assert(ver == "v1.0")
+ requests = batch["requests"]
+ req = {
+ "id": f"{len(requests)}",
+ "method": "POST",
+ "url": path,
+ "body": body,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ }
+ requests.append(req)
+
@_retry_protect
async def patch_json(self, ver, path, body, params=None):
"""Return the JSON dictionary from the PATCH operation"""
@@ -278,6 +301,22 @@ class GraphAPI(oauth.Account):
params=params) as op:
return await self._check_json(op)
+ def batch_patch_json(self, batch, ver, path, body):
+ """Like patch_json but appends the action to a batch. Note the ver of
+ all actions in the batch must be the same"""
+ assert(ver == "v1.0")
+ requests = batch["requests"]
+ req = {
+ "id": f"{len(requests)}",
+ "method": "PATCH",
+ "url": path,
+ "body": body,
+ "headers": {
+ "Content-Type": "application/json"
+ },
+ }
+ requests.append(req)
+
@_retry_protect
async def delete(self, ver, path):
"""Issue a delete. For Messages delete doesn't put it in the Deleted Items
@@ -303,6 +342,40 @@ class GraphAPI(oauth.Account):
async with self.session.get(uri, headers=self.headers) as op:
resp = await self._check_json(op)
+ async def _execute_batch(self, batch):
+ resp = await self.post_json("v1.0", "/$batch", batch)
+ to_retry = set()
+ for rep in resp["responses"]:
+ status = int(rep["status"])
+ if status < 200 or status >= 300 or "error" in rep["body"]:
+ to_retry.add(rep["id"])
+ self.cfg.logger.debug(f"Batched request failed, retrying: {rep}")
+ if not to_retry:
+ return
+
+ # Otherwise issue the request natively and let the normal
+ # mechanisms sort it out.
+ for req in batch["requests"]:
+ if req["id"] not in to_retry:
+ continue
+ to_retry.remove(req["id"])
+
+ if req["method"] == "POST":
+ await self.post_json("v1.0", req["url"], req["body"])
+ elif req["method"] == "PATCH":
+ await self.patch_json("v1.0", req["url"], req["body"])
+ else:
+ raise ValueError(f"Incorrect batch {req}")
+ assert not to_retry
+
+ async def execute_batch(self, batch):
+ """Execute a batch sequence created by batch_* functions"""
+ # See https://docs.microsoft.com/en-us/graph/json-batching
+ all_requests = batch["requests"]
+ while all_requests:
+ await self._execute_batch({"requests": all_requests[:MAX_BATCH_SIZE]})
+ del all_requests[:MAX_BATCH_SIZE]
+
@_retry_protect
async def owa_subscribe(self, resource, changetype):
"""Graph does not support streaming subscriptions, so we use the OWA interface
@@ -561,7 +634,7 @@ class O365Mailbox(mailbox.Mailbox):
def force_content(self, msgs):
raise RuntimeError("Cannot move messages into the Cloud")
- def _update_msg_flags(self, cmsg: messages.Message,
+ def _update_msg_flags(self, batch, cmsg: messages.Message,
old_cmsg_flags: int, lmsg: messages.Message):
lflags = lmsg.flags & (messages.Message.ALL_FLAGS
^ messages.Message.FLAG_DELETED)
@@ -625,7 +698,8 @@ class O365Mailbox(mailbox.Mailbox):
if not patch:
return None
cmsg.flags = nflags
- return self.graph.patch_json(
+ self.graph.batch_patch_json(
+ batch,
"v1.0",
f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}",
body=patch)
@@ -634,11 +708,12 @@ class O365Mailbox(mailbox.Mailbox):
lambda self: f", {self.last_merge_len} changes ")
@mailbox.update_on_failure
async def merge_content(self, msgs: messages.CHMsgMappingDict_Type):
- # There is a batching API for this kind of stuff as well:
- # https://docs.microsoft.com/en-us/graph/json-batching
+ # Note that the mutation operations return a full copy of the message,
+ # which is wasteful and we don't need. Couldn't find a way to prevent
+ # that.
self.last_merge_len = 0
- todo_flags = []
- todo_del = []
+ todo_flags = {"requests": []}
+ todo_del = {"requests": []}
if self.cfg.trace_file is not None:
pickle.dump(["merge_content", self.name, self.messages, msgs],
self.cfg.trace_file)
@@ -652,9 +727,7 @@ class O365Mailbox(mailbox.Mailbox):
# Update flags
if cmsg is not None and old_cmsg is not None and lmsg is not None:
- patch = self._update_msg_flags(cmsg, old_cmsg.flags, lmsg)
- if patch:
- todo_flags.append(patch)
+ self._update_msg_flags(todo_flags, cmsg, old_cmsg.flags, lmsg)
# Debugging that the message really is to be deleted
if cmsg is not None and lmsg is None:
@@ -664,19 +737,20 @@ class O365Mailbox(mailbox.Mailbox):
if cmsg is not None and (lmsg is None or lmsg.flags
& messages.Message.FLAG_DELETED):
# Delete cloud message
- todo_del.append(
- self.graph.post_json(
- "v1.0",
- f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move",
- body={
- "destinationId":
- "deleteditems"
- if self.delete_action == "delete" else "archive"
- }))
+ self.graph.batch_post_json(
+ todo_del,
+ "v1.0",
+ f"/me/mailFolders/{self.mailbox}/messages/{cmsg.storage_id}/move",
+ body={
+ "destinationId":
+ "deleteditems"
+ if self.delete_action == "delete" else "archive"
+ })
del self.messages[ch]
- await asyncio_complete(*todo_flags)
+ ops = len(todo_flags["requests"]) + len(todo_del["requests"])
+ await asyncio_complete(self.graph.execute_batch(todo_flags))
# Delete must be temporally after move as move will change the mailbox
- # id.
- await asyncio_complete(*todo_del)
- self.last_merge_len = len(todo_flags) + len(todo_del)
+ # id. FIXME: We could do this with batch ordering
+ await asyncio_complete(self.graph.execute_batch(todo_del))
+ self.last_merge_len = ops