Added a test to measure the impact of a separate thread periodically locking the queue entirely.
This commit is contained in:
parent
bbc1ba223a
commit
b609519a5c
1 changed files with 99 additions and 26 deletions
|
@ -234,6 +234,11 @@ void RunEpicTest()
|
||||||
/**************************************************************************/
|
/**************************************************************************/
|
||||||
/* Lock-free FIFO test */
|
/* Lock-free FIFO test */
|
||||||
|
|
||||||
|
/* This is useful to test the impact of another thread locking the queue
|
||||||
|
entirely for heavy-weight manipulation.
|
||||||
|
*/
|
||||||
|
#define TEST_SPINLOCK_FIFO
|
||||||
|
|
||||||
#define NUM_READERS 4
|
#define NUM_READERS 4
|
||||||
#define NUM_WRITERS 4
|
#define NUM_WRITERS 4
|
||||||
#define EVENTS_PER_WRITER 1000000
|
#define EVENTS_PER_WRITER 1000000
|
||||||
|
@ -265,6 +270,14 @@ typedef struct
|
||||||
|
|
||||||
char cache_pad3[CACHELINE-sizeof(SDL_atomic_t)];
|
char cache_pad3[CACHELINE-sizeof(SDL_atomic_t)];
|
||||||
|
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
SDL_SpinLock lock;
|
||||||
|
SDL_atomic_t rwcount;
|
||||||
|
SDL_atomic_t watcher;
|
||||||
|
|
||||||
|
char cache_pad4[CACHELINE-sizeof(SDL_SpinLock)-2*sizeof(SDL_atomic_t)];
|
||||||
|
#endif
|
||||||
|
|
||||||
SDL_bool active;
|
SDL_bool active;
|
||||||
|
|
||||||
/* Only needed for the mutex test */
|
/* Only needed for the mutex test */
|
||||||
|
@ -281,6 +294,10 @@ static void InitEventQueue(SDL_EventQueue *queue)
|
||||||
}
|
}
|
||||||
SDL_AtomicSet(&queue->enqueue_pos, 0);
|
SDL_AtomicSet(&queue->enqueue_pos, 0);
|
||||||
SDL_AtomicSet(&queue->dequeue_pos, 0);
|
SDL_AtomicSet(&queue->dequeue_pos, 0);
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
queue->lock = 0;
|
||||||
|
SDL_AtomicSet(&queue->rwcount, 0);
|
||||||
|
#endif
|
||||||
queue->active = SDL_TRUE;
|
queue->active = SDL_TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -290,6 +307,15 @@ static SDL_bool EnqueueEvent_LockFree(SDL_EventQueue *queue, const SDL_Event *ev
|
||||||
unsigned queue_pos;
|
unsigned queue_pos;
|
||||||
unsigned entry_seq;
|
unsigned entry_seq;
|
||||||
int delta;
|
int delta;
|
||||||
|
SDL_bool status;
|
||||||
|
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
/* This is a gate so an external thread can lock the queue */
|
||||||
|
SDL_AtomicLock(&queue->lock);
|
||||||
|
SDL_assert(SDL_AtomicGet(&queue->watcher) == 0);
|
||||||
|
SDL_AtomicIncRef(&queue->rwcount);
|
||||||
|
SDL_AtomicUnlock(&queue->lock);
|
||||||
|
#endif
|
||||||
|
|
||||||
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
|
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
|
@ -300,22 +326,26 @@ static SDL_bool EnqueueEvent_LockFree(SDL_EventQueue *queue, const SDL_Event *ev
|
||||||
if (delta == 0) {
|
if (delta == 0) {
|
||||||
/* The entry and the queue position match, try to increment the queue position */
|
/* The entry and the queue position match, try to increment the queue position */
|
||||||
if (SDL_AtomicCAS(&queue->enqueue_pos, (int)queue_pos, (int)(queue_pos+1))) {
|
if (SDL_AtomicCAS(&queue->enqueue_pos, (int)queue_pos, (int)(queue_pos+1))) {
|
||||||
|
/* We own the object, fill it! */
|
||||||
|
entry->event = *event;
|
||||||
|
SDL_AtomicSet(&entry->sequence, (int)(queue_pos + 1));
|
||||||
|
status = SDL_TRUE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (delta < 0) {
|
} else if (delta < 0) {
|
||||||
/* We ran into an old queue entry, which means it still needs to be dequeued */
|
/* We ran into an old queue entry, which means it still needs to be dequeued */
|
||||||
return SDL_FALSE;
|
status = SDL_FALSE;
|
||||||
|
break;
|
||||||
} else {
|
} else {
|
||||||
/* We ran into a new queue entry, get the new queue position */
|
/* We ran into a new queue entry, get the new queue position */
|
||||||
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
|
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We own the object, fill it! */
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
entry->event = *event;
|
SDL_AtomicDecRef(&queue->rwcount);
|
||||||
SDL_AtomicSet(&entry->sequence, (int)(queue_pos + 1));
|
#endif
|
||||||
|
return status;
|
||||||
return SDL_TRUE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
|
static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
|
@ -324,6 +354,15 @@ static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
unsigned queue_pos;
|
unsigned queue_pos;
|
||||||
unsigned entry_seq;
|
unsigned entry_seq;
|
||||||
int delta;
|
int delta;
|
||||||
|
SDL_bool status;
|
||||||
|
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
/* This is a gate so an external thread can lock the queue */
|
||||||
|
SDL_AtomicLock(&queue->lock);
|
||||||
|
SDL_assert(SDL_AtomicGet(&queue->watcher) == 0);
|
||||||
|
SDL_AtomicIncRef(&queue->rwcount);
|
||||||
|
SDL_AtomicUnlock(&queue->lock);
|
||||||
|
#endif
|
||||||
|
|
||||||
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
|
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
|
||||||
for ( ; ; ) {
|
for ( ; ; ) {
|
||||||
|
@ -334,22 +373,26 @@ static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
if (delta == 0) {
|
if (delta == 0) {
|
||||||
/* The entry and the queue position match, try to increment the queue position */
|
/* The entry and the queue position match, try to increment the queue position */
|
||||||
if (SDL_AtomicCAS(&queue->dequeue_pos, (int)queue_pos, (int)(queue_pos+1))) {
|
if (SDL_AtomicCAS(&queue->dequeue_pos, (int)queue_pos, (int)(queue_pos+1))) {
|
||||||
|
/* We own the object, fill it! */
|
||||||
|
*event = entry->event;
|
||||||
|
SDL_AtomicSet(&entry->sequence, (int)(queue_pos+MAX_ENTRIES));
|
||||||
|
status = SDL_TRUE;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (delta < 0) {
|
} else if (delta < 0) {
|
||||||
/* We ran into an old queue entry, which means we've hit empty */
|
/* We ran into an old queue entry, which means we've hit empty */
|
||||||
return SDL_FALSE;
|
status = SDL_FALSE;
|
||||||
|
break;
|
||||||
} else {
|
} else {
|
||||||
/* We ran into a new queue entry, get the new queue position */
|
/* We ran into a new queue entry, get the new queue position */
|
||||||
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
|
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We own the object, fill it! */
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
*event = entry->event;
|
SDL_AtomicDecRef(&queue->rwcount);
|
||||||
SDL_AtomicSet(&entry->sequence, (int)(queue_pos+MAX_ENTRIES));
|
#endif
|
||||||
|
return status;
|
||||||
return SDL_TRUE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event)
|
static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event)
|
||||||
|
@ -358,6 +401,7 @@ static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event
|
||||||
unsigned queue_pos;
|
unsigned queue_pos;
|
||||||
unsigned entry_seq;
|
unsigned entry_seq;
|
||||||
int delta;
|
int delta;
|
||||||
|
SDL_bool status = SDL_FALSE;
|
||||||
|
|
||||||
SDL_mutexP(queue->mutex);
|
SDL_mutexP(queue->mutex);
|
||||||
|
|
||||||
|
@ -368,21 +412,20 @@ static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event
|
||||||
delta = (int)(entry_seq - queue_pos);
|
delta = (int)(entry_seq - queue_pos);
|
||||||
if (delta == 0) {
|
if (delta == 0) {
|
||||||
++queue->enqueue_pos.value;
|
++queue->enqueue_pos.value;
|
||||||
} else if (delta < 0) {
|
|
||||||
/* We ran into an old queue entry, which means it still needs to be dequeued */
|
|
||||||
SDL_mutexV(queue->mutex);
|
|
||||||
return SDL_FALSE;
|
|
||||||
} else {
|
|
||||||
printf("ERROR: mutex failed!\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* We own the object, fill it! */
|
/* We own the object, fill it! */
|
||||||
entry->event = *event;
|
entry->event = *event;
|
||||||
entry->sequence.value = (int)(queue_pos + 1);
|
entry->sequence.value = (int)(queue_pos + 1);
|
||||||
|
status = SDL_TRUE;
|
||||||
|
} else if (delta < 0) {
|
||||||
|
/* We ran into an old queue entry, which means it still needs to be dequeued */
|
||||||
|
} else {
|
||||||
|
printf("ERROR: mutex failed!\n");
|
||||||
|
}
|
||||||
|
|
||||||
SDL_mutexV(queue->mutex);
|
SDL_mutexV(queue->mutex);
|
||||||
|
|
||||||
return SDL_TRUE;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
|
static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
|
@ -391,6 +434,7 @@ static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
unsigned queue_pos;
|
unsigned queue_pos;
|
||||||
unsigned entry_seq;
|
unsigned entry_seq;
|
||||||
int delta;
|
int delta;
|
||||||
|
SDL_bool status = SDL_FALSE;
|
||||||
|
|
||||||
SDL_mutexP(queue->mutex);
|
SDL_mutexP(queue->mutex);
|
||||||
|
|
||||||
|
@ -401,21 +445,20 @@ static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
|
||||||
delta = (int)(entry_seq - (queue_pos + 1));
|
delta = (int)(entry_seq - (queue_pos + 1));
|
||||||
if (delta == 0) {
|
if (delta == 0) {
|
||||||
++queue->dequeue_pos.value;
|
++queue->dequeue_pos.value;
|
||||||
} else if (delta < 0) {
|
|
||||||
/* We ran into an old queue entry, which means we've hit empty */
|
|
||||||
SDL_mutexV(queue->mutex);
|
|
||||||
return SDL_FALSE;
|
|
||||||
} else {
|
|
||||||
printf("ERROR: mutex failed!\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* We own the object, fill it! */
|
/* We own the object, fill it! */
|
||||||
*event = entry->event;
|
*event = entry->event;
|
||||||
entry->sequence.value = (int)(queue_pos + MAX_ENTRIES);
|
entry->sequence.value = (int)(queue_pos + MAX_ENTRIES);
|
||||||
|
status = SDL_TRUE;
|
||||||
|
} else if (delta < 0) {
|
||||||
|
/* We ran into an old queue entry, which means we've hit empty */
|
||||||
|
} else {
|
||||||
|
printf("ERROR: mutex failed!\n");
|
||||||
|
}
|
||||||
|
|
||||||
SDL_mutexV(queue->mutex);
|
SDL_mutexV(queue->mutex);
|
||||||
|
|
||||||
return SDL_TRUE;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDL_sem *writersDone;
|
static SDL_sem *writersDone;
|
||||||
|
@ -517,6 +560,29 @@ static int FIFO_Reader(void* _data)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
/* This thread periodically locks the queue for no particular reason */
|
||||||
|
static int FIFO_Watcher(void* _data)
|
||||||
|
{
|
||||||
|
SDL_EventQueue *queue = (SDL_EventQueue *)_data;
|
||||||
|
|
||||||
|
while (queue->active) {
|
||||||
|
SDL_AtomicLock(&queue->lock);
|
||||||
|
SDL_AtomicIncRef(&queue->watcher);
|
||||||
|
while (SDL_AtomicGet(&queue->rwcount) > 0) {
|
||||||
|
SDL_Delay(0);
|
||||||
|
}
|
||||||
|
/* Do queue manipulation here... */
|
||||||
|
SDL_AtomicDecRef(&queue->watcher);
|
||||||
|
SDL_AtomicUnlock(&queue->lock);
|
||||||
|
|
||||||
|
/* Wait a bit... */
|
||||||
|
SDL_Delay(1);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif /* TEST_SPINLOCK_FIFO */
|
||||||
|
|
||||||
static void RunFIFOTest(SDL_bool lock_free)
|
static void RunFIFOTest(SDL_bool lock_free)
|
||||||
{
|
{
|
||||||
SDL_EventQueue queue;
|
SDL_EventQueue queue;
|
||||||
|
@ -541,6 +607,13 @@ static void RunFIFOTest(SDL_bool lock_free)
|
||||||
|
|
||||||
start = SDL_GetTicks();
|
start = SDL_GetTicks();
|
||||||
|
|
||||||
|
#ifdef TEST_SPINLOCK_FIFO
|
||||||
|
/* Start a monitoring thread */
|
||||||
|
if (lock_free) {
|
||||||
|
SDL_CreateThread(FIFO_Watcher, &queue);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Start the readers first */
|
/* Start the readers first */
|
||||||
printf("Starting %d readers\n", NUM_READERS);
|
printf("Starting %d readers\n", NUM_READERS);
|
||||||
SDL_zero(readerData);
|
SDL_zero(readerData);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue