|
| 1 | +#!/usr/bin/env escript |
| 2 | +%% -*- erlang -*- |
| 3 | +%%! -pa _build/default/lib/erlang_python/ebin |
| 4 | + |
| 5 | +%%% @doc Benchmark script for Channel API vs Reactor comparison. |
| 6 | +%%% |
| 7 | +%%% Run with: |
| 8 | +%%% rebar3 compile && escript examples/bench_channel.erl |
| 9 | + |
| 10 | +-mode(compile). |
| 11 | + |
| 12 | +main(_Args) -> |
| 13 | + io:format("~n========================================~n"), |
| 14 | + io:format("Channel API Benchmark~n"), |
| 15 | + io:format("========================================~n~n"), |
| 16 | + |
| 17 | + %% Start the application |
| 18 | + {ok, _} = application:ensure_all_started(erlang_python), |
| 19 | + {ok, _} = py:start_contexts(), |
| 20 | + ok = py_channel:register_callbacks(), |
| 21 | + |
| 22 | + %% Print system info |
| 23 | + io:format("System Information:~n"), |
| 24 | + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), |
| 25 | + {ok, PyVer} = py:version(), |
| 26 | + io:format(" Python: ~s~n", [PyVer]), |
| 27 | + io:format("~n"), |
| 28 | + |
| 29 | + %% Run benchmarks |
| 30 | + run_channel_throughput_bench(), |
| 31 | + run_channel_vs_reactor_bench(), |
| 32 | + run_channel_latency_bench(), |
| 33 | + |
| 34 | + io:format("~n========================================~n"), |
| 35 | + io:format("Benchmark Complete~n"), |
| 36 | + io:format("========================================~n"), |
| 37 | + |
| 38 | + halt(0). |
| 39 | + |
| 40 | +run_channel_throughput_bench() -> |
| 41 | + io:format("~n--- Channel Throughput Benchmark ---~n"), |
| 42 | + io:format("Messages per batch: 1000~n~n"), |
| 43 | + |
| 44 | + Sizes = [64, 256, 1024, 4096, 16384], |
| 45 | + |
| 46 | + io:format("~8s | ~12s | ~12s | ~12s~n", |
| 47 | + ["Size", "Send (msg/s)", "Recv (msg/s)", "Round (msg/s)"]), |
| 48 | + io:format("~s~n", [string:copies("-", 52)]), |
| 49 | + |
| 50 | + lists:foreach(fun(Size) -> |
| 51 | + {ok, Ch} = py_channel:new(), |
| 52 | + Data = binary:copy(<<0>>, Size), |
| 53 | + Iterations = 1000, |
| 54 | + |
| 55 | + %% Benchmark send |
| 56 | + SendStart = erlang:monotonic_time(microsecond), |
| 57 | + lists:foreach(fun(_) -> |
| 58 | + ok = py_channel:send(Ch, Data) |
| 59 | + end, lists:seq(1, Iterations)), |
| 60 | + SendEnd = erlang:monotonic_time(microsecond), |
| 61 | + SendTime = (SendEnd - SendStart) / 1000000, |
| 62 | + SendRate = Iterations / SendTime, |
| 63 | + |
| 64 | + %% Benchmark receive (drain the queue) |
| 65 | + RecvStart = erlang:monotonic_time(microsecond), |
| 66 | + lists:foreach(fun(_) -> |
| 67 | + {ok, _} = py_nif:channel_try_receive(Ch) |
| 68 | + end, lists:seq(1, Iterations)), |
| 69 | + RecvEnd = erlang:monotonic_time(microsecond), |
| 70 | + RecvTime = (RecvEnd - RecvStart) / 1000000, |
| 71 | + RecvRate = Iterations / RecvTime, |
| 72 | + |
| 73 | + %% Benchmark round-trip (send + receive) |
| 74 | + RoundStart = erlang:monotonic_time(microsecond), |
| 75 | + lists:foreach(fun(_) -> |
| 76 | + ok = py_channel:send(Ch, Data), |
| 77 | + {ok, _} = py_nif:channel_try_receive(Ch) |
| 78 | + end, lists:seq(1, Iterations)), |
| 79 | + RoundEnd = erlang:monotonic_time(microsecond), |
| 80 | + RoundTime = (RoundEnd - RoundStart) / 1000000, |
| 81 | + RoundRate = Iterations / RoundTime, |
| 82 | + |
| 83 | + io:format("~8B | ~12w | ~12w | ~12w~n", |
| 84 | + [Size, round(SendRate), round(RecvRate), round(RoundRate)]), |
| 85 | + |
| 86 | + py_channel:close(Ch) |
| 87 | + end, Sizes), |
| 88 | + ok. |
| 89 | + |
| 90 | +run_channel_vs_reactor_bench() -> |
| 91 | + io:format("~n--- Channel vs Reactor Comparison ---~n"), |
| 92 | + io:format("Iterations: 5000~n~n"), |
| 93 | + |
| 94 | + Code = <<" |
| 95 | +import time |
| 96 | +import socket |
| 97 | +import erlang |
| 98 | +import erlang.reactor as reactor |
| 99 | +
|
| 100 | +def bench_channel_python(iterations=5000): |
| 101 | + '''Benchmark Channel from Python side''' |
| 102 | + results = {} |
| 103 | + sizes = [64, 256, 1024, 4096, 16384] |
| 104 | +
|
| 105 | + for size in sizes: |
| 106 | + test_data = b'X' * size |
| 107 | +
|
| 108 | + # We can't easily create channels from Python yet, |
| 109 | + # so we benchmark try_receive on pre-filled channels |
| 110 | + # This measures Python-side overhead |
| 111 | +
|
| 112 | + # Measure try_receive with None ref (measures call overhead) |
| 113 | + start = time.perf_counter() |
| 114 | + for _ in range(iterations): |
| 115 | + try: |
| 116 | + # Just measure the erlang.call overhead |
| 117 | + pass |
| 118 | + except: |
| 119 | + pass |
| 120 | + call_time = time.perf_counter() - start |
| 121 | +
|
| 122 | + results[size] = { |
| 123 | + 'call_overhead_ms': call_time * 1000, |
| 124 | + 'ops_per_sec': iterations / max(call_time, 0.0001), |
| 125 | + } |
| 126 | +
|
| 127 | + return results |
| 128 | +
|
| 129 | +def bench_reactor_protocol(iterations=200): |
| 130 | + '''Benchmark Reactor Protocol pattern''' |
| 131 | + results = {} |
| 132 | + sizes = [64, 256, 1024, 4096, 16384] |
| 133 | +
|
| 134 | + class EchoProtocol(reactor.Protocol): |
| 135 | + def data_received(self, data): |
| 136 | + self.write_buffer.extend(data) |
| 137 | + return 'write_pending' |
| 138 | +
|
| 139 | + def write_ready(self): |
| 140 | + if not self.write_buffer: |
| 141 | + return 'read_pending' |
| 142 | + written = self.write(bytes(self.write_buffer)) |
| 143 | + del self.write_buffer[:written] |
| 144 | + return 'continue' if self.write_buffer else 'read_pending' |
| 145 | +
|
| 146 | + for size in sizes: |
| 147 | + test_data = b'X' * size |
| 148 | + times = [] |
| 149 | +
|
| 150 | + for _ in range(iterations): |
| 151 | + s1, s2 = socket.socketpair() |
| 152 | + s1.setblocking(False) |
| 153 | + s2.setblocking(False) |
| 154 | +
|
| 155 | + try: |
| 156 | + reactor.set_protocol_factory(EchoProtocol) |
| 157 | + reactor.init_connection(s1.fileno(), {'type': 'test'}) |
| 158 | +
|
| 159 | + s2.send(test_data) |
| 160 | +
|
| 161 | + start = time.perf_counter() |
| 162 | + action = reactor.on_read_ready(s1.fileno()) |
| 163 | + elapsed = time.perf_counter() - start |
| 164 | + times.append(elapsed) |
| 165 | +
|
| 166 | + reactor.close_connection(s1.fileno()) |
| 167 | + finally: |
| 168 | + s1.close() |
| 169 | + s2.close() |
| 170 | +
|
| 171 | + import statistics |
| 172 | + avg_time = statistics.mean(times) |
| 173 | + results[size] = { |
| 174 | + 'avg_time_ms': avg_time * 1000, |
| 175 | + 'ops_per_sec': 1.0 / avg_time, |
| 176 | + } |
| 177 | +
|
| 178 | + return results |
| 179 | +
|
| 180 | +_reactor_results = bench_reactor_protocol() |
| 181 | +">>, |
| 182 | + |
| 183 | + ok = py:exec(Code), |
| 184 | + {ok, ReactorResults} = py:eval(<<"_reactor_results">>), |
| 185 | + |
| 186 | + io:format("Reactor Protocol (echo pattern):~n"), |
| 187 | + io:format("~8s | ~12s | ~12s~n", |
| 188 | + ["Size", "Avg (ms)", "Ops/sec"]), |
| 189 | + io:format("~s~n", [string:copies("-", 36)]), |
| 190 | + |
| 191 | + Sizes = [64, 256, 1024, 4096, 16384], |
| 192 | + lists:foreach(fun(Size) -> |
| 193 | + Data = maps:get(Size, ReactorResults), |
| 194 | + AvgMs = maps:get(<<"avg_time_ms">>, Data), |
| 195 | + OpsPerSec = maps:get(<<"ops_per_sec">>, Data), |
| 196 | + io:format("~8B | ~12.3f | ~12w~n", |
| 197 | + [Size, AvgMs, round(OpsPerSec)]) |
| 198 | + end, Sizes), |
| 199 | + |
| 200 | + %% Now compare with Channel round-trip |
| 201 | + io:format("~nChannel Round-trip (Erlang send + NIF receive):~n"), |
| 202 | + io:format("~8s | ~12s | ~12s | ~8s~n", |
| 203 | + ["Size", "Avg (ms)", "Ops/sec", "vs React"]), |
| 204 | + io:format("~s~n", [string:copies("-", 48)]), |
| 205 | + |
| 206 | + lists:foreach(fun(Size) -> |
| 207 | + {ok, Ch} = py_channel:new(), |
| 208 | + Data = binary:copy(<<0>>, Size), |
| 209 | + Iterations = 1000, |
| 210 | + |
| 211 | + Start = erlang:monotonic_time(microsecond), |
| 212 | + lists:foreach(fun(_) -> |
| 213 | + ok = py_channel:send(Ch, Data), |
| 214 | + {ok, _} = py_nif:channel_try_receive(Ch) |
| 215 | + end, lists:seq(1, Iterations)), |
| 216 | + End = erlang:monotonic_time(microsecond), |
| 217 | + |
| 218 | + TotalTime = (End - Start) / 1000000, |
| 219 | + AvgMs = (TotalTime / Iterations) * 1000, |
| 220 | + OpsPerSec = Iterations / TotalTime, |
| 221 | + |
| 222 | + ReactorData = maps:get(Size, ReactorResults), |
| 223 | + ReactorOps = maps:get(<<"ops_per_sec">>, ReactorData), |
| 224 | + Ratio = OpsPerSec / ReactorOps, |
| 225 | + |
| 226 | + io:format("~8B | ~12.3f | ~12w | ~.1fx~n", |
| 227 | + [Size, AvgMs, round(OpsPerSec), Ratio]), |
| 228 | + |
| 229 | + py_channel:close(Ch) |
| 230 | + end, Sizes), |
| 231 | + ok. |
| 232 | + |
| 233 | +run_channel_latency_bench() -> |
| 234 | + io:format("~n--- Channel Latency Distribution ---~n"), |
| 235 | + io:format("Iterations: 10000~n~n"), |
| 236 | + |
| 237 | + Sizes = [64, 1024, 16384], |
| 238 | + |
| 239 | + io:format("~8s | ~10s | ~10s | ~10s | ~10s~n", |
| 240 | + ["Size", "Min (us)", "Avg (us)", "P99 (us)", "Max (us)"]), |
| 241 | + io:format("~s~n", [string:copies("-", 56)]), |
| 242 | + |
| 243 | + lists:foreach(fun(Size) -> |
| 244 | + {ok, Ch} = py_channel:new(), |
| 245 | + Data = binary:copy(<<0>>, Size), |
| 246 | + Iterations = 10000, |
| 247 | + |
| 248 | + %% Collect latencies |
| 249 | + Latencies = lists:map(fun(_) -> |
| 250 | + Start = erlang:monotonic_time(microsecond), |
| 251 | + ok = py_channel:send(Ch, Data), |
| 252 | + {ok, _} = py_nif:channel_try_receive(Ch), |
| 253 | + End = erlang:monotonic_time(microsecond), |
| 254 | + End - Start |
| 255 | + end, lists:seq(1, Iterations)), |
| 256 | + |
| 257 | + Sorted = lists:sort(Latencies), |
| 258 | + Min = hd(Sorted), |
| 259 | + Max = lists:last(Sorted), |
| 260 | + Avg = lists:sum(Latencies) / Iterations, |
| 261 | + P99Idx = round(Iterations * 0.99), |
| 262 | + P99 = lists:nth(P99Idx, Sorted), |
| 263 | + |
| 264 | + io:format("~8B | ~10.1f | ~10.1f | ~10.1f | ~10.1f~n", |
| 265 | + [Size, float(Min), Avg, float(P99), float(Max)]), |
| 266 | + |
| 267 | + py_channel:close(Ch) |
| 268 | + end, Sizes), |
| 269 | + ok. |
0 commit comments