Let the RWQueue be stoppable

This commit is contained in:
kcgen 2023-05-23 09:48:01 -07:00 committed by kcgen
parent 0ce4ff7e29
commit 59a6933f26
5 changed files with 277 additions and 62 deletions

View file

@ -25,20 +25,18 @@
/* RW (Read/Write) Queue
* ---------------------
* A fixed-size thread-safe queue that blocks both the
* producer until space is available and the consumer until
* items are available.
* A fixed-size thread-safe queue that blocks both the producer until space is
* available, and the consumer until items are available.
*
* For some background, this class was authored to replace
* the one that resulted from this discussion:
* https://github.com/cameron314/readerwriterqueue/issues/112
* because the MoodyCamel implementation:
* - Is roughly 5-fold larger (and more latent)
* - Consumes more CPU by spinning (instead of locking)
* - Lacks bulk queue/dequeue methods (request was rejected
* https://github.com/cameron314/readerwriterqueue/issues/130)
* For optimal performance inside the rwqueue, blocking is accomplished by
* putting the thread into the waiting state, then waking it up via notify when
* the required conditions are met.
*
* Producer and consumer thread(s) are expected to simply call the enqueue and
* dequeue methods directly without any thread state management.
*/
#include <atomic>
#include <condition_variable>
#include <deque>
#include <mutex>
@ -52,6 +50,7 @@ private:
std::condition_variable has_room = {};
std::condition_variable has_items = {};
size_t capacity = 0;
std::atomic<bool> is_running = true;
using difference_t = typename std::vector<T>::difference_type;
public:
@ -63,8 +62,20 @@ public:
void Resize(size_t queue_capacity);
bool IsEmpty();
// non-blocking call
bool IsRunning() const;
// non-blocking call
size_t Size();
// non-blocking calls
void Stop();
// non-blocking call
size_t MaxCapacity() const;
// non-blocking call
float GetPercentFull();
// Discourage copying into the queue. Instead, use std::move into the
@ -73,9 +84,19 @@ public:
//
// void Enqueue(const T& item);
// items will be empty (moved-out) after call
void Enqueue(T&& item);
// Items will be empty (moved-out) after call.
// The method potentially blocks until the queue has enough capacity to
// queue a single item.
// If queuing has stopped prior to enqueing, then this will immediately
// return false and the item will not be queued.
bool Enqueue(T&& item);
// The method potentially blocks until there is at least a single item
// in the queue to dequeue.
// If queuing has stopped, this will continue to return item(s) until
// none remain in the queue, at which point it immediately returns T{}.
T Dequeue();
// Bulk operations move multiple items from/to the given vector, which
@ -90,11 +111,28 @@ public:
// Items are std::move'd out of the source vector into the queue. This
// function clear()s the vector such that it's in a defined state on
// return (and can be reused).
void BulkEnqueue(std::vector<T>& from_source, const size_t num_requested);
// return (and can be reused). The method potentially blocks until there
// is enough capacity in the queue for the new items.
// The target vector will be resized to accomodate, if needed.
void BulkDequeue(std::vector<T>& into_target, const size_t num_requested);
// If queuing has stopped prior to bulk enqueing, then this will
// immediately return false and no items will be queued.
// If queuing stops in the middle of enqueing prior to completion, then
// this will immediately return false. The items queued /before/
// stopping will be available in the queue however the items that came
// after stopping will not be queued.
bool BulkEnqueue(std::vector<T>& from_source, const size_t num_requested);
// The target vector will be resized to accomodate, if needed. The
// method potentially blocks until the requested number of items have
// been dequeued.
// If queuing has stopped:
// - Returns true when one or more item(s) have been dequeued.
// - Returns false when no items can be dequeued.
//
// The vector is always sized to match the number of items returned.
bool BulkDequeue(std::vector<T>& into_target, const size_t num_requested);
};
#endif

View file

@ -544,9 +544,8 @@ void MidiHandlerFluidsynth::Close()
// Stop rendering and drain the fifo
keep_rendering = false;
while (audio_frame_fifo.Size()) {
(void)audio_frame_fifo.Dequeue();
}
work_fifo.Stop();
audio_frame_fifo.Stop();
// Wait for the rendering thread to finish
if (renderer.joinable())
@ -682,15 +681,17 @@ void MidiHandlerFluidsynth::RenderAudioFramesToFifo(const uint16_t num_audio_fra
void MidiHandlerFluidsynth::ProcessWorkFromFifo()
{
const auto work = work_fifo.Dequeue();
if (!work_fifo.IsRunning()) {
return;
}
/* // Comment-in to log inter-cycle rendering
if ( work.num_pending_audio_frames > 0) {
LOG_MSG("FSYNTH: %2u audio frames prior to %s message, followed
by "
LOG_MSG("FSYNTH: %2u audio frames prior to %s message, followed by "
"%2lu more messages. Have %4lu audio frames queued",
work.num_pending_audio_frames,
work.message_type == MessageType::Channel ? "channel" :
"sysex", work_fifo.Size(), audio_frame_fifo.Size());
"sysex", work_fifo.Size(), audio_frame_fifo.Size());
}*/
if (work.num_pending_audio_frames > 0) {
@ -708,7 +709,7 @@ void MidiHandlerFluidsynth::ProcessWorkFromFifo()
// Keep the fifo populated with freshly rendered buffers
void MidiHandlerFluidsynth::Render()
{
while (keep_rendering.load()) {
while (work_fifo.IsRunning()) {
work_fifo.IsEmpty() ? RenderAudioFramesToFifo()
: ProcessWorkFromFifo();
}

View file

@ -694,9 +694,8 @@ void MidiHandler_mt32::Close()
// Stop rendering and drain the fifo
keep_rendering = false;
while (audio_frame_fifo.Size()) {
(void)audio_frame_fifo.Dequeue();
}
work_fifo.Stop();
audio_frame_fifo.Stop();
// Wait for the rendering thread to finish
if (renderer.joinable())
@ -807,15 +806,17 @@ void MidiHandler_mt32::RenderAudioFramesToFifo(const uint16_t num_frames)
void MidiHandler_mt32::ProcessWorkFromFifo()
{
const auto work = work_fifo.Dequeue();
if (!work_fifo.IsRunning()) {
return;
}
/* // Comment-in to log inter-cycle rendering
if (work.num_pending_audio_frames > 0) {
LOG_MSG("MT32: %2u audio frames prior to %s message, followed by
"
LOG_MSG("MT32: %2u audio frames prior to %s message, followed by"
"%2lu more messages. Have %4lu audio frames queued",
work.num_pending_audio_frames,
work.message_type == MessageType::Channel ? "channel" :
"sysex", work_fifo.Size(), audio_frame_fifo.Size());
"sysex", work_fifo.Size(), audio_frame_fifo.Size());
}*/
if (work.num_pending_audio_frames > 0) {
@ -839,7 +840,7 @@ void MidiHandler_mt32::ProcessWorkFromFifo()
// Keep the fifo populated with freshly rendered buffers
void MidiHandler_mt32::Render()
{
while (keep_rendering.load()) {
while (work_fifo.IsRunning()) {
work_fifo.IsEmpty() ? RenderAudioFramesToFifo()
: ProcessWorkFromFifo();
}

View file

@ -43,6 +43,19 @@ size_t RWQueue<T>::Size()
return queue.size();
}
template <typename T>
void RWQueue<T>::Stop()
{
if (!is_running) {
return;
}
is_running = false;
// notify the conditions
has_items.notify_one();
has_room.notify_one();
}
template <typename T>
size_t RWQueue<T>::MaxCapacity() const
{
@ -65,16 +78,29 @@ bool RWQueue<T>::IsEmpty()
}
template <typename T>
void RWQueue<T>::Enqueue(T&& item)
bool RWQueue<T>::IsRunning() const
{
// wait until the queue has room to accept the item
return is_running;
}
template <typename T>
bool RWQueue<T>::Enqueue(T&& item)
{
// wait until we're stopped or the queue has room to accept the item
std::unique_lock<std::mutex> lock(mutex);
has_room.wait(lock, [this] { return queue.size() < capacity; });
has_room.wait(lock,
[this] { return !is_running || queue.size() < capacity; });
// add it, and notify the next waiting thread that we've got an item
queue.emplace(queue.end(), std::move(item));
if (is_running) {
queue.emplace(queue.end(), std::move(item));
}
// If we stopped while enqueing, then anything that was enqueued prior
// to being stopped is safely in the queue.
lock.unlock();
has_items.notify_one();
return is_running;
}
// In both bulk methods, the best case scenario is if the queue can absorb or
@ -86,7 +112,7 @@ void RWQueue<T>::Enqueue(T&& item)
// room for just one item to avoid spinning with a zero count (which burns CPU).
template <typename T>
void RWQueue<T>::BulkEnqueue(std::vector<T>& from_source, const size_t num_requested)
bool RWQueue<T>::BulkEnqueue(std::vector<T>& from_source, const size_t num_requested)
{
constexpr size_t min_items = 1;
assert(num_requested >= min_items);
@ -105,42 +131,58 @@ void RWQueue<T>::BulkEnqueue(std::vector<T>& from_source, const size_t num_reque
std::min(num_remaining,
free_capacity));
// wait until the queue has enough room for the items
has_room.wait(lock, [&] { return capacity - queue.size() >= num_items; });
// wait until we're stopped or the queue has enough room for the
// items
has_room.wait(lock, [&] {
return !is_running || capacity - queue.size() >= num_items;
});
const auto source_end = source_start +
static_cast<difference_t>(num_items);
queue.insert(queue.end(),
std::move_iterator(source_start),
std::move_iterator(source_end));
if (is_running) {
const auto source_end = source_start +
static_cast<difference_t>(num_items);
queue.insert(queue.end(),
std::move_iterator(source_start),
std::move_iterator(source_end));
source_start = source_end;
num_remaining -= num_items;
} else {
// If we stopped while bulk enqueing, then stop here.
// Anything that was enqueued prior to being stopped is
// safely in the queue.
num_remaining = 0;
}
// notify the next waiting thread that we have an item
lock.unlock();
has_items.notify_one();
source_start = source_end;
num_remaining -= num_items;
}
from_source.clear();
return is_running;
}
template <typename T>
T RWQueue<T>::Dequeue()
{
// wait until the queue has an item
// wait until we're stopped or the queue has an item
std::unique_lock<std::mutex> lock(mutex);
has_items.wait(lock, [this] { return !queue.empty(); });
has_items.wait(lock, [this] { return !is_running || !queue.empty(); });
// get it, and notify the first waiting thread that the queue has room
T item = std::move(queue.front());
queue.pop_front();
T item = {};
// Even if the queue has stopped, we need to drain the (previously)
// queued items before we're done.
if (is_running || !queue.empty()) {
item = std::move(queue.front());
queue.pop_front();
}
lock.unlock();
// notify the first waiting thread that the queue has room
has_room.notify_one();
return item;
}
template <typename T>
void RWQueue<T>::BulkDequeue(std::vector<T>& into_target, const size_t num_requested)
bool RWQueue<T>::BulkDequeue(std::vector<T>& into_target, const size_t num_requested)
{
constexpr size_t min_items = 1;
assert(num_requested >= min_items);
@ -159,22 +201,36 @@ void RWQueue<T>::BulkDequeue(std::vector<T>& into_target, const size_t num_reque
std::min(num_remaining,
queue.size()));
// wait until the queue has enough items
has_items.wait(lock, [&] { return queue.size() >= num_items; });
// wait until we're stopped or the queue has enough items
has_items.wait(lock, [&] {
return !is_running || queue.size() >= num_items;
});
const auto source_start = queue.begin();
const auto source_end = source_start +
static_cast<difference_t>(num_items);
std::move(source_start, source_end, target_start);
queue.erase(source_start, source_end);
// Even if the queue has stopped, we need to drain the
// (previously) queued items before we're done.
if (is_running || !queue.empty()) {
const auto queue_end = queue.begin() +
static_cast<difference_t>(num_items);
std::move(queue.begin(), queue_end, target_start);
queue.erase(queue.begin(), queue_end);
target_start += static_cast<difference_t>(num_items);
num_remaining -= num_items;
} else {
// If we stopped while dequeing, cap off the target
// vector based on the subset that were dequeued.
assert(num_remaining <= num_requested);
into_target.resize(num_requested - num_remaining);
num_remaining = 0;
}
// notify the first waiting thread that the queue now has room
lock.unlock();
has_room.notify_one();
target_start += static_cast<difference_t>(num_items);
num_remaining -= num_items;
}
return !into_target.empty();
}
// Explicit template instantiations

View file

@ -439,4 +439,123 @@ TEST(RWQueue,ContainerMoveAsync)
EXPECT_EQ(q.Size(), 0);
}
TEST(RWQueue, StopImmediately)
{
RWQueue<int> q(65);
q.Stop();
EXPECT_FALSE(q.IsRunning());
EXPECT_FALSE(q.Enqueue(1)); // shouldn't block
EXPECT_TRUE(q.IsEmpty());
const auto value = q.Dequeue(); // shouldn't block
EXPECT_EQ(value, 0); // once stopped, dequeues defaults}
}
TEST(RWQueue, StopMidway)
{
RWQueue<int> q(2);
q.Enqueue(1);
EXPECT_EQ(q.Size(), 1);
EXPECT_FALSE(q.IsEmpty());
EXPECT_TRUE(q.IsRunning());
q.Stop();
// Enqueuing fails after being stopped
EXPECT_FALSE(q.IsRunning());
const auto enqueue_result = q.Enqueue(2);
EXPECT_FALSE(enqueue_result);
// We still have one item in the queue, so we're not stopped yet
auto value = q.Dequeue();
EXPECT_EQ(value, 1);
// once stopped and out of items, dequeuing has stopped
value = q.Dequeue();
EXPECT_EQ(value, 0);
}
TEST(RWQueue, StopBulkImmediately)
{
RWQueue<int> q(3);
q.Stop();
EXPECT_FALSE(q.IsRunning());
std::vector<int> items = {1, 2, 3};
const auto num_items = items.size();
// Bulk enqueuing fails after being stopped
const auto bulk_enqueue_result = q.BulkEnqueue(items, num_items);
EXPECT_FALSE(bulk_enqueue_result);
EXPECT_TRUE(q.IsEmpty());
// Bulk dequeing fails after being stopped and without any items to deqeue
const auto bulk_dequeue_result = q.BulkDequeue(items, num_items);
EXPECT_FALSE(bulk_dequeue_result);
EXPECT_TRUE(items.empty());
}
TEST(RWQueue, StopBulkMidway)
{
RWQueue<int> q(8);
// Bulk enque a couple before stopping
std::vector<int> items = {1, 2, 3, 4, 5};
auto bulk_enqueue_result = q.BulkEnqueue(items, items.size());
EXPECT_TRUE(bulk_enqueue_result);
EXPECT_TRUE(q.IsRunning());
EXPECT_EQ(q.Size(), 5);
q.Stop();
// Bulking enqueuing fails after being stopped
items = {6, 7};
bulk_enqueue_result = q.BulkEnqueue(items, items.size());
EXPECT_FALSE(bulk_enqueue_result);
EXPECT_FALSE(q.IsRunning());
EXPECT_EQ(q.Size(), 5);
// But we still have a handful of items queued
// Bulk dequeue the first couple
auto num_items = 2u;
auto bulk_dequeue_result = q.BulkDequeue(items, num_items);
EXPECT_TRUE(bulk_dequeue_result);
EXPECT_EQ(q.Size(), 3);
std::vector<int> expected_items = {1, 2};
EXPECT_EQ(items, expected_items);
// Dequeue the middle value
auto value = q.Dequeue();
EXPECT_EQ(q.Size(), 2);
EXPECT_EQ(value, 3);
// Bulk dequeue the last couple, but over-request
num_items = 3u;
bulk_dequeue_result = q.BulkDequeue(items, num_items);
EXPECT_TRUE(bulk_dequeue_result);
EXPECT_TRUE(q.IsEmpty());
EXPECT_EQ(q.Size(), 0);
expected_items = {4, 5};
EXPECT_EQ(items, expected_items);
// At this point, we should be out of items, but let's try bulk
// dequeuing anyway
num_items = 10u;
bulk_dequeue_result = q.BulkDequeue(items, num_items);
EXPECT_FALSE(bulk_dequeue_result);
EXPECT_TRUE(items.empty());
// At this point, we should be out of items, but let's try single
// dequeuing anyway
value = q.Dequeue();
EXPECT_EQ(value, {});
EXPECT_TRUE(items.empty());
}
} // namespace