AtomicDict
A concurrent dictionary (hash table).
Features of AtomicDict:
-
you don't need an external lock to synchronize changes (mutations) to the dictionary:
- you don't have to manually guard your code against deadlocks (reentrancy-caused deadlocks can still be an issue)
- when
AtomicDict
is correctly configured (settingmin_size
so that no resizing occurs), even if the OS decides to interrupt or terminate a thread which was accessing anAtomicDict
, all remaining threads will continue to make progress
-
mutations are atomic and can be aborted or retried under contention
-
scalability:
- TODO
- for some workloads scalability is already quite good: see
AtomicDict.reduce
.
Note
The special
cereggii.NOT_FOUND
,
cereggii.ANY
, and
cereggii.EXPECTATION_FAILED
objects cannot be used as keys nor values.
__init__(initial={}, *, min_size=None, buffer_size=4)
Correctly configuring the min_size
parameter avoids resizing the AtomicDict
.
Inserts that spill over this size will not fail, but may require resizing.
Resizing prevents concurrent mutations until completed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
initial |
dict
|
A |
{}
|
min_size |
int | None
|
The size initially allocated. |
None
|
buffer_size |
int
|
The amount of entries that a thread reserves for future insertions. A larger value can help reducing contention, but may lead to increased fragmentation. Min: 1, max: 64. |
4
|
__getitem__(key)
__setitem__(key, value)
Unconditionally set the value associated with key
to value
:
Warning
Use compare_and_set
instead.
When an item is inserted or updated with this usual Python idiom, it is
not possible to know that the value currently associated with key
is the
one being expected -- it may be mutated by another thread before this
mutation is applied.
Use this method only when no other thread may be writing to key
.
get(key, default=None)
Just like Python's dict.get
:
Tip
The special cereggii.NOT_FOUND
object
can never be inserted into an AtomicDict
, so when it is returned,
you are ensured that the key was not in the dictionary.
Conversely, None
can both be a key and a value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Key
|
The key to be looked up. |
required |
default |
Value | None
|
The value to return when the key is not found. |
None
|
Returns:
Type | Description |
---|---|
Value
|
The value associated with |
compare_and_set(key, expected, desired)
Atomically read the value associated with key
:
- if it is
expected
, then replace it withdesired
- else, don't change it and raise
ExpectationFailed
.
Example
Insert only
The expected value can be cereggii.NOT_FOUND
, in
which case the call will succeed only when the item is inserted, and
not updated:
Example
Counter
Correct way to increment the value associated with key
, coping with concurrent mutations:
done = False
while not done:
expected = my_atomic_dict.get(key, default=0)
try:
my_atomic_dict.compare_and_set(key, expected, desired=expected + 1)
except cereggii.ExpectationFailed:
# d[key] was concurrently mutated: retry
pass
else:
done = True
The reduce
method removes a lot of boilerplate.
Tip
This family of methods is the recommended way to mutate an AtomicDict
.
Though, you should probably want to use a higher-level method than compare_and_set
, like
reduce
.
Raises:
Type | Description |
---|---|
ExpectationFailed
|
If the found value was not |
reduce(iterable, aggregate, chunk_size=0)
Aggregate the values in this dictionary with those found in iterable
,
as computed by aggregate
.
The aggregate
function takes as input a key, the value currently stored
in the dictionary,
and the new value from iterator
. It returns the aggregated value.
Note
The aggregate
function must be:
- total — it should handle both the case in which the key is present and in which it is not
- state-less — you should not rely on the number of times this function is called (it will be
called at least once for each item in
iterable
, but there is no upper bound)
Example
Counter
Info
This method exploits the skewness in the data.
First, an intermediate result is aggregated into a thread-local dictionary, and then applied to the shared
AtomicDict
. This can greatly reduce contention when the keys in the input are repeated.
__len__()
Get the number of items in this AtomicDict
:
This method is sequentially consistent.
When invoked, it temporarily locks the AtomicDict
instance to compute
the result.
If you need to invoke this method frequently while the dictionary is
being mutated, consider using
AtomicDict.approx_len
instead.
approx_len()
Retrieve the approximate length of this AtomicDict
.
Calling this method does not prevent other threads from mutating the dictionary.
Note
Differently from AtomicDict.__len__
,
this method does not return a sequentially consistent result.
This is not so bad!
Suppose you call AtomicDict.__len__
instead. You would get a result that was correct in between the invocation of
len()
. But, at the very next line, the result might get invalidated by another
thread inserting or deleting an item.
If you need to know the size of an AtomicDict
while other threads are
mutating it, calling approx_len
should be more performant and still
return a fairly good approximation.
len_bounds()
Get a lower and an upper-bound for the number of items stored in this AtomicDict
.
Also see AtomicDict.approx_len
.
fast_iter(partitions=1, this_partition=0)
A fast, not sequentially consistent iterator.
Calling this method does not prevent other threads from mutating this
AtomicDict
.
Danger
This method can return sequentially-inconsistent results.
Only use fast_iter
when you know that no other thread is mutating
this AtomicDict
.
Depending on the execution, the following may happen:
- it can return the same key multiple times (with the same or different values)
- an update 1, that happened strictly before another update 2, is not seen, but update 2 is seen.
Tip
If no other thread is mutating this AtomicDict
while a thread is calling this method, then this
method is safe to use.
Example
Summing an array with partitioning
n = 3
partials = [0 for _ in range(n)]
def iterator(i):
current = 0
for k, v in d.fast_iter(partitions=n, this_partition=i):
current += v
partials[i] = current
threads = [
threading.Thread(target=iterator, args=(i,))
for i in range(n)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(sum(partials))
Also see the partitioned iterations example.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
partitions |
The number of partitions to split this iterator with. It should be equal to the number of threads that participate in the iteration. |
1
|
|
this_partition |
This thread's assigned partition. Valid values are from 0 to |
0
|
batch_getitem(batch, chunk_size=128)
Batch many lookups together for efficient memory access.
The values provided in batch
will be substituted with the values found
in the AtomicDict
instance, or with cereggii.NOT_FOUND
.
Notice no exception is thrown: the cereggii.NOT_FOUND
object instead
is the returned value for a key that wasn't present.
Notice that the cereggii.NOT_FOUND
object can never be inserted
into an AtomicDict
.
The values themselves, provided in batch
, will always be substituted.
Example
With:
calling
returns:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
chunk_size |
int
|
subdivide the keys to lookup from |
128
|
Returns:
Type | Description |
---|---|
dict
|
the input |
cereggii.NOT_FOUND: object
module-attribute
A singleton object.
Used in AtomicDict
to signal that a key was not found.
cereggii.ANY: object
module-attribute
A singleton object.
Used in AtomicDict
as input for an unconditional update (upsert).
cereggii.EXPECTATION_FAILED: object
module-attribute
A singleton object.
Used in AtomicDict
to return that an operation was aborted due to a
failed expectation.