2.2. Elementary Multi-threading#
To implement a communication system in software, we often need to run functions that send and/or receive signal samples to and/or from the USRP as well as many other processing functions asynchronously and simultaneously.
This can be done by multi-threaded programming, in which different functions are implemented in different threads. Exactly how many treads to use and which functions should be implemented in which threads are typical design questions.
Multi-threading imposes overheads. If you are pushing the CPU to its limits and concern about overheads, a common rule of thumb is to have the maximum number of simultaneous CPU-hogging threads to be roughly the same as the number of processors (or cores) available. In our Linux boxes, all the processors contain 4 cores with hyper-threading support. Hence the maximum of number of simultaneous CPU-hogging threads should be about 8.
Generally we should have a main thread, the one starts
main()
, and other spawned threads. We will use thestd::thread
class to create, launch, and manage threads.Caution
std::thread
is available in C++11 or after.Dig deeper
You may also use the somewhat more powerful Boost.Thread library to do multi-threading.
2.2.1. Create, launch, and join threads#
The simplest way to create and launch a thread is to instantiate a
std::thread
class object by passing to its constructor a function reference pointing to the function that the thread is to execute. Other input arguments of the function can also be passed to the constructor. See the code example below.After a thread’s job is completed, we should join the thread back to the main thread for clean termination. This is done by the using
std::thread::join()
method, which will block the calling thread until the thread represented by the thread object has terminated. To make sure that a thread can be joined, we may check if the thread is joinable using thestd::thread::joinable()
method. A thread is not joinable if it has been already joined or detached.One may use the namespace
std::this_thread
to invoke a set of functions that access the current thread. For example, calling thestd::this_thread::sleep_for()
method will cause the current thread to pause for a specified amount of time. We should always try to do that whenever possible in a thread to vacate CPU resources to other running threads.Example:
age_threads.cpp
2.2.2. UHD thread priority#
The priority of the main thread can be set using the
uhd::set_thread_priority_safe()
method inmain()
. The same method can be used to set the thread priority in the function that a spawned thread executes. Theuhd::set_thread_priority_safe()
method is a UHD wrapper to set thread priority using Pthreads in UNIX-based systems. Thegcc
compiler, in most of such systems, implements thestd::thread
class based on Pthreads. Hence, you may also use the same method to set priority of threads that you create usingstd::thread
. This also applies if you use the Boost.Thread library instead.The two input parameters of
uhd::set_thread_priority_safe()
are:priority
: afloat
in [0,1] with 0 and 1 respectively meaning lowest and highest priority for the thread (default = 0.5)realtime
: abool
specifying whether real-time scheduling should be enabled or not (default =true
)
Setting
priority
to 1 andrealtime
totrue
typically gives the thread highest priority. This is what we want to do for threads that implement time-critical processing, such as sending samples from the host to the USRP and grabbing samples back to the host from the USRP.Note
Real-time scheduling means a certain way to schedule a fair and constant access to the CPU. Such a restriction may not necessarily reduce the run time of a program.
Example: age_threads.cpp
2.2.3. Thread synchronization with mutex#
If you run age_threads.cpp, the console printout is not what you have expected. Despite the messages printed out by different threads sometimes come out cleanly, they often garble together. This is because the threads are running asynchronously without regard of the existence of each other. The characters of the message strings from different threads may be put into the buffer of
std::cout
by the respective<<
operators in the threads in some unpredictable order, garbling the output.The situation can be worse when multiple threads are writing to the same memory location that holds a pointer to an object. A race condition may occur and the content of memory location may become unpredictable. Any subsequent reference to the object may cause the program to crash! The technical terminology to describe this situation is that such an implementation is not thread-safe.
Note
Strictly speaking, both
std::cout
and the<<
operator are thread-safe because the implementation indeed prevents multiple threads from writing to the buffer ofstd::cout
simultaneously. In fact, age_threads.cpp doesn’t crash. However, this thread-safe property applies only on a character-by-character basis; hence we still have the messages from different threads garbled together.Caution
Neither
uhd::tx_streamer::send()
noruhd::rx_streamer::recv()
is thread-safe.In order to make age_threads.cpp work in the way that we want it to, we need to provide a form of synchronization among all the threads such that a thread can get exclusive access to
std::cout
, blocking other threads from usingstd::cout
before the thread is done streaming its whole message tostd::cout
.Mutexes provide a basic mechanism for achieving this form of synchronization among threads. We will use the
std::mutex
class to construct mutexes.Dig deeper
You may also use the somewhat more versatile
boost::mutex
class from the Boost.Thread library.There are many ways to use mutexes for thread synchronization. The following three simple ways are pretty much all needed for most of our purposes.
2.2.3.1. Internal locking#
Consider
class SharedPrinter { private: std::mutex mtx; public: void print(std::string message) { mtx.lock(); std::cout << message; mtx.unlock(); } };
which will be used in place of
std::cout
in age_threads.cppSuppose that a thread calls
SharedPrinter::print()
. Thestd::mutex::lock()
method inSharedPrinter::print()
gives ownership of the mutexSharedPrinter::mtx
to the thread, and blocks other threads from accessing the object. After the whole message is streamed tostd::cout
, the methodstd::mutex::unlock()
releases the current thread’s ownership of the mutex, allowing other threads to access theSharedPrinter
object again. As a result, only one thread can stream a message tostd::cout
at a time, and no other thread can stream tostd::cout
until the current thread finishes streaming its whole message.Note that
std::mutex::lock()
is a blocking call. It will not return until the mutex is unlocked by its current owner and the calling thread obtains ownership of the mutex. The methodstd::mutex::try_lock()
attempts to obtain the mutex’s ownership for the current thread without blocking and returnstrue
if succeeds.One may also use a scoped lock in the definition of
SharedPrinter
in place of the explicit calling ofstd::mutex::lock()
andstd::mutex::lunlock()
:class SharedPrinter { private: std::mutex mtx; public: void print(std::string message) { std::lock_guard<std::mutex> scoped_lock(mtx); std::cout << message; } };
The object
scoped_lock
’s constructor and destructor locks and releases the mutex, respectively.Example code showing how to use
SharedPrinter
: age_mutex.cpp
2.2.3.2. Caller-ensured locking#
Consider the following implementation of the
SharedPrinter
class:class SharedPrinter { private: std::mutex mtx; public: void print(std::string message) { std::cout << message; } void lock() { mtx.lock(); } void unlock() { mtx.unlock(); } };
where the mutex locking and unlocking methods are exposed to the caller function. The class method
SharedPrinter::print()
is not thread-safe and the caller is responsible for blocking other threads from accessing it.Example code showing how to use this caller-ensured locking version of
SharedPrinter
: age_mutex_caller.cpp
2.2.3.3. Internal + Caller-ensured locking#
We may obtain the best of both methods above by the following implementation of the
SharedPrinter
class:class SharedPrinter { private: std::recursive_mutex mtx; public: void print(std::string message) { std::lock_guard<std::recursive_mutex> scoped_lock(mtx); std::cout << message; } void lock() { mtx.lock(); } void unlock() { mtx.unlock(); } };
where the class method
SharedPrinter ::print()
is now thread-safe.Note that we have to use
std::recursive_mutex
in this implementation to allow for the possibility of the same mutex being locked repeatedly, internally and by the caller.Experiment
Use this new implementation of
SharedPrinter
in age_mutex_caller.cpp. Compile and run your modified code to observe the results.Instead of
std::recursive_mutex
in the implementation ofSharedPrinter
above, usestd::mutex
to construct the scoped lock. Compile and test the resulting code. Explain what happens.Hint
If the program hangs, don’t panic. You may press CTRL-Z to stop it. Then run
ps
to find out its process number andkill -9 <process number>
to remove it.
2.2.4. Passing information/signal between threads using atomic objects#
Information and data may be passed between threads via shared objects. The methods to access these shared objects should be thread-safe. That can be achieved by employing the mutex-based techniques discussed before.
When we want to share a variable of fundamental type, such as
bool
,int
,float
, and etc, we may effectively achieve locking by declaring the variable asatomic
using thestd::atomic<>
template. An atomic object is free from data races. If one thread writes to an atomic object while another thread reads from it, the behavior is well-defined.The basic methods to write to and read from an atomic variable are
std::atomic::store()
andstd::atomic::load()
, respectively. One may also use the=
operator in place ofstd::atomic::store()
. Other operators, such as++
and--
, and operation methods are also available for atomic objects. See this link for details.Dig deeper
An atomic object may be used to synchronize access to other non-atomic objects near it in different threads by specifying different memory orders. See [1] for details. By default,
std::atomic::store()
andstd::atomic::load()
employ the strictest memory order. The=
operator is equivalent tostd::atomic::store()
with the strictest memory order.One may also use atomic objects to pass signals from one thread to others in order to trigger their operations.
Example:
count_atomic.cpp
Dig deeper
A more sophisticated, CPU-efficient way to pass signals between threads is to use mutexes together with condition variables. See this simple example and [1] for a more detailed exploration.
2.2.5. Thread-safe FIFO queue#
We will often encounter the common scenario in which the output generated by a thread needs to be passed on to another thread for further processing. Since all threads are running asynchronously, we need to provide thread-safe buffering between the data-generating (producer) and data-receiving (consumer) threads.
A simple example of such thread-safe buffering is the FIFO queue implemented in the
MutexFIFO
class template defined in the header filequeue.hpp
.Example code that shows how to use this FIFO queue: age_fifo.cpp
One should make sure that the arrival rate to the FIFO queue should be smaller than its service rate. Otherwise the size of the queue will grow and eventually cause memory problems.