…what were we talking about?

Last time, we coded a small OpenMP-style parallel construct using some macro directives and a class wrapping a vector of threads.

This time we will add a replacement for 2 OpenMP library functions: omp_get_num_threads() and omp_get_thread_num(). These are among the most used (and useful) OpenMP functions.

I’ll show you several implementations, but to start we need a little clean-up in the thread_pool class.

first try: adding methods

Following there’s the source for the thread_pool class, with a few modifications I’ll explain later. I suppose that you put it in a file called “thread_pool.h” in your working directory, if you put it elsewhere, change the #include in the samples to make them work.

#include <thread>
#include <algorithm>
#include <vector>
#include <iostream>
#include <functional>

using namespace std;

typedef function <void ()> task;
class thread_pool {
  private:
    vector<thread> the_pool;

  public:
    thread_pool(unsigned int num_threads, task tbd) {
      for(int i = 0; i < num_threads; ++i) {
        the_pool.push_back(thread(tbd));
      }
    }

    void join() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.join();});
    }

    void nowait() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.detach();});
    }
    
    int get_num_threads() { return the_pool.size(); }
    
    int get_thread_num() {
      for(int i = 0; i < the_pool.size(); ++i)
        if(the_pool[i].get_id()==this_thread::get_id()) return i;
      return -1;
    }

};

#define parallel_do_(N) thread_pool (N, []()
#define parallel_do parallel_do_(thread::hardware_concurrency())
#define parallel_end ).join();
#define parallel_end_nowait ).nowait();

The first important change in the thread_pool class is the definition of task, it's no more a function pointer but ‐ more correctly ‐ a std::function returing void and taking no arguments. This way it works with function pointer and lambda arguments, and allows us to capture variables in lambdas.

The mandatory usage example:

#include "thread_pool.h"

int main() {

    thread_pool p(4, [&p] () {
        cout << "I'm thread n. " << p.get_thread_num() 
             << " in a pool of " << p.get_num_threads() << endl;
    });

    // You could do other things before joining...
    p.join();

    return 0;
}

As you see, there's a thread_pool instance named p, and in the code passed to the constructor (and executed by four threads), the object p itself is used to call the methods get_num_threads() and get_thread_num(). This "magic" is made possible because the variable p has been captured. The square brackets in lamdas are used for this purpose (as usual, I'm not explaining the whole thing, but there's plenty of information on the net).

This solution works, but requires our pools to be named, so we should modify our macro definition to include the pool name as a parameter. We can do better.

second try: the global map

I want to say it loud and clear: I don't like this second solution at all, it's not elegant and uses a global object. I'm not even going to show you a complete example, just a modified version of the thread_pool class to give you the idea of how it could be done.

//... includes omitted
int get_thread_num();
int get_num_threads();

typedef function <void ()> task;
class thread_pool {
  private:
    typedef map<thread::id, thread_pool*> thread_map;
    static thread_map allthreads;
    friend int get_thread_num();
    friend int get_num_threads();
    vector<thread> the_pool;

  public:
    thread_pool(unsigned int num_threads, task tbd) {
      for(int i = 0; i < num_threads; ++i) {
        the_pool.push_back(thread(tbd));
        allthreads.insert(
          thread_map::value_type(the_pool[i].get_id(), this));
      }
    }
    
    ~thread_pool() {
       for_each(the_pool.begin(), the_pool.end(), [] (thread& t) { 
         allthreads.erase(t.get_id()); 
        });
    }

    void join() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.join();});
    }

    void nowait() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.detach();});
    }
    
    int get_num_threads() { return the_pool.size(); }
    
    int get_thread_num() {
      for(int i = 0; i < the_pool.size(); ++i)
        if(the_pool[i].get_id()==this_thread::get_id()) return i;
      return -1;
    }

};

int get_thread_num() {
  thread_pool * p = thread_pool::allthreads[this_thread::get_id()];
  return p->get_thread_num();
}

int get_num_threads() {
  thread_pool * p = thread_pool::allthreads[this_thread::get_id()];
  return p->get_num_threads();
}

// This should be in a .cc file!
map<thread::id, thread_pool*> thread_pool::allthreads;

How it works:

  • the object allthreads associate every thread in a thread_pool to its pool.
  • thread_pool's constructor and destructor take care of adding and removing entries to the map
  • the static friend functions get_num_threads() and get_thread_num() use allthreads to get a pointer to the thread's pool and invoke the homonymous instance methods

I'm not going to complete or discuss further this example, because I want to show you a better way.

a better way: thread local storage

Thread-local storage is a way to let each thread mantain its own version of a global variable or memory region. The idea is to use two thread-local variables to store num_threads and thread_num.

C++11 introduces the storage specifier thread_local to declare thread-local variables. Sadly, many compilers don't support it yet, and GCC is one of them, so I'll use the __thread builtin for this compiler, but the principle is the same.

Here is the resulting thread_pool class.

#include <thread>
#include <algorithm>
#include <vector>
#include <iostream>
#include <functional>

using namespace std;

#ifdef __GNUG__
static __thread int thread_num;
static __thread int num_threads;
#else
static thread_local int thread_num;
static thread_local int num_threads;
#endif 

typedef function <void ()> task;
class thread_pool {
  private:
    vector<thread> the_pool;

  public:
    thread_pool(unsigned int n_threads, task tbd) {
      for(int i = 0; i < n_threads; ++i) {
        the_pool.push_back(thread([=] () {
          thread_num = i;
          num_threads = n_threads;
          tbd();
        }));
      }
    }
    
    void join() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.join();});
    }

    void nowait() {
      for_each(the_pool.begin(), the_pool.end(), 
        [] (thread& t) {t.detach();});
    }
    
};

#define parallel_(N) thread_pool (N, []()
#define parallel parallel_(thread::hardware_concurrency())
#define parallel_end ).join();
#define parallel_end_nowait ).nowait();
#define single if(thread_num==0) 

The local copy of num_threads and thread_num are initialized in thread_pool's constructor. Again, we are using variable capture to access the referenced variables inside the thread code.

Example:

#include "thread_pool.h"
#include <iostream>

int main() {

    parallel_(4)
    {
      cout << "I'm thread " << thread_num << " of " 
           << num_threads << endl;
      single
      {
        cout << "This region is executed only by thread " 
             << thread_num << endl;
      }
    }
    parallel_end
    
    return 0;
}

In the example we have a parallel region executed by four threads, with a nested region executed only by the first thread in the pool. Thanks to the thread-local variables and to the macros, the code is both readable and concise.

That's it. Leave a comment to ask a question, suggest an improvement or share a thought.

Tags: , , ,

Comments are closed.