diff options
| -rw-r--r-- | mitmproxy/flow_export.py | 123 | ||||
| -rw-r--r-- | test/mitmproxy/test_flow_export.py | 189 | 
2 files changed, 308 insertions, 4 deletions
| diff --git a/mitmproxy/flow_export.py b/mitmproxy/flow_export.py index 6333de57..a1e07953 100644 --- a/mitmproxy/flow_export.py +++ b/mitmproxy/flow_export.py @@ -1,10 +1,11 @@  import json -import urllib  from textwrap import dedent  import netlib.http  from netlib.utils import parse_content_type +import re +from six.moves.urllib.parse import urlparse, quote, quote_plus  def curl_command(flow):      data = "curl " @@ -38,7 +39,7 @@ def python_code(flow):          print(response.text)      """).strip() -    components = map(lambda x: urllib.quote(x, safe=""), flow.request.path_components) +    components = map(lambda x: quote(x, safe=""), flow.request.path_components)      url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components)      args = "" @@ -93,3 +94,121 @@ def is_json(headers, content):              except ValueError:                  return False      return False + + +def locust_code(flow): +    code = dedent(""" +        from locust import HttpLocust, TaskSet, task + +        class UserBehavior(TaskSet): +            def on_start(self): +                ''' on_start is called when a Locust start before any task is scheduled ''' +                self.flow() + +            @task() +            def flow(self): +                url = '{url}' +                {headers}{params}{data} +                self.response = self.client.request( +                    method='{method}', +                    url=url,{args} +                ) + +        class WebsiteUser(HttpLocust): +            task_set = UserBehavior +            min_wait=1000 +            max_wait=3000 + +    """).strip() + +    components = map(lambda x: quote(x, safe=""), flow.request.path_components) +    url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components) + +    args = "" +    headers = "" +    if flow.request.headers: +        lines = ["            '%s': '%s',\n" % (k, v) for k, v in flow.request.headers.fields if k.lower() not in ["host", "cookie"]] +        headers += "\n        headers = {\n%s        }\n" % "".join(lines) +        args += "\n            headers=headers," + +    params = "" +    if flow.request.query: +        lines = ["            '%s': '%s',\n" % (k, v) for k, v in flow.request.query] +        params = "\n        params = {\n%s        }\n" % "".join(lines) +        args += "\n            params=params," + +    data = "" +    if flow.request.body: +        data = "\n        data = '''%s'''\n" % flow.request.body +        args += "\n            data=data," + +    code = code.format( +        url=url, +        headers=headers, +        params=params, +        data=data, +        method=flow.request.method, +        args=args, +    ) + +    host = flow.request.scheme + "://" + flow.request.host +    code = code.replace(host, "' + self.locust.host +'") +    code = code.replace(quote_plus(host), "' + quote_plus(self.locust.host) +'") +    code = code.replace(quote(host), "' + quote(self.locust.host) +'") + +    return code + + +def locust_task(flow): +    code = dedent(""" +    @task() +    def {name}(self): +        url = '{url}' +        {headers}{params}{data} +        self.response = self.client.request( +            method='{method}', +            url=url,{args} +        ) +    """).strip() + +    components = map(lambda x: quote(x, safe=""), flow.request.path_components) +    file_name = "_".join(components) +    name = re.sub('\W|^(?=\d)','_', file_name) +    url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components) + +    args = "" +    headers = "" +    if flow.request.headers: +        lines = ["        '%s': '%s',\n" % (k, v) for k, v in flow.request.headers.fields if k.lower() not in ["host", "cookie"]] +        headers += "\n    headers = {\n%s    }\n" % "".join(lines) +        args += "\n        headers=headers," + +    params = "" +    if flow.request.query: +        lines = ["        '%s': '%s',\n" % (k, v) for k, v in flow.request.query] +        params = "\n    params = {\n%s    }\n" % "".join(lines) +        args += "\n        params=params," + +    data = "" +    if flow.request.body: +        data = "\n    data = '''%s'''\n" % flow.request.body +        args += "\n        data=data," + +    code = code.format( +        name=name, +        url=url, +        headers=headers, +        params=params, +        data=data, +        method=flow.request.method, +        args=args, +    ) + +    host = flow.request.scheme + "://" + flow.request.host +    code = code.replace(host, "' + self.locust.host +'") +    code = code.replace(quote_plus(host), "' + quote_plus(self.locust.host) +'") +    code = code.replace(quote(host), "' + quote(self.locust.host) +'") + +    code = "\n".join("    " + i for i in code.splitlines()) + +    return code diff --git a/test/mitmproxy/test_flow_export.py b/test/mitmproxy/test_flow_export.py index 62161d5d..a4264e10 100644 --- a/test/mitmproxy/test_flow_export.py +++ b/test/mitmproxy/test_flow_export.py @@ -142,7 +142,7 @@ class TestExportPythonCode():          assert flow_export.python_code(flow) == result -def TestRawRequest(): +class TestRawRequest():      def test_get(self):          flow = tutils.tflow(req=req_get) @@ -159,9 +159,10 @@ def TestRawRequest():          flow = tutils.tflow(req=req_post)          result = dedent("""              POST /path HTTP/1.1\r +            content-type: application/json\r              host: address:22\r              \r -            content +            {"name": "example", "email": "example@example.com"}          """).strip()          assert flow_export.raw_request(flow) == result @@ -176,3 +177,187 @@ def TestRawRequest():              content          """).strip()          assert flow_export.raw_request(flow) == result + +class TestExportLocustCode(): + +    def test_get(self): +        flow = tutils.tflow(req=req_get) +        result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): +    def on_start(self): +        ''' on_start is called when a Locust start before any task is scheduled ''' +        self.flow() + +    @task() +    def flow(self): +        url = '' + self.locust.host +'/path' +         +        headers = { +            'header': 'qvalue', +            'content-length': '7', +        } + +        self.response = self.client.request( +            method='GET', +            url=url, +            headers=headers, +        ) + +class WebsiteUser(HttpLocust): +    task_set = UserBehavior +    min_wait=1000 +    max_wait=3000 +        """.strip() + +        assert flow_export.locust_code(flow) == result + +    def test_post(self): +        req_post.content = '''content''' +        req_post.headers = '' +        flow = tutils.tflow(req=req_post) +        result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): +    def on_start(self): +        ''' on_start is called when a Locust start before any task is scheduled ''' +        self.flow() + +    @task() +    def flow(self): +        url = '' + self.locust.host +'/path' +         +        data = '''content''' + +        self.response = self.client.request( +            method='POST', +            url=url, +            data=data, +        ) + +class WebsiteUser(HttpLocust): +    task_set = UserBehavior +    min_wait=1000 +    max_wait=3000 + +        """.strip() + +        assert flow_export.locust_code(flow) == result + + +    def test_patch(self): +        flow = tutils.tflow(req=req_patch) +        result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): +    def on_start(self): +        ''' on_start is called when a Locust start before any task is scheduled ''' +        self.flow() + +    @task() +    def flow(self): +        url = '' + self.locust.host +'/path' +         +        headers = { +            'header': 'qvalue', +            'content-length': '7', +        } + +        params = { +            'query': 'param', +        } + +        data = '''content''' + +        self.response = self.client.request( +            method='PATCH', +            url=url, +            headers=headers, +            params=params, +            data=data, +        ) + +class WebsiteUser(HttpLocust): +    task_set = UserBehavior +    min_wait=1000 +    max_wait=3000 + +        """.strip() + +        assert flow_export.locust_code(flow) == result + + +class TestExportLocustTask(): + +    def test_get(self): +        flow = tutils.tflow(req=req_get) +        result = '    ' + """ +    @task() +    def path(self): +        url = '' + self.locust.host +'/path' +         +        headers = { +            'header': 'qvalue', +            'content-length': '7', +        } +     +        self.response = self.client.request( +            method='GET', +            url=url, +            headers=headers, +        ) +        """.strip() + +        assert flow_export.locust_task(flow) == result + +    def test_post(self): +        flow = tutils.tflow(req=req_post) +        result = '    ' + """ +    @task() +    def path(self): +        url = '' + self.locust.host +'/path' +         +        data = '''content''' +     +        self.response = self.client.request( +            method='POST', +            url=url, +            data=data, +        ) +     +        """.strip() + +        assert flow_export.locust_task(flow) == result + + +    def test_patch(self): +        flow = tutils.tflow(req=req_patch) +        result = '    ' + """ +    @task() +    def path(self): +        url = '' + self.locust.host +'/path' +         +        headers = { +            'header': 'qvalue', +            'content-length': '7', +        } +     +        params = { +            'query': 'param', +        } +     +        data = '''content''' +     +        self.response = self.client.request( +            method='PATCH', +            url=url, +            headers=headers, +            params=params, +            data=data, +        ) +        """.strip() + +        assert flow_export.locust_task(flow) == result | 
