マルチプラットフォームなスレッドクラス

pthreadとwin32 threadのインターフェースを統一したライブラリです.インターフェースおよびpthread側の実装はboost::threadベース,win32 thread側の実装はいろいろなWebサイトを参考に行いました*1

実装したものは,thread,mutex (mutex, recursive_mutex),condition,call_onceです.劣化boost::threadですが,ヘッダファイルだけなのでboostのビルドが面倒なときに使う分には良いかな.pthreadを先に作ったせいで,win32 thread側は,インターフェースを合わせるために取り合えず作ったメソッドもいくつかあって少し不恰好です.もう少し,修正する必要がありそうです.

動作確認は,gcc 4.x.xとVisualStudio 2005で行っています.

使用関数(API)一覧

ものぐさ備忘録:スレッドの関数一覧(pthread, win32スレッド)の対応表が分かり易かったので,実装に使った関数一覧を載せてみます(各クラスのインターフェースは後述).

thread

機能(メソッド名) pthread win32 thread
スレッドの作成(コンストラクタ) pthread_create CreateThread
使い終わったスレッドidを開放する(デストラクタ) -*2 CloseHandle
自スレッドの終了(clx::exit) pthread_exit ExitThread
スレッド終了時に自動的に確保したリソース
を解放する(detach)
pthread_detach -*3
他スレッドの終了を待つ(join) pthread_join WaitForSingleObject
現在実行中のスレッドidを取得する(get_id) pthread_self GetCurrentThread

mutex,recursive_mutex

機能(メソッド名) pthread win32 thread
mutexの作成(コンストラクタ) pthread_mutex_init CreateMutex
mutexの破壊(デストラクタ) pthread_mutex_destroy CloseHandle
mutexのロックを取る(lock) pthread_mutex_lock WaitForSingleObject
mutexのロックを解除する(unlock) pthread_mutex_unlock ReleaseMutex

condition

conditionは,スレッド間での待受/通知制御を行うためのクラスです.

機能(メソッド名) pthread win32 thread
conditionの作成(コンストラクタ) pthread_cond_init CreateEvent
conditionの破壊(デストラクタ) pthread_cond_destroy CloseHandle
イベントが発生するまで待機する(wait) pthread_cond_wait WaitForSingleObject
同上,タイマー付き(timed_wait) pthread_cond_timedwait WaitForSingleObject
待機しているスレッドに通知する(notify_one) pthread_cond_signal SetEvent
待機している全てのスレッドに通知する(notify_all) pthread_cond_broadcast -*4

call_once

機能(関数名) pthread win32 thread
指定した関数を一度だけ実行する(call_once) pthread_once -*5

サンプルコード

スレッド作成時に渡せる関数(オブジェクト)は,

void function();

または,

class functor {
public:
    void operator()();
};

となります.pthread_createやCreateThreadでは引数を一つ渡せるのですが,実装の都合上,引数は渡せません.

mutex::scoped_lockは,コンストラクタでmutex.lock(),デストラクタでmutex.unlock()を呼ぶクラスです.lock()/unlock()を呼ぶ代わりに,mutexを引数にmutex::scoped_lockオブジェクトを作成することにより,スコープ範囲から抜けると同時にunlock()が呼ばれるので,unlock()のし忘れを防ぐことができます(boost::threadを参照).

call_once()は,関数ポインタの他に関数オブジェクトも指定することができます.ただし,mutexやconditionはコピー不可なクラスなので,指定した関数オブジェクトがmutexやconditionなどのコピー不可なメンバ変数を持っていた場合は,うまく機能しません.

#include <iostream>
#include <sstream>
#include <string>
#include <list>
#include <stdexcept>
#include "clx/thread.h"

/* ------------------------------------------------------------------------- */
//  trivial_queue
/* ------------------------------------------------------------------------- */
template <class Type>
class trivial_queue {
public:
    typedef Type value_type;
    
    trivial_queue() : list_(), mutex_(), not_empty_() {}
    virtual ~trivial_queue() {}
    
    void enqueue(const value_type& x) {
        clx::mutex::scoped_lock lock(mutex_);
        std::cout << "PSH: " << x << " (rest " << list_.size() << ")" << std::endl;
        list_.push_back(x);
        not_empty_.notify_one();
    }
    
    value_type dequeue() {
        clx::mutex::scoped_lock lock(mutex_);
        while (list_.empty()) {
            std::cout << "empty buffer" << std::endl;
            not_empty_.wait(lock);
        }
        value_type tmp = list_.front();
        std::cout << "POP: " << tmp << " (rest " << list_.size() << ")" << std::endl;
        list_.pop_front();
        return tmp;
    }
    
    void reset() {
        clx::mutex::scoped_lock lock(mutex_);
        list_.clear();
    }
    
private:
    std::list<value_type> list_;
    clx::mutex mutex_;
    clx::condition not_empty_;
};

trivial_queue<std::string> data_;
clx::once_flag once_ = CLX_ONCE_INIT;

/* ------------------------------------------------------------------------- */
//  init_data
/* ------------------------------------------------------------------------- */
void init_data() {
    std::cout << "reset data queue" << std::endl;
    data_.reset();
}

/* ------------------------------------------------------------------------- */
//  send_something
/* ------------------------------------------------------------------------- */
void send_something() {
    for (int i = 0; i < 10; i++) {
        clx::call_once(once_, init_data); // call_once test
        std::stringstream ss;
        ss << "element[" << i << "]";
        data_.enqueue(ss.str());
    }
}

/* ------------------------------------------------------------------------- */
//  recv_something
/* ------------------------------------------------------------------------- */
void recv_something() {
    std::string s;
    for (int i = 0; i < 10; i++) {
        clx::call_once(once_, init_data); // call_once test
        s = data_.dequeue();
    }
}

/* ------------------------------------------------------------------------- */
//  main
/* ------------------------------------------------------------------------- */
int main(int argc, char* argv[]) {
    clx::thread enq_th[2];
    clx::thread deq_th[2];
    
    // 第2引数がtrueの場合は,detach属性(joinで後処理を行う必要がない)
    for (size_t i = 0; i < 2; i++) enq_th[i].start(send_something, true);
    for (size_t i = 0; i < 2; i++) deq_th[i].start(recv_something, true);
    
    /*
     * detach属性の場合,作成したスレッドが仕事を終える前にメインスレッドが
     * 終了する可能性があるので,clx::exit(0)で待つ.
     */
    clx::exit(0);
}

実行結果

$ ./test
reset data queue
empty buffer
empty buffer
PSH: element[0] (rest 0)
PSH: element[0] (rest 1)
PSH: element[1] (rest 2)
PSH: element[1] (rest 3)
POP: element[0] (rest 4)
PSH: element[2] (rest 3)
POP: element[0] (rest 4)

・・・(以下略)・・・

インターフェース

全てのクラス,関数はclx名前空間の中に定義してあります.

/* ------------------------------------------------------------------------- */
//  thread
/* ------------------------------------------------------------------------- */
class thread {
public:
    thread();
    template <class Functor>
    explicit thread(Functor f, bool detached = false);
    virtual ~thread();
    
    template <class Functor>
    void start(Functor f, bool detached = false);
    
    bool joinable() const;
    void join();
    void detach();
    void sleep(double sec);
    
    handle_id_type get_id();
};

/* ------------------------------------------------------------------------- */
/*
 *  mutex
 *
 *  recursive_mutexもインターフェースは同じ
 */
/* ------------------------------------------------------------------------- */
class mutex {
public:
    typedef unique_lock<mutex> scoped_lock;
    typedef ... handle_pointer;
    
    mutex();
    virtual ~mutex();
    
    void lock();
    void unlock();
    handle_pointer native_handle();
};

/* ------------------------------------------------------------------------- */
//  condition
/* ------------------------------------------------------------------------- */
class condition {
public:
    typedef ... handle_pointer;
    
    condition();
    virtual ~condition();
    
    template <class LockT>
    void wait(LockT& lock);
    
    template <class LockT>
    bool timed_wait(LockT& lock, double sec);
    
    void notify_one();
    void notify_all();
    handle_pointer native_handle();
};

/* ------------------------------------------------------------------------- */
//  call_once
/* ------------------------------------------------------------------------- */
template <class Functor>
void call_once(once_flag& flag, Functor f);

*1:win32 thread側の実装もboost::threadベースにしようと思ったのですが,ソースを追いきれなかったorz

*2:開放する必要なし

*3:デフォルトでこの設定

*4:取り合えず,インターフェースを合わせるためにnotify_oneと同じ実装にしている

*5:InterlockedIncrementを利用して実装している