aboutsummaryrefslogtreecommitdiffstats
path: root/cloud_mdir_sync/oauth.py
blob: 163dcba2051b0a53db8202a3748aedf8c267e144 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# SPDX-License-Identifier: GPL-2.0+
import asyncio
import os

import aiohttp
import aiohttp.web


class WebServer(object):
    """A small web server is used to manage oauth requests. The user should point a browser
    window at localhost. The program will generate redirects for the browser to point at
    OAUTH servers when interactive authentication is required."""
    url = "http://127.0.0.1:8080/"
    runner = None

    def __init__(self):
        self.auth_redirs = {}
        self.web_app = aiohttp.web.Application()
        self.web_app.router.add_get("/", self._start)
        self.web_app.router.add_get("/oauth2/msal", self._oauth2_redirect)
        self.web_app.router.add_get("/oauth2/gmail", self._oauth2_redirect)

    async def go(self):
        self.runner = aiohttp.web.AppRunner(self.web_app)
        await self.runner.setup()
        site = aiohttp.web.TCPSite(self.runner, '127.0.0.1', 8080)
        await site.start()

    async def close(self):
        if self.runner:
            await self.runner.cleanup()

    async def auth_redir(self, url: str, state: str, redir_url: str):
        """Call as part of an OAUTH flow to hand the URL off to interactive browser
        based authentication.  The flow will resume when the OAUTH server
        redirects back to the localhost server.  The final query paremeters
        will be returned by this function"""
        queue = asyncio.Queue()
        self.auth_redirs[state] = (url, queue, redir_url)
        return await queue.get()

    def _start(self, request: aiohttp.web.Request):
        """Feed redirects to the web browser until all authing is done.  FIXME: Some
        fancy java script should be used to fetch new interactive auth
        requests"""
        for I in self.auth_redirs.values():
            raise aiohttp.web.HTTPFound(I[0])
        return aiohttp.web.Response(text="Authentication done")

    def _oauth2_redirect(self, request: aiohttp.web.Request):
        """Use for the Azure AD authentication response redirection"""
        state = request.query.get("state", None)
        if state is None:
            raise aiohttp.web.HTTPBadRequest(text="No state parameter")
        try:
            _, queue, redir_url = self.auth_redirs[state]
            # RFC8252 8.10
            if redir_url != self.url[:-1] + request.path:
                raise aiohttp.web.HTTPBadRequest(
                    text="Invalid redirection path")
            del self.auth_redirs[state]
            queue.put_nowait(request.query)
        except KeyError:
            pass

        for I in self.auth_redirs.values():
            raise aiohttp.web.HTTPFound(I[0])
        raise aiohttp.web.HTTPFound(self.url)