aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Gunthorpe <jgg@mellanox.com>2020-02-07 11:52:43 -0400
committerJason Gunthorpe <jgg@mellanox.com>2020-02-07 12:11:03 -0400
commitd1877aaf5a791204dd7222f50fc33b5f802c7e94 (patch)
tree8c6db52a7656bc99da9e196913de95953771fbc5
parentf44a42f276e01514a1e4bf3028639aaea58138c6 (diff)
downloadcloud_mdir_sync-d1877aaf5a791204dd7222f50fc33b5f802c7e94.tar.gz
cloud_mdir_sync-d1877aaf5a791204dd7222f50fc33b5f802c7e94.tar.bz2
cloud_mdir_sync-d1877aaf5a791204dd7222f50fc33b5f802c7e94.zip
Don't leave asyncio tasks running unexpectedly
All cases where gather is called intend that the tasks will complete successfully or all cancel at the first error. Add a little wrapper to achieve this. Signed-off-by: Jason Gunthorpe <jgg@mellanox.com>
-rw-r--r--cloud_mdir_sync/gmail.py5
-rw-r--r--cloud_mdir_sync/main.py17
-rw-r--r--cloud_mdir_sync/office365.py7
-rw-r--r--cloud_mdir_sync/util.py13
4 files changed, 29 insertions, 13 deletions
diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py
index 49468d4..949798b 100644
--- a/cloud_mdir_sync/gmail.py
+++ b/cloud_mdir_sync/gmail.py
@@ -15,6 +15,7 @@ import oauthlib
import requests_oauthlib
from . import config, mailbox, messages, util
+from .util import asyncio_complete
class NativePublicApplicationClient(oauthlib.oauth2.WebApplicationClient):
@@ -424,7 +425,7 @@ class GMailMailbox(mailbox.Mailbox):
msgs.append(msg)
if todo:
start_history_id = await todo[0]
- await asyncio.gather(*todo)
+ await asyncio_complete(*todo)
return (msgs, start_history_id)
@@ -505,7 +506,7 @@ class GMailMailbox(mailbox.Mailbox):
msg.received_time = omsg.received_time
assert self.msgdb.have_content(msg)
msgs.append(msg)
- await asyncio.gather(*todo)
+ await asyncio_complete(*todo)
return (msgs, next_history_id)
@util.log_progress(lambda self: f"Updating Message List for {self.name}",
diff --git a/cloud_mdir_sync/main.py b/cloud_mdir_sync/main.py
index af179cb..d810956 100644
--- a/cloud_mdir_sync/main.py
+++ b/cloud_mdir_sync/main.py
@@ -9,6 +9,7 @@ import aiohttp
import pyinotify
from . import config, mailbox, messages, oauth
+from .util import asyncio_complete
def route_cloud_messages(cfg: config.Config) -> messages.MBoxDict_Type:
@@ -48,7 +49,7 @@ async def update_cloud_from_local(cfg: config.Config,
if lmsg is None and offline_mode:
continue
msgs_by_cloud[cloud_msg.mailbox][ch] = (lmsg, cloud_msg)
- await asyncio.gather(*(
+ await asyncio_complete(*(
mbox.merge_content(msgdict) for mbox, msgdict in msgs_by_cloud.items()
if not mbox.same_messages(msgdict, tuple_form=True)))
@@ -59,15 +60,15 @@ async def synchronize_mail(cfg: config.Config):
try:
await cfg.web_app.go()
- await asyncio.gather(*(mbox.setup_mbox()
- for mbox in cfg.all_mboxes()))
+ await asyncio_complete(*(mbox.setup_mbox()
+ for mbox in cfg.all_mboxes()))
msgs = None
while True:
try:
- await asyncio.gather(*(mbox.update_message_list()
- for mbox in cfg.all_mboxes()
- if mbox.need_update))
+ await asyncio_complete(*(mbox.update_message_list()
+ for mbox in cfg.all_mboxes()
+ if mbox.need_update))
nmsgs = route_cloud_messages(cfg)
if msgs is not None:
@@ -92,8 +93,8 @@ async def synchronize_mail(cfg: config.Config):
cfg.msgdb.cleanup_msgs(msgs)
cfg.logger.debug("Changed event, looping")
finally:
- await asyncio.gather(*(domain.close()
- for domain in cfg.domains.values()))
+ await asyncio_complete(*(domain.close()
+ for domain in cfg.domains.values()))
cfg.domains = {}
await cfg.web_app.close()
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index f96ef29..9cfaf0d 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -13,6 +13,7 @@ import aiohttp
import requests
from . import config, mailbox, messages, util
+from .util import asyncio_complete
def _retry_protect(func):
@@ -488,7 +489,7 @@ class O365Mailbox(mailbox.Mailbox):
asyncio.create_task(self._fetch_message(msg)))
msgs.append(msg)
- await asyncio.gather(*todo)
+ await asyncio_complete(*todo)
res = {}
for msg in msgs:
@@ -652,8 +653,8 @@ class O365Mailbox(mailbox.Mailbox):
body={"destinationId": "deleteditems"}))
del self.messages[ch]
- await asyncio.gather(*todo_flags)
+ await asyncio_complete(*todo_flags)
# Delete must be temporally after move as move will change the mailbox
# id.
- await asyncio.gather(*todo_del)
+ await asyncio_complete(*todo_del)
self.last_merge_len = len(todo_flags) + len(todo_del)
diff --git a/cloud_mdir_sync/util.py b/cloud_mdir_sync/util.py
index 7799356..df8ce59 100644
--- a/cloud_mdir_sync/util.py
+++ b/cloud_mdir_sync/util.py
@@ -1,4 +1,5 @@
# SPDX-License-Identifier: GPL-2.0+
+import asyncio
import contextlib
import functools
import inspect
@@ -68,3 +69,15 @@ def sizeof_fmt(num, suffix='B'):
def pj(json_dict):
print(json.dumps(json_dict, indent=4, sort_keys=True))
+
+
+async def asyncio_complete(*awo_list):
+ """This is like asyncio.gather but it always ensures that the list of
+ awaitable objects is completed upon return. For instance if an exception
+ is thrown then all the awaitables are canceled"""
+ g = asyncio.gather(*awo_list)
+ try:
+ await g
+ finally:
+ g.cancel()
+ await asyncio.gather(*awo_list, return_exceptions=True)