Using reduce()
for Multithreaded Aggregation
This example demonstrates how to use AtomicDict.reduce()
to aggregate key-value pairs across multiple threads.
We'll compare the performance of the built-in Python dict
with AtomicDict
in
both single-threaded and multithreaded scenarios.
The example consists of:
- synthetic dataset generation
- baseline single-threaded dictionary summation using
dict
- multithreaded implementation using
AtomicDict.reduce()
andreduce_sum()
- a performance comparison between the approaches
Source Code
The source code is available on GitHub. We'll briefly go through its structure.
Generating Data
First, we have some functions generating synthetic data.
Essentially, we repeat keys 1–10 size
times.
We use 5 as the value for each item in the data so that we have a predictable
total which we can later use to check the outputs.
size = 3628800
expected_total = size * 5
print(f"{expected_total=}")
def make_keys():
return list(range(10))
def make_data(data_size):
keys = make_keys()
return [(keys[_ % len(keys)], 5) for _ in range(data_size)]
Single-threaded Baseline
Next, we introduce a single-threaded function that uses Python's built-in dict
.
This will be useful to compare our multithreaded implementation against a baseline.
This function is quite straightforward: make sure the keys are present in the dictionary
with an initial value of 0, proceed with summing the values in the input, and then
return the sum of the values in the dictionary.
def builtin_dict_sum():
d = {}
keys = make_keys()
for k in keys:
d[k] = 0
for k, v in make_data(size):
d[k] += v
return sum(d.values())
Multithreaded Implementation
Now let's go through the multithreaded equivalent of builtin_dict_sum
.
This is what's going on in the function below:
- create a new
AtomicDict()
- split the dataset into equally sized partitions
- instantiate and start threads
- wait for threads to finish
- sum the values in the dictionary
def threaded_sum(threads_num, thread_target):
atomic_dict = AtomicDict()
data_size = size // threads_num
threads = [
threading.Thread(target=thread_target, args=(atomic_dict, data_size))
for _ in range(threads_num)
]
for t in threads:
t.start()
for t in threads:
t.join()
threaded_total = 0
for _, v in atomic_dict.fast_iter():
threaded_total += v
return threaded_total
Note
The variable size
is intentionally set to 10! = 3628800 so that partitioning
works cleanly with any number of threads between 1 and 10.
A more serious program that doesn't use synthetic data should look for a better
way to partition it.
The target function that the threads actually run has been intentionally left out
of the snippet above.
This is because we'll compare two variants of that: one uses a custom input function
for reduce()
(the aggregate
argument), while the other uses a specialized reduce
function that sums the values in the input data, exactly as the custom variant does.
As we'll see later, this results not only in more convenience but also better performance.
Understanding reduce()
— Using a Custom Function
AtomicDict.reduce()
applies a user-defined
function to every key-value pair in a thread-safe way.
This function, such as my_reduce_sum()
below, receives:
key
: the key (first item) of the paircurrent
: the current value in the dictionary (orNOT_FOUND
if absent)new
: the new value to aggregate
Note that NOT_FOUND
is a special object that cannot be
used as either key nor value in AtomicDict.
When current is NOT_FOUND
, it means that key
is not present in the dictionary and so,
in this case, it makes sense to return the new
value.
def thread(atomic_dict, iterations):
data = make_data(iterations)
def my_reduce_sum(key, current, new):
if current is NOT_FOUND:
return new
return current + new
atomic_dict.reduce(data, my_reduce_sum)
This is somewhat more compact than the single-threaded version based on dict
.
The reduce()
method offloads from us the reads and writes to the shared dictionary
so that we don't inadvertently introduce data races.
Since the dictionary may be shared with multiple threads, the current thread executing
reduce()
may try to update an item and fail because another thread has updated
the same item.
To cope with this contention, reduce()
will call my_reduce_sum()
again, and
current
will hold the value that the other thread had put into the shared
dictionary.
Tip
Do not assume a fixed number of calls to your function. It will be called at least once per input item, but potentially more.
Limitations of Reduce
Because of this implicit retry mechanism, reduce()
imposes some limitations to
the aggregate
input functions.
Most importantly, those functions need to be commutative and associative: the
final result must not depend on the order of application.
More on how to cope with this limitation in the Using reduce()
for Averaging example.
Specialized Function
Summation is a simple and common way of using reduce()
.
Since it is so, a specialized version is available in AtomicDict.reduce_sum()
.
The code using the specialized reduction is very compact:
def thread_specialized(atomic_dict, iterations):
data = make_data(iterations)
atomic_dict.reduce_sum(data)
Multithreaded summation is a solved problem. It should be easily accessible, too.
Results
The results shown here have been run with a beta version of free-threading Python 3.14.
$ python -VV
Python 3.14.0b2 experimental free-threading build (main, Jun 13 2025, 18:57:59) [Clang 17.0.0 (clang-1700.0.13.3)]
Note
Speedup is relative to the single-threaded baseline. A value below 1.0x means it performed worse than the baseline.
Correctness
We'll go through the performance numbers soon, I promise, but first let me spend a few words about the correctness of the results.
As you'll see from the program outputs below, all methods we've used, both based on dict and AtomicDict, have produced the expected total. For dict this really isn't big news, but I think it's worth observing how AtomicDict has made the correct result show.
There are data races in this multithreaded operation.
Multiple threads have computed their partial sum and have all concurrently tried
to update the shared dictionary, racing one against the others.
This is not a problem for reduce()
: the races are transparently coped with, without
explicit intervention in the code.
Internally, a race has been detected and the operation retried.
The family of reduce()
methods effectively guarantees to observe the single-threaded
result, regardless of the number of threads.
Specialization
Using the specialized reduce_sum()
function can yield higher performance even for
single-threaded code.
With multiple threads, the difference becomes increasingly more significant.
expected_total=18144000
Counting keys using the built-in dict with a single thread:
- Took 0.428s (total=18144000)
Counting keys using cereggii.AtomicDict.reduce_sum():
- Took 0.324s with 1 threads (1.3x faster, total=18144000)
- Took 0.167s with 2 threads (2.6x faster, total=18144000)
- Took 0.113s with 3 threads (3.8x faster, total=18144000)
- Took 0.086s with 4 threads (5.0x faster, total=18144000)
Using the Custom Reduce Function
Looking at the running time of the non-specialized function, we can observe lower performance. It is recommended to use one of the specialized reduce functions, whenever possible.
If you're implementing a custom function to use with reduce()
, make sure to read the
documentation about its requirements.
expected_total=18144000
Counting keys using the built-in dict with a single thread:
- Took 0.428s (total=18144000)
Counting keys using cereggii.AtomicDict.reduce():
- Took 0.453s with 1 threads (0.9x faster, total=18144000)
- Took 0.228s with 2 threads (1.9x faster, total=18144000)
- Took 0.153s with 3 threads (2.8x faster, total=18144000)
- Took 0.114s with 4 threads (3.7x faster, total=18144000)
Comparison Table
method | threads | time (s) | speedup |
---|---|---|---|
built-in dict |
1 | 0.428 | 1.0x |
AtomicDict.reduce() |
1 | 0.453 | 0.9x |
2 | 0.228 | 1.9x | |
3 | 0.153 | 2.8x | |
4 | 0.114 | 3.7x | |
AtomicDict.reduce_sum() |
1 | 0.324 | 1.3x |
2 | 0.167 | 2.6x | |
3 | 0.113 | 3.8x | |
4 | 0.086 | 5.0x |
Note
We haven't considered multithreaded scenarios for dict
because performance
would be strictly worse than the single-threaded scenario.
Summary
- complying with the requirements of
reduce()
lets us write data-race-free aggregations forAtomicDict
reduce_sum()
is both more convenient and more performant- multithreading brings significant speedups when using
AtomicDict
Refer to the AtomicDict.reduce()
documentation
for full details on writing compatible reduce functions, or check the Using reduce()
for Averaging example
for a more advanced use case.