From b9fccfd53902389d01dc8ea5acb375aeca95e55e Mon Sep 17 00:00:00 2001 From: Jason Gunthorpe Date: Sat, 30 May 2020 19:26:49 -0300 Subject: Manage concurrency to avoid running out of file descriptors AIO does not seem to strictly pipeline work, so it is possible for a very large number of message download tasks to be scheduled. Each one that gets scheduled opens a file descriptor, and so we can run out of them fairly fast. Strictly limit the number of open files per mailbox during message downloading. Signed-off-by: Jason Gunthorpe --- cloud_mdir_sync/gmail.py | 34 ++++++++++++++++++---------------- cloud_mdir_sync/office365.py | 28 +++++++++++++++------------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py index b6d1490..6e73168 100644 --- a/cloud_mdir_sync/gmail.py +++ b/cloud_mdir_sync/gmail.py @@ -374,6 +374,7 @@ class GMailMailbox(mailbox.Mailbox): self.label_name = label self.gmail = gmail self.gmail_messages = {} + self.max_fetches = asyncio.Semaphore(10) async def setup_mbox(self): """Setup access to the authenticated API domain for this endpoint""" @@ -400,22 +401,23 @@ class GMailMailbox(mailbox.Mailbox): async def _fetch_message(self, msg: GMailMessage): msgdb = self.msgdb msg.size = 0 - with util.log_progress_ctx(logging.DEBUG, - f"Downloading {msg.storage_id}", - lambda msg: f" {util.sizeof_fmt(msg.size)}", - msg), msgdb.get_temp() as F: - jmsg = await self.gmail.get_json( - "v1", - f"/users/me/messages/{msg.storage_id}", - params={ - "format": "raw", - }) - data = base64.urlsafe_b64decode(jmsg["raw"]) - data = data.replace(b"\r\n", b"\n") - F.write(data) - msg.size = F.tell() - msg.update_from_json(jmsg) - msg.content_hash = msgdb.store_hashed_msg(msg, F) + async with self.max_fetches: + with util.log_progress_ctx( + logging.DEBUG, f"Downloading {msg.storage_id}", + lambda msg: f" {util.sizeof_fmt(msg.size)}", + msg), msgdb.get_temp() as F: + jmsg = await self.gmail.get_json( + "v1", + f"/users/me/messages/{msg.storage_id}", + params={ + "format": "raw", + }) + data = base64.urlsafe_b64decode(jmsg["raw"]) + data = data.replace(b"\r\n", b"\n") + F.write(data) + msg.size = F.tell() + msg.update_from_json(jmsg) + msg.content_hash = msgdb.store_hashed_msg(msg, F) return jmsg["historyId"] async def _fetch_all_messages(self): diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py index 4945724..6eec42e 100644 --- a/cloud_mdir_sync/office365.py +++ b/cloud_mdir_sync/office365.py @@ -390,6 +390,7 @@ class O365Mailbox(mailbox.Mailbox): super().__init__(cfg) self.mailbox = mailbox self.graph = graph + self.max_fetches = asyncio.Semaphore(10) async def setup_mbox(self): """Setup access to the authenticated API domain for this endpoint""" @@ -413,19 +414,20 @@ class O365Mailbox(mailbox.Mailbox): async def _fetch_message(self, msg: messages.Message): msgdb = self.msgdb msg.size = 0 - with util.log_progress_ctx(logging.DEBUG, - f"Downloading {msg.email_id}", - lambda msg: f" {util.sizeof_fmt(msg.size)}", - msg), msgdb.get_temp() as F: - # For some reason this returns a message with dos line - # endings. Really weird. - await self.graph.get_to_file( - F, - "v1.0", - f"/me/messages/{msg.storage_id}/$value", - dos2unix=True) - msg.size = F.tell() - msg.content_hash = msgdb.store_hashed_msg(msg, F) + async with self.max_fetches: + with util.log_progress_ctx( + logging.DEBUG, f"Downloading {msg.email_id}", + lambda msg: f" {util.sizeof_fmt(msg.size)}", + msg), msgdb.get_temp() as F: + # For some reason this returns a message with dos line + # endings. Really weird. + await self.graph.get_to_file( + F, + "v1.0", + f"/me/messages/{msg.storage_id}/$value", + dos2unix=True) + msg.size = F.tell() + msg.content_hash = msgdb.store_hashed_msg(msg, F) def _json_to_flags(self, jmsg): """This is was remarkably difficult to find out, and seems completely -- cgit v1.2.3