Skip to content

Using reduce() for Multithreaded Aggregation

This example demonstrates how to use AtomicDict.reduce() to aggregate key-value pairs across multiple threads. We'll compare the performance of the built-in Python dict with AtomicDict in both single-threaded and multithreaded scenarios.

The example consists of:

  • synthetic dataset generation
  • baseline single-threaded dictionary summation using dict
  • multithreaded implementation using AtomicDict.reduce() and reduce_sum()
  • a performance comparison between the approaches

Source Code

The source code is available on GitHub. We'll briefly go through its structure.

Generating Data

First, we have some functions generating synthetic data. Essentially, we repeat keys 1–10 size times. We use 5 as the value for each item in the data so that we have a predictable total which we can later use to check the outputs.

size = 3628800
expected_total = size * 5
print(f"{expected_total=}")


def make_keys():
    return list(range(10))


def make_data(data_size):
    keys = make_keys()
    return [(keys[_ % len(keys)], 5) for _ in range(data_size)]

Single-threaded Baseline

Next, we introduce a single-threaded function that uses Python's built-in dict. This will be useful to compare our multithreaded implementation against a baseline. This function is quite straightforward: make sure the keys are present in the dictionary with an initial value of 0, proceed with summing the values in the input, and then return the sum of the values in the dictionary.

def builtin_dict_sum():
    d = {}
    keys = make_keys()
    for k in keys:
        d[k] = 0
    for k, v in make_data(size):
        d[k] += v
    return sum(d.values())

Multithreaded Implementation

Now let's go through the multithreaded equivalent of builtin_dict_sum. This is what's going on in the function below:

  1. create a new AtomicDict()
  2. split the dataset into equally sized partitions
  3. instantiate and start threads
  4. wait for threads to finish
  5. sum the values in the dictionary
def threaded_sum(threads_num, thread_target):
    atomic_dict = AtomicDict()
    data_size = size // threads_num

    threads = [
        threading.Thread(target=thread_target, args=(atomic_dict, data_size))
        for _ in range(threads_num)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    threaded_total = 0
    for _, v in atomic_dict.fast_iter():
        threaded_total += v
    return threaded_total

Note

The variable size is intentionally set to 10! = 3628800 so that partitioning works cleanly with any number of threads between 1 and 10. A more serious program that doesn't use synthetic data should look for a better way to partition it.

The target function that the threads actually run has been intentionally left out of the snippet above. This is because we'll compare two variants of that: one uses a custom input function for reduce() (the aggregate argument), while the other uses a specialized reduce function that sums the values in the input data, exactly as the custom variant does. As we'll see later, this results not only in more convenience but also better performance.

Understanding reduce() — Using a Custom Function

AtomicDict.reduce() applies a user-defined function to every key-value pair in a thread-safe way. This function, such as my_reduce_sum() below, receives:

  • key: the key (first item) of the pair
  • current: the current value in the dictionary (or NOT_FOUND if absent)
  • new: the new value to aggregate

Note that NOT_FOUND is a special object that cannot be used as either key nor value in AtomicDict. When current is NOT_FOUND, it means that key is not present in the dictionary and so, in this case, it makes sense to return the new value.

def thread(atomic_dict, iterations):
    data = make_data(iterations)

    def my_reduce_sum(key, current, new):
        if current is NOT_FOUND:
            return new
        return current + new

    atomic_dict.reduce(data, my_reduce_sum)

This is somewhat more compact than the single-threaded version based on dict. The reduce() method offloads from us the reads and writes to the shared dictionary so that we don't inadvertently introduce data races.

Since the dictionary may be shared with multiple threads, the current thread executing reduce() may try to update an item and fail because another thread has updated the same item. To cope with this contention, reduce() will call my_reduce_sum() again, and current will hold the value that the other thread had put into the shared dictionary.

Tip

Do not assume a fixed number of calls to your function. It will be called at least once per input item, but potentially more.

Limitations of Reduce

Because of this implicit retry mechanism, reduce() imposes some limitations to the aggregate input functions. Most importantly, those functions need to be commutative and associative: the final result must not depend on the order of application.

More on how to cope with this limitation in the Using reduce() for Averaging example.

Specialized Function

Summation is a simple and common way of using reduce(). Since it is so, a specialized version is available in AtomicDict.reduce_sum(). The code using the specialized reduction is very compact:

def thread_specialized(atomic_dict, iterations):
    data = make_data(iterations)
    atomic_dict.reduce_sum(data)

Multithreaded summation is a solved problem. It should be easily accessible, too.

Results

The results shown here have been run with a beta version of free-threading Python 3.14.

$ python -VV
Python 3.14.0b2 experimental free-threading build (main, Jun 13 2025, 18:57:59) [Clang 17.0.0 (clang-1700.0.13.3)]

Note

Speedup is relative to the single-threaded baseline. A value below 1.0x means it performed worse than the baseline.

Correctness

We'll go through the performance numbers soon, I promise, but first let me spend a few words about the correctness of the results.

As you'll see from the program outputs below, all methods we've used, both based on dict and AtomicDict, have produced the expected total. For dict this really isn't big news, but I think it's worth observing how AtomicDict has made the correct result show.

There are data races in this multithreaded operation. Multiple threads have computed their partial sum and have all concurrently tried to update the shared dictionary, racing one against the others. This is not a problem for reduce(): the races are transparently coped with, without explicit intervention in the code. Internally, a race has been detected and the operation retried.

The family of reduce() methods effectively guarantees to observe the single-threaded result, regardless of the number of threads.

Specialization

Using the specialized reduce_sum() function can yield higher performance even for single-threaded code. With multiple threads, the difference becomes increasingly more significant.

expected_total=18144000

Counting keys using the built-in dict with a single thread:
 - Took 0.428s (total=18144000)

Counting keys using cereggii.AtomicDict.reduce_sum():
 - Took 0.324s with 1 threads (1.3x faster, total=18144000)
 - Took 0.167s with 2 threads (2.6x faster, total=18144000)
 - Took 0.113s with 3 threads (3.8x faster, total=18144000)
 - Took 0.086s with 4 threads (5.0x faster, total=18144000)

Using the Custom Reduce Function

Looking at the running time of the non-specialized function, we can observe lower performance. It is recommended to use one of the specialized reduce functions, whenever possible.

If you're implementing a custom function to use with reduce(), make sure to read the documentation about its requirements.

expected_total=18144000

Counting keys using the built-in dict with a single thread:
 - Took 0.428s (total=18144000)

Counting keys using cereggii.AtomicDict.reduce():
 - Took 0.453s with 1 threads (0.9x faster, total=18144000)
 - Took 0.228s with 2 threads (1.9x faster, total=18144000)
 - Took 0.153s with 3 threads (2.8x faster, total=18144000)
 - Took 0.114s with 4 threads (3.7x faster, total=18144000)

Comparison Table

method threads time (s) speedup
built-in dict 1 0.428 1.0x
AtomicDict.reduce() 1 0.453 0.9x
2 0.228 1.9x
3 0.153 2.8x
4 0.114 3.7x
AtomicDict.reduce_sum() 1 0.324 1.3x
2 0.167 2.6x
3 0.113 3.8x
4 0.086 5.0x

Note

We haven't considered multithreaded scenarios for dict because performance would be strictly worse than the single-threaded scenario.

Summary

  • complying with the requirements of reduce() lets us write data-race-free aggregations for AtomicDict
  • reduce_sum() is both more convenient and more performant
  • multithreading brings significant speedups when using AtomicDict

Refer to the AtomicDict.reduce() documentation for full details on writing compatible reduce functions, or check the Using reduce() for Averaging example for a more advanced use case.