aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Gunthorpe <jgg@mellanox.com>2020-05-30 19:26:49 -0300
committerJason Gunthorpe <jgg@mellanox.com>2020-05-30 19:26:49 -0300
commitb9fccfd53902389d01dc8ea5acb375aeca95e55e (patch)
tree9033f8767bbb5bbaa7cfa94a1e6e465607a651a6
parent8f7c714265c7644c818a93fbc7928fc6b4d1c30e (diff)
downloadcloud_mdir_sync-b9fccfd53902389d01dc8ea5acb375aeca95e55e.tar.gz
cloud_mdir_sync-b9fccfd53902389d01dc8ea5acb375aeca95e55e.tar.bz2
cloud_mdir_sync-b9fccfd53902389d01dc8ea5acb375aeca95e55e.zip
Manage concurrency to avoid running out of file descriptors
AIO does not seem to strictly pipeline work, so it is possible for a very large number of message download tasks to be scheduled. Each one that gets scheduled opens a file descriptor, and so we can run out of them fairly fast. Strictly limit the number of open files per mailbox during message downloading. Signed-off-by: Jason Gunthorpe <jgg@mellanox.com>
-rw-r--r--cloud_mdir_sync/gmail.py34
-rw-r--r--cloud_mdir_sync/office365.py28
2 files changed, 33 insertions, 29 deletions
diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py
index b6d1490..6e73168 100644
--- a/cloud_mdir_sync/gmail.py
+++ b/cloud_mdir_sync/gmail.py
@@ -374,6 +374,7 @@ class GMailMailbox(mailbox.Mailbox):
self.label_name = label
self.gmail = gmail
self.gmail_messages = {}
+ self.max_fetches = asyncio.Semaphore(10)
async def setup_mbox(self):
"""Setup access to the authenticated API domain for this endpoint"""
@@ -400,22 +401,23 @@ class GMailMailbox(mailbox.Mailbox):
async def _fetch_message(self, msg: GMailMessage):
msgdb = self.msgdb
msg.size = 0
- with util.log_progress_ctx(logging.DEBUG,
- f"Downloading {msg.storage_id}",
- lambda msg: f" {util.sizeof_fmt(msg.size)}",
- msg), msgdb.get_temp() as F:
- jmsg = await self.gmail.get_json(
- "v1",
- f"/users/me/messages/{msg.storage_id}",
- params={
- "format": "raw",
- })
- data = base64.urlsafe_b64decode(jmsg["raw"])
- data = data.replace(b"\r\n", b"\n")
- F.write(data)
- msg.size = F.tell()
- msg.update_from_json(jmsg)
- msg.content_hash = msgdb.store_hashed_msg(msg, F)
+ async with self.max_fetches:
+ with util.log_progress_ctx(
+ logging.DEBUG, f"Downloading {msg.storage_id}",
+ lambda msg: f" {util.sizeof_fmt(msg.size)}",
+ msg), msgdb.get_temp() as F:
+ jmsg = await self.gmail.get_json(
+ "v1",
+ f"/users/me/messages/{msg.storage_id}",
+ params={
+ "format": "raw",
+ })
+ data = base64.urlsafe_b64decode(jmsg["raw"])
+ data = data.replace(b"\r\n", b"\n")
+ F.write(data)
+ msg.size = F.tell()
+ msg.update_from_json(jmsg)
+ msg.content_hash = msgdb.store_hashed_msg(msg, F)
return jmsg["historyId"]
async def _fetch_all_messages(self):
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index 4945724..6eec42e 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -390,6 +390,7 @@ class O365Mailbox(mailbox.Mailbox):
super().__init__(cfg)
self.mailbox = mailbox
self.graph = graph
+ self.max_fetches = asyncio.Semaphore(10)
async def setup_mbox(self):
"""Setup access to the authenticated API domain for this endpoint"""
@@ -413,19 +414,20 @@ class O365Mailbox(mailbox.Mailbox):
async def _fetch_message(self, msg: messages.Message):
msgdb = self.msgdb
msg.size = 0
- with util.log_progress_ctx(logging.DEBUG,
- f"Downloading {msg.email_id}",
- lambda msg: f" {util.sizeof_fmt(msg.size)}",
- msg), msgdb.get_temp() as F:
- # For some reason this returns a message with dos line
- # endings. Really weird.
- await self.graph.get_to_file(
- F,
- "v1.0",
- f"/me/messages/{msg.storage_id}/$value",
- dos2unix=True)
- msg.size = F.tell()
- msg.content_hash = msgdb.store_hashed_msg(msg, F)
+ async with self.max_fetches:
+ with util.log_progress_ctx(
+ logging.DEBUG, f"Downloading {msg.email_id}",
+ lambda msg: f" {util.sizeof_fmt(msg.size)}",
+ msg), msgdb.get_temp() as F:
+ # For some reason this returns a message with dos line
+ # endings. Really weird.
+ await self.graph.get_to_file(
+ F,
+ "v1.0",
+ f"/me/messages/{msg.storage_id}/$value",
+ dos2unix=True)
+ msg.size = F.tell()
+ msg.content_hash = msgdb.store_hashed_msg(msg, F)
def _json_to_flags(self, jmsg):
"""This is was remarkably difficult to find out, and seems completely