本文是参考链接1 的翻译,感谢原作者。
考虑一个简单的需求,Compute函数接口,有这么两个接口
// lib V1:
void Compute(int in, int& out) { }
// lib V2:
void Compute(double in, double& out) { }
假如我想用v2的接口但是用不到,可能就需要自己适配,如何灵活的检查这个接口呢?
首先想到的就是宏,判断接口使用的版本,区分出,然后自己适配接口。但是问题在于接口宏混乱以后无法灵活的适配
这时候就需要第三种方法,就是模板探测惯用法了
template <typename T, typename = void>
struct is_compute_available : std::false_type {};
template <typename T>
struct is_compute_available<T,
std::void_t<decltype(Compute(std::declval<T>(),
std::declval<T&>())) >> : std::true_type {};
其中std::void_t吃掉decltye的类型。如果Compute对于指定的T没有匹配,整体就会退化到第一个匹配
关于std::void_t 参考链接2 3也可以看一下。有很多类似的用法。新时代的std::enable_if
上面的整体封装一下
// helper variable template
template< class T> inline constexpr bool is_compute_available_v =
is_compute_available<T>::value;
template <typename T>
void ComputeTest(T val)
{
if constexpr (is_compute_available_v<T>)
{
T out { };
Compute(val, out);
}
else
{
std::cout << "fallback...\n";
}
}
回到这篇文章想要讨论的函数,std::from_chars.鉴于这些功能实现的还不完整,msvc全实现了,gcc和clang只实现了T=int
所以套用上面的解决方案,如果用宏,能区分gcc clang和msvc,但是具体到gcc和clang呢?
考虑feature test macros,但是这个只能确定有没有这个功能,不能确定这个功能支持啥类型
最终,使用模板探测惯用法
template <typename T, typename = void>
struct is_from_chars_convertible : false_type {};
template <typename T>
struct is_from_chars_convertible<T,
void_t<decltype(from_chars(declval<const char*>(), declval<const char*>(), declval<T&>()))>>
: true_type {};
template <typename T>
[[nodiscard]] std::optional<T> TryConvert(std::string_view sv) noexcept {
T value{ };
if constexpr (is_from_chars_convertible<T>::value) {
const auto last = sv.data() + sv.size();
const auto res = std::from_chars(sv.data(), last, value);
if (res.ec == std::errc{} && res.ptr == last)
return value;
}
else {
try {
std::string str{ sv };
size_t read = 0;
if constexpr (std::is_same_v<T, double>)
value = std::stod(str, &read);
else if constexpr (std::is_same_v<T, float>)
value = std::stof(str, &read);
if (str.size() == read)
return value;
}
catch (...) { }
}
}
不支持的类型放到try里用以前的api来搞。全文完。参考6中提到了作者提到的几个参考链接。非常棒。
这篇文章是参考链接的翻译,简而言之,如何写一个反向代理
第一版,代码在这里 https://github.com/gpjt/rsp/blob/f214f5a75e112311b90c81ad823bf86c3900b03d/src/rsp.c
下面的介绍错误处理都去掉了
首先要明白反向代理干啥的,就是接收一个连接,转发给设定好的后端节点。代理/转发
那大概就有了框架
主程序负责接收,处理客户端连接 handle_client_connection
int main(int argc, char *argv[]) {
...
if (argc != 4) {
fprintf(stderr,
"Usage: %s <server_port> <backend_addr> <backend_port>\n",
argv[0]);
exit(1);
}
server_port_str = argv[1];
backend_addr = argv[2];
backend_port_str = argv[3];
server_socket_fd = socket(addr_iter->ai_family,
addr_iter->ai_socktype,
addr_iter->ai_protocol);
bind(server_socket_fd, addr_iter->ai_addr, addr_iter->ai_addrlen)
listen(server_socket_fd, MAX_LISTEN_BACKLOG);
while (1) {
client_socket_fd = accept(server_socket_fd, NULL, NULL);
handle_client_connection(client_socket_fd, backend_addr, backend_port_str);
}
}
处理客户端连接呢,就是客户端来啥,我转发啥,后端节点来啥,原样转发给客户端fd
void handle_client_connection(int client_socket_fd,
char *backend_host,
char *backend_port_str)
{
backend_socket_fd = socket(addrs_iter->ai_family,
addrs_iter->ai_socktype,
addrs_iter->ai_protocol);
connect(backend_socket_fd,
addrs_iter->ai_addr,
addrs_iter->ai_addrlen) != -1)
bytes_read = read(client_socket_fd, buffer, BUFFER_SIZE);
write(backend_socket_fd, buffer, bytes_read);
while (bytes_read = read(backend_socket_fd, buffer, BUFFER_SIZE)) {
write(client_socket_fd, buffer, bytes_read);
}
}
这是简单的同步写法。当然nginx肯定要用epoll来尽量异步
转自 http://spiritsaway.info/lockfree-hashtable.html
对于让hashtable变成无锁,主要要解决如下几个问题:
在preshing的一篇文章里面谈到了无锁线性扫描的实现,这里定义了一个基本的Entry
:
struct Entry
{
mint_atomic32_t key;
mint_atomic32_t value;
};
Entry *m_entries;
在这个Entry
里,我们规定如果key
的值为0,则代表这个entry
还没有被使用,所以插入的时候禁止传入为0的key
。
在此结构之下,定义的setItem
操作如下:
void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
for (uint32_t idx = 0;; idx++)
{
uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
if ((prevKey == 0) || (prevKey == key))
{
mint_store_32_relaxed(&m_entries[idx].value, value);
return;
}
}
}
类似的getItem
的操作如下:
uint32_t ArrayOfItems::GetItem(uint32_t key)
{
for (uint32_t idx = 0;; idx++)
{
uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
if (probedKey == key)
return mint_load_32_relaxed(&m_entries[idx].value);
if (probedKey == 0)
return 0;
}
}
现在的疑问在于,这里的原子操作使用的都是relaxed
语义,这个语义在x86
上基本等于没有任何作用,如何在使得SetItem
里的第8行能够被GetItem
的第7行可见。事实上这压根做不到,因为一个线程在执行到SetItem
的第8行之前被换出, 然后另外一个线程执行到了GetItem
的第7行,这里读取的还是老的值。除了这种情况之外,还可能出现SetItem
里的CAS
操作并没有将数据更新的通知发放到其他的core
上去,然而第8行的store
操作已经被另外一个执行GetItem
的线程可见的情况,此时GetItem
会返回0。这两种情况都是合法的,因为在多线程中读取数据的时机是不确定的,因此读取老数据也是正常的。甚至可以说在没有通知机制的情况下,是不是最新根本没有意义。如果要实现publish-listen
的机制,则需要在SetItem
的时候将一个原子的bool
变量设置为True
,同时这个Store
操作要使用Release
语义,同时另外一个线程在CAS
这个值的时候,要使用Acquire
语义。
// Shared variables
char message[256];
ArrayOfItems collection;
void PublishMessage()
{
// Write to shared memory non-atomically.
strcpy(message, "I pity the fool!");
// Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
mint_thread_fence_release();
// Set a flag to indicate to other threads that the message is ready.
collection.SetItem(SHARED_FLAG_KEY, 1)
}
这样才能使得数据更改完并通知完之后,另外一方能够得到最新数据。因此,当前设计的无锁hashtable
在多线程上唯一做的事情就是防止了多个线程对同一个entry
同时做SetItem
操作。
preshing对SetItem
有一个优化:减少不必要的CAS
操作。在原来的实现中会遍历所有的元素去执行CAS
操作,其实只有key == 0 or key == my_key
的时候我们才需要去做CAS
。所以这里的优化就是预先作一次load
,发现可以去set
的时候才去CAS
。
void ArrayOfItems::SetItem(uint32_t key, uint32_t value)
{
for (uint32_t idx = 0;; idx++)
{
// Load the key that was there.
uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
if (probedKey != key)
{
// The entry was either free, or contains another key.
if (probedKey != 0)
continue; // Usually, it contains another key. Keep probing.
// The entry was free. Now let's try to take it using a CAS.
uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
if ((prevKey != 0) && (prevKey != key))
continue; // Another thread just stole it from underneath us.
// Either we just added the key, or another thread did.
}
// Store the value in this array entry.
mint_store_32_relaxed(&m_entries[idx].value, value);
return;
}
}
在上面的lockfree linear scan
的基础上,做一个lockfree hashtable
还是比较简单的。这里定义了三个函数intergerHash, SetItem, GetItem
:
inline static uint32_t integerHash(uint32_t h)
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
这个hash
函数的来源是MurmurHash3’s integer finalizer , 据说这样可以让每一位都起到差不多的作用。
void HashTable1::SetItem(uint32_t key, uint32_t value)
{
for (uint32_t idx = integerHash(key);; idx++)
{
idx &= m_arraySize - 1;
// Load the key that was there.
uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
if (probedKey != key)
{
// The entry was either free, or contains another key.
if (probedKey != 0)
continue; // Usually, it contains another key. Keep probing.
// The entry was free. Now let's try to take it using a CAS.
uint32_t prevKey = mint_compare_exchange_strong_32_relaxed(&m_entries[idx].key, 0, key);
if ((prevKey != 0) && (prevKey != key))
continue; // Another thread just stole it from underneath us.
// Either we just added the key, or another thread did.
}
// Store the value in this array entry.
mint_store_32_relaxed(&m_entries[idx].value, value);
return;
}
}
这里的SetItem
正确工作有一个前提:整个hashtable
不是满的,是满的一定会出错。
GetItem
还是老样子:
uint32_t HashTable1::GetItem(uint32_t key)
{
for (uint32_t idx = integerHash(key);; idx++)
{
idx &= m_arraySize - 1;
uint32_t probedKey = mint_load_32_relaxed(&m_entries[idx].key);
if (probedKey == key)
return mint_load_32_relaxed(&m_entries[idx].value);
if (probedKey == 0)
return 0;
}
}
所谓的publish
函数也是一样:
// Shared variables
char message[256];
HashTable1 collection;
void PublishMessage()
{
// Write to shared memory non-atomically.
strcpy(message, "I pity the fool!");
// Release fence: The only way to safely pass non-atomic data between threads using Mintomic.
mint_thread_fence_release();
// Set a flag to indicate to other threads that the message is ready.
collection.SetItem(SHARED_FLAG_KEY, 1)
}
至于delete
操作,我们可以规定value
是某个值的时候代表当前entry
是被删除的,这样就可以用SetItem(key, 0)
来模拟delete
操作了。
上面的无锁hashtable
有一个致命缺陷,他没有处理整个hashtable
满了的情况。为了处理满的情况,我们需要设置最大探查数量为当前hashtable
的容量, 同时维护多个独立的hashtable
,用一个无锁的链表将所有的hashtable
的指针串联起来。如果最大探查数量达到上限,且当前hashtable
没有下一个hashtable
的指针,且则先建立一个新的hashtable
,并挂载到无锁链表上,回到了有下一个hashtable
的情况,然后对下一个hashtable
做递归遍历。
这样做的确解决了扩容的问题,但是会出现性能下降的问题。后面过来的key
在查询的时候会变得越来越慢,因为经常需要查询多层的hashtable
。为了避免这个问题,出现了一种新的设计:每次添加一层hashtable
的时候,都将容量扩大一倍,然后将上一个hashtable
的内容拷贝到新的hashtable
里。这个新的hashtable
也叫做main_hashtable
,由于我们无法在无锁的情况下把整个hashtable
拷贝过去,所以采用lazy
的方式,这个方式的步骤如下:
hashtable
的无锁链表,链表的头节点就叫做main_hashtable
,所有的hashtable
通过一个next
指针相连;main_hashtable
的装载因子(这个装载因子考虑了所有的key
)已经大于0.5,则新建一个hashtable
,然后插入到新的hashtable
里;hashtable
是空的,大小为当前main_hashtable
的两倍,每次新加入一个hashtable
的时候都插入到头部,使之成为新的main_hashtable
;next
指针一直查询,直到最后一个hashtable
;hashtable
并不是main_hashtable
,则把当前的key value
对插入到main_hashtable
里,这就是核心的lazy copy
的过程这个lazy
的过程代码如下:
auto id = details::thread_id();
auto hashedId = details::hash_thread_id(id);
auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
// Look for the id in this hash
auto index = hashedId;
while (true) { // Not an infinite loop because at least one slot is free in the hash table
index &= hash->capacity - 1;
auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
if (probedKey == id) {
// Found it! If we had to search several hashes deep, though, we should lazily add it
// to the current main hash table to avoid the extended search next time.
// Note there's guaranteed to be room in the current hash table since every subsequent
// table implicitly reserves space for all previous tables (there's only one
// implicitProducerHashCount).
auto value = hash->entries[index].value;
if (hash != mainHash) {
index = hashedId;
while (true) {
index &= mainHash->capacity - 1;
probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
auto reusable = details::invalid_thread_id2;
if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) ||
(probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) {
#else
if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed))) {
#endif
mainHash->entries[index].value = value;
break;
}
++index;
}
}
return value;
}
if (probedKey == details::invalid_thread_id) {
break; // Not in this hash table
}
++index;
}
}
现在的核心则转移到了扩容的过程,扩容涉及到了动态内存分配和初始内容的填充,是比较耗的操作,所以要避免多个线程在争抢扩容的控制权。在moodycamel的设计里,是这样处理扩容的。
// Insert!
auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
while (true)
{
if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire))
{
// We've acquired the resize lock, try to allocate a bigger hash table.
// Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
// we reload implicitProducerHash it must be the most recent version (it only gets changed within this
// locked block).
mainHash = implicitProducerHash.load(std::memory_order_acquire);
if (newCount >= (mainHash->capacity >> 1))
{
auto newCapacity = mainHash->capacity << 1;
while (newCount >= (newCapacity >> 1))
{
newCapacity <<= 1;
}
auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
if (raw == nullptr)
{
// Allocation failed
implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
return nullptr;
}
auto newHash = new (raw) ImplicitProducerHash;
newHash->capacity = newCapacity;
newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
for (size_t i = 0; i != newCapacity; ++i)
{
new (newHash->entries + i) ImplicitProducerKVP;
newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
}
newHash->prev = mainHash;
implicitProducerHash.store(newHash, std::memory_order_release);
implicitProducerHashResizeInProgress.clear(std::memory_order_release);
mainHash = newHash;
}
else
{
implicitProducerHashResizeInProgress.clear(std::memory_order_release);
}
}
// If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
// to finish being allocated by another thread (and if we just finished allocating above, the condition will
// always be true)
if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2))
{
bool recycled;
auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
if (producer == nullptr)
{
implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
return nullptr;
}
if (recycled)
{
implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed);
}
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
producer->threadExitListener.userData = producer;
details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
#endif
auto index = hashedId;
while (true)
{
index &= mainHash->capacity - 1;
auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
auto reusable = details::invalid_thread_id2;
if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) ||
(probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire)))
{
#else
if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)))
{
#endif
mainHash->entries[index].value = producer;
break;
}
++index;
}
return producer;
}
// Hmm, the old hash is quite full and somebody else is busy allocating a new one.
// We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
// we try to allocate ourselves).
mainHash = implicitProducerHash.load(std::memory_order_acquire);
}
上面的第五行就是抢夺控制权的过程,进入扩容的条件就是当前装载因子已经大于0.5,且扩容标志位没有设置。
newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)
扩容的时候设置一个标志位,相当于一个锁,扩容完成之后清空标志位。但是由于线程换出的存在,这个标志位可能导致其他线程永远抢不到控制权,进入无限死循环。所以这里又对没有抢夺到扩容控制权的线程,还有另外的一个判断,如果装载因子小于0.75,则直接尝试插入,不用管。
newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)
这个分支里还做了一些事情,就是当真正的获得了一个implicit producer
之后,注册一个线程退出的callback
,这个callback
会把当前producer
销毁,并在hashtable
里删除对应的key
。
最后剩下的一种情况就是:拿不到扩容所有权,且当前装载因子已经上了0.75,此时除了死循环没有办法,约等于死锁。这种情况很罕见,但是仍然可以构造出来:正在扩容的线程被换出。不知原作者如何处理这个情况。
这有几个hashtable实现,上亿qps,无锁
这里挖个坑,后面分析一下 http://cmph.sourceforge.net/chd.html
抄自这里https://zhuanlan.zhihu.com/p/55583561
Herb Sutter在DDJ pillars of concurrency
一文中抛出并行编程的三个简单论点,一是分离任务,使用更细粒度的锁或者无锁编程;二是尽量通过并行任务使用CPU资源,以提高系统吞吐量及扩展性;三是保证对共享资源访问的一致性。第三点已经被atomic
、mutex
、lock
、condition_variable
解决了,第一点和第二点则可以归结为如何对任务进行粒度划分并投递到任务的执行单元中去调度执行。任务划分依赖于各种不同业务的理解,例如网络和渲染,很难抽取出其共性。而任务的调度执行则是一种通用的结构,可以分为四个部分:
c++11
里提供了三种最基本的任务封装形式future, promise,packaged_task
c++17
里补全了任务结构控制,主要是提供了then, when_all, when_any
这三个用来关联多个future
的函数在整个并发任务系统中,在任务容器集合之上的任务调度结构是核心。现在使用的最广泛的任务容器是concurrent queue
,下面我们来对concurrent queue
的多种实现来做一下分析。
queue
是一个维持先进先出(FIFO
)队列的结构,在很多STL的实现之中采取的是多块连续内存的双向链表来维持其先进先出结构。为了在多线程中使用std::queue
,最简单的方法就是使用锁来解决data race
,同时修改原始提供的接口,使得这个数据结构不会被用错。
#include <queue>
#include <atomic>
#include <mutex>
template<typename T>
class concurrent_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
concurrent_queue()
{
// pass
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{
return !data_queue.empty();
})
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{
return !data_queue.empty();
})
auto res = std::make_shared<T>(std::move(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return false;
}
value = std::move(data_queue.front());
data_queue.pop()
return True;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return std::shared_ptr<T>();
}
auto res = std::make_shared<T>(std::move(data_queue.front()));
data_queue.pop()
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
}
上述代码的主要考量如下:
empty
之后再pop
的处理流程是错误的,这两个操作必须封装在一起,所以这里提供了try_pop
和wait_and_pop
这两个接口来获取数据。shared_ptr
。这样就保证了如果在拷贝构造front
的时候出了trace也能维持整个queue
的结构完整。这个concurrent_queue
并不是很高效,主要的drawback
包括如下三个方面:
yield
,从而导致线程切换对应的常见解决方案:
mutex
,同时由于无锁最大的问题是内存分配,有些并发队列通过预先设置最大大小的方式来预分配内存,从而绕过了著名的ABA
问题queue
,这样我们就可以分离头节点和尾节点的访问;如果是固定大小的队列则可以采取ring buffer
的形式来维持队列结构。yield
,也就是对condition_variable
封装了一层。事实上,在这是一个并发queue
的时候,首先要明确如下几个问题:
这个queue
的生产者和消费者各有多少个,常见的有单生产者单消费者(SPSC
)、单生产者多消费者(SPMC
)、多生产者单消费者(MPSC
)和多生产者多消费者(MPMC
)
这个queue
的最大元素大小是否确定,如果可以确定最大大小,则动态内存分配就可以避免,直接采取环形队列当作容器即可;如果无法确定最大大小,则只能通过动态内存分配的形式去处理,这里的难度加大了很多,因为要处理多线程的内存分配。
下面我们来看一下现在主流的几种concurrent_queue
的实现,来分析一下他们对concurrent_queue
的实现优化。
intel官方网站上提供了一个SPSC queue
,但是这个queue
没有限制最大元素大小,如果临时内存不够的话会调用new
,可能会触发锁。
// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*>(addr);
__memory_barrier(); // compiler fence
return v;
}
// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
// hardware fence is implicit on x86
__memory_barrier(); // compiler fence
*const_cast<T volatile*>(addr) = v;
}
// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;
// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_= tail_copy_ = n;
}
~spsc_queue()
{
node* n = first_;
do
{
node* next = n->next_;
delete n;
n = next;
}
while (n);
}
void enqueue(T v)
{
node* n = alloc_node();
n->next_ = 0;
n->value_ = v;
store_release(&head_->next_, n);
head_ = n;
}
// returns 'false' if queue is empty
bool dequeue(T& v)
{
if (load_consume(&tail_->next_))
{
v = tail_->next_->value_;
store_release(&tail_, tail_->next_);
return true;
}
else
{
return false;
}
}
private:
// internal node structure
struct node
{
node* next_;
T value_;
};
// consumer part
// accessed mainly by consumer, infrequently be producer
node* tail_; // tail of the queue
// delimiter between consumer part and producer part,
// so that they situated on different cache lines
char cache_line_pad_ [cache_line_size];
// producer part
// accessed only by producer
node* head_; // head of the queue
node* first_; // last unused node (tail of node cache)
node* tail_copy_; // helper (points somewhere between first_ and tail_)
node* alloc_node()
{
// first tries to allocate node from internal node cache,
// if attempt fails, allocates node via ::operator new()
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
tail_copy_ = load_consume(&tail_);
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
node* n = new node;
return n;
}
spsc_queue(spsc_queue const&);
spsc_queue& operator = (spsc_queue const&);
};
// usage example
int main()
{
spsc_queue<int> q;
q.enqueue(1);
q.enqueue(2);
int v;
bool b = q.dequeue(v);
b = q.dequeue(v);
q.enqueue(3);
q.enqueue(4);
b = q.dequeue(v);
b = q.dequeue(v);
b = q.dequeue(v);
}
这个代码的实现很简单粗暴,核心是一个单链表,对于单链表的任何操作都是wait_free
的,这个链表有四个指针:
tail
指针,指向下一个应该dequeue
的位置head
指针,指向最新的一个enqueue
的位置first_
指针,指向第一个可以回收node
的位置tail_copy
指针,指向一个安全的可以回收的node
的next
位置,他不一定指向tail
。在这个链表里,指针之间有如下关系:$first \le tail_copy \le tail \le head$ 。这里做的核心优化就是按需去更新tail_copy
,没必要每次更新tail
的时候都把tail_copy
更新一遍,只有发现first == tail_copy
的时候才去更新一下。每个操作都没有使用到CAS
,因此都是wait_free
的,当然那一行调用了new
的除外。
这里为了避免False Sharing
使用了padding
。由于读线程只需要更改tail
,所以只需要在tail
之后加个padding
即可。
facebook
提供了固定大小的SPSC queue
,代码在folly
的ProducerConsumerQueue
里。
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @author Bo Hu (bhu@fb.com)
// @author Jordan DeLong (delong.j@fb.com)
#pragma once
#include <atomic>
#include <cassert>
#include <cstdlib>
#include <memory>
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <folly/concurrency/CacheLocality.h>
namespace folly {
/*
* ProducerConsumerQueue is a one producer and one consumer queue
* without locks.
*/
template<class T>
struct ProducerConsumerQueue {
typedef T value_type;
ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
ProducerConsumerQueue& operator = (const ProducerConsumerQueue&) = delete;
// size must be >= 2.
//
// Also, note that the number of usable slots in the queue at any
// given time is actually (size-1), so if you start with an empty queue,
// isFull() will return true after size-1 insertions.
explicit ProducerConsumerQueue(uint32_t size)
: size_(size)
, records_(static_cast<T*>(std::malloc(sizeof(T) * size)))
, readIndex_(0)
, writeIndex_(0)
{
assert(size >= 2);
if (!records_) {
throw std::bad_alloc();
}
}
~ProducerConsumerQueue() {
// We need to destruct anything that may still exist in our queue.
// (No real synchronization needed at destructor time: only one
// thread can be doing this.)
if (!std::is_trivially_destructible<T>::value) {
size_t readIndex = readIndex_;
size_t endIndex = writeIndex_;
while (readIndex != endIndex) {
records_[readIndex].~T();
if (++readIndex == size_) {
readIndex = 0;
}
}
}
std::free(records_);
}
template<class ...Args>
bool write(Args&&... recordArgs) {
auto const currentWrite = writeIndex_.load(std::memory_order_relaxed);
auto nextRecord = currentWrite + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
new (&records_[currentWrite]) T(std::forward<Args>(recordArgs)...);
writeIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// queue is full
return false;
}
// move (or copy) the value at the front of the queue to given variable
bool read(T& record) {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return false;
}
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
record = std::move(records_[currentRead]);
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
return true;
}
// pointer to the value at the front of the queue (for use in-place) or
// nullptr if empty.
T* frontPtr() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
if (currentRead == writeIndex_.load(std::memory_order_acquire)) {
// queue is empty
return nullptr;
}
return &records_[currentRead];
}
// queue must not be empty
void popFront() {
auto const currentRead = readIndex_.load(std::memory_order_relaxed);
assert(currentRead != writeIndex_.load(std::memory_order_acquire));
auto nextRecord = currentRead + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
records_[currentRead].~T();
readIndex_.store(nextRecord, std::memory_order_release);
}
bool isEmpty() const {
return readIndex_.load(std::memory_order_acquire) ==
writeIndex_.load(std::memory_order_acquire);
}
bool isFull() const {
auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1;
if (nextRecord == size_) {
nextRecord = 0;
}
if (nextRecord != readIndex_.load(std::memory_order_acquire)) {
return false;
}
// queue is full
return true;
}
// * If called by consumer, then true size may be more (because producer may
// be adding items concurrently).
// * If called by producer, then true size may be less (because consumer may
// be removing items concurrently).
// * It is undefined to call this from any other thread.
size_t sizeGuess() const {
int ret = writeIndex_.load(std::memory_order_acquire) -
readIndex_.load(std::memory_order_acquire);
if (ret < 0) {
ret += size_;
}
return ret;
}
private:
char pad0_[CacheLocality::kFalseSharingRange];
const uint32_t size_;
T* const records_;
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> readIndex_;
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned int> writeIndex_;
char pad1_[CacheLocality::kFalseSharingRange - sizeof(writeIndex_)];
};
}
这里就是使用环形队列来作为容器,双指针来作为头尾,读线程读取readIndex
直接采用relaxed
,写线程读取writeIndex
的时候也是采取relaxed
,因为这两个变量只会在对应的线程内修改,可以认为是对应线程的私有变量,如果要读取另外一个线程的变量则需要采取acquire
,当然前提是修改的时候使用了release
。为了避免False Sharing
这里也使用了padding
,只不过是用宏做的。
其实这里也可以做一点优化,就像前面intel
的延迟处理tail_copy
一样,首次读取另外一个线程变量的时候先用relaxed
,如果发现不能操作了,则再使用acquire
。
总的来说,这个无锁spsc queue
也是wait_free
的。
moody camel 在上面的基础上做了一些改进:在支持无大小限制的情况下,将动态内存分配的需求降得很低,同时支持了容量的动态增长。其容器结构是两层的queue of queue
,第一层是循环链表,第二层是循环队列。第一层循环链表的控制基本等价于intel
的spsc
里的代码,而第二层的循环队列的控制基本等价于folly
的代码。当enqueue
的时候,发现没有空闲内存的时候会调用malloc
,不过这种动态内存分配比起intel
的每个新node
都分配来说简单多了,总的来说还是比较符合wait_free
的。这个的代码我就不分析了,直接贴作者的解释吧。
# Enqueue
If room in tail block, add to tail
Else check next block
If next block is not the head block, enqueue on next block
Else create a new block and enqueue there
Advance tail to the block we just enqueued to
# Dequeue
Remember where the tail block is
If the front block has an element in it, dequeue it
Else
If front block was the tail block when we entered the function, return false
Else advance to next block and dequeue the item there
在这前面介绍的spsc
并发队列的基础上,我们可以比较容易的构建出一个spmc
的并发队列,而构造一个mpsc
的并发队列则难很多。其原因主要是在enqueue
的时候,可能会涉及到动态内存分配,如果有好几个线程都抢着进行动态内存分配的话,就会出现malloc
的锁征用。而多个线程抢占dequeue
的时候,只需要采取CAS
来保持tail
的更新即可,虽说这个不是waitfree
的,但是lockfree
还是可以基本保证的。
boost concurrent queue
通过模板参数的方式来支持固定大小的队列和不定大小的队列。
如果是固定大小队列,则会使用一个带dummy head
的ring buffer
来存储内容,同时使用一个头节点索引和一个尾节点索引来标记队列的头尾位置。为了一次性的修改头尾节点索引,这里将队列大小的上限设置为了$2^{16} - 2$ ,这样两个索引就可以合并为一个int32
来处理,修改的时候可以使用compare_exchange_
来同时修改。如果在支持int64
类型的compare_exchange_
操作的平台,队列大小的上限可以放到$2^{32} -2$ ,同时两个索引会被压缩为一个int64
来做同时修改。
如果是不定大小的队列,则会使用链表的形式来维持队列结构, 代码见下。
struct BOOST_LOCKFREE_CACHELINE_ALIGNMENT node
{
typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
node(T const & v, handle_type null_handle):
data(v)//, next(tagged_node_handle(0, 0))
{
/* increment tag to avoid ABA problem */
tagged_node_handle old_next = next.load(memory_order_relaxed);
tagged_node_handle new_next (null_handle, old_next.get_next_tag());
next.store(new_next, memory_order_release);
}
node (handle_type null_handle):
next(tagged_node_handle(null_handle, 0))
{}
node(void)
{}
atomic<tagged_node_handle> next;
T data;
};
这里比较有意思的就是第九行的注释:对指针的tag
位置进行自增来避免ABA
问题。这里的next
指针是一个tagged_pointer
,其分配位置是内存对齐的,对齐的大小由BOOST_LOCKFREE_CACHELINE_BYTES
定义,在WIN
平台下,这个宏定义如下:
#define BOOST_LOCKFREE_CACHELINE_BYTES 64
#ifdef _MSC_VER
#define BOOST_LOCKFREE_CACHELINE_ALIGNMENT __declspec(align(BOOST_LOCKFREE_CACHELINE_BYTES))
当这个指针是64字节对齐时,最底的6位是没有意义的,所以这6位我们可以用来存储额外的数据,这种指针就叫做tagged_pointer
,在llvm
里这个指针结构也很常见。
在boost lockfree queue
里,数据成员定义如下:
typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
static const bool node_based = !(has_capacity || fixed_sized);
static const bool compile_time_sized = has_capacity;
typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
typedef typename pool_t::tagged_node_handle tagged_node_handle;
typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
atomic<tagged_node_handle> head_;
static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
char padding1[padding_size];
atomic<tagged_node_handle> tail_;
char padding2[padding_size];
pool_t pool; //代表node的pool 可以当作内存分配器
因为atomic<T*>
内部只包含一个T*
作为成员变量,所以atomic<T*>
与T*
的内存布局是一样的,所以这里的padding_size
才会这样计算出来。这里的padding
的意义在于让poll
的开始地址是BOOST_LOCKFREE_CACHELINE_BYTES
对齐的,同时这里分为了两个padding
而不是一个padding
主要是考虑到将tail head
分离在两个cache_line
上,避免不同线程之间的缓存竞争。
现在我们来看这个lockfree queue
提供的接口。
首先查看empty
。
/** Check if the queue is empty
*
* \return true, if the queue is empty, false otherwise
* \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
* value in program logic.
* */
bool empty(void)
{
return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
}
注释里写的很清楚了,这个函数的返回值是不准确的,因为在没有锁的情况下无法同时获得head tail
的准确值。
现在来看push
,这里分为了两个接口push bounded_push
,区分在于如果内存池已经用完,第一个push
在当前队列是大小固定的情况下会返回false
,不固定的情况下会向操作系统尝试申请更多的内存并返回;而第二个bounded_push
则直接返回false
。
/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
* \returns true, if the push operation is successful.
*
* \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
* from the OS. This may not be lock-free.
* */
bool push(T const & t)
{
return do_push<false>(t);
}
/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
* \returns true, if the push operation is successful.
*
* \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
* \throws if memory allocator throws
* */
bool bounded_push(T const & t)
{
return do_push<true>(t);
}
这两个函数都调用了do_push
,这个函数的定义如下:
template <bool Bounded>
bool do_push(T const & t)
{
using detail::likely;
node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
handle_type node_handle = pool.get_handle(n);
if (n == NULL)
return false;
for (;;) {
tagged_node_handle tail = tail_.load(memory_order_acquire);
node * tail_node = pool.get_pointer(tail);
tagged_node_handle next = tail_node->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);
tagged_node_handle tail2 = tail_.load(memory_order_acquire);
if (likely(tail == tail2)) {
if (next_ptr == 0) {
tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
tagged_node_handle new_tail(node_handle, tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
return true;
}
}
else {
tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
}
}
}
}
这里比较难理解的一点就是tail tail2
,以及最后30行的compare_exchange_strong
。这里在19行使用判断的意义是避免在内部做无用功,虽然不使用19行的判断, tail
改变之后,20行所在分支的检测都会fail
掉,对正确性没影响,对性能上来说提升很大。在一个完整的成功push
流程中有两个cas
操作,我们需要担心的是在两个cas
操作之间线程被换出之后会出现何种结果,也就是在24行之前被换出。此时老的tail
的next
已经被修正为了新数据,而新tail
却没有更新。在下一个线程进来的时候会发现tail->next != 0
, 因此会进28号的分支,在此分支之内会尝试将tail->next
更新为tail
,这样就避免了数据更新到一半的尴尬局面。
对于pop
则只有一个函数:
/** Pops object from queue.
*
* \pre type U must be constructible by T and copyable, or T must be convertible to U
* \post if pop operation is successful, object will be copied to ret.
* \returns true, if the pop operation is successful, false if queue was empty.
*
* \note Thread-safe and non-blocking
* */
template <typename U>
bool pop (U & ret)
{
using detail::likely;
for (;;) {
tagged_node_handle head = head_.load(memory_order_acquire);
node * head_ptr = pool.get_pointer(head);
tagged_node_handle tail = tail_.load(memory_order_acquire);
tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
node * next_ptr = pool.get_pointer(next);
tagged_node_handle head2 = head_.load(memory_order_acquire);
if (likely(head == head2)) {
if (pool.get_handle(head) == pool.get_handle(tail)) {
if (next_ptr == 0)
return false;
tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
tail_.compare_exchange_strong(tail, new_tail);
} else {
if (next_ptr == 0)
/* this check is not part of the original algorithm as published by michael and scott
*
* however we reuse the tagged_ptr part for the freelist and clear the next part during node
* allocation. we can observe a null-pointer here.
* */
continue;
detail::copy_payload(next_ptr->data, ret);
tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
if (head_.compare_exchange_weak(head, new_head)) {
pool.template destruct<true>(head);
return true;
}
}
}
}
}
这里就简单多了,成功的pop
只需要一个compare_exchange_weak
即可,所以就不需要担心数据更改到一半的问题,这里的28行处理的还是tail
数据更新到一半的问题。
这里比较有意思的一点就是42行的.template
,这个叫做template disambiguator, 其作用就是通知编译器destruct<true>
是一个模板,而不是destruct < true
。
总的来说, boost lockfree queue
的注意点完全在lock free
上,并没有采取每个生产者单独一个queue
的方式来解决争用,虽然我们可以在lockfree queue
的基础上做一个这样的东西。
其实这个的总体实现与boost类似。占坑,以后填。粗看起来,这个东西的实现很具有STL
的风格。
这个concurrent queue
的实现被很多项目使用过, 值得重点分析。这个实现的突出之处在于,每个生产者都维持自己的专属queue
,而不同的消费者会以不同的顺序去访问各个生产者的queue
,直到遇到一个不为空的queue
。简而言之,他所实现的MPMC(multiple producer multiple consumer)
的队列建立在了SPMC
的多线程队列的基础上。这个SPMC
的实现是lockfree
的,同时还增加了bulk
操作。下面来慢慢介绍这个的设计。
首先就是在构建消费者的时候,尽可能的让消费者与生产者均衡绑定,内部实现是通过使用一个token
来维持消费者与生产者之间的亲和性。其实最简单的亲和性分配的方法就是每个消费者分配一个生产者的编号,dequeue
的时候采取轮询的方式,每次开始轮询的时候都以上次dequeue
成功的生产者queue
开始。
处理完了多生产者多消费者之间的映射,现在剩下的内容就是如何更高效的处理单生产者多消费者。moodycamel这里的主要改进就是单个queue
的存储结构,这里采取的是两层的循环队列,第一层循环队列存储的是第二层循环队列的指针。一个队列只需要维护四个索引,考虑到原子性修改可以把消费者的两个索引合并为一个uint64
或者uint32t
,因为只有消费者会发生数据竞争,为了方便比较,也顺便把生产者的两个索引合并为一个uint64t or uint32t
,这样就可以直接使用整数比较了。在enqueue
的时候,数据复制完成之后,直接对生产者的索引自增即可。而dequeue
的时候则没这么容易,此时首先自增消费者索引,然后判断当前消费者索引是否已经越过生产者索引,如果越过了,则对则对另外一个overcommit
计数器进行自增,三个计数器合用才能获得真正的容量。
这里使用环形缓冲来扩容而不是采取列表来扩容,主要是因为连续的空间操作可以支持批量的enqueue
和dequeue
操作,直接预先占据一些索引就行了。
实际上,基于状态和基于操作的CRDT已经在数学上被证明可以相互转换。
具体概念看参考链接2 ,图非常清晰
下面介绍几个比较有意思的CRDT类型。
这是一个只增不减的计数器。对于N个节点,每个节点上维护一个长度为N的向量$V={P_0, P_1, P2, …, P{n-1}}。。P_m表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器表示节点m上的计数。当需要增加这个计数器时,只需要任意选择一个节点操作,操作会将对应节点的计数器P_m := P_m + 1。当要统计整个集群的计数器总数时,只需要对向量。当要统计整个集群的计数器总数时,只需要对向量V$中的所有元素求和即可。
举个例子,我们有A, B, C三个节点上分别部署了同一个G-Counter。初始状态如下:
A: {A: 0, B: 0, C: 0} = 0
B: {A: 0, B: 0, C: 0} = 0
C: {A: 0, B: 0, C: 0} = 0
每个节点都维护着其他节点的计数器状态,初始状态下,所有计数器都为0。现在我们假设有三个客户端a,b,c,a和b先后在节点A上增加了一次计数器,c在节点B上增加了一次计数器:
A: {A: 2, B: 0, C: 0} = 2
B: {A: 0, B: 1, C: 0} = 1
C: {A: 0, B: 0, C: 0} = 0
此时如果分别向A, B, C查询当前计数器的值,得到的结果分别是{2, 1, 0}。而实际上一共有3次增加计数器的操作,因此全局计数器的正确值应该为3,此时系统内状态是不一致的。不过没关系,我们追求的是最终一致性。假设经过一段时间,B向A发起了合并的请求:
A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3
经过合并后,A和B的计数器状态合并,现在从A和B读取到的计数器的值变为3。接下来C和A进行合并:
A: {A: 2, B: 1, C: 0} = 3
B: {A: 2, B: 1, C: 0} = 3
C: {A: 2, B: 1, C: 0} = 3
现在节点ABC都达到了相同的状态,从任意一个节点获取到的计数器值都为3。也就是说3个节点达成了最终一致性。
我们可以用以下伪代码描述G-Counter的逻辑:
payload integer[n] P
initial [0, 0, 0, ..., 0]
update increment()
let g = myId()
P[g] := P[g] + 1 // 只更新当前节点对应的计数器
query value() : integer v
let v = P[0] + P[1] + ... + P[n-1] // 将每个节点的计数器进行累加
merge (X, Y) : payload Z
for i = 0, 1, ... , n-1:
Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
G-Counter使用max()操作来进行各个状态的合并,我们知道函数max满足可交换性,结合性,幂等性,即:
所以G-Counter可以在分布式系统中使用,并且可以无冲突合并。
G-Counter有一个限制,就是计数器只能增加,不能减少。不过我们可以通过使用两个G-Counter来实现一个既能增加也能减少计数器(PN-Counter)。简单来说,就是用一个G-Counter来记录所有累加结果,另一个G-Counter来记录累减结果,需要查询当前计数器时,只需要计算两个G-Counter的差即可。
payload integer[n] P, integer[n] N
initial [0, 0, ..., 0], [0, 0, ..., 0]
update increment()
let g = myId()
P[g] := P[g] + 1 // 只更新当前节点对应的计数器
update decrement()
let g = myId()
N[g] := N[g] + 1
query value() : integer v
let v = sum(P) - sum(N) // 用P向量的和减去N向量的和
merge (X, Y) : payload Z
for i = 0, 1, ... , n-1:
Z.P[i] = max(X.P[i], Y.P[i]) // 通过max操作来合并各个节点的计数器
Z.N[i] = max(N.P[i], Y.N[i])
register有assign()及value()两种操作
LWW-Register
)给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge
MV-Register
)类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge
G-Set
)使用union操作进行merge
2P-Set
)使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除
LWW-element Set
)类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先
OR-Set
)类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove
关于Array有Replicated Growable Array(
RGA
),支持addRight(v, a)操作
Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作
Map需要处理并发的put、rmv操作
https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type
http://galudisu.info/2018/06/06/akka/ddata/Akka-Distributed-Data-Deep-Dive/
http://liyu1981.github.io/what-is-CRDT/
https://lfwen.site/2018/06/09/crdt-counter/
详细介绍了各种CRDT实现https://juejin.im/post/5cd25e886fb9a0322758d23f
论文 https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf
一个系统的阅读导航http://christophermeiklejohn.com/crdt/2014/07/22/readings-in-crdts.html
这活真恶心。瞎忙还没意义。
错误提示
CMake Error at cmake-3.10.3/share/cmake-3.10/Modules/FindPackageHandleStandardArgs.cmake:137 (message):
Could NOT find LibXml2 (missing: LIBXML2_LIBRARY LIBXML2_INCLUDE_DIR)
用的cmake版本是3.10.
编译机是docker容器。本地测试好使
yum install -y libxml2-devel
依旧不好使。我在其他环境的docker机器上测试cmake 3 cmake2都没问题
看到了错误日志 发现 dladdr 连接错误,加上链接ldl修了半天才发现方向不对
手动指定 LIBXML2_LIBRARY LIBXML2_INCLUDE_DIR目录到bootstrap,cmake命令家伙是哪个
-DLIBXML2_INCLUDE_DIR=/usr/include/libxml2/ -DLIBXML2_LIBRARY=/usr/lib64/
,但是编译不过
安装的日志是这样的
--> Running transaction check
---> Package libxml2-devel.x86_64 0:2.9.1-6.3 will be installed
--> Processing Dependency: libxml2 = 2.9.1-6.3 for package: libxml2-devel-2.9.1-6.3.x86_64
--> Processing Dependency: xz-devel for package: libxml2-devel-2.9.1-6.3.x86_64
--> Running transaction check
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated
--> Processing Dependency: libxml2 = 2.9.1-6.el7_2.3 for package: libxml2-python-2.9.1-6.el7_2.3.x86_64
---> Package libxml2.x86_64 0:2.9.1-6.3 will be an update
---> Package xz-devel.x86_64 0:5.2.2-1.el7 will be installed
--> Running transaction check
---> Package libxml2-python.x86_64 0:2.9.1-6.el7_2.3 will be updated
---> Package libxml2-python.x86_64 0:2.9.1-6.3.h3 will be an update
--> Processing Dependency: libxml2 = 2.9.1-6.3.h3 for package: libxml2-python-2.9.1-6.3.h3.x86_64
--> Running transaction check
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated
---> Package libxml2.x86_64 0:2.9.1-6.el7_2.3 will be updated
---> Package libxml2.x86_64 0:2.9.1-6.3 will be an update
--> Processing Dependency: libxml2 = 2.9.1-6.3 for package: libxml2-devel-2.9.1-6.3.x86_64
---> Package libxml2.x86_64 0:2.9.1-6.3.h3 will be an update
--> Finished Dependency Resolution
You could try using --skip-broken to work around the problem
You could try running: rpm -Va --nofiles --nodigest
卧槽,没装成功。妈的
加上–skip-broken还是不行
错误冲突实际上是这样的
Error: Package: libxml2-devel-2.9.1-6.3.x86_64 (current)
Requires: libxml2 = 2.9.1-6.3
Installed: libxml2-2.9.1-6.3.h3.x86_64 (@stable)
libxml2 = 2.9.1-6.3.h3
Available: libxml2-2.9.1-6.el7_2.3.i686 (base)
libxml2 = 2.9.1-6.el7_2.3
Available: libxml2-2.9.1-6.3.x86_64 (current)
libxml2 = 2.9.1-6.3
试着删掉libxml2提示 yum is protecte,换另外的删除方法
rpm -e --nodeps libxml2
但是会报错
There was a problem importing one of the Python modules
required to run yum. The error leading to this problem was:
libxml2.so.2: cannot open shared object file: No such file or directory
Please install a package which provides this module, or
verify that the module is installed correctly.
It's possible that the above module doesn't match the
current version of Python, which is:
2.7.5 (default, Nov 6 2016, 00:28:07)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)]
If you cannot solve this problem yourself, please go to
the yum faq at:
yum完全就不工作了。
唉。我最后自带了一个xml源码包自己编。
我只是想把编好的二进制和相关lib库打包后放到rpm里。网上搜了一天。各种从头开始。各种源码开始编。一点意义都没有。
rpmbuild打包就有三种模式,ba bb bs。还有各种奇怪的宏。还有各种make install流程。很容易就陷入这些没用的细节中。我根本都不需要的。最终我使用了bs。就当我打包源码包了。
另外rpmbuild默认目录是root,还需要指定
rpmbuild --define '_topdir ./rpmbuild' -bs rpmbuild/SPECS/rpm.spec
另外,解压rpm
rpm2cpio xxx.rpm | cpio -div
又遇到了个问题,要求目录。只能从二进制打包,又浪费一上午
照着这个文档看了半天 http://kuanghy.github.io/2015/11/13/rpmbuild
我的步骤里总会删掉buildroot,原来还需要install流程里加二进制。
最后发一个我的模板
Name: xx
Version: 1.0
Release: 1%{?dist}
Summary: xx binary
#Group:
License: WTF
%install
mkdir -p %{buildroot}/dir/dir/somedir/
cp -a %(pwd)/your.tar.gz %{buildroot}/dir/dir/somedir/
echo "install done"
%files
%defattr (-,root,root,-)
/dir/
然后打包,bb是二进制打包,之前用的bs 源码打包
rpmbuild --define '_topdir ./rpmbuild' --buildroot=$(pwd)/rpmbuild/BUILDROOT -bb rpm.spec
找不到iddr https://github.com/ContinuumIO/libhdfs3-downstream/issues/9 严重误导我。没有没啥影响。
https://www.mail-archive.com/dev@hawq.incubator.apache.org/msg04104.html 严重误导
https://stackoverflow.com/questions/12993460/why-am-i-getting-undefined-reference-to-dladdr-even-with-ldl-for-this-simpl 严重误导。不是同一个场景。
https://github.com/libical/libical/issues/248 这个链接让我误以为装了glibc2-devel就能找到xml2了。没仔细读就用,误导
https://gitlab.kitware.com/cmake/cmake/issues/18078 这个给了我一点思路,我可以定义这两个变量
https://trac.macports.org/ticket/55386 误导
https://stackoverflow.com/questions/15799047/trying-to-remove-yum-which-is-protected-in-centos 删除包
https://serverfault.com/questions/742160/conflicts-in-files-when-installing-libxml2
https://www.centos.org/forums/viewtopic.php?t=68202 这个和我遇到的问题差不多,但是是arm机器
https://forums.cpanel.net/threads/requires-libxml2-update-error.636661/ 真正的原因,但没说解决办法
https://stackoverflow.com/questions/416983/why-is-topdir-set-to-its-default-value-when-rpmbuild-called-from-tcl rpmbuild更改目录的办法
网上一大堆rpm详解封装。都是翻译的外国的文章。说不到重点。比如
hdfs client 使用log4cplus是需要配置文件的,一般长这个样子
log.properties
log4cplus.logger.defaultLogger=INFO, LogToFile
log4cplus.appender.LogToFile=log4cplus::RollingFileAppender
log4cplus.appender.LogToFile.MaxFileSize=64MB
log4cplus.appender.LogToFile.MaxBackupIndex=20
log4cplus.appender.LogToFile.File=/opt/hdfs-client.log
log4cplus.appender.LogToFile.ImmediateFlush=true
log4cplus.appender.LogToFile.Append=true
log4cplus.appender.LogToFile.CreateDirs=true
log4cplus.appender.LogToFile.layout=log4cplus::PatternLayout
log4cplus.appender.LogToFile.layout.ConversionPattern=%D{\%Y-%m-%d %H:%M:%S,%Q} %-5p [pid:%i pth:%t] %m %l%n
注意%D{\%Y-%m
中间多加了个斜线,githubpage编译不过只好出此下策
通常库是所有模块共用同一个,动态库也是放在系统PATH下的。
如果单独打包可能就有问题
log4cplus:ERROR could not open file ./log.properties
log4cplus:ERROR No appenders could be found for logger (root).
log4cplus:ERROR Please initialize the log4cplus system properly.
需要把该文件放到模块同目录下,并且需要提供rootLogger
改成
log4cplus.rootLogger=INFO, LogToFile
...
就不会报错了。
Google一圈搜到了各种解决方案。被严重误导,只有1 贴点边
2是别人使用log4cplus遇到该问题以及解决方案。
https://bbs.csdn.net/topics/370248522
https://github.com/nsacyber/HIRS/issues/95
编译好了自己的模块,依赖很多动态库,打包
mkdir -p package
cp myserver package
for i in `ldd myserver | awk '{print $3}' | grep "/" | sort | uniq`; do cp $i package; done
tar -czvf package.tar.gz package
把包挪到另一个机器上,试着运行,提示 找不到动态库
error while loading shared libraries: libxxx.so.1.0.0: cannot open shared object file: No such file or directory
搜到了一个解决方案1 先添加到LD_LIBRARY_PATH 然后再ldconfig
但是我执行发现
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PWD
Segmentation fault
然后后面的命令都会报错,开始我以为是机器问题,然后换了个机器也这样,然后我直接把PWD加到/etc/ld.so.conf里,结果机器立马报错,然后就打不开了。
在此警告,千万不要这么搞,这么搞是全局的,整个机器就挂了,export只是当前shell,所以还好。幸亏我有镜像,重新复原就搞定了。
应该是打包的动态库有问题
后面我逐个排查重试会不会报错
删到libc之后就不会报错了。。。确定了是libc的问题。由于上面的打包脚本会把libc也带上。需要去掉一下
也搜到了一个解决记录2 可惜看到的比较晚,不然能更早定位到是libc的问题。不过链接里说的strace和ldconfig -iv都没用,直接就segmentation fault了。不会执行的。
为什么两个libc.so会导致ldconfig崩溃目前还没有找到原因。总之打包的时候去掉libc
mkdir -p package
cp myserver package
for i in `ldd myserver | awk '{print $3}' | grep "/" | grep -v libc | sort | uniq`; do cp $i package; done
tar -czvf package.tar.gz package
遇到两个问题
这个解决办法只能是重启,并且需要预估硬盘的使用情况避免这种问题的发生。
这个是发生过compaction已经收回了空间,但是之前有的bg_error一直存在没有清掉。这个是rocksdb故意设计成这样的,上层应用需要决定是否处理这种场景,选择重启还是自己实现高可用
rocksdb的issue在这里
crdb的设计讨论在这里
ardb的解决办法,重启
crdb的讨论我没有仔细看,有时间看完补充一下