aboutsummaryrefslogtreecommitdiffstats
path: root/cloud_mdir_sync
diff options
context:
space:
mode:
Diffstat (limited to 'cloud_mdir_sync')
-rw-r--r--cloud_mdir_sync/cms_oauth_main.py50
-rw-r--r--cloud_mdir_sync/config.py13
-rw-r--r--cloud_mdir_sync/credsrv.py15
-rw-r--r--cloud_mdir_sync/gmail.py6
-rw-r--r--cloud_mdir_sync/oauth.py7
-rw-r--r--cloud_mdir_sync/office365.py10
6 files changed, 70 insertions, 31 deletions
diff --git a/cloud_mdir_sync/cms_oauth_main.py b/cloud_mdir_sync/cms_oauth_main.py
index c7c6699..514e411 100644
--- a/cloud_mdir_sync/cms_oauth_main.py
+++ b/cloud_mdir_sync/cms_oauth_main.py
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: GPL-2.0+
import argparse
import base64
+import contextlib
import re
import socket
@@ -9,6 +10,11 @@ def get_xoauth2_token(args):
"""Return the xoauth2 string. This is something like
'user=foo^Aauth=Bearer bar^A^A'
"""
+ if args.test_smtp:
+ args.proto = "SMTP"
+ elif args.test_imap:
+ args.proto = "IMAP"
+
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(args.cms_sock)
sock.sendall(f"{args.proto} {args.user}".encode())
@@ -20,14 +26,27 @@ def get_xoauth2_token(args):
def test_smtp(args, xoauth2_token):
- """Initiate a testing SMTP connection to verify """
+ """Initiate a testing SMTP connection to verify the token and server
+ work"""
import smtplib
- conn = smtplib.SMTP(args.test_smtp, 587)
- conn.set_debuglevel(True)
- conn.ehlo()
- conn.starttls()
- conn.ehlo()
- conn.auth("xoauth2", lambda x: xoauth2_token, initial_response_ok=False)
+ with contextlib.closing(smtplib.SMTP(args.test_smtp, 587)) as conn:
+ conn.set_debuglevel(True)
+ conn.ehlo()
+ conn.starttls()
+ conn.ehlo()
+ conn.auth("xoauth2",
+ lambda x: xoauth2_token,
+ initial_response_ok=False)
+
+
+def test_imap(args, xoauth2_token):
+ """Initiate a testing IMAP connection to verify the token and server
+ work"""
+ import imaplib
+ with contextlib.closing(imaplib.IMAP4_SSL(args.test_imap)) as conn:
+ conn.debug = 4
+ conn.authenticate('XOAUTH2', lambda x: xoauth2_token.encode())
+ conn.select('INBOX')
def main():
@@ -35,7 +54,7 @@ def main():
parser.add_argument(
"--proto",
default="SMTP",
- choices={"SMTP"},
+ choices={"SMTP", "IMAP"},
help="""Select the protocol to get a token for. The protocol will
automatically select the correct OAUTH scope.""")
parser.add_argument(
@@ -57,19 +76,30 @@ def main():
xoauth2 is used if the caller will provide the base64 conversion.
token returns the bare access_token""")
- parser.add_argument(
+ tests = parser.add_mutually_exclusive_group()
+ tests.add_argument(
"--test-smtp",
metavar="SMTP_SERVER",
help=
"""If specified attempt to connect and authenticate to the given SMTP
- sever. This can be used to test that the authentication method works
+ server. This can be used to test that the authentication method works
properly on the server. Typical servers would be smtp.office365.com
and smtp.gmail.com.""")
+ tests.add_argument(
+ "--test-imap",
+ metavar="IMAP_SERVER",
+ help=
+ """If specified attempt to connect and authenticate to the given IMAP
+ server. This can be used to test that the authentication method works
+ properly on the server. Typical servers would be outlook.office365.com
+ and imap.gmail.com.""")
args = parser.parse_args()
xoauth2_token = get_xoauth2_token(args)
if args.test_smtp:
return test_smtp(args, xoauth2_token)
+ if args.test_imap:
+ return test_imap(args, xoauth2_token)
if args.output == "xoauth2-b64":
print(base64.b64encode(xoauth2_token.encode()).decode())
diff --git a/cloud_mdir_sync/config.py b/cloud_mdir_sync/config.py
index 79ab4a9..019e757 100644
--- a/cloud_mdir_sync/config.py
+++ b/cloud_mdir_sync/config.py
@@ -116,9 +116,18 @@ class Config(object):
self.local_mboxes.append(MailDirMailbox(self, directory))
return self.local_mboxes[-1]
- def CredentialServer(self, path: str, accounts: List, umask=0o600):
+ def CredentialServer(self,
+ path: str,
+ accounts: List,
+ umask=0o600,
+ protocols=["SMTP"]):
+ """Serve XOAUTH2 bearer tokens over a unix domain socket. The client
+ writes the user to obtain a token for and the server responds with the
+ token. protocols can be IMAP or SMTP. The cms-oauth program interacts
+ with this server."""
from .credsrv import CredentialServer
- self.async_tasks.append(CredentialServer(self, path, accounts, umask))
+ self.async_tasks.append(
+ CredentialServer(self, path, accounts, umask, protocols))
return self.async_tasks[-1]
def _direct_message(self, msg):
diff --git a/cloud_mdir_sync/credsrv.py b/cloud_mdir_sync/credsrv.py
index 97357bc..45a032f 100644
--- a/cloud_mdir_sync/credsrv.py
+++ b/cloud_mdir_sync/credsrv.py
@@ -11,18 +11,15 @@ from . import config, oauth
class CredentialServer(object):
"""Serve XOAUTH2 bearer tokens over a unix domain socket. The client
writes the user to obtain a token for and the server responds with the
- token"""
- def __init__(self,
- cfg: config.Config,
- path: str,
- accounts: List[oauth.Account],
- umask=0o600):
+ token. protocols can be IMAP or SMTP"""
+ def __init__(self, cfg: config.Config, path: str,
+ accounts: List[oauth.Account], umask, protocols):
self.cfg = cfg
self.path = os.path.abspath(os.path.expanduser(path))
self.umask = umask
self.accounts = {}
for I in accounts:
- I.oauth_smtp = True
+ I.protocols.update(protocols)
self.accounts[I.user] = I
async def go(self):
@@ -55,10 +52,10 @@ class CredentialServer(object):
f"Credential request {proto!r} {opts} {user!r}")
account = self.accounts.get(user)
- if account is None:
+ if account is None or proto not in account.protocols:
return
- xoauth2 = await account.get_xoauth2_bytes("SMTP")
+ xoauth2 = await account.get_xoauth2_bytes(proto)
if xoauth2 is None:
return
writer.write(xoauth2)
diff --git a/cloud_mdir_sync/gmail.py b/cloud_mdir_sync/gmail.py
index dc70361..20e7502 100644
--- a/cloud_mdir_sync/gmail.py
+++ b/cloud_mdir_sync/gmail.py
@@ -89,7 +89,7 @@ class GmailAPI(oauth.Account):
self.scopes = [
"https://www.googleapis.com/auth/gmail.modify",
]
- if self.oauth_smtp:
+ if "SMTP" in self.protocols or "IMAP" in self.protocols:
self.scopes.append("https://mail.google.com/")
self.redirect_url = cfg.web_app.url + "oauth2/gmail"
@@ -263,13 +263,13 @@ class GmailAPI(oauth.Account):
async def close(self):
await self.session.close()
- async def get_xoauth2_bytes(self, proto: str) -> bytes:
+ async def get_xoauth2_bytes(self, proto: str) -> Optional[bytes]:
"""Return the xoauth2 byte string for the given protocol to login to
this account."""
while self.api_token is None:
await self.authenticate()
- if proto == "SMTP":
+ if proto == "SMTP" or proto == "IMAP":
res = 'user=%s\1auth=%s %s\1\1' % (self.user,
self.api_token["token_type"],
self.api_token["access_token"])
diff --git a/cloud_mdir_sync/oauth.py b/cloud_mdir_sync/oauth.py
index 716e7c0..385e0e8 100644
--- a/cloud_mdir_sync/oauth.py
+++ b/cloud_mdir_sync/oauth.py
@@ -19,16 +19,15 @@ if TYPE_CHECKING:
class Account(object):
"""An OAUTH2 account"""
- oauth_smtp = False
def __init__(self, cfg: "config.Config", user: str):
self.cfg = cfg
self.user = user
+ self.protocols = set()
@abstractmethod
- async def get_xoauth2_bytes(self, proto: str) -> bytes:
- pass
-
+ async def get_xoauth2_bytes(self, proto: str) -> Optional[bytes]:
+ return None
class WebServer(object):
"""A small web server is used to manage oauth requests. The user should point a browser
diff --git a/cloud_mdir_sync/office365.py b/cloud_mdir_sync/office365.py
index f66882f..6fa6145 100644
--- a/cloud_mdir_sync/office365.py
+++ b/cloud_mdir_sync/office365.py
@@ -120,10 +120,14 @@ class GraphAPI(oauth.Account):
self.session = aiohttp.ClientSession(connector=connector,
raise_for_status=False)
- if self.oauth_smtp:
+ if "SMTP" in self.protocols:
self.owa_scopes = self.owa_scopes + [
"https://outlook.office.com/SMTP.Send"
]
+ if "IMAP" in self.protocols:
+ self.owa_scopes = self.owa_scopes + [
+ "https://outlook.office.com/IMAP.AccessAsUser.All"
+ ]
self.redirect_url = self.cfg.web_app.url + "oauth2/msal"
self.oauth = oauth.OAuth2Session(
@@ -452,13 +456,13 @@ class GraphAPI(oauth.Account):
async def close(self):
await self.session.close()
- async def get_xoauth2_bytes(self, proto: str) -> bytes:
+ async def get_xoauth2_bytes(self, proto: str) -> Optional[bytes]:
"""Return the xoauth2 byte string for the given protocol to login to
this account."""
while self.owa_token is None:
await self.authenticate()
- if proto == "SMTP":
+ if proto == "SMTP" or proto == "IMAP":
res = 'user=%s\1auth=%s %s\1\1' % (self.user,
self.owa_token["token_type"],
self.owa_token["access_token"])