Predixy

layout: post title: predixy代码走读 categories: [database] tags: [proxy,redis,predixy]


我发现很多公司用redis proxy,开源选型都用了predixy,所以来读一读

入口是handler->run函数

    Request::init();
    Response::init();
    auto conf = mProxy->conf();
    refreshServerPool();
    while (!mStop) {
        mEventLoop->wait(100000, this);
        postEvent();
        long timeout = conf->clientTimeout();
        if (timeout > 0) {
            int num = checkClientTimeout(timeout);
            if (num > 0) {
                postEvent();
            }
        }
        refreshServerPool();
        checkConnectionPool();
        timeout = mProxy->serverPool()->serverTimeout();
        if (timeout > 0) {
            int num = checkServerTimeout(timeout);
            if (num > 0) {
                postEvent();
            }
        }
    }

其中 eventloop的wait是epoll_wait的封装

template<class T>
int EpollMultiplexor::wait(long usec, T* handler)
{
    int timeout = usec < 0 ? usec : usec / 1000;
    logVerb("epoll wait with timeout %ld usec", usec);
    int num = epoll_wait(mFd, mEvents, MaxEvents, timeout);
    logVerb("epoll wait return with event count:%d", num);
    if (num == -1) {
        if (errno == EINTR) {
            return 0;
        }
        Throw(EpollWaitFail, "handler %d epoll wait fail %s", handler->id(), StrError());
    }
    for (int i = 0; i < num; ++i) {
        Socket* s = static_cast<Socket*>(mEvents[i].data.ptr);
        int evts = 0;
        evts |= (mEvents[i].events & EPOLLIN) ? ReadEvent : 0;
        evts |= (mEvents[i].events & EPOLLOUT) ? WriteEvent : 0;
        evts |= (mEvents[i].events & (EPOLLERR|EPOLLHUP)) ? ErrorEvent : 0;
        handler->handleEvent(s, evts);
    }
    return num;
}

hanldeEvent根据来的连接类型,处理不同的connect,listen,accept和connect(主要是后面这俩)

如果是读事件,就直接读了,一口气读完?readEvent

主要是把epoll拿到的fd和事件进行处理,标记,主要是addPostEvent来做,标记之后,真正的epoll_ctl动作封装在postEvent中,内部有postConnectConnectionEvent和postAcceptConnectionEvent来处理

在这样框架下,主要的工作就在于维护内部的数据

转发,有接收前端消息有发送后端消息

  • PostConnectConn的列表,PostAcceptConnect的列表,主要是addPostEvent和PostEvent来维护改动
  • epoll_wait拿到的事件属性也保存了,根据这个属性,在post函数中处理,如果是写事件,一次写不完,重新加数据,下次写。总之不要出现互相影响(一个重要的优化点)
    • 在处理期间会检测这个connect是不是正常的,不正常直接修改成err
  • PostConnectConn针对不同的命令区分成共用的和私有的
ref
  1. https://github.com/joyieldInc/predixy

Read More

代码覆盖率以及gcov lcov


why

QA要求对接gcov,不得已研究了一下


gcov是gcc自带的,需要两个编译选项和一个链接选项,

如果代码是makefile组织的,很简单

指定编译flag加上-fprofile-arcs -ftest-coverage,链接flag加上lgcov

比如rocksdb的写法

coverage:
	$(MAKE) clean
	COVERAGEFLAGS="-fprofile-arcs -ftest-coverage" LDFLAGS+="-lgcov" $(MAKE) J=1 all check
	cd coverage && ./coverage_test.sh
        # Delete intermediate files
	$(FIND) . -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;

如果是cmake组织的也很好办,cmake配置见参考链接1,写的很周全

我的项目也是cmake组织的,简化了一下

if(USE_GCOV)
	message(STATUS "using gcov")
	SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage" )
#   SET(GCC_COVERAGE_LINK_FLAGS "-lgcov")
#   SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} ${GCC_COVERAGE_LINK_FLAGS}" )               
    target_link_libraries(binary-namexxx gcov) 
endif(USE_GCOV)

其中两行set和下面的target_link_libraries是一个意思,set写法需要把所有的link管理都放在CMAKE_EXE_LINKER_FLAGS里。我的项目都是用target_link_libraries处理的,就放弃了这种写法

cmake生成makefile指定一下USE_GCOV就可以了,比如cmake -DUSE_GCOV=1

编译成功之后需要使用lcov进行分析,有两类文件.gcda .gcno需要分析

其中gcno是编译就会生成的,gcda是运行才会生成的。所以需要执行以下编译好的二进制文件

如果程序不会自动退出,需要graceful杀掉

ps -ef| pgrep binary-namexxx |while read -r num
do 
    kill -15 "$num"
done

或者

pkill binary-namexxx

注意,杀掉后可能停的比较慢,gcda文件handle还在写有可能破坏,手动延时一下

注意,两个文件所在的目录,一般来说,在二进制目录下,如果是cmake,在build/CMakeFile/binary-namexxx.dir/ 下,可以去目录下确定是否生成了文件

lcov安装可以yum或者git clone make install,github有教程

lcov依赖PerlIO::gzip和JSON,如果有报错,可以cpan安装或者手动下载,没报错跳过

cpan PerlIO::gzip
cpan JSON
# or
cd xxx
perl Makefile.PL 
make install

安装好后

执行

lcov -d . -c -o report.info

如果没有明显的报错提示,就是成功,然后生成report即可

genhtml report.info -o result
Reading data file test.info
Found 28 entries.
Found common filename prefix "/usr/local/include/c++"
Writing .css and .png files.
...
Processing file 5.4.0/ext/aligned_buffer.h
Writing directory view page.
Overall coverage rate:
  lines......: 84.5% (410 of 485 lines)
  functions..: 84.8% (28 of 33 functions)

就结束了,可以点开html观看

另外,参考链接中有gcov原理,可以学习一下

ref

  1. https://codeist.zone/2018/03/16/code-coverage-with-gcov-and-cmake/
  2. rocksdb coverage
    1. https://github.com/facebook/rocksdb/blob/5dc9fbd1175ad10454b877d9044c4b909d00ae3b/Makefile
    2. https://github.com/facebook/rocksdb/blob/master/coverage/coverage_test.sh
  3. lcov https://github.com/linux-test-project/lcov
  4. gcov 原理 http://rdc.hundsun.com/portal/article/704.html

contact

Read More

DeleteRange以及删除范围key


Why

这是Rocksdb API使用记录。作为一个掌握的过程


DeleteRange见参考链接1

在5.18.3之前,只能Seek开头结尾,然后遍历调用删除

Slice start, end;
// set start and end
auto it = db->NewIterator(ReadOptions());

for (it->Seek(start); cmp->Compare(it->key(), end) < 0; it->Next()) {
  db->Delete(WriteOptions(), it->key());
}

后面增加了新的API DeleteRange,可以一次性搞定,注意,是半开半闭区间,end是不删除的

实现原理见参考链接23

如何删除整个CF?

如果CF名字不是rocksdb::kDefaultColumnFamilyName 可以直接调用Drop CF,然后重新Create CF就行了,如果是的话,直接Drop会报错,最好改掉,或者先SeekToFirstSeekToLast找到范围[first, end),然后DeleteRange,然后在删除end.挺繁琐的。最好还是命个名,就可以删Drop了

注意DropCF后需要deletehandle不然会泄露

ref

  1. https://github.com/facebook/rocksdb/wiki/DeleteRange
  2. https://github.com/facebook/rocksdb/wiki/DeleteRange-Implementation
  3. https://rocksdb.org/blog/2018/11/21/delete-range.html
  4. https://stackoverflow.com/questions/54438787/faster-way-to-clear-a-column-family-in-rocksdb

contact

Read More

std::tie的原理,以及结构化绑定


why

std::tie既能解包,也能赋值,怎么做到的?

template< class... Types >
tuple<Types&...> tie( Types&... args ) noexcept;

一个使用的例子,见参考链接2,直接贴代码

  • 比较自定义结构体, 非常干净,生成tuple,在c++17前,比std::make_tuple干净一些
#include <string>
#include <tuple>

struct person {
  std::string first_name;
  std::string last_name;
  int age;
};

inline auto tie_members(const person & x) noexcept {
  return std::tie(x.first_name, x.last_name, x.age);
}

inline bool operator<(const person & lhs, const person & rhs) noexcept {
  return tie_members(lhs) < tie_members(rhs);
}
  • 返回值解包,这也是cppref上的例子
bool was_inserted;
std::tie(std::ignore, was_inserted) = some_set.insert(some_value);

能看出来,传的是引用,然后改引用了。参考链接三有更详细的解释

简单说,std::tie实际上是引用形态的std::tuple<T>,或者std::tuple<T&>,通过std::tuple::operator=()来干活

template< class... UTypes >
tuple& operator=( const tuple<UTypes...>& other );

先声明变量,然后把引用拿过来,然后修改引用,这样变量就修改了。是比较猥琐的引用用法,这种用法实现上层组件还是很干净的,但还是需要声明一个变量来搞引用

c++17 结构化绑定彻底干掉了第二种解包需求。第一种还是可以继续用的。第一种在c++17上和make_tuple没区别(make_tuple可以通过类模板推导来省掉尖括号)

ref

  1. https://zh.cppreference.com/w/cpp/utility/tuple/tie
  2. http://bajamircea.github.io/coding/cpp/2017/03/10/std-tie.html
    1. 作者博客不错,这篇文章可以看一眼http://bajamircea.github.io/coding/cpp/2018/05/06/alex-stepanov-pearls.html
  3. https://stackoverflow.com/questions/43762651/how-does-stdtie-work

contact

Read More

copy elision


Why

这是参考链接中ppt的总结

copy elision到底做了什么


RVO 返回值优化

这个在copy elision之前就有的优化,copy elision算是大一统理论?

简单说

std::string f(){
    ...
    std::string a;
    ...
    return a;
}

这里的a不会在stack上分配空间,而是直接放到返回值那个地址上。

Argument Passing 参数传递优化

在copy elision之前,传值的赋值就是复制,直接拷贝一份,有了copy elision,直接折叠掉

注意,产生条件只是右值,左值不会省略。因为左值地址不能直接用。

void f(std::string a){
    ...
    int b{23};
    ...
    return;
}
void g(){
    ...
    f(std::string{"A"});
    ...
}

这里原本是为这个临时变量在当前栈生成一个然后压栈再生成一个,直接折叠复用同一个。

ref

  1. https://github.com/boostcon/cppnow_presentations_2018/blob/master/lightning_talks/copy_elision__jon_kalb__cppnow_05092018.pdf

contact

Read More

valgrind sup文件的作用以及生成


redis的runtest支持valgrind,里面有这么一条

if {$::valgrind} {
        set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/redis-server $config_file > $stdout 2> $stderr &]
    } elseif ($::stack_logging) {
        set pid [exec /usr/bin/env MallocStackLogging=1 MallocLogFile=/tmp/malloc_log.txt src/redis-server $config_file > $stdout 2> $stderr &]
    } else {
        set pid [exec src/redis-server $config_file > $stdout 2> $stderr &]
    }

里面引用到sup文件https://github.com/antirez/redis/blob/unstable/src/valgrind.sup

sup表示suppress,避免valgrind出错的意思,这个文件定义一系列规则,valgrind检测的时候跳过这些触发条件,比如redis的是这样的

{
   <lzf_unitialized_hash_table>
   Memcheck:Cond
   fun:lzf_compress
}

{
   <lzf_unitialized_hash_table>
   Memcheck:Value4
   fun:lzf_compress
}

{
   <lzf_unitialized_hash_table>
   Memcheck:Value8
   fun:lzf_compress
}

对于提示unitialized hash table提醒,跳过不处理

再比如我遇到的一个epoll提示

...
==13711== Syscall param epoll_ctl(event) points to uninitialised byte(s)
==13711==    at 0x6E30CBA: epoll_ctl (in /usr/lib64/libc-2.17.so)
...
==13711==  Address 0xffefffad8 is on thread 1's stack
==13711==  in frame #1, created by xxxx
==13711==  Uninitialised value was created by a stack allocation
==13711==    at 0x561500: xxxx

这个很明显,valgrind有问题,这段代码没问题(问题不大)却告警了

  struct epoll_event ee;
  ee.data.fd = fd;
  ee.events = mask;
  return epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ee);

这个原因见参考链接,实际上需要加个memset,由于padding问题。

但是这步完全可以省掉,生成类似的sup规则

{
   <epoll_add>
   Memcheck:Param
   epoll_ctl(event)
   fun:epoll_ctl
}

读入,就会避免这些告警

如何生成sup规则?

规则肯定不是自己手写的,如果很多告警挨个手写也太低效了,实际上valgrind支持导出sup规则 ,见参考链接, 比如二进制是minimal

valgrind --leak-check=full --show-reachable=yes --error-limit=no --gen-suppressions=all --log-file=minimalraw.log ./minimal

另外,valgrind可以指定log文件。我一直都是重定向 >log 2>&1 特傻

也可以不像redis这样读入,直接写到 .valgrindrc里,套路类似bashrc(不过有污染也不好)

参考链接中的文章用了两步,先是上面这个命令,提取出log,然后log中已经有一系列sup信息了,再通过一个脚本parse一下。不过据我分析,第一步就能整理出来,不过有重复,冗余比较多,可以整理成上面epoll_ctl这种形式的四行一组就行了。

详情可以读参考链接中的文章。写的很好

ref

contact

Read More

一个 Valgrind Address is on Thread's 1 stack 搞笑场景


用valgrind扫了一遍模块,有个函数附近报错 ,简单说这个函数是读取配置文件,解析配置文件保存,提示Address is on Thread’s 1 stack,各种string越界

搜索了好几个案例,见参考链接,我仔细检查了这一系列函数,最后发现了问题。比较搞笑,就记录下来了

bool Config::Load()
{ 
  if (!FileExists(path_)) {
    return -1;
  }
  // read conf items

  char line[CONFLEN];
  char name[CONFLEN], value[CONFLEN];
  int line_len = 0;
  int name_len = 0, value_len = 0;
  int sep_sign = 0;
...
}

这个CONFLEN的长度是

static const int CONFLEN = 1024 * 1024;

也就是说是物理意义上的栈溢出。。我还以为代码有bug写穿了。。改小,改成一半,告警就消失了

案例中的第四个链接有点类似

ref

contact

Read More

valgrind跑一个rocksdb应用出现错误,以及背后的write hint


valgrind 3.10 日志是这样的

valgrind: m_syswrap/syswrap-linux.c:5255 (vgSysWrap_linux_sys_fcntl_before): Assertion 'Unimplemented functionality' failed.
valgrind: valgrind

host stacktrace:
==26531==    at 0x3805DC16: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x3805DD24: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x3805DEA6: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x380D53A0: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x380B0834: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x380AD242: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x380AE6F6: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)
==26531==    by 0x380BDD7C: ??? (in /usr/lib64/valgrind/memcheck-amd64-linux)

sched status:
  running_tid=1

Thread 1: status = VgTs_Runnable
==26531==    at 0x60903A4: fcntl (in /usr/lib64/libpthread-2.17.so)
==26531==    by 0x53571F6: rocksdb::PosixWritableFile::SetWriteLifeTimeHint(rocksdb::Env::WriteLifeTimeHint) (io_posix.cc:897)
...

找到rocksdb代码,接口是这样的

env/io_posix.cc

void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
#ifdef OS_LINUX
// Suppress Valgrind "Unimplemented functionality" error.
#ifndef ROCKSDB_VALGRIND_RUN
  if (hint == write_hint_) {
    return;
  }
  if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
    write_hint_ = hint;
  }
#else
  (void)hint;
#endif // ROCKSDB_VALGRIND_RUN
#else
  (void)hint;
#endif // OS_LINUX
}

…结果显然了,不支持fcntl F_SET_RW_HINT选项。

注意:

  • 如果需要跑valgrind,编译的rocksdb需要定义ROCKSDB_VALGRIND_RUN
  • 如果有必要,最好也定义PORTABLE,默认是march=native可能会遇到指令集不支持

特意搜了一下,这个参数是这样定义的

env/io_posix.cc

#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
#define F_LINUX_SPECIFIC_BASE 1024
#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
#endif

在linux中是这样的

https://github.com/torvalds/linux/blob/dd5001e21a991b731d659857cd07acc7a13e6789/include/uapi/linux/fcntl.h#L53

/*
 * Set/Get write life time hints. {GET,SET}_RW_HINT operate on the
 * underlying inode, while {GET,SET}_FILE_RW_HINT operate only on
 * the specific file.
 */
#define F_GET_RW_HINT		(F_LINUX_SPECIFIC_BASE + 11)
#define F_SET_RW_HINT		(F_LINUX_SPECIFIC_BASE + 12)
#define F_GET_FILE_RW_HINT	(F_LINUX_SPECIFIC_BASE + 13)
#define F_SET_FILE_RW_HINT	(F_LINUX_SPECIFIC_BASE + 14)

另外这里也移植了一个https://github.com/riscv/riscv-gnu-toolchain/blob/master/linux-headers/include/linux/fcntl.h

write life time hints

搜到了这个实现的patch https://patchwork.kernel.org/patch/9794403/

和这个介绍,linux 4.13引入的https://www.phoronix.com/scan.php?page=news_item&px=Linux-4.13-Write-Hints

简单说,这是为NVMe加上的功能,暗示写入数据是ssd的话,调度就会把数据尽可能靠近,这样方便后续回收

patchset讨论里还特意说到了rocksdb。。。https://lwn.net/Articles/726477/

降低写放大十分可观

A new iteration of this patchset, previously known as write streams. As before, this patchset aims at enabling applications split up writes into separate streams, based on the perceived life time of the data written. This is useful for a variety of reasons:

  • For NVMe, this feature is ratified and released with the NVMe 1.3 spec. Devices implementing Directives can expose multiple streams. Separating data written into streams based on life time can drastically reduce the write amplification. This helps device endurance, and increases performance. Testing just performed internally at Facebook with these patches showed up to a 25% reduction in NAND writes in a RocksDB setup.

  • Software caching solutions can make more intelligent decisions on how and where to place data.

这背后又有一个NVMe特性,Stream ID,https://lwn.net/Articles/717755/

一个用法,见pg的patch以及讨论 https://www.postgresql.org/message-id/CA%2Bq6zcX_iz9ekV7MyO6xGH1LHHhiutmHY34n1VHNN3dLf_4C4Q%40mail.gmail.com

这里还没有更深入讨论。只是罗列了资料。下班了,就到这里

ref

contacts

Read More


lsm-tree延迟分析


  • L0满, 无法接收 write-buffer不能及时Flush,阻塞客户端
    • 高层压缩占用IO
    • L0 L1压缩慢
    • L0 空间少
  • Flush 太慢,客户端阻塞 -> rate limiter限速

rocksdb的解决方案就是rate limiter限速

slik思路

(1)优先flushing和lower levels的compaction,低level的compaction优先于高level的compaction,这样保证flushing能尽快的写入level 0,level 0尽快compaction level 1,(a)尽量避免因为memtable到达上限卡client I/O,(b)尽量避免因为level 0的sstable文件数到达上限卡client I/O。实际实现的时候会有一个线程池专门用于Flushing;另外一个线程池用于其他的Compaction

(2)Preempting Compactions:支持抢占式Compaction,也就是高优先级的compaction可以抢占低优先级的compaction,也就是说另外一个用于L0~LN之间的compaction的线程池,优先级高的low level compaction可以抢占high level的compaction,这样L0->L1在这个线程池,最高优先级的compaction就能够在必要的时候抢占执行,保证尽量不会出现level 0的sstable数量超过阈值

(3)在low load的时候,给Internal Operation(compaction)分配更多的bandwidth,相反,在high load的时候,给Client operation分配更多的带宽,这样可以保证Compaction在适当的时候也能得到更多的处理,减少read放大和空间放大,这个调度策略也是基于通常的client operation load是波动这事实来设计的,如下图,就是一个真实环境的workload变化规律:

ref

  1. https://blog.csdn.net/Z_Stand/article/details/109683804
  2. https://zhuanlan.zhihu.com/p/77543818
  3. 论文地址 https://www.usenix.org/system/files/atc19-balmau.pdf

Read More

^