aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRouli <rouli.net@gmail.com>2013-01-28 17:37:25 +0200
committerRouli <rouli.net@gmail.com>2013-01-28 17:37:25 +0200
commit330fbfe8cc54449b3e7c0fe8305b13f29b47c15c (patch)
tree3e18b2cad965d75357119fe2ef0d4420c38130c2
parent1e0bab65e3131829560383d147a192e0912c2fe9 (diff)
downloadmitmproxy-330fbfe8cc54449b3e7c0fe8305b13f29b47c15c.tar.gz
mitmproxy-330fbfe8cc54449b3e7c0fe8305b13f29b47c15c.tar.bz2
mitmproxy-330fbfe8cc54449b3e7c0fe8305b13f29b47c15c.zip
adding helper functions to make HAR export easier
-rw-r--r--libmproxy/flow.py59
-rw-r--r--test/test_flow.py113
2 files changed, 172 insertions, 0 deletions
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index 2c4c5513..9238cfbf 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -197,6 +197,16 @@ class decoded(object):
class HTTPMsg(controller.Msg):
+ def get_decoded_content(self):
+ """
+ Returns the decoded content based on the current Content-Encoding header.
+ Doesn't change the message iteself or its headers.
+ """
+ ce = self.headers.get_first("content-encoding")
+ if not self.content or ce not in encoding.ENCODINGS:
+ return self.content
+ return encoding.decode(ce, self.content)
+
def decode(self):
"""
Decodes content based on the current Content-Encoding header, then
@@ -232,7 +242,15 @@ class HTTPMsg(controller.Msg):
else:
return hl
+ def get_content_type(self):
+ return self.headers.get_first("content-type")
+ def get_transmitted_size(self):
+ # FIXME: this is inprecise in case chunking is used
+ # (we should count the chunking headers)
+ if not self.content:
+ return 0
+ return len(self.content)
class Request(HTTPMsg):
"""
@@ -459,6 +477,28 @@ class Request(HTTPMsg):
self.scheme, self.host, self.port, self.path = parts
return True
+ def get_cookies(self):
+ cookie_headers = self.headers.get("cookie")
+ if not cookie_headers:
+ return None
+
+ cookies = []
+ for header in cookie_headers:
+ pairs = [pair.partition("=") for pair in header.split(';')]
+ cookies.extend((pair[0],(pair[2],{})) for pair in pairs)
+ return dict(cookies)
+
+ def get_header_size(self):
+ FMT = '%s %s HTTP/%s.%s\r\n%s\r\n'
+ assembled_header = FMT % (
+ self.method,
+ self.path,
+ self.httpversion[0],
+ self.httpversion[1],
+ str(self.headers)
+ )
+ return len(assembled_header)
+
def _assemble_head(self, proxy=False):
FMT = '%s %s HTTP/%s.%s\r\n%s\r\n'
FMT_PROXY = '%s %s://%s:%s%s HTTP/%s.%s\r\n%s\r\n'
@@ -713,6 +753,25 @@ class Response(HTTPMsg):
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
+ def get_header_size(self):
+ FMT = '%s\r\n%s\r\n'
+ proto = "HTTP/%s.%s %s %s"%(self.httpversion[0], self.httpversion[1], self.code, str(self.msg))
+ assembled_header = FMT % (proto, str(self.headers))
+ return len(assembled_header)
+
+ def get_cookies(self):
+ cookie_headers = self.headers.get("set-cookie")
+ if not cookie_headers:
+ return None
+
+ cookies = []
+ for header in cookie_headers:
+ pairs = [pair.partition("=") for pair in header.split(';')]
+ cookie_name = pairs[0][0] # the key of the first key/value pairs
+ cookie_value = pairs[0][2] # the value of the first key/value pairs
+ cookie_parameters = {key.strip().lower():value.strip() for key,sep,value in pairs[1:]}
+ cookies.append((cookie_name, (cookie_value, cookie_parameters)))
+ return dict(cookies)
class ClientDisconnect(controller.Msg):
"""
diff --git a/test/test_flow.py b/test/test_flow.py
index 2af702ce..ed3a988f 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -892,6 +892,57 @@ class TestRequest:
assert not r.headers["content-encoding"]
assert r.content == "falafel"
+ def test_get_cookies_single(self):
+ h = flow.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ result = r.get_cookies()
+ assert len(result)==1
+ assert result['cookiename']==('cookievalue',{})
+
+ def test_get_cookies_double(self):
+ h = flow.ODictCaseless()
+ h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ result = r.get_cookies()
+ assert len(result)==2
+ assert result['cookiename']==('cookievalue',{})
+ assert result['othercookiename']==('othercookievalue',{})
+
+ def test_get_cookies_withequalsign(self):
+ h = flow.ODictCaseless()
+ h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ result = r.get_cookies()
+ assert len(result)==2
+ assert result['cookiename']==('coo=kievalue',{})
+ assert result['othercookiename']==('othercookievalue',{})
+
+ def test_get_header_size(self):
+ h = flow.ODictCaseless()
+ h["headername"] = ["headervalue"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ result = r.get_header_size()
+ assert result==43
+
+ def test_get_transmitted_size(self):
+ h = flow.ODictCaseless()
+ h["headername"] = ["headervalue"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ result = r.get_transmitted_size()
+ assert result==len("content")
+
+ def test_get_content_type(self):
+ h = flow.ODictCaseless()
+ h["Content-Type"] = ["text/plain"]
+ c = flow.ClientConnect(("addr", 2222))
+ r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
+ assert r.get_content_type()=="text/plain"
class TestResponse:
def test_simple(self):
@@ -989,6 +1040,68 @@ class TestResponse:
assert not r.headers["content-encoding"]
assert r.content == "falafel"
+ def test_get_header_size(self):
+ r = tutils.tresp()
+ result = r.get_header_size()
+ assert result==49
+
+ def test_get_transmitted_size(self):
+ r = tutils.tresp()
+ r.headers["content-encoding"] = ["identity"]
+ r.content = "falafel"
+ result = r.get_transmitted_size()
+ assert result==len("falafel")
+
+ def test_get_cookies_simple(self):
+ h = flow.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue"]
+ resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
+ result = resp.get_cookies()
+ assert len(result)==1
+ assert "cookiename" in result
+ assert result["cookiename"] == ("cookievalue", {})
+
+ def test_get_cookies_with_parameters(self):
+ h = flow.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
+ resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
+ result = resp.get_cookies()
+ assert len(result)==1
+ assert "cookiename" in result
+ assert result["cookiename"][0] == "cookievalue"
+ assert len(result["cookiename"][1])==4
+ assert result["cookiename"][1]["domain"]=="example.com"
+ assert result["cookiename"][1]["expires"]=="Wed Oct 21 16:29:41 2015"
+ assert result["cookiename"][1]["path"]=="/"
+ assert result["cookiename"][1]["httponly"]==""
+
+ def test_get_cookies_no_value(self):
+ h = flow.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"]
+ resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
+ result = resp.get_cookies()
+ assert len(result)==1
+ assert "cookiename" in result
+ assert result["cookiename"][0] == ""
+ assert len(result["cookiename"][1])==2
+
+ def test_get_cookies_twocookies(self):
+ h = flow.ODictCaseless()
+ h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"]
+ resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
+ result = resp.get_cookies()
+ assert len(result)==2
+ assert "cookiename" in result
+ assert result["cookiename"] == ("cookievalue", {})
+ assert "othercookie" in result
+ assert result["othercookie"] == ("othervalue", {})
+
+ def test_get_content_type(self):
+ h = flow.ODictCaseless()
+ h["Content-Type"] = ["text/plain"]
+ resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
+ assert resp.get_content_type()=="text/plain"
+
class TestError:
def test_getset_state(self):