There was a recent post on Hacker News by Juho Snellman where he claims “I have been writing single producer, single consumer queues *wrong* my entire life” and presented a solution for it.

The problem seems to be that SPSC queues as usually defined will never be able to fill all N slots because of index wrapping. Imaging a simple queue as defined below.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
template< typename T, typename IndexT = uint32_t, typename AllocT = std::allocator<T> > class SimpleRing { public: static const uint32_t SLOTSIZE = sizeof(T); SimpleRing( uint32_t sz ) : size(sz) { data = allocator.allocate(size); read_idx = 0; write_idx = 0; } ~SimpleRing() { allocator.deallocate( data, size ); } bool push( const T& obj ) { ... }; bool pop( T& obj ) { ... }; private: SimpleRing(); SimpleRing( const SimpleRing& ); std::atomic<IndexT> read_idx; std::atomic<IndexT> write_idx; const uint32_t size; T* data; AllocT allocator; }; |

The write/push function can be defined as

1 2 3 4 5 6 7 8 9 |
bool push( const T& obj ) { uint32_t next_idx = write_idx+1<size ? write_idx+1 : 0; if ( next_idx != read_idx ) { data[write_idx] = obj; write_idx = next_idx; return true; } return false; } |

The respective read/pop method is defined as

1 2 3 4 5 6 7 8 9 |
bool pop( T& obj ) { uint32_t next_idx = read_idx+1<size ? read_idx+1 : 0; if ( read_idx != write_idx ) { obj = data[read_idx]; read_idx = next_idx; return true; } return false; } |

Notice that next_idx cannot be equal to read_idx, otherwise we will assume the queue is full. So the maximum value for next_idx is read_idx-1. This implies the last slot before the read pointer will always be unused.

Personally, I have never seen this as a problem. Many other implementations of queues waste one divisor node. But let’s assume that this is a big deal in some memory-constrained cases as in embedded.

By the way, I tend not to use modulus in implementations because in most platforms I use a sure integer division is more expensive than the rare branch miss. And PLEASE do not go ape on the const-ness of the proposed C++ code or in the syntax of assignment of the objects. I wrote this in the past half hour just to show how the indices would work, with no regard to usability.

Snelmann’s post offers a solution: instead of wrapping the indices, let them grow unbounded and cap only the array indices. The implementation is the following:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
bool push( const T& obj ) { uint32_t numel = write_idx - read_idx; if ( numel < size ) { data[write_idx%size] = obj; write_idx++; return true; } return false; } bool pop( T& obj ) { uint32_t numel = write_idx - read_idx; if ( numel>0 ) { obj = data[read_idx%size]; read_idx++; return true; } return false; } |

This is very elegant and simple because now you don’t need to be manually cycling the indices. As write_idx is always greater than read_idx, the entire modular arithmetic is thrown away. The obvious drawback is that both indices can grow unbounded and cause all sort of problems.

One could go crazy and set the indices to uint64_t but that causes two problems: the first is that in most platforms an atomic read or write is guaranteed only for a certain size. On Intel this is 32 bits for the general case of unaligned access. Second, the basic motivation for this article is an embedded case where the memory space of one node is precious so this waste of resources should not be allowed. Snellman’s solution also forces the number of elements to be a power of two.

All these give me the shills just thinking of a production crash. Even if one proves me this is correct in some way, I would still think this is too close to the cliff to be comfortable.

# Solution

However there is an easy fix for these problems. The basic idea is that we need to reset the two indices at some point in time, but when? There are only two code points we can possibly do this modification: push() or pop(). On a SPSC queue you can only modify write_idx in push() and, respectivelly, can only modify read_idx inside pop(). As you cannot do a simultaneous change on both indices at exactly the same time, we need to cycle one index then the other and in the meantime the difference remains the same.

It is obvious that we need to play some modular arithmetic magic here. First, we need to allow the result of the modular arithmetic to be equal to size. Therefore we cannot mod by size since that would yield zero. The obvious conclusion is to widen the divisor – let’s try with 2*size then. This means we will cycle both indices every 2*size steps.

However, if both indices can roam through the [0,2*size) interval freely, we will have the inconvenient case where write_idx is less than read_idx. That creates a nuisance because if we use unsigned indices such difference will underflow and that will sure give us a segfault for Christmas. So we need somehow make write_idx be always greater than read_idx. How? Just make the two ranges non-overlapping. Would that work? Fortunately, yes!

The simple trick is to employ two strategies:

- read_idx is allowed to occupy values in the range [0, 2*size)
- write_idx is allowed to occupy values in the range [2*size, 4*size)
- mod the resulting difference by 2*size

Using this method, the expression for the number of elements in the queue

1 |
uint32_t numel = (write_idx-read_idx) % (2*size); |

will always be in the range [0, 2*size). This allows us to fill up the queue to the last element because numel can be equal to the queue size.

Also because the ranges do not overlap and the write index range is superior to the read range, we can use unsigned indices: the difference will always be a positive integer or zero. And because the ranges are contained, none of the two indices can ever overflow.

Let’s run mentally some cases for size=8:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
// Lets push some into the queue read_idx=0, write_idx=16, numel = (16-0)%16 = 0 read_idx=0, write_idx=17, numel = (17-0)%16 = 1 read_idx=0, write_idx=23, numel = (23-0)%16 = 7 read_idx=0, write_idx=24, numel = (24-0)%16 = 8 // from here on we can't push() because numel==size // Let's pull one from the queue: read_idx=1, write_idx=24, numel = (24-1)%16 = 7 // Now we can push more read_idx=1, write_idx=25, numel = (25-1)%16 = 8 // Pushing until the edge case read_idx=7, write_idx=31, numel = (31-7)%16 = 8 // Let's pop one so it allows to proceed (cannot write anymore) read_idx=8, write_idx=31, numel = (31-8)%16 = 7 // The next write will cycle through write_idx such that it goes back to 16 read_idx=8, write_idx=16, numel = (16-8)%16 = 8 // Reading/writing through the next edge case read_idx=15, write_idx=23, numel = (23-15)%16 = 8 // Pop one since we cannot write anymore - here the read_idx will cycle back to 0 read_idx=0, write_idx=23, numel = (23-0)%16 = 7 |

So you can see that the modular difference between write_idx and read_idx with this arrangement will always reflect perfectly the correct number of elements in the queue.

Let’s have a look at the C++ implementation. The constructor for this class has to be slightly different from the previous queues.The only modification from the simple ring is the initialization of write_idx with 2*size:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
template< typename T, typename IndexT = uint32_t, typename AllocT = std::allocator<T> > class VitorianRing { public: static const uint32_t SLOTSIZE = sizeof(T); VitorianRing( uint32_t sz ) : size(sz) { data = allocator.allocate( size ); read_idx = 0; write_idx = 2*size; } ~VitorianRing() { allocator.deallocate( data, size ); } bool push( const T& obj ) { ... }; bool pop( T& obj ) { ... }; private: VitorianRing(); VitorianRing( const VitorianRing& ); std::atomic<IndexT> read_idx; std::atomic<IndexT> write_idx; const uint32_t size; T* data; AllocT allocator; }; |

The push() method for this class is not much different from the previous classes. The only difference is computing how the queue is not full (numel < size) and the wrapping of write_idx.

1 2 3 4 5 6 7 8 9 |
bool push( const T& obj ) { uint32_t numel = (write_idx-read_idx) % (2*size); if ( numel < size ) { data[write_idx%size] = obj; write_idx = ( write_idx+1 < 4*size ) ? write_idx + 1 : 2*size; return true; } return false; } |

The read/pop() method is very similar

1 2 3 4 5 6 7 8 9 |
bool pop( T& obj ) { uint32_t numel = (write_idx-read_idx) % (2*size); if ( numel > 0 ) { obj = data[read_idx%size]; read_idx = ( read_idx+1 < 2*size ) ? read_idx+1 : 0; return true; } return false; } |

All these implementations are available on my github help repository.

POST EDIT: we added some contributions users have been doing in the comments. Use of std::atomic<T>, custom allocators and choice of index type. The changes are in the github but were not reflected in this text yet.