The last modification in my series about ticket locks is about aborting locks. In an ideal world every function waiting on a mutex will eventually obtain it. However, there are times we would like a blocking function to stop blocking but not get the mutex. The most common example is during application shutdown when we need all threads to stop blocking and rejoin the main thread.
Our ticket lock implementation uses conditional variables to signal threads waiting for the mutex that the mutex has been released. During an abort we simple need to signal the waiting threads but have a flag that denotes the mutex is aborting. This information needs to be relayed to the parent function requesting the lock, and that function needs to check this status. If the lock didn’t take place, the mutex wasn’t obtained, and whatever operations were supposed to happen should be aborted. In C++ throwing an exception would be appropriate. For our C implementation, simple returning a failure code for the mutex lock will suffice.
//=============================================================================
// Uses: Ticket lock with early abort example.
// Date: 2018-12-26
// Author: Andrew Que <https://www.DrQue.net>
// Notes:
// Uses several threads that all hog a shared resource. The ticket lock
// will ensure all threads are able to access the resource equally.
// Compile:
// gcc -Wall -Wextra -std=c99 -pedantic abortExample.c -O2 -pthread -o example
//
// Andrew Que, 2018
// Public domain
//=============================================================================
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#include <time.h>
// Define implementation specifics about mutex.
typedef struct
{
// Mutex and conditional instance.
pthread_mutex_t lock;
pthread_cond_t condition;
// True if thread is running. False signifies a shutdown.
bool volatile isRunning;
// Id of the current thread that possesses the lock.
pthread_t lockingThread;
// Thread lock counts.
// Used to track recursive locks. Lock is free when count is 0.
unsigned count;
// Head/tail of mutex lock.
// The head is the ticket being served, tail is the next trick number to
// issue.
unsigned volatile head;
unsigned volatile tail;
} TicketLock;
//-----------------------------------------------------------------------------
// Uses:
// Unlock mutex. Must be called for each to `ticketLockAcquire`.
// Input:
// ticketLock - Pointer to context.
// Output:
// True if there was an error, false if not.
//-----------------------------------------------------------------------------
bool ticketLockRelease( TicketLock * ticketLock )
{
bool isError = false;
isError |= ( 0 != pthread_mutex_lock( &ticketLock->lock ) );
// Decrease the mutex count.
ticketLock->count -= 1;
// If the mutex is free.
if ( 0 == ticketLock->count )
{
ticketLock->lockingThread = 0;
// Advance to next mutex in line.
ticketLock->head += 1;
// Signal other threads the resource has been released.
isError |= ( 0 != pthread_cond_broadcast( &ticketLock->condition ) );
}
isError |= ( 0 != pthread_mutex_unlock( &ticketLock->lock ) );
return isError;
}
//-----------------------------------------------------------------------------
// Uses:
// Lock mutex.
// Input:
// ticketLock - Pointer to context.
// Output:
// True if there was an error, false if not.
// Note:
// May unblock if the mutex has been signaled to shutdown. Returns `true`
// to denote an abnormal acquisition of resources.
//-----------------------------------------------------------------------------
bool ticketLockAcquire( TicketLock * ticketLock )
{
bool isError = false;
bool skipLock = false;
// Don't try for a lock if not running--just return an error.
isError |= ( ! ticketLock->isRunning );
if ( ! isError )
{
isError |= ( 0 != pthread_mutex_lock( &ticketLock->lock ) );
// If the same thread is requesting the lock, increase the mutex count.
pthread_t currentThread = pthread_self();
if ( ( ticketLock->count > 0 )
&& ( currentThread == ticketLock->lockingThread ) )
{
ticketLock->count += 1;
skipLock = true;
}
// If the mutex doesn't already belong to us...
if ( ! skipLock )
{
// Get a ticket number.
unsigned ticket = ticketLock->tail;
ticketLock->tail += 1;
// Wait for our ticket number, shutdown, or an error.
while ( ( ticketLock->head != ticket )
&& ( ticketLock->isRunning )
&& ( ! isError ) )
{
// Wait for mutex release.
isError |= ( 0 != pthread_cond_wait( &ticketLock->condition, &ticketLock->lock ) );
}
if ( ! isError )
{
// Note what thread has the lock (this thread).
ticketLock->lockingThread = currentThread;
// Now one waiting lock count.
ticketLock->count += 1;
isError |= ( ! ticketLock->isRunning );
}
}
isError |= ( 0 != pthread_mutex_unlock( &ticketLock->lock ) );
}
return isError;
}
//-----------------------------------------------------------------------------
// Uses:
// Signal shutdown to cause all blocking threads to unblock.
// Input:
// ticketLock - Pointer to context.
// Output:
// True if there was an error, false if not.
// Note:
// Calling this function will cause all locks on this mutex (pending and
// future) to fail.
// The mutex is not functional after shutdown and should be destroyed.
//-----------------------------------------------------------------------------
bool ticketLockShutdown( TicketLock * ticketLock )
{
// Denote the we are no longer running.
ticketLock->isRunning = false;
// Signal all waiting about pending shutdown so they unblock.
bool isError = ( 0 != pthread_cond_broadcast( &ticketLock->condition ) );
return isError;
}
//-----------------------------------------------------------------------------
// Uses:
// Setup a ticket mutex. Must be called before mutex is used.
// Input:
// ticketLock - Pointer to context to setup.
// Note:
// Call `ticketLockDestroy` when done with mutex to release resources.
//-----------------------------------------------------------------------------
void ticketLockInitialize( TicketLock * ticketLock )
{
// Initialize the mutex and conditional wait context.
pthread_mutex_init( &ticketLock->lock, NULL );
pthread_cond_init( &ticketLock->condition, NULL );
ticketLock->isRunning = true;
ticketLock->lockingThread = 0;
ticketLock->count = 0;
ticketLock->head = 0;
ticketLock->tail = 0;
}
//-----------------------------------------------------------------------------
// Uses:
// Release mutex resources. Call when mutex is no longer needed.
// Input:
// ticketLock - Pointer to context to destory.
//-----------------------------------------------------------------------------
void ticketLockDestroy( TicketLock * ticketLock )
{
pthread_mutex_destroy( &ticketLock->lock );
pthread_cond_destroy( &ticketLock->condition );
}
enum { THREADS = 8 };
static pthread_t threads[ THREADS ];
static TicketLock printMutex;
static bool volatile isRunning = true;
//-----------------------------------------------------------------------------
// Uses:
// Sleep specified number of milliseconds.
// Input:
// ms - Milliseconds to sleep. Must be less than 1000.
//-----------------------------------------------------------------------------
static void sleep_ms( unsigned ms )
{
struct timespec sleepValue = { 0, 0 };
sleepValue.tv_nsec = ms * 1000000;
nanosleep( &sleepValue, NULL );
}
//-----------------------------------------------------------------------------
// Uses:
// Thread to hog print mutex.
// Input:
// parameter - Thread ID.
// Output:
// Output is unused and simple returns parameter.
//-----------------------------------------------------------------------------
static void * threadFunction( void * parameter )
{
// Recast parameter as unsigned.
// Double-cast needed.
unsigned threadIndex = (unsigned)(uintptr_t)parameter;
// Random sleep before thread begins.
sleep_ms( rand() % 50 );
bool isError = false;
while ( ( isRunning )
&& ( ! isError ) )
{
isError |= ticketLockAcquire( &printMutex );
if ( ! isError )
printf( "Thread: %u\n", threadIndex );
struct timespec sleepValue = { 0 };
sleepValue.tv_nsec = ( 150 + rand() % 150 ) * 1000000;
nanosleep( &sleepValue, NULL );
isError |= ticketLockRelease( &printMutex );
}
if ( isError )
fprintf( stderr, "****** Thread %u had an error! ******\n", threadIndex );
return parameter;
}
//-----------------------------------------------------------------------------
// Uses:
// Ticket lock example.
// Output:
// Always returns 0.
//-----------------------------------------------------------------------------
int main()
{
// Setup random number generator.
srand( time( NULL ) );
ticketLockInitialize( &printMutex );
printf( "Hit a Enter to stop.\n" );
// Create and start all threads.
for ( unsigned index = 0; index < THREADS; index += 1 )
pthread_create( &threads[ index ], NULL, threadFunction, (void*)(uintptr_t)index );
// Wait for enter to be pressed.
fgetc( stdin );
// Shutdown print mutex. Should cause an error.
ticketLockShutdown( &printMutex );
// Signal shutdown.
isRunning = false;
// Wait for all threads to stop.
for ( unsigned index = 0; index < THREADS; index += 1 )
pthread_join( threads[ index ], NULL );
ticketLockDestroy( &printMutex );
return 0;
}
There are just two changes and one new function. The ticket lock context will get a flag called isRunning. By default this flag is true. When forcibly shutting down, the flag is set to false. A new function called ticketLockShutdown sets the flag to false, and broadcasts a condition change to all blocking threads. The last change is in the while loop used to wait for the desired ticket number. It will additionally stop if isRunning is not set and return an error should that condition arise.