Skip to content

AtomicDict

A concurrent dictionary (hash table).

Features of AtomicDict:

  1. you don't need an external lock to synchronize changes (mutations) to the dictionary:

    1. you don't have to manually guard your code against deadlocks (reentrancy-caused deadlocks can still be an issue)
    2. when AtomicDict is correctly configured (setting min_size so that no resizing occurs), even if the OS decides to interrupt or terminate a thread which was accessing an AtomicDict, all remaining threads will continue to make progress
  2. mutations are atomic and can be aborted or retried under contention

  3. scalability:

    1. TODO
    2. for some workloads scalability is already quite good: see AtomicDict.reduce.

Note

The special cereggii.NOT_FOUND, cereggii.ANY, and cereggii.EXPECTATION_FAILED objects cannot be used as keys nor values.

__init__(initial={}, *, min_size=None, buffer_size=4)

Correctly configuring the min_size parameter avoids resizing the AtomicDict. Inserts that spill over this size will not fail, but may require resizing. Resizing prevents concurrent mutations until completed.

Parameters:

Name Type Description Default
initial dict

A dict to initialize this AtomicDict with.

{}
min_size int | None

The size initially allocated.

None
buffer_size int

The amount of entries that a thread reserves for future insertions. A larger value can help reducing contention, but may lead to increased fragmentation. Min: 1, max: 64.

4

__getitem__(key)

Atomically read the value associated with key:

my_atomic_dict[key]

Also see get.

__setitem__(key, value)

Unconditionally set the value associated with key to value:

my_atomic_dict[key] = value

Warning

Use compare_and_set instead.

When an item is inserted or updated with this usual Python idiom, it is not possible to know that the value currently associated with key is the one being expected -- it may be mutated by another thread before this mutation is applied. Use this method only when no other thread may be writing to key.

__delitem__(key)

Atomically delete an item:

del my_atomic_dict[key]

get(key, default=None)

Just like Python's dict.get:

my_atomic_dict.get(key, default=cereggii.NOT_FOUND)

Tip

The special cereggii.NOT_FOUND object can never be inserted into an AtomicDict, so when it is returned, you are ensured that the key was not in the dictionary.

Conversely, None can both be a key and a value.

Parameters:

Name Type Description Default
key Key

The key to be looked up.

required
default Value | None

The value to return when the key is not found.

None

Returns:

Type Description
Value

The value associated with key, or default.

compare_and_set(key, expected, desired)

Atomically read the value associated with key:

  • if it is expected, then replace it with desired
  • else, don't change it and raise ExpectationFailed.

Example

Insert only

The expected value can be cereggii.NOT_FOUND, in which case the call will succeed only when the item is inserted, and not updated:

my_atomic_dict.compare_and_set(
    key="spam",
    expected=cereggii.NOT_FOUND,
    desired=42,
)

Example

Counter

Correct way to increment the value associated with key, coping with concurrent mutations:

done = False
while not done:
    expected = my_atomic_dict.get(key, default=0)
    try:
        my_atomic_dict.compare_and_set(key, expected, desired=expected + 1)
    except cereggii.ExpectationFailed:
        # d[key] was concurrently mutated: retry
        pass
    else:
        done = True

The reduce method removes a lot of boilerplate.

Tip

This family of methods is the recommended way to mutate an AtomicDict. Though, you should probably want to use a higher-level method than compare_and_set, like reduce.

Raises:

Type Description
ExpectationFailed

If the found value was not expected.

reduce(iterable, aggregate, chunk_size=0)

Aggregate the values in this dictionary with those found in iterable, as computed by aggregate.

The aggregate function takes as input a key, the value currently stored in the dictionary, and the new value from iterator. It returns the aggregated value.

Note

The aggregate function must be:

  • total — it should handle both the case in which the key is present and in which it is not
  • state-less — you should not rely on the number of times this function is called (it will be called at least once for each item in iterable, but there is no upper bound)

Example

Counter

d = AtomicDict()

data = [
    ("red", 1),
    ("green", 42),
    ("blue", 3),
    ("red", 5),
]

def count(key, current, new):
    if current is cereggii.NOT_FOUND:
        return new
    return current + new

d.reduce(data, count)

Info

This method exploits the skewness in the data.

First, an intermediate result is aggregated into a thread-local dictionary, and then applied to the shared AtomicDict. This can greatly reduce contention when the keys in the input are repeated.

__len__()

Get the number of items in this AtomicDict:

len(my_atomic_dict)

This method is sequentially consistent. When invoked, it temporarily locks the AtomicDict instance to compute the result. If you need to invoke this method frequently while the dictionary is being mutated, consider using AtomicDict.approx_len instead.

approx_len()

Retrieve the approximate length of this AtomicDict.

Calling this method does not prevent other threads from mutating the dictionary.

Note

Differently from AtomicDict.__len__, this method does not return a sequentially consistent result.

This is not so bad!

Suppose you call AtomicDict.__len__ instead. You would get a result that was correct in between the invocation of len(). But, at the very next line, the result might get invalidated by another thread inserting or deleting an item.

If you need to know the size of an AtomicDict while other threads are mutating it, calling approx_len should be more performant and still return a fairly good approximation.

len_bounds()

Get a lower and an upper-bound for the number of items stored in this AtomicDict.

Also see AtomicDict.approx_len.

fast_iter(partitions=1, this_partition=0)

A fast, not sequentially consistent iterator.

Calling this method does not prevent other threads from mutating this AtomicDict.

Danger

This method can return sequentially-inconsistent results.

Only use fast_iter when you know that no other thread is mutating this AtomicDict.

Depending on the execution, the following may happen:

  • it can return the same key multiple times (with the same or different values)
  • an update 1, that happened strictly before another update 2, is not seen, but update 2 is seen.

Tip

If no other thread is mutating this AtomicDict while a thread is calling this method, then this method is safe to use.

Example

Summing an array with partitioning

n = 3
partials = [0 for _ in range(n)]

def iterator(i):
    current = 0
    for k, v in d.fast_iter(partitions=n, this_partition=i):
        current += v
    partials[i] = current

threads = [
    threading.Thread(target=iterator, args=(i,))
    for i in range(n)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

print(sum(partials))

Also see the partitioned iterations example.

Parameters:

Name Type Description Default
partitions

The number of partitions to split this iterator with. It should be equal to the number of threads that participate in the iteration.

1
this_partition

This thread's assigned partition. Valid values are from 0 to partitions-1.

0

batch_getitem(batch, chunk_size=128)

Batch many lookups together for efficient memory access.

The values provided in batch will be substituted with the values found in the AtomicDict instance, or with cereggii.NOT_FOUND. Notice no exception is thrown: the cereggii.NOT_FOUND object instead is the returned value for a key that wasn't present.

Notice that the cereggii.NOT_FOUND object can never be inserted into an AtomicDict.

The values themselves, provided in batch, will always be substituted.

Example

With:

foo = AtomicDict({'a': 1, 'b': 2, 'c': 3})

calling

foo.batch_getitem({
    'a': None,
    'b': None,
    'f': None,
})

returns:

{
   'a': 1,
   'b': 2,
   'f': <cereggii.NOT_FOUND>,
}

Parameters:

Name Type Description Default
chunk_size int

subdivide the keys to lookup from batch in smaller chunks of size `chunk_size to prevent memory over-prefetching.

128

Returns:

Type Description
dict

the input batch dictionary, with substituted values.

cereggii.NOT_FOUND: object module-attribute

A singleton object. Used in AtomicDict to signal that a key was not found.

cereggii.ANY: object module-attribute

A singleton object. Used in AtomicDict as input for an unconditional update (upsert).

cereggii.EXPECTATION_FAILED: object module-attribute

A singleton object. Used in AtomicDict to return that an operation was aborted due to a failed expectation.