aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/view.py66
-rw-r--r--test/mitmproxy/addons/test_view.py34
2 files changed, 50 insertions, 50 deletions
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index af366d67..8c0567a5 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -15,15 +15,15 @@ import datetime
import blinker
import sortedcontainers
-from mitmproxy import flow
+import mitmproxy.flow
from mitmproxy import flowfilter
-def key_request_start(f: flow.Flow) -> datetime.datetime:
+def key_request_start(f: mitmproxy.flow.Flow) -> datetime.datetime:
return f.request.timestamp_start or 0
-def key_request_method(f: flow.Flow) -> str:
+def key_request_method(f: mitmproxy.flow.Flow) -> str:
return f.request.method
@@ -67,19 +67,19 @@ class View(collections.Sequence):
def __len__(self):
return len(self._view)
- def __getitem__(self, offset) -> flow.Flow:
+ def __getitem__(self, offset) -> mitmproxy.flow.Flow:
return self._view[self._rev(offset)]
# Reflect some methods to the efficient underlying implementation
- def bisect(self, f: flow.Flow) -> int:
+ def bisect(self, f: mitmproxy.flow.Flow) -> int:
v = self._view.bisect(f)
# Bisect returns an item to the RIGHT of the existing entries.
if v == 0:
return v
return self._rev(v - 1) + 1
- def index(self, f: flow.Flow) -> int:
+ def index(self, f: mitmproxy.flow.Flow) -> int:
return self._rev(self._view.index(f))
# API
@@ -116,7 +116,7 @@ class View(collections.Sequence):
self._view.clear()
self.sig_refresh.send(self)
- def add(self, f: flow.Flow):
+ def add(self, f: mitmproxy.flow.Flow):
"""
Adds a flow to the state. If the flow already exists, it is
ignored.
@@ -127,7 +127,7 @@ class View(collections.Sequence):
self._view.add(f)
self.sig_add.send(self, flow=f)
- def remove(self, f: flow.Flow):
+ def remove(self, f: mitmproxy.flow.Flow):
"""
Removes the flow from the underlying store and the view.
"""
@@ -137,7 +137,7 @@ class View(collections.Sequence):
self._view.remove(f)
self.sig_remove.send(self, flow=f)
- def update(self, f: flow.Flow):
+ def update(self, f: mitmproxy.flow.Flow):
"""
Updates a flow. If the flow is not in the state, it's ignored.
"""
@@ -180,65 +180,65 @@ class Focus:
"""
def __init__(self, v: View) -> None:
self.view = v
- self._focusflow = None
+ self._flow = None
if len(self.view):
- self.focusflow = self.view[0]
+ self.flow = self.view[0]
v.sig_add.connect(self._sig_add)
v.sig_remove.connect(self._sig_remove)
v.sig_refresh.connect(self._sig_refresh)
@property
- def focusflow(self) -> typing.Optional[flow.Flow]:
- return self._focusflow
+ def flow(self) -> typing.Optional[mitmproxy.flow.Flow]:
+ return self._flow
- @focusflow.setter
- def focusflow(self, f: flow.Flow):
+ @flow.setter
+ def flow(self, f: mitmproxy.flow.Flow):
if f is not None and f not in self.view:
raise ValueError("Attempt to set focus to flow not in view")
- self._focusflow = f
+ self._flow = f
@property
def index(self) -> typing.Optional[int]:
- if self.focusflow:
- return self.view.index(self.focusflow)
+ if self.flow:
+ return self.view.index(self.flow)
def next(self):
"""
Sets the focus to the next flow.
"""
- if self.focusflow:
+ if self.flow:
idx = min(self.index + 1, len(self.view) - 1)
- self.focusflow = self.view[idx]
+ self.flow = self.view[idx]
def prev(self):
"""
Sets the focus to the previous flow.
"""
- if self.focusflow:
+ if self.flow:
idx = max(self.index - 1, 0)
- self.focusflow = self.view[idx]
+ self.flow = self.view[idx]
def _nearest(self, f, v):
return min(v.bisect(f), len(v) - 1)
def _sig_remove(self, view, flow):
if len(view) == 0:
- self.focusflow = None
- elif flow is self.focusflow:
- self.focusflow = view[self._nearest(self.focusflow, view)]
+ self.flow = None
+ elif flow is self.flow:
+ self.flow = view[self._nearest(self.flow, view)]
def _sig_refresh(self, view):
if len(view) == 0:
- self.focusflow = None
- elif self.focusflow is None:
- self.focusflow = view[0]
- elif self.focusflow not in view:
- self.focusflow = view[self._nearest(self.focusflow, view)]
+ self.flow = None
+ elif self.flow is None:
+ self.flow = view[0]
+ elif self.flow not in view:
+ self.flow = view[self._nearest(self.flow, view)]
def _sig_add(self, view, flow):
# We only have to act if we don't have a focus element
- if not self.focusflow:
- self.focusflow = flow
+ if not self.flow:
+ self.flow = flow
class Settings(collections.Mapping):
@@ -254,7 +254,7 @@ class Settings(collections.Mapping):
def __len__(self) -> int:
return len(self.values)
- def __getitem__(self, f: flow.Flow) -> dict:
+ def __getitem__(self, f: mitmproxy.flow.Flow) -> dict:
if f.id not in self.view._store:
raise KeyError
return self.values.setdefault(f.id, {})
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index e4083bac..15cf534e 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -186,45 +186,45 @@ def test_focus():
v.add(tft())
f = view.Focus(v)
assert f.index is 0
- assert f.focusflow is v[0]
+ assert f.flow is v[0]
# Start empty
v = view.View()
f = view.Focus(v)
assert f.index is None
- assert f.focusflow is None
+ assert f.flow is None
v.add(tft(start=1))
assert f.index == 0
- assert f.focusflow is v[0]
+ assert f.flow is v[0]
v.add(tft(start=0))
assert f.index == 1
- assert f.focusflow is v[1]
+ assert f.flow is v[1]
v.add(tft(start=2))
assert f.index == 1
- assert f.focusflow is v[1]
+ assert f.flow is v[1]
v.remove(v[1])
assert f.index == 1
- assert f.focusflow is v[1]
+ assert f.flow is v[1]
v.remove(v[1])
assert f.index == 0
- assert f.focusflow is v[0]
+ assert f.flow is v[0]
v.remove(v[0])
assert f.index is None
- assert f.focusflow is None
+ assert f.flow is None
v.add(tft(method="get", start=0))
v.add(tft(method="get", start=1))
v.add(tft(method="put", start=2))
v.add(tft(method="get", start=3))
- f.focusflow = v[2]
- assert f.focusflow.request.method == "PUT"
+ f.flow = v[2]
+ assert f.flow.request.method == "PUT"
filt = flowfilter.parse("~m get")
v.set_filter(filt)
@@ -243,21 +243,21 @@ def test_focus_nextprev():
# Nops on a single-flow view
v.add(tft(start=0))
- assert v.focus.focusflow == v[0]
+ assert v.focus.flow == v[0]
v.focus.next()
- assert v.focus.focusflow == v[0]
+ assert v.focus.flow == v[0]
v.focus.prev()
- assert v.focus.focusflow == v[0]
+ assert v.focus.flow == v[0]
v.add(tft(start=1))
v.focus.next()
- assert v.focus.focusflow == v[1]
+ assert v.focus.flow == v[1]
v.focus.next()
- assert v.focus.focusflow == v[1]
+ assert v.focus.flow == v[1]
v.focus.prev()
- assert v.focus.focusflow == v[0]
+ assert v.focus.flow == v[0]
v.focus.prev()
- assert v.focus.focusflow == v[0]
+ assert v.focus.flow == v[0]
def test_settings():