diff options
-rw-r--r-- | mitmproxy/flow_export.py | 123 | ||||
-rw-r--r-- | test/mitmproxy/test_flow_export.py | 189 |
2 files changed, 308 insertions, 4 deletions
diff --git a/mitmproxy/flow_export.py b/mitmproxy/flow_export.py index 6333de57..a1e07953 100644 --- a/mitmproxy/flow_export.py +++ b/mitmproxy/flow_export.py @@ -1,10 +1,11 @@ import json -import urllib from textwrap import dedent import netlib.http from netlib.utils import parse_content_type +import re +from six.moves.urllib.parse import urlparse, quote, quote_plus def curl_command(flow): data = "curl " @@ -38,7 +39,7 @@ def python_code(flow): print(response.text) """).strip() - components = map(lambda x: urllib.quote(x, safe=""), flow.request.path_components) + components = map(lambda x: quote(x, safe=""), flow.request.path_components) url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components) args = "" @@ -93,3 +94,121 @@ def is_json(headers, content): except ValueError: return False return False + + +def locust_code(flow): + code = dedent(""" + from locust import HttpLocust, TaskSet, task + + class UserBehavior(TaskSet): + def on_start(self): + ''' on_start is called when a Locust start before any task is scheduled ''' + self.flow() + + @task() + def flow(self): + url = '{url}' + {headers}{params}{data} + self.response = self.client.request( + method='{method}', + url=url,{args} + ) + + class WebsiteUser(HttpLocust): + task_set = UserBehavior + min_wait=1000 + max_wait=3000 + + """).strip() + + components = map(lambda x: quote(x, safe=""), flow.request.path_components) + url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components) + + args = "" + headers = "" + if flow.request.headers: + lines = [" '%s': '%s',\n" % (k, v) for k, v in flow.request.headers.fields if k.lower() not in ["host", "cookie"]] + headers += "\n headers = {\n%s }\n" % "".join(lines) + args += "\n headers=headers," + + params = "" + if flow.request.query: + lines = [" '%s': '%s',\n" % (k, v) for k, v in flow.request.query] + params = "\n params = {\n%s }\n" % "".join(lines) + args += "\n params=params," + + data = "" + if flow.request.body: + data = "\n data = '''%s'''\n" % flow.request.body + args += "\n data=data," + + code = code.format( + url=url, + headers=headers, + params=params, + data=data, + method=flow.request.method, + args=args, + ) + + host = flow.request.scheme + "://" + flow.request.host + code = code.replace(host, "' + self.locust.host +'") + code = code.replace(quote_plus(host), "' + quote_plus(self.locust.host) +'") + code = code.replace(quote(host), "' + quote(self.locust.host) +'") + + return code + + +def locust_task(flow): + code = dedent(""" + @task() + def {name}(self): + url = '{url}' + {headers}{params}{data} + self.response = self.client.request( + method='{method}', + url=url,{args} + ) + """).strip() + + components = map(lambda x: quote(x, safe=""), flow.request.path_components) + file_name = "_".join(components) + name = re.sub('\W|^(?=\d)','_', file_name) + url = flow.request.scheme + "://" + flow.request.host + "/" + "/".join(components) + + args = "" + headers = "" + if flow.request.headers: + lines = [" '%s': '%s',\n" % (k, v) for k, v in flow.request.headers.fields if k.lower() not in ["host", "cookie"]] + headers += "\n headers = {\n%s }\n" % "".join(lines) + args += "\n headers=headers," + + params = "" + if flow.request.query: + lines = [" '%s': '%s',\n" % (k, v) for k, v in flow.request.query] + params = "\n params = {\n%s }\n" % "".join(lines) + args += "\n params=params," + + data = "" + if flow.request.body: + data = "\n data = '''%s'''\n" % flow.request.body + args += "\n data=data," + + code = code.format( + name=name, + url=url, + headers=headers, + params=params, + data=data, + method=flow.request.method, + args=args, + ) + + host = flow.request.scheme + "://" + flow.request.host + code = code.replace(host, "' + self.locust.host +'") + code = code.replace(quote_plus(host), "' + quote_plus(self.locust.host) +'") + code = code.replace(quote(host), "' + quote(self.locust.host) +'") + + code = "\n".join(" " + i for i in code.splitlines()) + + return code diff --git a/test/mitmproxy/test_flow_export.py b/test/mitmproxy/test_flow_export.py index 62161d5d..a4264e10 100644 --- a/test/mitmproxy/test_flow_export.py +++ b/test/mitmproxy/test_flow_export.py @@ -142,7 +142,7 @@ class TestExportPythonCode(): assert flow_export.python_code(flow) == result -def TestRawRequest(): +class TestRawRequest(): def test_get(self): flow = tutils.tflow(req=req_get) @@ -159,9 +159,10 @@ def TestRawRequest(): flow = tutils.tflow(req=req_post) result = dedent(""" POST /path HTTP/1.1\r + content-type: application/json\r host: address:22\r \r - content + {"name": "example", "email": "example@example.com"} """).strip() assert flow_export.raw_request(flow) == result @@ -176,3 +177,187 @@ def TestRawRequest(): content """).strip() assert flow_export.raw_request(flow) == result + +class TestExportLocustCode(): + + def test_get(self): + flow = tutils.tflow(req=req_get) + result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): + def on_start(self): + ''' on_start is called when a Locust start before any task is scheduled ''' + self.flow() + + @task() + def flow(self): + url = '' + self.locust.host +'/path' + + headers = { + 'header': 'qvalue', + 'content-length': '7', + } + + self.response = self.client.request( + method='GET', + url=url, + headers=headers, + ) + +class WebsiteUser(HttpLocust): + task_set = UserBehavior + min_wait=1000 + max_wait=3000 + """.strip() + + assert flow_export.locust_code(flow) == result + + def test_post(self): + req_post.content = '''content''' + req_post.headers = '' + flow = tutils.tflow(req=req_post) + result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): + def on_start(self): + ''' on_start is called when a Locust start before any task is scheduled ''' + self.flow() + + @task() + def flow(self): + url = '' + self.locust.host +'/path' + + data = '''content''' + + self.response = self.client.request( + method='POST', + url=url, + data=data, + ) + +class WebsiteUser(HttpLocust): + task_set = UserBehavior + min_wait=1000 + max_wait=3000 + + """.strip() + + assert flow_export.locust_code(flow) == result + + + def test_patch(self): + flow = tutils.tflow(req=req_patch) + result = """ +from locust import HttpLocust, TaskSet, task + +class UserBehavior(TaskSet): + def on_start(self): + ''' on_start is called when a Locust start before any task is scheduled ''' + self.flow() + + @task() + def flow(self): + url = '' + self.locust.host +'/path' + + headers = { + 'header': 'qvalue', + 'content-length': '7', + } + + params = { + 'query': 'param', + } + + data = '''content''' + + self.response = self.client.request( + method='PATCH', + url=url, + headers=headers, + params=params, + data=data, + ) + +class WebsiteUser(HttpLocust): + task_set = UserBehavior + min_wait=1000 + max_wait=3000 + + """.strip() + + assert flow_export.locust_code(flow) == result + + +class TestExportLocustTask(): + + def test_get(self): + flow = tutils.tflow(req=req_get) + result = ' ' + """ + @task() + def path(self): + url = '' + self.locust.host +'/path' + + headers = { + 'header': 'qvalue', + 'content-length': '7', + } + + self.response = self.client.request( + method='GET', + url=url, + headers=headers, + ) + """.strip() + + assert flow_export.locust_task(flow) == result + + def test_post(self): + flow = tutils.tflow(req=req_post) + result = ' ' + """ + @task() + def path(self): + url = '' + self.locust.host +'/path' + + data = '''content''' + + self.response = self.client.request( + method='POST', + url=url, + data=data, + ) + + """.strip() + + assert flow_export.locust_task(flow) == result + + + def test_patch(self): + flow = tutils.tflow(req=req_patch) + result = ' ' + """ + @task() + def path(self): + url = '' + self.locust.host +'/path' + + headers = { + 'header': 'qvalue', + 'content-length': '7', + } + + params = { + 'query': 'param', + } + + data = '''content''' + + self.response = self.client.request( + method='PATCH', + url=url, + headers=headers, + params=params, + data=data, + ) + """.strip() + + assert flow_export.locust_task(flow) == result |