From e727446f14870f087df2687883c3445fe10c3d95 Mon Sep 17 00:00:00 2001 From: madt1m Date: Mon, 23 Jul 2018 21:12:53 +0200 Subject: benchmark: some improvements - limit to queue size --- test/bench/serialization-bm.py | 52 +++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 19 deletions(-) mode change 100755 => 100644 test/bench/serialization-bm.py diff --git a/test/bench/serialization-bm.py b/test/bench/serialization-bm.py old mode 100755 new mode 100644 index 6bac6415..665b72cb --- a/test/bench/serialization-bm.py +++ b/test/bench/serialization-bm.py @@ -1,6 +1,8 @@ +import tempfile import asyncio +import typing import time -import os + from statistics import mean from mitmproxy import ctx @@ -16,18 +18,20 @@ class StreamTester: """ def __init__(self): - self.loop = asyncio.get_event_loop() - self.queue = asyncio.Queue(loop=self.loop) self.dbh = None self.streaming = False self.tf = None self.out = None self.hot_flows = [] self.results = [] - self._fflushes = 0 + self._flushes = 0 self._stream_period = 0.001 self._flush_period = 3.0 self._flush_rate = 150 + self._target = 2000 + self.loop = asyncio.get_event_loop() + self.queue = asyncio.Queue(maxsize=self._flush_rate * 3, loop=self.loop) + self.temp = tempfile.NamedTemporaryFile() def load(self, loader): loader.add_option( @@ -38,21 +42,30 @@ class StreamTester: ) loader.add_option( "benchmark_save_path", - str, - "/tmp/stats", + typing.Optional[str], + None, "Destination for the stats result file" ) + def _log(self, msg): + if self.out: + self.out.write(msg + '\n') + else: + ctx.log(msg) + def running(self): if not self.streaming: ctx.log("<== Serialization Benchmark Enabled ==>") self.tf = tflow.tflow() self.tf.request.content = b'A' * ctx.options.testflow_size ctx.log(f"With content size: {len(self.tf.request.content)} B") - self.dbh = db.DBHandler("/tmp/temp.sqlite", mode='write') - self.out = open(ctx.options.benchmark_save_path, "w") + if ctx.options.benchmark_save_path: + ctx.log(f"Storing results to {ctx.options.benchmark_save_path}") + self.out = open(ctx.options.benchmark_save_path, "w") + self.dbh = db.DBHandler(self.temp.name, mode='write') self.streaming = True - self.loop.create_task(asyncio.gather(self.writer(), self.stream(), self.stats())) + tasks = (self.stream, self.writer, self.stats) + self.loop.create_task(asyncio.gather(*(t() for t in tasks))) async def stream(self): while True: @@ -65,7 +78,7 @@ class StreamTester: count = 1 f = await self.queue.get() self.hot_flows.append(f) - while not self.queue.empty() and count < self._flush_rate: + while count < self._flush_rate: try: self.hot_flows.append(self.queue.get_nowait()) count += 1 @@ -74,20 +87,21 @@ class StreamTester: start = time.perf_counter() n = self._fflush() end = time.perf_counter() - self.out.write(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}\n") + self._log(f"dumps/time ratio: {n} / {end-start} -> {n/(end-start)}") self.results.append(n / (end - start)) - self._fflushes += 1 - ctx.log(f"Flushes: {self._fflushes}") + self._flushes += n + self._log(f"Flows dumped: {self._flushes}") + ctx.log(f"Progress: {min(100.0, 100.0 * (self._flushes / self._target))}%") async def stats(self): while True: await asyncio.sleep(1.0) - if self._fflushes == 21: - self.out.write(f"AVG : {mean(self.results)}\n") - ctx.log(f"<== Benchmark Ended. Collect results at {ctx.options.benchmark_save_path} ==>") - self.out.close() - del self.dbh - os.remove("/tmp/temp.sqlite") + if self._flushes >= self._target: + self._log(f"AVG : {mean(self.results)}") + ctx.log(f"<== Benchmark Ended. Shutting down... ==>") + if self.out: + self.out.close() + self.temp.close() ctx.master.shutdown() def _fflush(self): -- cgit v1.2.3