Closed
Bug 1454385
Opened 7 years ago
Closed 6 years ago
Add a single producer single consumer lock and wait free queue to mfbt/
Categories
(Core :: MFBT, enhancement)
Core
MFBT
Tracking
()
RESOLVED
FIXED
mozilla62
People
(Reporter: padenot, Assigned: padenot)
References
(Blocks 1 open bug)
Details
Attachments
(1 file)
In bug 1267161, we need an spsc lock-free and wait free queue to communicate efficiently between two threads.
However, it's not the first time there has been the need for an spsc queue in gecko, and I'd like to have one readily available, so having it in mfbt is better.
This is adapted from code in `media/libcubeb` that I've written some time ago, but I've converted everything to gecko-style (code and tests), and added more comments.
Comment hidden (mozreview-request) |
Assignee | ||
Updated•7 years ago
|
Attachment #8968191 -
Flags: review?(nfroyd)
Comment 2•7 years ago
|
||
mozreview-review |
Comment on attachment 8968191 [details]
Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/.
https://reviewboard.mozilla.org/r/236880/#review243032
I have not thought too hard about whether the synchronization concerns are correct, but a first pass (and trying to find explanations around the web and such) suggests they are not too far off. I will try to find some better thinking time to review those. In the meantime, I have preliminary comments, which we can talk about.
::: mfbt/SPSCQueue.h:23
(Diff revision 1)
> +template<typename T, typename Trait>
> +void Copy(T * aDestination, const T * aSource, size_t aCount, Trait)
I think we have typically structured these such that the overloading for podness vs. copy constructors would happen via template parameters:
```
template<typename T, bool IsPod>
void Copy(T* aDst, const T* aSrc, size_t aCount)
```
This is obviously a pain, because you can't specialize function templates, but I think we should arrange things this way so we can be assured the compiler isn't going to do extra parameter passing for `Trait`.
At least you can common up the `Copy` and `ConstructDefault` functions into a common structure for specialization...
::: mfbt/SPSCQueue.h:23
(Diff revision 1)
> +template<typename T, typename Trait>
> +void Copy(T * aDestination, const T * aSource, size_t aCount, Trait)
Nit: `T * aDestination` (and related) should be `T* aDestination`. Here and throughout the file.
::: mfbt/SPSCQueue.h:125
(Diff revision 1)
> + {
> + MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2 &&
> + "buffer too large for the type of index used.");
> + MOZ_ASSERT(mCapacity > 0);
> +
> + mData.reset(new T[StorageCapacity()]);
I have a small preference for just `mData = MakeUnique<T[]>(StorageCapacity())`, to avoid bare `new` calls, but up to you.
::: mfbt/SPSCQueue.h:126
(Diff revision 1)
> + /* If this queue is using atomics, initializing those members as the last
> + * action in the constructor acts as a full barrier, and allow Capacity() to
> + * be thread-safe. */
Why say "If this queue is using atomics..."? The indices are always `std::atomic`, so I don't understand why this statement would be conditionally qualified.
::: mfbt/SPSCQueue.h:129
(Diff revision 1)
> + mWriteIndex = 0;
> + mReadIndex = 0;
Could we instead get away with initializing these members in the initializer list and then using an explicit barrier here? The explicit barrier would only require a single fence, whereas two sequentially consistent stores would require two fences.
::: mfbt/SPSCQueue.h:140
(Diff revision 1)
> + int EnqueueDefault(int aCount) {
> + return Enqueue(nullptr, aCount);
> + }
WDYT about making these `Enqueue`/`Dequeue` functions `MOZ_MUST_USE`? Is that reasonable?
::: mfbt/SPSCQueue.h:187
(Diff revision 1)
> + details::Copy(mData.get() + wrIdx, aElements, firstPart);
> + details::Copy(mData.get(), aElements + firstPart, secondPart);
Do we want to require copy semantics for transferring elements into the ring buffer, or can we be more clever and use move semantics where possible?
::: mfbt/SPSCQueue.h:229
(Diff revision 1)
> + details::Copy(elements, mData.get() + rdIdx, firstPart);
> + details::Copy(elements + firstPart, mData.get(), secondPart);
Likewise here for copying vs. moving.
::: mfbt/SPSCQueue.h:233
(Diff revision 1)
> + mReadIndex.store(IncrementIndex(rdIdx, toRead),
> + std::memory_order::memory_order_relaxed);
Why is relaxed memory ordering correct here? This is essentially publishing the read index, so relaxed ordering does not feel like the correct thing. (For instance, if the producer thread doesn't see updates to this, it will fail to write when there could be a bunch of space left...)
::: mfbt/SPSCQueue.h:239
(Diff revision 1)
> + std::memory_order::memory_order_relaxed);
> +
> + return toRead;
> + }
> + /**
> + * Get the number of available element for consuming.
Nit: "available elements"
::: mfbt/SPSCQueue.h:259
(Diff revision 1)
> + return AvailableReadInternal(
> + mReadIndex.load(std::memory_order::memory_order_relaxed),
> + mWriteIndex.load(std::memory_order::memory_order_relaxed));
> + }
> + /**
> + * Get the number of available elements for consuming.
I think this comment is too much copy-and-paste from `AvailableRead`, correct?
::: mfbt/SPSCQueue.h:271
(Diff revision 1)
> +#ifndef DEBUG
> + AssertCorrectThread(producerId);
> +#endif
I don't understand why some of the thread assertions are `#ifdef DEBUG` and some, like this one, are `#ifndef DEBUG`. Wouldn't it be better to just have the `AssertCorrectThread` call unconditionally, and then inside `AssertCorrectThread`, have the assert potentially conditioned on `#ifdef DEBUG`? Less `#ifdef` clutter and the compiler will almost certainly inline away the empty `AssertCorrectThread` in non-`DEBUG` builds.
(I realize relying on the compiler here and not in the above `Copy` comments is inconsistent...)
Oh, I see, the thread IDs are only defined in `DEBUG` builds anyway. Hm. In any event, the `#ifdef` conditions need to be straightened out.
::: mfbt/SPSCQueue.h:272
(Diff revision 1)
> + * @return The number of empty slots in the buffer, available for writing.
> + */
> + int AvailableWrite() const
> + {
> +#ifndef DEBUG
> + AssertCorrectThread(producerId);
Nit: presumably this is `mProducerId`?
::: mfbt/SPSCQueue.h:306
(Diff revision 1)
> + *
> + * @param aReadIndex the read index to consider
> + * @param writeIndex the write index to consider
> + * @return true if the ring buffer is empty, false otherwise.
> + **/
> + bool EmptyInternal(int aReadIndex, int aWriteIndex) const
Perhaps `IsEmptyInternal`? Or just `IsEmpty`, since the private visibility makes the `Internal` a little redundant?
::: mfbt/SPSCQueue.h:321
(Diff revision 1)
> + *
> + * @param aReadIndex the read index to consider
> + * @param writeIndex the write index to consider
> + * @return true if the ring buffer is full, false otherwise.
> + **/
> + bool FullInternal(int aReadIndex, int aWriteIndex) const
Same comment here on the `Internal` suffix.
::: mfbt/SPSCQueue.h:323
(Diff revision 1)
> + * @param writeIndex the write index to consider
> + * @return true if the ring buffer is full, false otherwise.
> + **/
> + bool FullInternal(int aReadIndex, int aWriteIndex) const
> + {
> + return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
Do we want to require (via release assertions in the constructor or perhaps a `size_t Capacity` template parameter) that `StorageCapacity()` will return a power-of-two so we can just do masking here rather than a full modulo?
::: mfbt/SPSCQueue.h:342
(Diff revision 1)
> + * This can be called from the consummer or producer thread, but see the
> + * comment in `AvailableRead`.
> + *
> + * @return the number of available elements for reading.
> + */
> + int AvailableReadInternal(int aReadIndex, int aWriteIndex) const
Same comment here on the `Internal` suffix.
::: mfbt/SPSCQueue.h:358
(Diff revision 1)
> + * This can be called from the consummer or producer thread, but see the
> + * comment in `AvailableWrite`.
> + *
> + * @return the number of elements that can be written into the array.
> + */
> + int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const
Same comment here on the `Internal` suffix.
::: mfbt/SPSCQueue.h:360
(Diff revision 1)
> + *
> + * @return the number of elements that can be written into the array.
> + */
> + int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const
> + {
> + /* We substract one element here to always keep at least one sample
Nit: "subtract"
::: mfbt/SPSCQueue.h:363
(Diff revision 1)
> + int AvailableWriteInternal(int aReadIndex, int aWriteIndex) const
> + {
> + /* We substract one element here to always keep at least one sample
> + * free in the buffer, to distinguish between full and empty array. */
> + int rv = aReadIndex - aWriteIndex - 1;
> + if (aWriteIndex >= aReadIndex) {
Is it more clear to say `rv < 0` here?
::: mfbt/SPSCQueue.h:380
(Diff revision 1)
> + * @param increment the number by which `index` is incremented.
> + * @return the new index.
> + */
> + int IncrementIndex(int aIndex, int aIncrement) const
> + {
> + MOZ_ASSERT(aIncrement >= 0);
Do we also want to assert that `aIncrement` is less than `StorageCapacity()`? And maybe `aIndex`, too?
Attachment #8968191 -
Flags: review?(nfroyd)
Assignee | ||
Comment 3•7 years ago
|
||
Thanks for the comments. I've addressed everything and I'm answering on some points below.
(In reply to Nathan Froyd [:froydnj] from comment #2)
> ::: mfbt/SPSCQueue.h:23
> (Diff revision 1)
> > +template<typename T, typename Trait>
> > +void Copy(T * aDestination, const T * aSource, size_t aCount, Trait)
>
> I think we have typically structured these such that the overloading for
> podness vs. copy constructors would happen via template parameters:
>
> ```
> template<typename T, bool IsPod>
> void Copy(T* aDst, const T* aSrc, size_t aCount)
> ```
>
> This is obviously a pain, because you can't specialize function templates,
> but I think we should arrange things this way so we can be assured the
> compiler isn't going to do extra parameter passing for `Trait`.
>
> At least you can common up the `Copy` and `ConstructDefault` functions into
> a common structure for specialization...
Done. I find it's a bit verbose, but I'm not sure if I did it in the best way.
> ::: mfbt/SPSCQueue.h:125
> (Diff revision 1)
> > + {
> > + MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2 &&
> > + "buffer too large for the type of index used.");
> > + MOZ_ASSERT(mCapacity > 0);
> > +
> > + mData.reset(new T[StorageCapacity()]);
>
> I have a small preference for just `mData =
> MakeUnique<T[]>(StorageCapacity())`, to avoid bare `new` calls, but up to
> you.
It's not the first time I see something like this mentioned, and I don't understand the issue with bare new calls, do you have something I can read to understand? In any case, I've removed the new here. Also, I'm using std::unique_ptr, so it's make_unique, but I can switch it to Gecko's version if you want.
> ::: mfbt/SPSCQueue.h:187
> (Diff revision 1)
> > + details::Copy(mData.get() + wrIdx, aElements, firstPart);
> > + details::Copy(mData.get(), aElements + firstPart, secondPart);
>
> Do we want to require copy semantics for transferring elements into the ring
> buffer, or can we be more clever and use move semantics where possible?
Yes. Added a test for this as well.
> Why say "If this queue is using atomics..."? The indices are always
> `std::atomic`, so I don't understand why this statement would be
> conditionally qualified.
I had a template policy to determine the index type (atomic integer or plan integer) before, and switched to atomic only later. It was useful, since a non-threadsafe ring buffer can certainly be useful on its own, but we ended up not needing it, so I removed the template policy and made the data structure unconditionally thread-safe.
It would be rather easy to put this feature back if we feel it would be useful.
> ::: mfbt/SPSCQueue.h:233
> (Diff revision 1)
> > + mReadIndex.store(IncrementIndex(rdIdx, toRead),
> > + std::memory_order::memory_order_relaxed);
>
> Why is relaxed memory ordering correct here? This is essentially publishing
> the read index, so relaxed ordering does not feel like the correct thing.
> (For instance, if the producer thread doesn't see updates to this, it will
> fail to write when there could be a bunch of space left...)
It should be `release` indeed. It does not matter on x86 though, but it does on ARM and others.
> ::: mfbt/SPSCQueue.h:272
> (Diff revision 1)
> > + * @return The number of empty slots in the buffer, available for writing.
> > + */
> > + int AvailableWrite() const
> > + {
> > +#ifndef DEBUG
> > + AssertCorrectThread(producerId);
>
> Nit: presumably this is `mProducerId`?
I forgot to compile in debug mode before sending you the review, embarrassing...
> ::: mfbt/SPSCQueue.h:323
> (Diff revision 1)
> > + * @param writeIndex the write index to consider
> > + * @return true if the ring buffer is full, false otherwise.
> > + **/
> > + bool FullInternal(int aReadIndex, int aWriteIndex) const
> > + {
> > + return (aWriteIndex + 1) % StorageCapacity() == aReadIndex;
>
> Do we want to require (via release assertions in the constructor or perhaps
> a `size_t Capacity` template parameter) that `StorageCapacity()` will return
> a power-of-two so we can just do masking here rather than a full modulo?
I'd rather not for a number of reasons:
- We very often decide on the capacity of those queues at run time. For example, for the first use of this data structure, its capacity will depend on the type of audio output device Firefox is using (high latency vs. low latency, say, bluetooth audio vs. integrated sound chip). Picking a correct size simply is better for memory usage, because we're not rounding up.
- `AvailableWrite` is used for back-pressure on the producer side, so it's really important to chose the capacity based on the problem at hand: the producer can know if it's producing too much, and the consumer can know if it's consuming too much. This is only something that can be done if we have a queue of the right size, and pull and push of the right size as well.
- Finally, a minor concern: it's awkward to require a POT - 1 size from the user of the queue.
In all usages I've had for this queue, I've never seen this in any profile, but if we want, we could have a separate class that would have a static POT capacity and wrap around using masking. We could share most of the code.
> ::: mfbt/SPSCQueue.h:342
> (Diff revision 1)
> > + * This can be called from the consummer or producer thread, but see the
> > + * comment in `AvailableRead`.
> > + *
> > + * @return the number of available elements for reading.
> > + */
> > + int AvailableReadInternal(int aReadIndex, int aWriteIndex) const
>
> Same comment here on the `Internal` suffix.
There is are public `AvailableRead` and `AvailableWrite` methods, though, so I've kept the `Internal` suffix here.
Comment hidden (mozreview-request) |
Comment 5•7 years ago
|
||
mozreview-review |
Comment on attachment 8968191 [details]
Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/.
https://reviewboard.mozilla.org/r/236880/#review247902
Lots of little comments, but nothing substantial. Sorry for not getting to this before you left. =/
::: commit-message-ee1d1:1
(Diff revision 2)
> +Bug 1454385 - Add a single producer single consumer lock and wait free queue to mfbt/. r?froydjn
Nit: the reviewer is "froydnj", not "froydjn". ;)
::: mfbt/SPSCQueue.h:27
(Diff revision 2)
> + * This allows zeroing (using memset) or default-constructing a number of
> + * elements calling the constructors and destructors if necessary.
> + */
> + static void ConstructDefault(T* aDestination, size_t aCount);
> + /**
> + * This allows either moving (if T supports it) or copying a number of
> + * elements from a `aSource` pointer to a `aDestination` pointer.
> + * If it is safe to do so and this call copies, this uses PodCopy. Otherwise,
> + * constructors and destructors are called in a loop.
I don't see any calling of destructors in the non-`IsPod` case. Perhaps the documentation should be revised to remove mention of destructors?
::: mfbt/SPSCQueue.h:64
(Diff revision 2)
> + for (size_t i = 0; i < aCount; i++) {
> + aDestination[i] = std::move(aSource[i]);
> + }
The C++ purist would ask for this to be `std::move(aDestination, aDestination + aCount, aSource)`.
::: mfbt/SPSCQueue.h:96
(Diff revision 2)
> + * thread can only touch a portion of the buffer that is not touched by the
> + * other thread.
> + * - Callers are expected to provide buffers. When writing to the queue,
> + * elements are copied into the internal storage from the buffer passed in.
> + * When reading from the queue, the user is expected to provide a buffer.
> + * Because this is a ring buffer, data might not be contiguous in memory,
Nit: the ending comma here should be a semicolon, or start a new sentence with the line below.
::: mfbt/SPSCQueue.h:112
(Diff revision 2)
> + * This performs an allocation on the heap, but is the only allocation that
> + * will happen for the life time of a `SPSCRingBufferBase`.
> + *
> + * @param Capacity The maximum number of element this ring buffer will hold.
> + */
> + SPSCRingBufferBase(int aCapacity)
Nit: this is going to want `explicit` for the static analysis checker.
You should also check your test code with the static analysis checker; I think there are some similar problems there.
::: mfbt/SPSCQueue.h:118
(Diff revision 2)
> + MOZ_ASSERT(StorageCapacity() < std::numeric_limits<int>::max() / 2 &&
> + "buffer too large for the type of index used.");
a) You don't need the `&&` trick, you can just:
`MOZ_ASSERT(condition, message);`
b) I guess we don't care about `aCapacity + 1` in the init-list overflowing?
::: mfbt/SPSCQueue.h:124
(Diff revision 2)
> + "buffer too large for the type of index used.");
> + MOZ_ASSERT(mCapacity > 0);
> +
> + mData = std::make_unique<T[]>(StorageCapacity());
> +
> + std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
To be clear, this fence is to "publish" the stored `mCapacity` value, correct?
::: mfbt/SPSCQueue.h:285
(Diff revision 2)
> + *
> + * @return The maximum Capacity of this ring buffer.
> + */
> + int Capacity() const { return StorageCapacity() - 1; }
> + /**
> + * Reset the consumer and producer thread identifier, in case the thread are
Nit: "the threads are..."
::: mfbt/SPSCQueue.h:407
(Diff revision 2)
> + /** Maximum number of elements that can be stored in the ring buffer. */
> + const int mCapacity;
I think this variable wants to be `std::atomic` or `mozilla::Atomic<int, Relaxed>` for TSan to be happy with it.
::: mfbt/SPSCQueue.h:410
(Diff revision 2)
> + * least one element ahead of `mReadIndex`. */
> + std::atomic<int> mWriteIndex;
> + /** Maximum number of elements that can be stored in the ring buffer. */
> + const int mCapacity;
> + /** Data storage, of size `mCapacity + 1` */
> + std::unique_ptr<T[]> mData;
Come to think of it, TSan is probably going to complain about this variable being touched on multiple threads. =/
::: mfbt/tests/TestSPSCQueue.cpp:207
(Diff revision 2)
> + };
> +
> + std::vector<Thing> vec_in;
> + std::vector<Thing> vec_out;
> +
> + for (uint32_t i = 0; i < 16; i++) {
Nit: Maybe make this bare `16`, that gets used all over the place below, a constant, and use the constant instead?
::: mfbt/tests/TestSPSCQueue.cpp:217
(Diff revision 2)
> + // Check that we've moved the std::string into the queue.
> + for (uint32_t i = 0; i < 16; i++) {
> + MOZ_RELEASE_ASSERT(vec_in[i].mStr.empty());
> + }
Technically, I don't think you can depend on this? Move construction just ensures that the moved-from object is in a destructable state ([string.cons] says "a valid but unspecified value"). It would be perfectly reasonable for `vec_in[i]` to still contain the string representation in the case of ref-counted string buffers, or short inline strings.
Maybe if you made the `Thing` move constructor explicitly truncate the moved-from string, this would be OK?
Attachment #8968191 -
Flags: review?(nfroyd) → review+
Assignee | ||
Comment 6•6 years ago
|
||
(In reply to Nathan Froyd [:froydnj] from comment #5)
> ::: mfbt/SPSCQueue.h:64
> (Diff revision 2)
> > + for (size_t i = 0; i < aCount; i++) {
> > + aDestination[i] = std::move(aSource[i]);
> > + }
>
> The C++ purist would ask for this to be `std::move(aDestination,
> aDestination + aCount, aSource)`.
`std::move(aSource, aSource + aCount, aDestination)`, rather.
> ::: mfbt/SPSCQueue.h:124
> (Diff revision 2)
> > + "buffer too large for the type of index used.");
> > + MOZ_ASSERT(mCapacity > 0);
> > +
> > + mData = std::make_unique<T[]>(StorageCapacity());
> > +
> > + std::atomic_thread_fence(std::memory_order::memory_order_seq_cst);
>
> To be clear, this fence is to "publish" the stored `mCapacity` value,
> correct?
Yes, and the mData member as well.
> ::: mfbt/SPSCQueue.h:407
> (Diff revision 2)
> > + /** Maximum number of elements that can be stored in the ring buffer. */
> > + const int mCapacity;
>
> I think this variable wants to be `std::atomic` or `mozilla::Atomic<int,
> Relaxed>` for TSan to be happy with it.
This one is fine thanks to the fence in the ctor.
> ::: mfbt/SPSCQueue.h:410
> (Diff revision 2)
> > + * least one element ahead of `mReadIndex`. */
> > + std::atomic<int> mWriteIndex;
> > + /** Maximum number of elements that can be stored in the ring buffer. */
> > + const int mCapacity;
> > + /** Data storage, of size `mCapacity + 1` */
> > + std::unique_ptr<T[]> mData;
>
> Come to think of it, TSan is probably going to complain about this variable
> being touched on multiple threads. =/
This also is fine thanks to the fence.
You mentioning TSAN was a great idea, I ended up doing this:
> for i in *.h
> do
> sed -if 's-mozilla/--' $i
> done
> clang++ -g -fsanitize=thread -Imfbt mfbt/tests/TestSPSCQueue.cpp -lpthread
which errored out because we were too lax with the memory ordering for the read index, having proper `acquire` for the read and `release` for the write made the error go away.
I think that it was safe, still (as long it's atomic variable we're dealing with), because of the way this queue works.
Comment hidden (mozreview-request) |
Comment hidden (mozreview-request) |
Assignee | ||
Comment 9•6 years ago
|
||
I'm pushing this with the test disabled on linux (with Nathan approval) because of bug 1464084. The test is green on other platforms, and this is not really platform dependent.
Comment hidden (mozreview-request) |
Comment 11•6 years ago
|
||
Pushed by paul@paul.cx:
https://hg.mozilla.org/integration/autoland/rev/977451308538
Add a single producer single consumer lock and wait free queue to mfbt/. r=froydnj
Comment 12•6 years ago
|
||
bugherder |
Status: NEW → RESOLVED
Closed: 6 years ago
status-firefox62:
--- → fixed
Resolution: --- → FIXED
Target Milestone: --- → mozilla62
Updated•6 years ago
|
You need to log in
before you can comment on or make changes to this bug.
Description
•