cppnow2012 John Wiegley C++11 High-Level Threading

这个ppt讲的是std::thread, 算是一个教学指南

构造函数声明是这个样子的

 struct thread{
 template<class F, class ...Args> explicit
         thread(F&&f, Args&&... args);
 };

一例

#include <thread>
std::map<std::string, std::string> french 
{ {"hello","bonjour"},{"world","tout le monde"} };

int main(){
    std::string greet = french["hello"];
    std::thread t([&]{std::cout<<greet<<", ";});
    std::string audience = freanch["world"];
    t.join();
    std::cout<< audience<<std::endl;
}

这是普通用法,如果想传参数引用,而不是捕获怎么办 ->转成ref,std::ref

std::thread t([](const std::string& s){std::cout<<s<<", ";}, std::ref(greet));

` std::thread overreview`

不可复制 ->移动语义

系统相关的细节不涉及(调度,优先级)

joinability

几个条件

  • 肯定有线程id,不是默认的
  • 可以join或detach的,或者调用过的,肯定是joinable的, 如果joinable没调用join/detach,析构就会调用std::terminate
  • 当析构或移动了之后肯定不是joinable的,这时候去访问(join)肯定会挂的。

所以这就有几个典型异常场景

  • system_error
  • 资源耗尽导致launch fail
  • detach /join fail,比如不是joinable,或者死锁了,join挂掉
  • 线程函数抛异常,这个就比较傻逼了,肯定调不到join或者detach

就上面的例子,第八行如果挂掉,线程就忘记join了。这样thread析构会直接调用std::terminate,所以需要catch住,在catch里join一下,然后把join转发出去

try {
    std::string audience = french["world"];
} catch(...) {
    t.join; 
    throw;
}

可真难看啊。

this_thread

接口是这样的

namespace this_thread{
    thread::id get_id() noexcept;
    void yield() noexcept;
    
    template <class Clock class Duration>
    void sleep_until(
      const chrono::time_point<Clock, Duration>& abs_time);
    
    template <class Rep, class Period>
    void sleep_for(
      const chrono::duration<Rep,Period>& rel_time);
}

太长了不好记

后面两个可以记成

void sleep_until(time_point);
void sleep_for(duration);

std::async std::future

上面例子的写法很难看,作者使用async 和future重写了一下

#include <future>
...
std::fucure<void> f = std:;async([&]{std::cout<< greet<<", ";});
std::string audience = french["world"];
f.get();
...

Or…

...
auto greet = std::async([]{return french["hellp"];});
std::string audience = french["world"];
std::cout<<greet.get()<<", "<<audience<<std::endl;

这样好看多了

async的launch逻辑

async deferred

一个例子

template <class Iter>
void parallel_merge_sort(Iter start, Itera finish) {
    std::size_t d = std::distance(start, finish);
    if(d<=1) return;
    Iter mid = start; std::advance(mid, d/2);
    auto f = std::async(
        d<768?std::launch::deferred
        : std::launch:deferred | std::launch::async,
        [=]{parallel_merge_sort(start,mid);});
    parallel_merge_sort(mid, finish);
    f.get();
    std::inplace_merge(start,mid,finish);
}

std::future

api是这个样子

enum class future_status{ready, timeout, deferred};
template <class R> struct future {
    future() noexcept;
    bool valid() const noexcept;// ready
    R get();
    void wait() const;// wait for ready
    future_status wait_for(duration) const;
    future_status wait_untile(time_point) const;
    shared_future<R> share();
};

整体是move-only的,有个share接口用于共享

std::shared_future

接口和future差不多,提供copy干脏活的

enum class future_status{ready, timeout, deferred};
template <class R> struct shared_future {
    future() noexcept;
    bool valid() const noexcept;// ready
    R get();
    void wait() const;// wait for ready
    future_status wait_for(duration) const;
    future_status wait_untile(time_point) const;
    shared_future(future<R>&& f) noexcept;
};

std::promise

如果用promise来重构,能彻底异步解耦,代码更好看一些

int main() {
    std::promise<std::string> audience_send;
    auto greet = std::async(
        [](std::future<std::string> audience_rcv)
        {
            std::cout<<french["hello"]  <<", ";
            std::cout<<audience_rcv.get()<<std:;endl;
        },
        audience_send.get_future()//pull
    );
    
    audience_send_value(french["world"]);//push
    greet.wait();
}

std::promise api

template <class R>
struct promise{
    promise();
    template <class Allocator>
        promise(allocator_arg_t, const Allocator& a);
    future<R> get_future(); //pull the future
    void set_value(R); //push, make the future ready
    void set_exception(exception_ptr p);
    void set_value_at_thread_exit(R); //push result but defer readiness
    void set_exception_at_thread_exit(exception_ptr p);
};

推迟异步动作,deferring launch, -> std::packaged_task

更进一步,把 std::async 换成std::packaged_task, 必须显式调用operator()才会执行,不像async立即异步执行

int main(){
    std::packaged_task<std::sring()> do_lookup(
        []{return french["hello"];});
    auto greet = do_lookup.get_future();
    do_lookup();
    std::string audience = french["world"];
    std::cout<<greet.get()<<", "<<audience<<std::endl;
}

std::packaged_task长这个样子

template<class> class packaged_task;// undefined
template<class R, class... ArgTypes>
struct packaged_task<R(ArgTypes...)> {
  packaged_task() noexcept;
  template <class F> explicit packaged_task(F&& f);
  template <class F, class Alloc>
    explicit packaged_task(allocator_arg_t, const Alloc& a, F&& f);
  future<R> get_future();//pull
  bool valid() const noexcept;
  void operator()(ArgTypes...);//make the future ready(push)
  void make_ready_at_thread_exit(ArgTypes...);
  void reset();
};

基于锁的数据共享

基本概念,线程安全,强线程安全

std::mutex

实现一个强线程安全的堆栈

template <class T>
struct shared_stack{
    bool empty() const {
        std::lock_guard<std::mutex>  l(m);
        bool r = v.empty();
        return r;
    }
    T top() const{
        std::lock_guard<std::mutex>  l(m);
        T r = v.back();
        return r;
    }
    void pop();
    void push(T x);
private:
    mutable std::mutex m;
    std::vector<T> v;
};

这里讨论了锁,lock_guard和unique_lock

实现一个线程安全的队列

template<unsigned size, class T>
struct bounded_msg_queue{
    bounded_msg_queue()
        :begin(0),end(0),buffered(0){}
    void send(T x){
        {
            std::unique_lock<std::mutex> lk(broker);
            while(buffered == size)
                not_full.wait(lk);
            buf[end] = x;
            end = (end +1)%size;
            ++buffered;
        }
        not_empty.notify_all();
    }
    T receive(){
        T r;
        {
            std::unique_lock<std::mutex> lk(broker);
            while (buffered == 0)
                not_empty.wait(lk);
            
            r = buf[begin];
            begin = (begin+1) % size;
            -- buffered;
        }
        not_full.notify_all();
        return r;
    }    
private:
    std::mutex broker;
    unsigned int begin, end, buffered;
    T buf[size];
    std::condition_variable not_full, not_empty;
};

std::condition_variable 这个api和pthread原语差不多,不说了

boost::shared_mutex

这个可以用于多读少写的场景,有点读写锁封装的感觉。具体没有研究

ref