From 91834f98ccb1e586dabb9c269c369a094a00f2f0 Mon Sep 17 00:00:00 2001 From: Arushit Mudgal Date: Sun, 4 Feb 2018 02:07:33 +0530 Subject: Extend mypy checking, fix #2194 (#2819) --- examples/complex/change_upstream_proxy.py | 7 ++- examples/complex/har_dump.py | 6 ++- examples/complex/sslstrip.py | 11 ++-- examples/complex/xss_scanner.py | 87 ++++++++++++++++--------------- test/examples/test_xss_scanner.py | 8 +-- tox.ini | 5 +- 6 files changed, 68 insertions(+), 56 deletions(-) diff --git a/examples/complex/change_upstream_proxy.py b/examples/complex/change_upstream_proxy.py index 49d5379f..089a9df5 100644 --- a/examples/complex/change_upstream_proxy.py +++ b/examples/complex/change_upstream_proxy.py @@ -1,3 +1,6 @@ +from mitmproxy import http +import typing + # This scripts demonstrates how mitmproxy can switch to a second/different upstream proxy # in upstream proxy mode. # @@ -6,7 +9,7 @@ # If you want to change the target server, you should modify flow.request.host and flow.request.port -def proxy_address(flow): +def proxy_address(flow: http.HTTPFlow) -> typing.Tuple[str, int]: # Poor man's loadbalancing: route every second domain through the alternative proxy. if hash(flow.request.host) % 2 == 1: return ("localhost", 8082) @@ -14,7 +17,7 @@ def proxy_address(flow): return ("localhost", 8081) -def request(flow): +def request(flow: http.HTTPFlow) -> None: if flow.request.method == "CONNECT": # If the decision is done by domain, one could also modify the server address here. # We do it after CONNECT here to have the request data available as well. diff --git a/examples/complex/har_dump.py b/examples/complex/har_dump.py index 66a81a7d..9e287a19 100644 --- a/examples/complex/har_dump.py +++ b/examples/complex/har_dump.py @@ -7,22 +7,24 @@ import json import base64 import zlib import os +import typing # noqa from datetime import datetime from datetime import timezone import mitmproxy +from mitmproxy import connections # noqa from mitmproxy import version from mitmproxy import ctx from mitmproxy.utils import strutils from mitmproxy.net.http import cookies -HAR = {} +HAR = {} # type: typing.Dict # A list of server seen till now is maintained so we can avoid # using 'connect' time for entries that use an existing connection. -SERVERS_SEEN = set() +SERVERS_SEEN = set() # type: typing.Set[connections.ServerConnection] def load(l): diff --git a/examples/complex/sslstrip.py b/examples/complex/sslstrip.py index 2f60c8b9..c3f8c4f7 100644 --- a/examples/complex/sslstrip.py +++ b/examples/complex/sslstrip.py @@ -3,13 +3,16 @@ This script implements an sslstrip-like attack based on mitmproxy. https://moxie.org/software/sslstrip/ """ import re -import urllib +import urllib.parse +import typing # noqa + +from mitmproxy import http # set of SSL/TLS capable hosts -secure_hosts = set() +secure_hosts = set() # type: typing.Set[str] -def request(flow): +def request(flow: http.HTTPFlow) -> None: flow.request.headers.pop('If-Modified-Since', None) flow.request.headers.pop('Cache-Control', None) @@ -27,7 +30,7 @@ def request(flow): flow.request.host = flow.request.pretty_host -def response(flow): +def response(flow: http.HTTPFlow) -> None: flow.response.headers.pop('Strict-Transport-Security', None) flow.response.headers.pop('Public-Key-Pins', None) diff --git a/examples/complex/xss_scanner.py b/examples/complex/xss_scanner.py index 4b35c6c1..0ee38cd4 100755 --- a/examples/complex/xss_scanner.py +++ b/examples/complex/xss_scanner.py @@ -35,14 +35,17 @@ Line: 1029zxcs'd"aoso[sb]po(pc)se;sl/bsl\eq=3847asd """ -from mitmproxy import ctx +from html.parser import HTMLParser +from typing import Dict, Union, Tuple, Optional, List, NamedTuple from socket import gaierror, gethostbyname from urllib.parse import urlparse -import requests import re -from html.parser import HTMLParser + +import requests + from mitmproxy import http -from typing import Dict, Union, Tuple, Optional, List, NamedTuple +from mitmproxy import ctx + # The actual payload is put between a frontWall and a backWall to make it easy # to locate the payload with regular expressions @@ -83,15 +86,16 @@ def get_cookies(flow: http.HTTPFlow) -> Cookies: return {name: value for name, value in flow.request.cookies.fields} -def find_unclaimed_URLs(body: Union[str, bytes], requestUrl: bytes) -> None: +def find_unclaimed_URLs(body: str, requestUrl: bytes) -> None: """ Look for unclaimed URLs in script tags and log them if found""" - def getValue(attrs: List[Tuple[str, str]], attrName: str) -> str: + def getValue(attrs: List[Tuple[str, str]], attrName: str) -> Optional[str]: for name, value in attrs: if attrName == name: return value + return None class ScriptURLExtractor(HTMLParser): - script_URLs = [] + script_URLs = [] # type: List[str] def handle_starttag(self, tag, attrs): if (tag == "script" or tag == "iframe") and "src" in [name for name, value in attrs]: @@ -100,13 +104,10 @@ def find_unclaimed_URLs(body: Union[str, bytes], requestUrl: bytes) -> None: self.script_URLs.append(getValue(attrs, "href")) parser = ScriptURLExtractor() - try: - parser.feed(body) - except TypeError: - parser.feed(body.decode('utf-8')) + parser.feed(body) for url in parser.script_URLs: - parser = urlparse(url) - domain = parser.netloc + url_parser = urlparse(url) + domain = url_parser.netloc try: gethostbyname(domain) except gaierror: @@ -178,10 +179,11 @@ def log_SQLi_data(sqli_info: Optional[SQLiData]) -> None: if not sqli_info: return ctx.log.error("===== SQLi Found =====") - ctx.log.error("SQLi URL: %s" % sqli_info.url.decode('utf-8')) - ctx.log.error("Injection Point: %s" % sqli_info.injection_point.decode('utf-8')) - ctx.log.error("Regex used: %s" % sqli_info.regex.decode('utf-8')) - ctx.log.error("Suspected DBMS: %s" % sqli_info.dbms.decode('utf-8')) + ctx.log.error("SQLi URL: %s" % sqli_info.url) + ctx.log.error("Injection Point: %s" % sqli_info.injection_point) + ctx.log.error("Regex used: %s" % sqli_info.regex) + ctx.log.error("Suspected DBMS: %s" % sqli_info.dbms) + return def get_SQLi_data(new_body: str, original_body: str, request_URL: str, injection_point: str) -> Optional[SQLiData]: @@ -202,20 +204,21 @@ def get_SQLi_data(new_body: str, original_body: str, request_URL: str, injection "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"), } for dbms, regexes in DBMS_ERRORS.items(): - for regex in regexes: + for regex in regexes: # type: ignore if re.search(regex, new_body, re.IGNORECASE) and not re.search(regex, original_body, re.IGNORECASE): return SQLiData(request_URL, injection_point, regex, dbms) + return None # A qc is either ' or " -def inside_quote(qc: str, substring: bytes, text_index: int, body: bytes) -> bool: +def inside_quote(qc: str, substring_bytes: bytes, text_index: int, body_bytes: bytes) -> bool: """ Whether the Numberth occurence of the first string in the second string is inside quotes as defined by the supplied QuoteChar """ - substring = substring.decode('utf-8') - body = body.decode('utf-8') + substring = substring_bytes.decode('utf-8') + body = body_bytes.decode('utf-8') num_substrings_found = 0 in_quote = False for index, char in enumerate(body): @@ -238,20 +241,20 @@ def inside_quote(qc: str, substring: bytes, text_index: int, body: bytes) -> boo return False -def paths_to_text(html: str, str: str) -> List[str]: +def paths_to_text(html: str, string: str) -> List[str]: """ Return list of Paths to a given str in the given HTML tree - Note that it does a BFS """ - def remove_last_occurence_of_sub_string(str: str, substr: str): + def remove_last_occurence_of_sub_string(string: str, substr: str) -> str: """ Delete the last occurence of substr from str String String -> String """ - index = str.rfind(substr) - return str[:index] + str[index + len(substr):] + index = string.rfind(substr) + return string[:index] + string[index + len(substr):] class PathHTMLParser(HTMLParser): currentPath = "" - paths = [] + paths = [] # type: List[str] def handle_starttag(self, tag, attrs): self.currentPath += ("/" + tag) @@ -260,7 +263,7 @@ def paths_to_text(html: str, str: str) -> List[str]: self.currentPath = remove_last_occurence_of_sub_string(self.currentPath, "/" + tag) def handle_data(self, data): - if str in data: + if string in data: self.paths.append(self.currentPath) parser = PathHTMLParser() @@ -268,7 +271,7 @@ def paths_to_text(html: str, str: str) -> List[str]: return parser.paths -def get_XSS_data(body: str, request_URL: str, injection_point: str) -> Optional[XSSData]: +def get_XSS_data(body: Union[str, bytes], request_URL: str, injection_point: str) -> Optional[XSSData]: """ Return a XSSDict if there is a XSS otherwise return None """ def in_script(text, index, body) -> bool: """ Whether the Numberth occurence of the first string in the second @@ -314,9 +317,9 @@ def get_XSS_data(body: str, request_URL: str, injection_point: str) -> Optional[ matches = regex.findall(body) for index, match in enumerate(matches): # Where the string is injected into the HTML - in_script = in_script(match, index, body) - in_HTML = in_HTML(match, index, body) - in_tag = not in_script and not in_HTML + in_script_val = in_script(match, index, body) + in_HTML_val = in_HTML(match, index, body) + in_tag = not in_script_val and not in_HTML_val in_single_quotes = inside_quote("'", match, index, body) in_double_quotes = inside_quote('"', match, index, body) # Whether you can inject: @@ -327,17 +330,17 @@ def get_XSS_data(body: str, request_URL: str, injection_point: str) -> Optional[ inject_slash = b"sl/bsl" in match # forward slashes inject_semi = b"se;sl" in match # semicolons inject_equals = b"eq=" in match # equals sign - if in_script and inject_slash and inject_open_angle and inject_close_angle: # e.g. + if in_script_val and inject_slash and inject_open_angle and inject_close_angle: # e.g. return XSSData(request_URL, injection_point, ' + elif in_script_val and in_single_quotes and inject_single_quotes and inject_semi: # e.g. return XSSData(request_URL, injection_point, "';alert(0);g='", match.decode('utf-8')) - elif in_script and in_double_quotes and inject_double_quotes and inject_semi: # e.g. + elif in_script_val and in_double_quotes and inject_double_quotes and inject_semi: # e.g. return XSSData(request_URL, injection_point, '";alert(0);g="', @@ -380,33 +383,35 @@ def get_XSS_data(body: str, request_URL: str, injection_point: str) -> Optional[ injection_point, " onmouseover=alert(0) t=", match.decode('utf-8')) - elif in_HTML and not in_script and inject_open_angle and inject_close_angle and inject_slash: # e.g. PAYLOAD + elif in_HTML_val and not in_script_val and inject_open_angle and inject_close_angle and inject_slash: # e.g. PAYLOAD return XSSData(request_URL, injection_point, '', match.decode('utf-8')) else: return None + return None # response is mitmproxy's entry point def response(flow: http.HTTPFlow) -> None: - cookiesDict = get_cookies(flow) + cookies_dict = get_cookies(flow) + resp = flow.response.get_text(strict=False) # Example: http://xss.guru/unclaimedScriptTag.html - find_unclaimed_URLs(flow.response.content, flow.request.url) - results = test_end_of_URL_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict) + find_unclaimed_URLs(resp, flow.request.url) + results = test_end_of_URL_injection(resp, flow.request.url, cookies_dict) log_XSS_data(results[0]) log_SQLi_data(results[1]) # Example: https://daviddworken.com/vulnerableReferer.php - results = test_referer_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict) + results = test_referer_injection(resp, flow.request.url, cookies_dict) log_XSS_data(results[0]) log_SQLi_data(results[1]) # Example: https://daviddworken.com/vulnerableUA.php - results = test_user_agent_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict) + results = test_user_agent_injection(resp, flow.request.url, cookies_dict) log_XSS_data(results[0]) log_SQLi_data(results[1]) if "?" in flow.request.url: # Example: https://daviddworken.com/vulnerable.php?name= - results = test_query_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict) + results = test_query_injection(resp, flow.request.url, cookies_dict) log_XSS_data(results[0]) log_SQLi_data(results[1]) diff --git a/test/examples/test_xss_scanner.py b/test/examples/test_xss_scanner.py index e15d7e10..8cf06a2a 100644 --- a/test/examples/test_xss_scanner.py +++ b/test/examples/test_xss_scanner.py @@ -343,10 +343,10 @@ class TestXSSScanner(): monkeypatch.setattr("mitmproxy.ctx.log", logger) xss.log_SQLi_data(None) assert logger.args == [] - xss.log_SQLi_data(xss.SQLiData(b'https://example.com', - b'Location', - b'Oracle.*Driver', - b'Oracle')) + xss.log_SQLi_data(xss.SQLiData('https://example.com', + 'Location', + 'Oracle.*Driver', + 'Oracle')) assert logger.args[0] == '===== SQLi Found =====' assert logger.args[1] == 'SQLi URL: https://example.com' assert logger.args[2] == 'Injection Point: Location' diff --git a/tox.ini b/tox.ini index 17790b96..d4ec2543 100644 --- a/tox.ini +++ b/tox.ini @@ -27,9 +27,8 @@ commands = flake8 --jobs 8 mitmproxy pathod examples test release python test/filename_matching.py rstcheck README.rst - mypy --ignore-missing-imports ./mitmproxy - mypy --ignore-missing-imports ./pathod - mypy --ignore-missing-imports --follow-imports=skip ./examples/simple/ + mypy --ignore-missing-imports ./mitmproxy ./pathod + mypy --ignore-missing-imports --follow-imports=skip ./examples/simple/ ./examples/pathod/ ./examples/complex/ [testenv:individual_coverage] deps = -- cgit v1.2.3