Skip to content

Commit c87d602

Browse files
Feature/multi processing (#134)
* ✨ Implement Scheduler for multi processing - Add scheduler singleton to handle creating pools an closing pools. - Remove Pool references and replace scheduler - Update tests and benchmarks * Modify scheduler to include only one method add tally methods to tally in batches * fix naming * fix unused inports * return int * increase the deadline on slow tests * Address PR Feedback Co-authored-by: Keith Fung <keith.fung@infernored.com> Co-authored-by: Keith Fung <keithrfung@users.noreply.github.com>
1 parent 0150373 commit c87d602

12 files changed

Lines changed: 564 additions & 262 deletions

File tree

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: all environment install install-mac install-linux install-windows lint validate test test-example coverage coverage-html coverage-xml coverage-erase generate-sample-data
1+
.PHONY: all bench environment install install-mac install-linux install-windows lint validate test test-example coverage coverage-html coverage-xml coverage-erase generate-sample-data
22

33
CODE_COVERAGE ?= 90
44
WINDOWS_32BIT_GMPY2 ?= packages/gmpy2-2.0.8-cp38-cp38-win32.whl
@@ -8,6 +8,10 @@ IS_64_BIT ?= $(shell python -c 'from sys import maxsize; print(maxsize > 2**32)'
88

99
all: environment install validate lint coverage
1010

11+
bench:
12+
@echo 📊 BENCHMARKS
13+
pipenv run python -s bench/bench_chaum_pedersen.py
14+
1115
environment:
1216
@echo 🔧 PIPENV SETUP
1317
pip install pipenv

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ typish = '*'
2222
[packages]
2323
numpy = '>=1.18.2'
2424
jsons = '>=1.1.2'
25+
psutil = '>=5.7.2'
2526
cryptography = "*"
2627

2728
[requires]

Pipfile.lock

Lines changed: 191 additions & 135 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench/bench_chaum_pedersen.py

Lines changed: 55 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
from multiprocessing import Pool, cpu_count
21
from timeit import default_timer as timer
3-
from typing import Tuple, NamedTuple, Dict
2+
from typing import Dict, List, NamedTuple, Tuple
43

54
from numpy import average, std
65

@@ -13,7 +12,7 @@
1312
)
1413
from electionguard.group import ElementModQ, int_to_q_unchecked
1514
from electionguard.nonces import Nonces
16-
15+
from electionguard.scheduler import Scheduler
1716
from electionguard.utils import get_optional
1817

1918

@@ -49,62 +48,64 @@ def identity(x: int) -> int:
4948
problem_sizes = (100, 500, 1000, 5000)
5049
rands = Nonces(int_to_q_unchecked(31337))
5150
speedup: Dict[int, float] = {}
52-
print(f"CPUs detected: {cpu_count()}, launching thread pool")
53-
pool = Pool(cpu_count())
5451

5552
# warm up the pool to help get consistent measurements
56-
results = pool.map(identity, range(1, 30000))
57-
assert results == list(range(1, 30000))
58-
59-
bench_start = timer()
60-
61-
for size in problem_sizes:
62-
print("Benchmarking on problem size: ", size)
63-
seeds = rands[0:size]
64-
inputs = [
65-
BenchInput(
66-
get_optional(elgamal_keypair_from_secret(a)),
67-
rands[size],
68-
rands[size + 1],
69-
)
70-
for a in seeds
71-
]
72-
start_all_scalar = timer()
73-
timing_data = [chaum_pedersen_bench(i) for i in inputs]
74-
end_all_scalar = timer()
75-
76-
print(f" Creating Chaum-Pedersen proofs ({size} iterations)")
77-
avg_proof_scalar = average([t[0] for t in timing_data])
78-
std_proof_scalar = std([t[0] for t in timing_data])
79-
print(f" Avg = {avg_proof_scalar:.6f} sec")
80-
print(f" Stddev = {std_proof_scalar:.6f} sec")
81-
82-
print(f" Validating Chaum-Pedersen proofs ({size} iterations)")
83-
avg_verify_scalar = average([t[1] for t in timing_data])
84-
std_verify_scalar = std([t[1] for t in timing_data])
85-
print(f" Avg = {avg_verify_scalar:.6f} sec")
86-
print(f" Stddev = {std_verify_scalar:.6f} sec")
87-
88-
# Run in parallel
89-
start_all_parallel = timer()
90-
timing_data_parallel = pool.map(chaum_pedersen_bench, inputs)
91-
end_all_parallel = timer()
92-
93-
speedup[size] = (end_all_scalar - start_all_scalar) / (
94-
end_all_parallel - start_all_parallel
53+
with Scheduler() as scheduler:
54+
results: List[int] = scheduler.schedule(
55+
identity, [list([x]) for x in range(1, 30000)]
9556
)
96-
print(f" Parallel speedup: {speedup[size]:.3f}x")
57+
assert results == list(range(1, 30000))
58+
59+
bench_start = timer()
60+
61+
for size in problem_sizes:
62+
print("Benchmarking on problem size: ", size)
63+
seeds = rands[0:size]
64+
inputs = [
65+
BenchInput(
66+
get_optional(elgamal_keypair_from_secret(a)),
67+
rands[size],
68+
rands[size + 1],
69+
)
70+
for a in seeds
71+
]
72+
start_all_scalar = timer()
73+
timing_data = [chaum_pedersen_bench(i) for i in inputs]
74+
end_all_scalar = timer()
75+
76+
print(f" Creating Chaum-Pedersen proofs ({size} iterations)")
77+
avg_proof_scalar = average([t[0] for t in timing_data])
78+
std_proof_scalar = std([t[0] for t in timing_data])
79+
print(f" Avg = {avg_proof_scalar:.6f} sec")
80+
print(f" Stddev = {std_proof_scalar:.6f} sec")
81+
82+
print(f" Validating Chaum-Pedersen proofs ({size} iterations)")
83+
avg_verify_scalar = average([t[1] for t in timing_data])
84+
std_verify_scalar = std([t[1] for t in timing_data])
85+
print(f" Avg = {avg_verify_scalar:.6f} sec")
86+
print(f" Stddev = {std_verify_scalar:.6f} sec")
87+
88+
# Run in parallel
89+
start_all_parallel = timer()
90+
timing_data_parallel: List[Tuple[float, float]] = scheduler.schedule(
91+
chaum_pedersen_bench, [list([input]) for input in inputs]
92+
)
93+
end_all_parallel = timer()
9794

98-
print()
99-
print("PARALLELISM SPEEDUPS")
100-
print("Size / Speedup")
101-
for size in problem_sizes:
102-
print(f"{size:4d} / {speedup[size]:.3f}x")
103-
pool.close()
95+
speedup[size] = (end_all_scalar - start_all_scalar) / (
96+
end_all_parallel - start_all_parallel
97+
)
98+
print(f" Parallel speedup: {speedup[size]:.3f}x")
10499

105-
bench_end = timer()
106-
print()
107-
print(f"Total benchmark runtime: {bench_end - bench_start} sec")
100+
print()
101+
print("PARALLELISM SPEEDUPS")
102+
print("Size / Speedup")
103+
for size in problem_sizes:
104+
print(f"{size:4d} / {speedup[size]:.3f}x")
105+
106+
bench_end = timer()
107+
print()
108+
print(f"Total benchmark runtime: {bench_end - bench_start} sec")
108109

109110
##############################################################################################################
110111
# Performance conclusions (Dan Wallach, 21 March 2020):
@@ -117,11 +118,3 @@ def identity(x: int) -> int:
117118
# 500 / 5.765x
118119
# 1000 / 5.507x
119120
# 5000 / 5.548x
120-
121-
# I've never seen this break 6x, and tweaking various parameters (e.g., cpu_count() returns 12, so I've
122-
# tried 6) yielded no significant improvement. One thing that seems to matter a lot: creating a Pool is
123-
# a heavyweight operation. Keeping it around and reusing it has a significant impact on performance.
124-
125-
# Moral of the story: the Pool.map() method is very much "good enough" to squeeze useful parallelism out
126-
# of any machine where we'll be verifying a lot of ballots. If we need radically more throughput, we're
127-
# probably going to need to move to running on clusters.

setup.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,11 @@
6262
classifiers=CLASSIFIERS,
6363
project_urls=PROJECT_URLS,
6464
python_requires="~=3.8",
65-
install_requires=["gmpy2>=2.0.8", "numpy>=1.18.2", "jsons>=1.1.2", "cryptography",],
65+
install_requires=[
66+
"gmpy2>=2.0.8",
67+
"numpy>=1.18.2",
68+
"jsons>=1.1.2",
69+
"cryptography",
70+
"psutil>=5.7.2",
71+
],
6672
)

src/electionguard/decryption.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
from multiprocessing import cpu_count
2-
from multiprocessing.dummy import Pool
3-
from typing import Dict, Optional
1+
from typing import Dict, List, Optional
42

53
from .auxiliary import AuxiliaryDecrypt
64
from .ballot import CiphertextAcceptedBallot, CiphertextSelection
@@ -21,6 +19,7 @@
2119
from .key_ceremony import ElectionPublicKey
2220
from .logs import log_warning
2321
from .rsa import rsa_decrypt
22+
from .scheduler import Scheduler
2423
from .tally import (
2524
CiphertextTally,
2625
CiphertextTallyContest,
@@ -111,17 +110,20 @@ def compute_decryption_share_for_cast_contests(
111110
"""
112111
Compute the decryption for all of the cast contests in the Ciphertext Tally
113112
"""
114-
cpu_pool = Pool(cpu_count())
115113
contests: Dict[CONTEST_ID, CiphertextDecryptionContest] = {}
114+
scheduler = Scheduler()
116115

117116
for contest in tally.cast.values():
118117
selections: Dict[SELECTION_ID, CiphertextDecryptionSelection] = {}
119-
selection_decryptions = cpu_pool.starmap(
118+
selection_decryptions: List[
119+
Optional[CiphertextDecryptionSelection]
120+
] = scheduler.schedule(
120121
compute_decryption_share_for_selection,
121122
[
122123
(guardian, selection, context)
123124
for (_, selection) in contest.tally_selections.items()
124125
],
126+
with_shared_resources=True,
125127
)
126128

127129
# verify the decryptions are received and add them to the collection
@@ -136,7 +138,6 @@ def compute_decryption_share_for_cast_contests(
136138
contests[contest.object_id] = CiphertextDecryptionContest(
137139
contest.object_id, guardian.object_id, contest.description_hash, selections
138140
)
139-
cpu_pool.close()
140141
return contests
141142

142143

@@ -150,17 +151,20 @@ def compute_compensated_decryption_share_for_cast_contests(
150151
"""
151152
Compute the compensated decryption for all of the cast contests in the Ciphertext Tally
152153
"""
153-
cpu_pool = Pool(cpu_count())
154+
scheduler = Scheduler()
154155
contests: Dict[CONTEST_ID, CiphertextCompensatedDecryptionContest] = {}
155156

156157
for contest in tally.cast.values():
157158
selections: Dict[SELECTION_ID, CiphertextCompensatedDecryptionSelection] = {}
158-
selection_decryptions = cpu_pool.starmap(
159+
selection_decryptions: List[
160+
Optional[CiphertextCompensatedDecryptionSelection]
161+
] = scheduler.schedule(
159162
compute_compensated_decryption_share_for_selection,
160163
[
161164
(guardian, missing_guardian_id, selection, context, decrypt)
162165
for (_, selection) in contest.tally_selections.items()
163166
],
167+
with_shared_resources=True,
164168
)
165169

166170
# verify the decryptions are received and add them to the collection
@@ -179,7 +183,6 @@ def compute_compensated_decryption_share_for_cast_contests(
179183
contest.description_hash,
180184
selections,
181185
)
182-
cpu_pool.close()
183186
return contests
184187

185188

@@ -189,19 +192,22 @@ def compute_decryption_share_for_spoiled_ballots(
189192
"""
190193
Compute the decryption for all spoiled ballots in the Ciphertext Tally
191194
"""
192-
cpu_pool = Pool(cpu_count())
193195
spoiled_ballots: Dict[BALLOT_ID, BallotDecryptionShare] = {}
196+
scheduler = Scheduler()
194197

195198
for spoiled_ballot in tally.spoiled_ballots.values():
196199
contests: Dict[CONTEST_ID, CiphertextDecryptionContest] = {}
197200
for contest in spoiled_ballot.contests:
198201
selections: Dict[SELECTION_ID, CiphertextDecryptionSelection] = {}
199-
selection_decryptions = cpu_pool.starmap(
202+
selection_decryptions: List[
203+
Optional[CiphertextDecryptionSelection]
204+
] = scheduler.schedule(
200205
compute_decryption_share_for_selection,
201206
[
202207
(guardian, selection, context)
203208
for selection in contest.ballot_selections
204209
],
210+
with_shared_resources=True,
205211
)
206212
# verify the decryptions are received and add them to the collection
207213
for decryption in selection_decryptions:
@@ -225,7 +231,6 @@ def compute_decryption_share_for_spoiled_ballots(
225231
spoiled_ballot.object_id,
226232
contests,
227233
)
228-
cpu_pool.close()
229234
return spoiled_ballots
230235

231236

@@ -239,21 +244,24 @@ def compute_compensated_decryption_share_for_spoiled_ballots(
239244
"""
240245
Compute the decryption for all spoiled ballots in the Ciphertext Tally
241246
"""
242-
cpu_pool = Pool(cpu_count())
243247
spoiled_ballots: Dict[BALLOT_ID, CompensatedBallotDecryptionShare] = {}
248+
scheduler = Scheduler()
244249

245250
for spoiled_ballot in tally.spoiled_ballots.values():
246251
contests: Dict[CONTEST_ID, CiphertextCompensatedDecryptionContest] = {}
247252
for contest in spoiled_ballot.contests:
248253
selections: Dict[
249254
SELECTION_ID, CiphertextCompensatedDecryptionSelection
250255
] = {}
251-
selection_decryptions = cpu_pool.starmap(
256+
selection_decryptions: List[
257+
Optional[CiphertextCompensatedDecryptionSelection]
258+
] = scheduler.schedule(
252259
compute_compensated_decryption_share_for_selection,
253260
[
254261
(guardian, missing_guardian_id, selection, context, decrypt)
255262
for selection in contest.ballot_selections
256263
],
264+
with_shared_resources=True,
257265
)
258266
# verify the decryptions are received and add them to the collection
259267
for decryption in selection_decryptions:
@@ -279,7 +287,6 @@ def compute_compensated_decryption_share_for_spoiled_ballots(
279287
spoiled_ballot.object_id,
280288
contests,
281289
)
282-
cpu_pool.close()
283290
return spoiled_ballots
284291

285292

0 commit comments

Comments
 (0)