Buffering SQL Writes with Redis


why

这篇文章Sentry公司的博客,介绍他们怎么用redis的(好像是一个运营监控服务软件公司)


Sentry公司大量使用pg,两个集群,一个集群存储事件元数据,另一个存储Sentry平台数据,都有冷备,只在灾备或者维护期间使用

数据流分成三类,时序数据

  • 事件blob数据,不变的,直接存储到Riak集群里
  • kv对,表示tag,通常是设备名,操作系统之类的
  • 属性 聚合操作用

大多数数据是SQL,事件Blob数据存在Riak来保持扩展性,实际上也是用来做KV存储

突发情况

Sentry的数据信息有特征,比如突然传来一个错误事件,这些事件重复程度高,出现频率也高,所以需要一个去重-聚合动作,又分两种情况

  • 相同错误出现多次
  • 大量不同错误生成新的聚合

可以从下面两个属性来观测到

  • 基数计数器 Cardinality counters,
  • Latest Value 看聚合操作中出现key用个字段记录

第一种设置个counter

UPDATE table SET counter = counter + 1 WHERE key = %s;

第二种就设置个字段记录

UPDATE table SET last_seen = %s WHERE key = %s;

count 引入的锁还有可能影响性能,所以把count拆开,拆成多行,避免行锁竞争

下面这个例子是一百行的

UPDATE table SET counter = counter + 1 WHERE key = %s AND partition = ABS(RANDOM() * 100);

pg提供强一致,浪费很多时间在锁上,所以用buffer来省掉这些竞争

buffer write,先聚合一堆写,定时刷回去,需要考虑最低损失数据,据此来决定回写周期,Sentry实践是10秒一次

这引入两种问题

  • UI不会自动更新,十秒需要触发一次更新
  • 可能丢失数据,比如网络问题

使用Redis

  • 每个entity 一个hash

  • flush 一组hash集合

当数据来

  • 对每个entity hashkey
    • hincrby counter
    • 更新其他字段,hset,比如last seen timestamp
  • zadd 把pending key加入集合中

然后类似cron任务,10秒一次,zrange拿到集合,对于pending hash key 加入到队列,然后zrem移除集合

worker从队列中拿到job,开始写

  • pipeline
    • zrem,保证没副作用
    • hgetall 拿到key
    • rem hashkey
  • 转换成sql,直接执行最后结果
    • set counter = counter +%d
    • set value = %s

限制条数,使用sorted set(zset)

可以水平扩展,加redis节点

这个模型保证一行sql就更新一次,降低锁竞争,达到预期

问题:为什么不用时序数据库?

博客还提到了几个提升的点子

  • 通常优先级高的事情发生的概率比较低,当前是模型是LIFO,也可改成FIFO 用zadd nx
  • 数据冲突踩踏,大量数据任务可能同时触发,虽然可能实际处理后和noop没区别,但还是有这种突发增加延迟的问题,当前是push模型,可以改成pull,控制主动权 限流器也行 实现上的复杂导致没能采用
  • 丢写,加个备份集合

ref

contact

其他联系方式在主页

Read More

unique_ptr实现pimpl惯用法


why

这篇文章是参考链接的总结


pimpl惯用法,pointer to implementation,就是用指针来拆分实现,这样改动不会导致所有文件都编译一遍,也是一种解耦

以前的实现

#include "Engine.h"
 
class Fridge
{
public:
   void coolDown();
private:
   Engine engine_;
};

这样改动Engine就会重编Fridge

引入指针分离

class Fridge
{
public:
   Fridge();
   ~Fridge();
 
   void coolDown();
private:
   class FridgeImpl;
   FridgeImpl* impl_;
};

FridgeImpl封装一层Engine,不可见

#include "Engine.h"
#include "Fridge.h"
 
class FridgeImpl
{
public:
   void coolDown()
   {
      /* ... */
   }
private:
   Engine engine_;
};
 
Fridge::Fridge() : impl_(new FridgeImpl) {}
 
Fridge::~Fridge(){
   delete impl_;
}
 
void Fridge::coolDown(){
   impl_->coolDown();
}

这样还是需要管理impl_生命周期,如果用unique_ptr就更好了

改进的代码

#include <memory>
 
class Fridge
{
public:
   Fridge();
   void coolDown();
private:
   class FridgeImpl;
   std::unique_ptr<FridgeImpl> impl_;
};
#include "Engine.h"
#include "Fridge.h"

class FridgeImpl
{
public:
   void coolDown()
   {
      /* ... */
   }
private:
   Engine engine_;
};

Fridge::Fridge() : impl_(new FridgeImpl) {}

这样会有新问题,编译不过

use of undefined type ‘FridgeImpl’ can’t delete an incomplete type

因为unique_ptr需要知道托管对象的析构,最起码要保证可见性

析构可见性

c++规则,以下两种情况,delete pointer会有未定义行为

  • void* 类型
  • 指针的类型不完整,比如这种前向声明类指针

由于unique_ptr检查,会在编译期直接拒绝 同理的还有boost::checked_delete

进一步讨论,Fridge 和FridgeImpl的析构函数都是没定义的,编译器会自动定义并内联,在Fridge的编译单元,就已经见到了Fridge的析构了,但是见不到FridgeImpl的析构,解决办法就是加上Fridge的析构声明,并把实现放到实现文件中,让Fridge和FridgeImpl的析构同时可见

#include <memory>
 
class Fridge
{
public:
   Fridge();
   +~Fridge();
...
};
#include "Engine.h"
#include "Fridge.h"
 
class FridgeImpl
....
Fridge::Fridge() : impl_(new FridgeImpl) {}
 
+Fridge::~Fridge() = default;

ref

contact

Read More


jemalloc 原理


先说下glibc自带的ptmalloc

多线程支持

  • Ptmalloc2有一个主分配区(main arena), 有多个非主分配区。 非主分配区只能使用mmap向操作系统批发申请HEAP_MAX_SIZE(64位系统为64MB)大小的虚拟内存。 当某个线程调用malloc的时候,会先查看线程私有变量中是否已经存在一个分配区,如果存在则尝试加锁,如果加锁失败则遍历arena链表试图获取一个没加锁的arena, 如果依然获取不到则创建一个新的非主分配区。
  • free()的时候也要获取锁。分配小块内存容易产生碎片,ptmalloc在整理合并的时候也要对arena做加锁操作。在线程多的时候,锁的开销就会增大。

ptmalloc内存管理

  • 用户请求分配的内存在ptmalloc中使用chunk表示, 每个chunk至少需要8个字节额外的开销。 用户free掉的内存不会马上归还操作系统,ptmalloc会统一管理heap和mmap区域的空闲chunk,避免了频繁的系统调用。

  • ptmalloc 将相似大小的 chunk 用双向链表链接起来, 这样的一个链表被称为一个 bin。Ptmalloc 一共 维护了 128 个 bin,并使用一个数组来存储这些 bin(图就像二维数组,或者std::deque底层实现那种感觉,rocksdb arena实现也这样的)

    1558506408332

    • 数组中的第一个为 unsorted bin, 数组中从 2 开始编号的前 64 个 bin 称为 small bins, 同一个small bin中的chunk具有相同的大小。small bins后面的bin被称作large bins。
    • 当free一个chunk并放入bin的时候, ptmalloc 还会检查它前后的 chunk 是否也是空闲的, 如果是的话,ptmalloc会首先把它们合并为一个大的 chunk, 然后将合并后的 chunk 放到 unstored bin 中。 另外ptmalloc 为了提高分配的速度,会把一些小的(不大于64B) chunk先放到一个叫做 fast bins 的容器内。
    • 在fast bins和bins都不能满足需求后,ptmalloc会设法在一个叫做top chunk的空间分配内存。 对于非主分配区会预先通过mmap分配一大块内存作为top chunk, 当bins和fast bins都不能满足分配需要的时候, ptmalloc会设法在top chunk中分出一块内存给用户, 如果top chunk本身不够大, 分配程序会重新mmap分配一块内存chunk, 并将 top chunk 迁移到新的chunk上,并用单链表链接起来。如果free()的chunk恰好 与 top chunk 相邻,那么这两个 chunk 就会合并成新的 top chunk,如果top chunk大小大于某个阈值才还给操作系统。主分配区类似,不过通过sbrk()分配和调整top chunk的大小,只有heap顶部连续内存空闲超过阈值的时候才能回收内存。
    • 需要分配的 chunk 足够大,而且 fast bins 和 bins 都不能满足要求,甚至 top chunk 本身也不能满足分配需求时,ptmalloc 会使用 mmap 来直接使用内存映射来将页映射到进程空间。

ptmalloc的缺陷

  • 后分配的内存先释放,因为 ptmalloc 收缩内存是从 top chunk 开始,如果与 top chunk 相邻的 chunk 不能释放, top chunk 以下的 chunk 都无法释放。
  • 多线程锁开销大, 需要避免多线程频繁分配释放。
  • 内存从thread的arena中分配, 内存不能从一个arena移动到另一个arena, 就是说如果多线程使用内存不均衡,容易导致内存的浪费。 比如说线程1使用了300M内存,完成任务后glibc没有释放给操作系统,线程2开始创建了一个新的arena, 但是线程1的300M却不能用了。
  • 每个chunk至少8字节的开销很大
  • 不定期分配长生命周期的内存容易造成内存碎片,不利于回收。 64位系统最好分配32M以上内存,这是使用mmap的阈值。

这里的问题在于arena是全局的 jemalloc和tcmalloc都针对这个做优化

tcmalloc的数据结构看参考链接

主要的优化 size分类,TheadCache

小对象分配

  • tcmalloc为每个线程分配了一个线程本地ThreadCache,小内存从ThreadCache分配,此外还有个中央堆(CentralCache),ThreadCache不够用的时候,会从CentralCache中获取空间放到ThreadCache中。
  • 小对象(<=32K)从ThreadCache分配,大对象从CentralCache分配。大对象分配的空间都是4k页面对齐的,多个pages也能切割成多个小对象划分到ThreadCache中。
  • 小对象有将近170个不同的大小分类(class),每个class有个该大小内存块的FreeList单链表,分配的时候先找到best fit的class,然后无锁的获取该链表首元素返回。如果链表中无空间了,则到CentralCache中划分几个页面并切割成该class的大小,放入链表中。

CentralCache分配管理

  • 大对象(>32K)先4k对齐后,从CentralCache中分配。 CentralCache维护的PageHeap如下图所示, 数组中第256个元素是所有大于255个页面都挂到该链表中。

  • 当best fit的页面链表中没有空闲空间时,则一直往更大的页面空间则,如果所有256个链表遍历后依然没有成功分配。 则使用sbrk, mmap, /dev/mem从系统中分配。
  • tcmalloc PageHeap管理的连续的页面被称为span. 如果span未分配, 则span是PageHeap中的一个链表元素 如果span已经分配,它可能是返回给应用程序的大对象, 或者已经被切割成多小对象,该小对象的size-class会被记录在span中
  • 在32位系统中,使用一个中央数组(central array)映射了页面和span对应关系, 数组索引号是页面号,数组元素是页面所在的span。 在64位系统中,使用一个3-level radix tree记录了该映射关系。

回收

  • 当一个object free的时候,会根据地址对齐计算所在的页面号,然后通过central array找到对应的span。
  • 如果是小对象,span会告诉我们他的size class,然后把该对象插入当前线程的ThreadCache中。如果此时ThreadCache超过一个预算的值(默认2MB),则会使用垃圾回收机制把未使用的object从ThreadCache移动到CentralCache的central free lists中。
  • 如果是大对象,span会告诉我们对象锁在的页面号范围。 假设这个范围是[p,q], 先查找页面p-1和q+1所在的span,如果这些临近的span也是free的,则合并到[p,q]所在的span, 然后把这个span回收到PageHeap中。
  • CentralCache的central free lists类似ThreadCache的FreeList,不过它增加了一级结构,先根据size-class关联到spans的集合, 然后是对应span的object链表。如果span的链表中所有object已经free, 则span回收到PageHeap中。

jemalloc

  • 与tcmalloc类似,每个线程同样在<32KB的时候无锁使用线程本地cache。

  • Jemalloc在64bits系统上使用下面的size-class分类:

    categories Spacing Size
    Small 8 [8]
      16 [16, 32, 48, …, 128]
      32 [160, 192, 224, 256]
      64 [320, 384, 448, 512]
      128 [640, 768, 896, 1024]
      256 [1280, 1536, 1792, 2048]
      512 [2560, 3072, 3584]
    Large 4 KiB [4 KiB, 8 KiB, 12 KiB, …, 4072 KiB]
    Huge 4 KiB [4 MiB, 8 MiB, 12 MiB, …]
  • small/large对象查找metadata需要常量时间, huge对象通过全局红黑树在对数时间内查找。

  • 虚拟内存被逻辑上分割成chunks(默认是4MB,1024个4k页),应用线程通过round-robin算法在第一次malloc的时候分配arena, 每个arena都是相互独立的,维护自己的chunks, chunk切割pages到small/large对象。free()的内存总是返回到所属的arena中,而不管是哪个线程调用free()。

结构图

jemalloc 的内存分配,可分成四类:

  • small( size < min(arena中得bin) ):如果请求size不大于arena的最小的bin,那么就通过线程对应的tcache来进行分配。首先确定size的大小属于哪一个tbin,比如2字节的size就属于最小的8字节的tbin,然后查找tbin中有没有缓存的空间,如果有就进行分配,没有则为这个tbin对应的arena的bin分配一个run,然后把这个run里面的部分块的地址依次赋给tcache的对应的bin的avail数组,相当于缓存了一部分的8字节的块,最后从这个availl数组中选取一个地址进行分配
  • large( max(tcache中的块) > size > min(arean中的bin) ): 如果请求size大于arena的最小的bin,同时不大于tcache能缓存的最大块,也会通过线程对应的tcache来进行分配,但方式不同。首先看tcache对应的tbin里有没有缓存块,如果有就分配,没有就从chunk里直接找一块相应的page整数倍大小的空间进行分配(当这块空间后续释放时,这会进入相应的tcache对应的tbin里)
  • large( chunk > size > tcache ): 如果请求size大于tcache能缓存的最大块,同时不大于chunk大小(默认是4M),具体分配和第2类请求相同,区别只是没有使用tcache
  • huge(size> chunk ):如果请求大于chunk大小,直接通过mmap进行分配。

简而言之,就是:

小内存(small class): 线程缓存bin -> 分配区bin(bin加锁) -> 问系统要 中型内存(large class):分配区bin(bin加锁) -> 问系统要 大内存(huge class): 直接mmap组织成N个chunk+全局huge红黑树维护(带缓存)

回收流程大体和分配流程类似,有tcache机制的会将回收的块进行缓存,没有tcache机制的直接回收(不大于chunk的将对应的page状态进行修改,回收对应的run;大于chunk的直接munmap)。需要关注的是jemalloc何时会将内存还给操作系统,因为ptmalloc中存在因为使用top_chunk机制(详见华庭的文章)而使得内存无法还给操作系统的问题。目前看来,除了大内存直接munmap,jemalloc还有两种机制可以释放内存:

  1. 当释放时发现某个chunk的所有内存都已经为脏(即分配后又回收)就把整个chunk释放
  2. 当arena中的page分配情况满足一个阈值时对dirty page进行purge(通过调用madvise来进行)。这个阈值的具体含义是该arena中的dirty page大小已经达到一个chunk的大小且占到了active page的1/opt_lg_dirty_mult(默认为1/32)。active page的意思是已经正在使用中的run的page,而dirty page就是其中已经分配后又回收的page。

Ref

这里把搜集的一些资料列举一下,后续做整理

contact

Read More

c++17 std::pmr::polymorphic_allocator

why

了解学习allocator

源标题 An Allocator is a Handle to a Heap by Arthur O’Dwyer


object和value语义

object有地址,value无地址。

allocator是否需要状态?

c++14 以前:stateless allocator

想要个statefull allocator怎么办 以前的方法是实现一个allocator

template <class T>
struct Bad{
    alignas(16)  char data[1024];
    size_t used = 0;
    T* allocate(size_t){
        auto k = n*sizeof(T);
        used+=k;
        return(T*)(data+used-k);
    }
};

现在的方法是->std::pmr::polymorphic_allocator + memory_resource

可以继承memory_resource 来实现statefull, 从allocator中拆分出来

template <class T>
struct trivial_res : std::pmr::memory_resource {
    alignas(16)  char data[1024];
    size_t used = 0;
    T* allocate(size_t){
        auto k = n*sizeof(T);
        used+=k;
        return(T*)(data+used-k);
    }
};
trivial_res<int> mr;
std::vector<int, polymorphic_allocator<int>> vec(&mr);

allocator本身还是值语义的,不需要考虑拷贝移动背后的危险(statefull allocator就会有这种问题)

重新实现allocator

语义上只负责分配,(调用全局new delete)就是个singleton

std::pmr::memory_resource就是为了提供存储空间,剩下的由allocator调用

std::pmr::new_delete_resource 实际上就是个singleton::get

std::pmr::polymorphic_allocator只要持有memory_resource的指针就行了

还要注意,这里allocator仅能有copy语义而不能有move语义。见参考链接2

rebind

之前一直不理解rebind 作者列出了一个rebindable的例子 https://wandbox.org/permlink/mHrj7Y55k3Gqu4Q5

实际上是各种容器内部实现的区别,比如vector<int> 内部allocator分配的T就是int,但是 list<T>就不一样了,内部实际上是Node<int> 由于这种原因才有不同的allocator构造接口(rebind接口)

要让allocator和T无关,这就回到了 std::pmr这个上了,干掉背后的类型,虚函数来搞,T交给背后的memory_resource,看起来好像这里的allocator就起到了一个指针的作用 allocator_traits<AllocT>::pointer可能是T*,也可能是藏了好几层的玩意儿,fancy pointer,比如boost::interprocess::offset_ptr<T>

fancy pointer

此外,这个指针还有问题,比如他到底在堆还是在栈中?有可能都在,也就是fancy pointer场景,比如std::list<T> 声明一个局部对象,考虑list的内部实现,头结点是对象本身持有的,分配在栈上,但是其他链表结点是在堆中的。

这个话题太长了,Bob Steagall 也有个PPT,100多页,还要看看消化一下这里暂时跳过

reference

  1. https://github.com/CppCon/CppCon2018/blob/master/Presentations/an_allocator_is_a_handle_to_a_heap/an_allocator_is_a_handle_to_a_heap__arthur_odwyer__cppcon_2018.pdf

  2. allocator的move问题 https://cplusplus.github.io/LWG/issue2593

  3. 关于fancy pointer的深度讨论http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0773r0.html

  4. 关于allocator的讨论https://www.zhihu.com/question/274802525

    1. 这个讨论里提到了cppcon2015 allocator Is to Allocation what vector Is to Vexation by Andrei Alexandrescu 有时间总结一下
  5. rebind的讨论https://bbs.csdn.net/topics/200079053 结论https://www.cnblogs.com/whyandinside/archive/2011/10/23/2221675.html

    连接中的rebind指的语义上的rebind,作者的rebind例子是是这样的 注意rebind copy ctor和move ctor

    #include <list>
    #include <vector>
    #include <memory>
    #include <stdio.h>
       
    namespace Stateless {
    template<class T>
    struct A {
        A() { puts("default-constructed"); }
        A(const A&) { puts("copy-constructed"); }
        A(A&&) { puts("move-constructed"); }
        void operator=(const A&) { puts("copy-assigned"); }
        void operator=(A&&) { puts("move-assigned"); }
       
        template<class U>
        A(const A<U>&) { puts("rebind-copy-constructed"); }
        template<class U>
        A(A<U>&&) { puts("rebind-move-constructed"); }
       
        using value_type = T;
        T *allocate(size_t n) { return std::allocator<T>{}.allocate(n); }
        void deallocate(T *p, size_t n) { return std::allocator<T>{}.deallocate(p, n); }
    };
    static_assert(std::allocator_traits<A<int>>::is_always_equal::value == true);
    } // namespace Stateless
       
    namespace Stateful {
    template<class T>
    struct A {
        int i = 0;
        A() { puts("default-constructed"); }
        A(const A&) { puts("copy-constructed"); }
        A(A&&) { puts("move-constructed"); }
        void operator=(const A&) { puts("copy-assigned"); }
        void operator=(A&&) { puts("move-assigned"); }
       
        template<class U>
        A(const A<U>&) { puts("rebind-copy-constructed"); }
        template<class U>
        A(A<U>&&) { puts("rebind-move-constructed"); }
           
        using value_type = T;
        T *allocate(size_t n) { return std::allocator<T>{}.allocate(n); }
        void deallocate(T *p, size_t n) { return std::allocator<T>{}.deallocate(p, n); }
    };
    static_assert(std::allocator_traits<A<int>>::is_always_equal::value == false);
    } // namespace Stateful
       
    template<template<class...> class CONTAINER>
    void test()
    {
        puts(__PRETTY_FUNCTION__);
        puts("--------Stateless:--------");
        {
            using namespace Stateless;
            puts("--- during default-construction:");
            CONTAINER<int, A<int>> a;
            puts("--- during copy-construction:");
            CONTAINER<int, A<int>> b(a);
            puts("--- during move-construction:");
            CONTAINER<int, A<int>> c(std::move(a));
            puts("--- during destructions:");
        }
        puts("--------Stateful:--------");
        {
            using namespace Stateful;
            puts("--- during default-construction:");
            CONTAINER<int, A<int>> a;
            puts("--- during copy-construction:");
            CONTAINER<int, A<int>> b(a);
            puts("--- during move-construction:");
            CONTAINER<int, A<int>> c(std::move(a));
            puts("--- during destructions:");
        }
    }
       
    int main()
    {
        test<std::list>();
        test<std::vector>();
    }
    

看到这里或许你有建议或者疑问或者指出我的错误,我的邮箱wanghenshui@qq.com 先谢指教。或者到博客上提issue 我能收到邮件提醒。

Read More

c++17引入的函数介绍 std::launder

why

在semimap的ppt里见到了std::launder,一个c++17引入的函数,学习了解一下


看提案就能大概了解std::launder到底是为了啥,解决掉placement new和reinterpret_cast在const场景下带来的未定义行为,比如这个例子

struct X { const int n; };
union U { X x; float f; };
void tong() {
  U u = 1;
  u.f = 5.f;               // OK, creates new subobject of 'u'
  X *p = new (&u.x) X {2}; // OK, creates new subobject of 'u'
  assert(p->n == 2);       // OK
  assert(*std::launder(&u.x.n) == 2); // OK

  // undefined behavior, 'u.x' does not name new subobject
  assert(u.x.n == 2);
}

如果内部有const,可能编译器会优化掉访问值的行为,std::launder就是为了解决掉这个场景,阻止编译器优化

std::launder acts as an optimization barrier that prevents the optimizer from performing constant propagation.

实现,就是一层阻止优化的封装,参考链接3中详细列举了gcc实现以及相应的问题

folly的实现见参考链接5,因为是应用,就封装了一层

template <typename T>
FOLLY_NODISCARD inline T* launder(T* in) noexcept {
#if FOLLY_HAS_BUILTIN(__builtin_launder) || __GNUC__ >= 7
  // The builtin has no unwanted side-effects.
  return __builtin_launder(in);
#elif __GNUC__
  // This inline assembler block declares that `in` is an input and an output,
  // so the compiler has to assume that it has been changed inside the block.
  __asm__("" : "+r"(in));
  return in;
#elif defined(_WIN32)
  // MSVC does not currently have optimizations around const members of structs.
  // _ReadWriteBarrier() will prevent compiler reordering memory accesses.
  _ReadWriteBarrier();
  return in;
#else
  static_assert(
      false, "folly::launder is not implemented for this environment");
#endif
}

对于gcc环境,看asm那条就懂了,优化让他强制访问地址而不是读const

llvm的实现见参考链接6,比较复杂,就不列代码了

reference

  1. 提案http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0532r0.pdf
  2. https://zh.cppreference.com/w/cpp/utility/launder 翻译的偏学术,结合例子看更佳
  3. 这个博客讲的很好https://miyuki.github.io/2016/10/21/std-launder.html
    1. 其中涉猎的很多链接我都是先看了一遍,才搜到这个博客。。走了点弯路
    2. 我也搜了launder的实现,结果发现这个博客已经列举了。
  4. 上面的博客援引的链接 ,也是用的同样的实例https://stackoverflow.com/questions/39382501/what-is-the-purpose-of-stdlaunder
  5. folly的实现 https://github.com/facebook/folly/blob/master/folly/lang/Launder.h
  6. llvm的实现https://reviews.llvm.org/D40218

contact

Read More

class template argument deduction for everyone

演讲人是Stephan T. Lavavej ,负责msvc stl的

这个演讲的主题就是Class template argument deduction,简称CTAD,之前也接触过。借这个ppt彻底顺一下


简而言之这个c++17新特性给了类模板像函数模板一样的推导能力,可以省去指定参数,让编译器帮你推导

std::vector v{2}; //推导成vector<size_t>
lock_guard l(mtx); //推导成lock_guard<mutex>

还有pair,tuple 以及智能指针,以及guard,从此make_函数基本可以下岗

对比函数模板的推导,为什么类模板之前没有推导,以pair举例

  • 可能推导的类型很多
    • pair<int, int> p(11,22); 如果不指定int,有可能是long,类型很多
    • 解决办法,make_pair,通过函数模板转发和退化forwarding and decay
      • 缺点,低效,是个函数调用,调试烦人,且很繁琐(其实还行吧,多打个make_)

基本语法 初始化(){},直接初始化和赋值初始化

应用的场景

  • 容器,包括pair和tuple

    • 容器的范围初始化

      list<pair<int, string>> lst; 
      ...;//添加元素等等
      vector v(lst.begin(),lst.end());
      
  • lock_guard

  • functor 比如sort中的greater<\void>{}

遇到的错误以及引起的原因

pair p{};// 无法推导类型,不像上面的greater, 没有默认参数
array arr = {1,2,3.14};//推导不出个数
shared_ptr sp(new string);//由于数组指针的退化,这个指针类型区分不出是数组指针还是普通指针,shared_ptr<string> shared_ptr<string []>

上面的例子也引入了,缺少信息和歧义的推导条件会错误,这和函数模板类似

  • P1021提案 设计的场景可以fix
  • 别名模板,显式模板参数这两个场景无法使用CTAD
  • 需要推导指引,比如array,继承构造函数
推导指引

比如字符串,类型很容易推导成char[N] ,如果想要string,就需要写推导指引规则

能构造不等于能推导
template <typename A, typename B> 
struct Peppermint{
    explicit Peppermint(const A&){}
};
Peppermint<int, double> p1{11};
Peppermint p2{22};//err

第二种情况缺少了参数,也没有提示参数,推导不出来

再比如指针

template <typename A, typename B>
struct jazz{
    jazz(A*, B*){}
};
int i = 1;
const double d = 3.14;
jazz j1{&i, &d};
jazz j2{&i, nullptr};//err
  • CTAD自动推导原则:模板参数构造中都带上了,或者有默认的
  • 不能自动推导的原因
    • 模板参数没带上
    • 提供的参数没有帮助推导的额外信息 nullptr
    • 参数不可推导 list<T>::iterator

一个推导规则

template <typename T>
struct MyVec{
    template <typename It> MyVec(It,It){}
};
template <typename it> MyVec(It,It) -> MyVec<typename iterator_traits<It>::value_type>;//建立关系
int *ptr = nullptr;
MyVec v(ptr,ptr);
  • 如果是转发构造呢,CTAD是支持的,而make_函数有退化

值退化

reference

  1. https://zh.cppreference.com/w/cpp/language/class_template_argument_deduction
  2. 之前也提到过这个东西https://wanghenshui.github.io/2018/08/17/overload-trick
  3. ppt地址 https://github.com/CppCon/CppCon2018/tree/master/Presentations/class_template_argument_deduction_for_everyone

contact

Read More

rocksdb 小知识 WalTerminationPoint

这个数据是在WriteBatch中的,和写WAL相关,数据结构是SavePoint

  // When sending a WriteBatch through WriteImpl we might want to
  // specify that only the first x records of the batch be written to
  // the WAL.
  SavePoint wal_term_point_;
struct SavePoint {
  size_t size;  // size of rep_
  int count;    // count of elements in rep_
  uint32_t content_flags;

  SavePoint() : size(0), count(0), content_flags(0) {}

  SavePoint(size_t _size, int _count, uint32_t _flags)
      : size(_size), count(_count), content_flags(_flags) {}

  void clear() {
    size = 0;
    count = 0;
    content_flags = 0;
  }

  bool is_cleared() const { return (size | count | content_flags) == 0; }
};

具体用法看这个测试用例,在db/fault_injection_test.cc

TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
  ReadOptions ro;
  Options options = CurrentOptions();
  options.env = env_;

  WriteOptions wo;
  wo.sync = true;
  wo.disableWAL = false;
  WriteBatch batch;
  batch.Put("cats", "dogs");
  batch.MarkWalTerminationPoint();
  batch.Put("boys", "girls");
  ASSERT_OK(db_->Write(wo, &batch));

  env_->SetFilesystemActive(false);
  NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
  ASSERT_OK(OpenDB());

  std::string val;
  ASSERT_OK(db_->Get(ro, "cats", &val));
  ASSERT_EQ("dogs", val);
  ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
}

能看到wal termination point 的作用,就是在WAL层的一层隔离,这个点之后的kv不写入wal,除非clear,从writebatch append能看出来

Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
                                  const bool wal_only) {
  size_t src_len;
  int src_count;
  uint32_t src_flags;

  const SavePoint& batch_end = src->GetWalTerminationPoint();

  if (wal_only && !batch_end.is_cleared()) {
    src_len = batch_end.size - WriteBatchInternal::kHeader;
    src_count = batch_end.count;
    src_flags = batch_end.content_flags;
  } else {
    src_len = src->rep_.size() - WriteBatchInternal::kHeader;
    src_count = Count(src);
    src_flags = src->content_flags_.load(std::memory_order_relaxed);
  }

  SetCount(dst, Count(dst) + src_count);
  assert(src->rep_.size() >= WriteBatchInternal::kHeader);
  dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
  dst->content_flags_.store(
      dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
      std::memory_order_relaxed);
  return Status::OK();
}

contact

Read More

rocksdb 目录创建的一个坑

虽然有CreateIfMiss,但有时候目录还会创建失败,场景就是级联目录 rocksdb没有这种语义

在env_posix.cc 里,实现如下

  virtual Status CreateDirIfMissing(const std::string& name) override {
    Status result;
    if (mkdir(name.c_str(), 0755) != 0) {
      if (errno != EEXIST) {
        result = IOError("While mkdir if missing", name, errno);
      } else if (!DirExists(name)) { // Check that name is actually a
                                     // directory.
        // Message is taken from mkdir
        result = Status::IOError("`"+name+"' exists but is not a directory");
      }
    }
    return result;
  };

如果报错 While mkdir if missing dirxx 就是这里了。

当然接口是透明的,rocksdb推荐自己实现env,实现对应的接口就可以了

mkdir(1)命令是有-p指令的(p for parent) 而mkdir(3)函数接口没有这个语义,所以就得循环调用,参考链接1有个粗糙版本

可以看看mkdir(1)的实现,见参考链接2

reference

  1. mkdir 多级目录 https://www.jianshu.com/p/4468e388f85c
  2. mkdir linux实现 <https://github.com/coreutils/coreutils/blob/master/src/mkdir.c
  3. mkdir api https://linux.die.net/man/3/mkdir

或者到博客上提issue 我能收到邮件提醒。

Read More

rocksdb merge_test 单测不过问题定位

背景,给key加了个字段,改写了了WriteBatch, 每组数据会重写,加这个字段。

问题就是mergetest不能过,具体未通过的代码片

  {
    std::cout << "Test merge in memtable... \n";
    size_t max_merge = 5;
    auto db = OpenDb(dbname, use_ttl, max_merge);
    MergeBasedCounters counters(db, 0);
    testCounters(counters, db.get(), compact);
    testSuccessiveMerge(counters, max_merge, max_merge * 2);
    testSingleBatchSuccessiveMerge(db.get(), 5, 7);
    DestroyDB(dbname, Options());
  }

其中,testSuccessiveMerge未通过,代码是这样的

void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
                         size_t num_merges) {

  counters.assert_remove("z");
  uint64_t sum = 0;

  for (size_t i = 1; i <= num_merges; ++i) {
    resetNumMergeOperatorCalls();
    counters.assert_add("z", i);
    sum += i;

    if (i % (max_num_merges + 1) == 0) {
      assert(num_merge_operator_calls == max_num_merges + 1);
    } else {
      assert(num_merge_operator_calls == 0);
    }

    resetNumMergeOperatorCalls();
    assert(counters.assert_get("z") == sum);
    assert(num_merge_operator_calls == i % (max_num_merges + 1));
  }
}

正常情况下,i=6的时候,会触发内部的merge,此时num_merge_operator_calls==6

测试代码逻辑:定义了MergeOperator,实际上assert_add调用是MergeBasedCounters::add, 里面调用了merge

  // mapped to a rocksdb Merge operation
  virtual bool add(const std::string& key, uint64_t value) override {
    char encoded[sizeof(uint64_t)];
    EncodeFixed64(encoded, value);
    Slice slice(encoded, sizeof(uint64_t));
    auto s = db_->Merge(merge_option_, key, slice);
...
  }

其实merge底层还是write,但是在内部会判断key是否存在,有seek get动作

我开始一直在在merge_test.cc中加各种打印,猜测是不是数据集的问题,排除,编了个原生的merge_test,然后gdb,

watch num_merge_operator_calls=6

抓到了正常流程下的堆栈

#0  CountMergeOperator::Merge (this=0x7fffe0001320, key=..., existing_value=0x7fffffffc520, value=..., new_value=0x7fffffffc530,
    logger=0x7fffe00034b0) at db/merge_test.cc:44
#1  0x000000000060ded4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0x7fffe0001320, merge_in=..., merge_out=0x7fffffffc5b0)
    at db/merge_operator.cc:62
#2  0x0000000000608f74 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=0x7fffe0001320, key=..., value=value@entry=0x7fffffffc730,
    operands=..., result=0x7fffffffcdb0, logger=0x7fffe00034b0, statistics=0x0, env=0xbb8260 <rocksdb::Env::Default()::default_env>,
    result_operand=0x0, update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x0000000000600877 in rocksdb::SaveValue (arg=0x7fffffffc930, entry=<optimized out>) at db/memtable.cc:709
#4  0x00000000006a369a in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffc930,
    callback_func=0x6000c0 <rocksdb::SaveValue(void*, char const*)>) at memtable/skiplistrep.cc:77
#5  0x00000000005ff6df in rocksdb::MemTable::Get (this=0x7fffe0013a80, key=..., value=0x7fffffffcdb0, s=s@entry=0x7fffffffcd50,
    merge_context=merge_context@entry=0x7fffffffca60, range_del_agg=range_del_agg@entry=0x7fffffffcae0, seq=0x7fffffffca50, read_opts=...,
    callback=0x0, is_blob_index=0x0) at db/memtable.cc:830
#6  0x000000000057c8fb in Get (is_blob_index=0x0, callback=0x0, read_opts=..., range_del_agg=0x7fffffffcae0, merge_context=0x7fffffffca60,
    s=0x7fffffffcd50, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:203
#7  rocksdb::DBImpl::GetImpl (this=0x7fffe0002510, read_options=..., column_family=<optimized out>, key=..., pinnable_val=0x7fffffffce90,
    value_found=0x0, callback=0x0, is_blob_index=0x0) at db/db_impl.cc:1145
#8  0x000000000057d067 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=...,
    value=<optimized out>) at db/db_impl.cc:1084
#9  0x00000000006652cd in Get (value=0x7fffffffcdb0, key=..., column_family=0x7fffe000e8b8, options=..., this=0x7fffe0002510)
    at ./include/rocksdb/db.h:335
#10 rocksdb::MemTableInserter::MergeCF (this=0x7fffffffd170, column_family_id=0, key=..., value=...) at db/write_batch.cc:1497
#11 0x000000000065e491 in rocksdb::WriteBatch::Iterate (this=0x7fffffffd910, handler=handler@entry=0x7fffffffd170) at db/write_batch.cc:479
#12 0x00000000006623cf in rocksdb::WriteBatchInternal::InsertInto (write_group=..., sequence=sequence@entry=21, memtables=<optimized out>,
    flush_scheduler=flush_scheduler@entry=0x7fffe0002e80, ignore_missing_column_families=<optimized out>, recovery_log_number=0,
    db=0x7fffe0002510, concurrent_memtable_writes=false, seq_per_batch=false) at db/write_batch.cc:1731
#13 0x00000000005c203c in rocksdb::DBImpl::WriteImpl (this=0x7fffe0002510, write_options=..., my_batch=<optimized out>,
    callback=callback@entry=0x0, log_used=log_used@entry=0x0, log_ref=0, disable_memtable=false, seq_used=0x0, batch_cnt=0,
    pre_release_callback=0x0) at db/db_impl_write.cc:309
#14 0x00000000005c24c1 in rocksdb::DBImpl::Write (this=<optimized out>, write_options=..., my_batch=<optimized out>) at db/db_impl_write.cc:54
#15 0x00000000005c2972 in rocksdb::DB::Merge (this=this@entry=0x7fffe0002510, opt=..., column_family=column_family@entry=0x7fffe000ebf0,
    key=..., value=...) at db/db_impl_write.cc:1501
#16 0x00000000005c2a2f in rocksdb::DBImpl::Merge (this=0x7fffe0002510, o=..., column_family=0x7fffe000ebf0, key=..., val=...)
    at db/db_impl_write.cc:33
#17 0x0000000000512ecd in rocksdb::DB::Merge (this=0x7fffe0002510, options=..., key=..., value=...) at ./include/rocksdb/db.h:312
#18 0x00000000005121f8 in MergeBasedCounters::add (this=<optimized out>, key=..., value=<optimized out>) at db/merge_test.cc:236
---Type <return> to continue, or q <return> to quit---
#19 0x000000000050eda0 in assert_add (value=18, key=..., this=0x7fffffffdda0) at db/merge_test.cc:214
#20 (anonymous namespace)::testCounters (counters=..., db=0x7fffe0002510, test_compaction=test_compaction@entry=false) at db/merge_test.cc:287
#21 0x0000000000510119 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:442
#22 0x000000000040ebc0 in main (argc=1) at db/merge_test.cc:508

最终会调用之前已经添加到db中的merge_operator,merge_test里的merge_operator长这个样子

class CountMergeOperator : public AssociativeMergeOperator {
 public:
  CountMergeOperator() {
    mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
  }

  virtual bool Merge(const Slice& key,
                     const Slice* existing_value,
                     const Slice& value,
                     std::string* new_value,
                     Logger* logger) const override {
    assert(new_value->empty());
    ++num_merge_operator_calls;
    if (existing_value == nullptr) {
      new_value->assign(value.data(), value.size());
      return true;
    }
      ...

开始我还给gdb给merge加断点。触发的场景太多,还需要先断点testSuccessMerge,然后在断点merge,单步走也看不清楚,不熟悉流程。

所以只要捋顺清除为什么出问题的版本没走到调用get,调用merge_operator就可以了

开始调试出问题的merge_test,顺着正常流程的堆栈一步一步走,走到insertto都是正常的,走到mergeCF也能证明编码数据是没问题的,下面看为什么没有走Get

memtableinserter::mergeCF长这样 在write_batch.cc中

 virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
                         const Slice& value) override {
    assert(!concurrent_memtable_writes_);
    // optimize for non-recovery mode
    ...

    Status ret_status;
    MemTable* mem = cf_mems_->GetMemTable();
    auto* moptions = mem->GetImmutableMemTableOptions();
    bool perform_merge = false;

    // If we pass DB through and options.max_successive_merges is hit
    // during recovery, Get() will be issued which will try to acquire
    // DB mutex and cause deadlock, as DB mutex is already held.
    // So we disable merge in recovery
    if (moptions->max_successive_merges > 0 && db_ != nullptr &&
        recovering_log_number_ == 0) {
      LookupKey lkey(key, sequence_);

      // Count the number of successive merges at the head
      // of the key in the memtable
      size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);

      if (num_merges >= moptions->max_successive_merges) {
        perform_merge = true;
      }
    }

    if (perform_merge) {
      // 1) Get the existing value
      std::string get_value;

      // Pass in the sequence number so that we also include previous merge
      // operations in the same batch.
      SnapshotImpl read_from_snapshot;
      read_from_snapshot.number_ = sequence_;
      ReadOptions read_options;
      read_options.snapshot = &read_from_snapshot;

      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
      if (cf_handle == nullptr) {
        cf_handle = db_->DefaultColumnFamily();
      }
      db_->Get(read_options, cf_handle, key, &get_value);
      Slice get_value_slice = Slice(get_value);

      // 2) Apply this merge
      auto merge_operator = moptions->merge_operator;
      assert(merge_operator);

      std::string new_value;

      Status merge_status = MergeHelper::TimedFullMerge(
          merge_operator, key, &get_value_slice, {value}, &new_value,
          moptions->info_log, moptions->statistics, Env::Default());

      if (!merge_status.ok()) {
        // Failed to merge!
        // Store the delta in memtable
        perform_merge = false;
      } else {
        // 3) Add value to memtable
        bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
        if (UNLIKELY(!mem_res)) {
          assert(seq_per_batch_);
          ret_status = Status::TryAgain("key+seq exists");
          const bool BATCH_BOUNDRY = true;
          MaybeAdvanceSeq(BATCH_BOUNDRY);
        }
      }
    }

    if (!perform_merge) {
      // Add merge operator to memtable
      bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
      if (UNLIKELY(!mem_res)) {
        assert(seq_per_batch_);
        ret_status = Status::TryAgain("key+seq exists");
        const bool BATCH_BOUNDRY = true;
        MaybeAdvanceSeq(BATCH_BOUNDRY);
      }
    }

...
  }

上面的不相关代码省略了,出问题的merge_test没走get,直接走的add,也就没有merge,这两个分支的关键点就是perform_merge的值,也就是mem->CountSuccessiveMergeEntries的值, 加了断点打印了一下,结果是0,所以问题进一步缩小,这个函数在memtable.cc 中,长这个样子

size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
  Slice memkey = key.memtable_key();

  // A total ordered iterator is costly for some memtablerep (prefix aware
  // reps). By passing in the user key, we allow efficient iterator creation.
  // The iterator only needs to be ordered within the same user key.
  std::unique_ptr<MemTableRep::Iterator> iter(
      table_->GetDynamicPrefixIterator());
  iter->Seek(key.internal_key(), memkey.data());
  size_t num_successive_merges = 0;

  for (; iter->Valid(); iter->Next()) {
    const char* entry = iter->key();
    uint32_t key_length = 0;
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (!comparator_.comparator.user_comparator()->Equal(
            UserKeyFromRawInternalKey(iter_key_ptr, key_length),
            key.user_key())) {
      break;
    }

    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
    ValueType type;
    uint64_t unused;
    UnPackSequenceAndType(tag, &unused, &type);
    if (type != kTypeMerge) {
      break;
    }

    ++num_successive_merges;
  }

  return num_successive_merges;
}

我加了断点,根本没有进循环,也就是说 iter->Valid()=false 到这里,已经很清晰了,就是SeekKey的问题,之前添加的字段这个Prefixiterator可能处理不了,需要预处理一下

这里又涉及到了一个非常恶心的问题,userkey internal key memkey区别到底在哪里

本身一个key Slice,会被Lookupkey重新编码一下,编码内部的memkey就是原来的key。internalkey是带上sequence number的memkey

理清这些概念,注意到背景,修改lookupkey初始化。因为进入mergeCf之前已经编码了要加的字段,lookupkey有加了一次,需要预处理一下。结束。

#  删除断点
delete 5
# 单步调试想返回,需要提前record
record
reverse-next
record stop

reference

  1. 官方文档 https://github.com/facebook/rocksdb/wiki/Merge-Operator

或者到博客上提issue 我能收到邮件提醒。

Read More

^