rocksdb 一些堆栈记录

一个堆栈信息,merge_test

(gdb) bt
#0  CountMergeOperator::Merge (this=0xebe410, key=..., existing_value=0x7fffffffd350, value=..., new_value=0x7fffffffd150, logger=0xebd220)
    at db/merge_test.cc:39
#1  0x00000000006123a4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0xebe410, merge_in=..., merge_out=0x7fffffffd1d0)
    at db/merge_operator.cc:62
#2  0x000000000060d3e4 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=merge_operator@entry=0xebe410, key=...,
    value=value@entry=0x7fffffffd350, operands=..., result=0x7fffffffd9e0, logger=0xebd220, statistics=0x0,
    env=0xbc9340 <rocksdb::Env::Default()::default_env>, result_operand=0x0, update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x0000000000604b7f in rocksdb::SaveValue (arg=0x7fffffffd550, entry=<optimized out>) at db/memtable.cc:735
#4  0x00000000006a81ea in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffd550,
    callback_func=0x604680 <rocksdb::SaveValue(void*, char const*)>) at memtable/skiplistrep.cc:77
#5  0x0000000000603c4f in rocksdb::MemTable::Get (this=0xed7150, key=..., value=0x7fffffffd9e0, s=s@entry=0x7fffffffd9d0,
    merge_context=merge_context@entry=0x7fffffffd680, range_del_agg=range_del_agg@entry=0x7fffffffd700, seq=0x7fffffffd670, read_opts=...,
    callback=0x0, is_blob_index=0x0) at db/memtable.cc:856
#6  0x0000000000580822 in Get (is_blob_index=0x0, callback=0x0, read_opts=..., range_del_agg=0x7fffffffd700, merge_context=0x7fffffffd680,
    s=0x7fffffffd9d0, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:203
#7  rocksdb::DBImpl::GetImpl (this=0xec3730, read_options=..., column_family=<optimized out>, key=..., pinnable_val=0x7fffffffd920,
    value_found=0x0, callback=0x0, is_blob_index=0x0) at db/db_impl.cc:1163
#8  0x0000000000580fb7 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=...,
    value=<optimized out>) at db/db_impl.cc:1098
#9  0x0000000000517da5 in rocksdb::DB::Get (this=this@entry=0xec3730, options=..., column_family=0xed0800, key=...,
    value=value@entry=0x7fffffffd9e0) at ./include/rocksdb/db.h:335
#10 0x00000000005196bc in Get (value=0x7fffffffd9e0, key=..., options=..., this=0xec3730) at ./include/rocksdb/db.h:345
#11 Counters::get (this=this@entry=0x7fffffffde10, key=..., value=value@entry=0x7fffffffda68) at db/merge_test.cc:172
#12 0x0000000000519a5c in Counters::assert_get (this=this@entry=0x7fffffffde10, key=...) at db/merge_test.cc:209
#13 0x000000000051296f in (anonymous namespace)::testCounters (counters=..., db=0xec3730, test_compaction=test_compaction@entry=false)
    at db/merge_test.cc:280
#14 0x0000000000513c62 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:433
#15 0x000000000040ebe0 in main (argc=1) at db/merge_test.cc:510

#0  rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0xebe410, merge_in=..., merge_out=0x7fffffffd1d0) at db/merge_operator.cc:68
#1  0x000000000060d3e4 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=merge_operator@entry=0xebe410, key=..., value=value@entry=0x0,
    operands=..., result=0x7fffffffd9e0, logger=0xebd220, statistics=0x0, env=0xbc9340 <rocksdb::Env::Default()::default_env>,
    result_operand=0x0, update_num_ops_stats=true) at db/merge_helper.cc:81
#2  0x0000000000604e2f in rocksdb::SaveValue (arg=0x7fffffffd550, entry=<optimized out>) at db/memtable.cc:757
#3  0x00000000006a81ea in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffd550,
    callback_func=0x604680 <rocksdb::SaveValue(void*, char const*)>) at memtable/skiplistrep.cc:77
#4  0x0000000000603c4f in rocksdb::MemTable::Get (this=0xed7150, key=..., value=0x7fffffffd9e0, s=s@entry=0x7fffffffd9d0,
    merge_context=merge_context@entry=0x7fffffffd680, range_del_agg=range_del_agg@entry=0x7fffffffd700, seq=0x7fffffffd670, read_opts=...,
    callback=0x0, is_blob_index=0x0) at db/memtable.cc:856
#5  0x0000000000580822 in Get (is_blob_index=0x0, callback=0x0, read_opts=..., range_del_agg=0x7fffffffd700, merge_context=0x7fffffffd680,
    s=0x7fffffffd9d0, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:203
#6  rocksdb::DBImpl::GetImpl (this=0xec3730, read_options=..., column_family=<optimized out>, key=..., pinnable_val=0x7fffffffd920,
    value_found=0x0, callback=0x0, is_blob_index=0x0) at db/db_impl.cc:1163
#7  0x0000000000580fb7 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=...,
    value=<optimized out>) at db/db_impl.cc:1098
#8  0x0000000000517da5 in rocksdb::DB::Get (this=this@entry=0xec3730, options=..., column_family=0xed0800, key=...,
    value=value@entry=0x7fffffffd9e0) at ./include/rocksdb/db.h:335
#9  0x00000000005196bc in Get (value=0x7fffffffd9e0, key=..., options=..., this=0xec3730) at ./include/rocksdb/db.h:345
#10 Counters::get (this=this@entry=0x7fffffffde10, key=..., value=value@entry=0x7fffffffda68) at db/merge_test.cc:172
#11 0x0000000000519a5c in Counters::assert_get (this=this@entry=0x7fffffffde10, key=...) at db/merge_test.cc:209
#12 0x0000000000512a1b in (anonymous namespace)::testCounters (counters=..., db=0xec3730, test_compaction=test_compaction@entry=false)
    at db/merge_test.cc:292
#13 0x0000000000513c62 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:433
#14 0x000000000040ebe0 in main (argc=1) at db/merge_test.cc:510

get内部会调用merge _operator


#0  (anonymous namespace)::UInt64AddOperator::Merge (this=0xec1500, existing_value=0x7fffffffd140, value=..., new_value=0x7fffffffd150,
    logger=0xebd220) at utilities/merge_operators/uint64add.cc:27
#1  0x00000000006123a4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0xebe410, merge_in=..., merge_out=0x7fffffffd1d0)
    at db/merge_operator.cc:62
#2  0x000000000060d3e4 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=merge_operator@entry=0xebe410, key=..., value=value@entry=0x0,
    operands=..., result=0x7fffffffd9e0, logger=0xebd220, statistics=0x0, env=0xbc9340 <rocksdb::Env::Default()::default_env>,
    result_operand=0x0, update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x0000000000604e2f in rocksdb::SaveValue (arg=0x7fffffffd550, entry=<optimized out>) at db/memtable.cc:757
#4  0x00000000006a81ea in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffd550,
    callback_func=0x604680 <rocksdb::SaveValue(void*, char const*)>) at memtable/skiplistrep.cc:77
#5  0x0000000000603c4f in rocksdb::MemTable::Get (this=0xed7150, key=..., value=0x7fffffffd9e0, s=s@entry=0x7fffffffd9d0,
    merge_context=merge_context@entry=0x7fffffffd680, range_del_agg=range_del_agg@entry=0x7fffffffd700, seq=0x7fffffffd670, read_opts=...,
    callback=0x0, is_blob_index=0x0) at db/memtable.cc:856
#6  0x0000000000580822 in Get (is_blob_index=0x0, callback=0x0, read_opts=..., range_del_agg=0x7fffffffd700, merge_context=0x7fffffffd680,
    s=0x7fffffffd9d0, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:203
#7  rocksdb::DBImpl::GetImpl (this=0xec3730, read_options=..., column_family=<optimized out>, key=..., pinnable_val=0x7fffffffd920,
    value_found=0x0, callback=0x0, is_blob_index=0x0) at db/db_impl.cc:1163
#8  0x0000000000580fb7 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=...,
    value=<optimized out>) at db/db_impl.cc:1098
#9  0x0000000000517da5 in rocksdb::DB::Get (this=this@entry=0xec3730, options=..., column_family=0xed0800, key=...,
    value=value@entry=0x7fffffffd9e0) at ./include/rocksdb/db.h:335
#10 0x00000000005196bc in Get (value=0x7fffffffd9e0, key=..., options=..., this=0xec3730) at ./include/rocksdb/db.h:345
#11 Counters::get (this=this@entry=0x7fffffffde10, key=..., value=value@entry=0x7fffffffda68) at db/merge_test.cc:172
#12 0x0000000000519a5c in Counters::assert_get (this=this@entry=0x7fffffffde10, key=...) at db/merge_test.cc:209
#13 0x0000000000512a1b in (anonymous namespace)::testCounters (counters=..., db=0xec3730, test_compaction=test_compaction@entry=false)
    at db/merge_test.cc:292
#14 0x0000000000513c62 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:433
#15 0x000000000040ebe0 in main (argc=1) at db/merge_test.cc:510

iter

#0  CountMergeOperator::Merge (this=0xd79bc0, key=..., existing_value=0x7fffffffd7d0, value=..., new_value=0x7fffffffd4c0, logger=0xeab050)
    at db/merge_test.cc:44
#1  0x000000000060ded4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0xd79bc0, merge_in=..., merge_out=0x7fffffffd540)
    at db/merge_operator.cc:62
#2  0x0000000000608f74 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=0xd79bc0, key=..., value=value@entry=0x7fffffffd7d0,
    operands=..., result=0x7fffe0000a40, logger=0xeab050, statistics=0x0, env=0xbb8260 <rocksdb::Env::Default()::default_env>,
    result_operand=0x7fffe0000a60, update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x00000000005c7c45 in rocksdb::DBIter::MergeValuesNewToOld (this=this@entry=0x7fffe0000960) at db/db_iter.cc:657
#4  0x00000000005c9070 in rocksdb::DBIter::FindNextUserEntryInternal (this=this@entry=0x7fffe0000960, skipping=skipping@entry=false,
    prefix_check=prefix_check@entry=false) at db/db_iter.cc:551
#5  0x00000000005ce37e in rocksdb::DBIter::FindNextUserEntry (this=0x7fffe0000960, skipping=<optimized out>, prefix_check=<optimized out>)
    at db/db_iter.cc:410
#6  0x00000000005c9f84 in rocksdb::DBIter::SeekToFirst (this=0x7fffe0000960) at db/db_iter.cc:1366
#7  0x000000000050e9e6 in (anonymous namespace)::dumpDb (db=db@entry=0xea9520) at db/merge_test.cc:250
#8  0x000000000050ed5a in (anonymous namespace)::testCounters (counters=..., db=0xea9520, test_compaction=test_compaction@entry=false)
    at db/merge_test.cc:280
#9  0x000000000050fff4 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:431
#10 0x000000000040ebc0 in main (argc=1) at db/merge_test.cc:508

dumpdb

0  CountMergeOperator::Merge (this=0xd79bc0, key=..., existing_value=0x7fffffffd540, value=..., new_value=0x7fffffffd550, logger=0xeab050)
    at db/merge_test.cc:44
#1  0x000000000060ded4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0xd79bc0, merge_in=..., merge_out=0x7fffffffd5d0)
    at db/merge_operator.cc:62
#2  0x0000000000608f74 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=0xd79bc0, key=..., value=value@entry=0x0, operands=...,
    result=0x7fffe0000a40, logger=0xeab050, statistics=0x0, env=0xbb8260 <rocksdb::Env::Default()::default_env>, result_operand=0x7fffe0000a60,
    update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x00000000005c7ae1 in rocksdb::DBIter::MergeValuesNewToOld (this=this@entry=0x7fffe0000960) at db/db_iter.cc:704
#4  0x00000000005c9070 in rocksdb::DBIter::FindNextUserEntryInternal (this=this@entry=0x7fffe0000960, skipping=skipping@entry=true,
    prefix_check=prefix_check@entry=false) at db/db_iter.cc:551
#5  0x00000000005c93b5 in FindNextUserEntry (prefix_check=false, skipping=true, this=0x7fffe0000960) at db/db_iter.cc:410
#6  rocksdb::DBIter::Next (this=0x7fffe0000960) at db/db_iter.cc:384
#7  0x000000000050ea1d in (anonymous namespace)::dumpDb (db=db@entry=0xea9520) at db/merge_test.cc:250
#8  0x000000000050ee14 in (anonymous namespace)::testCounters (counters=..., db=0xea9520, test_compaction=test_compaction@entry=false)
    at db/merge_test.cc:293
#9  0x000000000050fff4 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:431
#10 0x000000000040ebc0 in main (argc=1) at db/merge_test.cc:508

merge

#0  CountMergeOperator::Merge (this=0x7fffe0001320, key=..., existing_value=0x7fffffffc520, value=..., new_value=0x7fffffffc530,
    logger=0x7fffe00034b0) at db/merge_test.cc:44
#1  0x000000000060ded4 in rocksdb::AssociativeMergeOperator::FullMergeV2 (this=0x7fffe0001320, merge_in=..., merge_out=0x7fffffffc5b0)
    at db/merge_operator.cc:62
#2  0x0000000000608f74 in rocksdb::MergeHelper::TimedFullMerge (merge_operator=0x7fffe0001320, key=..., value=value@entry=0x7fffffffc730,
    operands=..., result=0x7fffffffcdb0, logger=0x7fffe00034b0, statistics=0x0, env=0xbb8260 <rocksdb::Env::Default()::default_env>,
    result_operand=0x0, update_num_ops_stats=true) at db/merge_helper.cc:81
#3  0x0000000000600877 in rocksdb::SaveValue (arg=0x7fffffffc930, entry=<optimized out>) at db/memtable.cc:709
#4  0x00000000006a369a in rocksdb::(anonymous namespace)::SkipListRep::Get (this=<optimized out>, k=..., callback_args=0x7fffffffc930,
    callback_func=0x6000c0 <rocksdb::SaveValue(void*, char const*)>) at memtable/skiplistrep.cc:77
#5  0x00000000005ff6df in rocksdb::MemTable::Get (this=0x7fffe0013a80, key=..., value=0x7fffffffcdb0, s=s@entry=0x7fffffffcd50,
    merge_context=merge_context@entry=0x7fffffffca60, range_del_agg=range_del_agg@entry=0x7fffffffcae0, seq=0x7fffffffca50, read_opts=...,
    callback=0x0, is_blob_index=0x0) at db/memtable.cc:830
#6  0x000000000057c8fb in Get (is_blob_index=0x0, callback=0x0, read_opts=..., range_del_agg=0x7fffffffcae0, merge_context=0x7fffffffca60,
    s=0x7fffffffcd50, value=<optimized out>, key=..., this=<optimized out>) at ./db/memtable.h:203
#7  rocksdb::DBImpl::GetImpl (this=0x7fffe0002510, read_options=..., column_family=<optimized out>, key=..., pinnable_val=0x7fffffffce90,
    value_found=0x0, callback=0x0, is_blob_index=0x0) at db/db_impl.cc:1145
#8  0x000000000057d067 in rocksdb::DBImpl::Get (this=<optimized out>, read_options=..., column_family=<optimized out>, key=...,
    value=<optimized out>) at db/db_impl.cc:1084
#9  0x00000000006652cd in Get (value=0x7fffffffcdb0, key=..., column_family=0x7fffe000e8b8, options=..., this=0x7fffe0002510)
    at ./include/rocksdb/db.h:335
#10 rocksdb::MemTableInserter::MergeCF (this=0x7fffffffd170, column_family_id=0, key=..., value=...) at db/write_batch.cc:1497
#11 0x000000000065e491 in rocksdb::WriteBatch::Iterate (this=0x7fffffffd910, handler=handler@entry=0x7fffffffd170) at db/write_batch.cc:479
#12 0x00000000006623cf in rocksdb::WriteBatchInternal::InsertInto (write_group=..., sequence=sequence@entry=21, memtables=<optimized out>,
    flush_scheduler=flush_scheduler@entry=0x7fffe0002e80, ignore_missing_column_families=<optimized out>, recovery_log_number=0,
    db=0x7fffe0002510, concurrent_memtable_writes=false, seq_per_batch=false) at db/write_batch.cc:1731
#13 0x00000000005c203c in rocksdb::DBImpl::WriteImpl (this=0x7fffe0002510, write_options=..., my_batch=<optimized out>,
    callback=callback@entry=0x0, log_used=log_used@entry=0x0, log_ref=0, disable_memtable=false, seq_used=0x0, batch_cnt=0,
    pre_release_callback=0x0) at db/db_impl_write.cc:309
#14 0x00000000005c24c1 in rocksdb::DBImpl::Write (this=<optimized out>, write_options=..., my_batch=<optimized out>) at db/db_impl_write.cc:54
#15 0x00000000005c2972 in rocksdb::DB::Merge (this=this@entry=0x7fffe0002510, opt=..., column_family=column_family@entry=0x7fffe000ebf0,
    key=..., value=...) at db/db_impl_write.cc:1501
#16 0x00000000005c2a2f in rocksdb::DBImpl::Merge (this=0x7fffe0002510, o=..., column_family=0x7fffe000ebf0, key=..., val=...)
    at db/db_impl_write.cc:33
#17 0x0000000000512ecd in rocksdb::DB::Merge (this=0x7fffe0002510, options=..., key=..., value=...) at ./include/rocksdb/db.h:312
#18 0x00000000005121f8 in MergeBasedCounters::add (this=<optimized out>, key=..., value=<optimized out>) at db/merge_test.cc:236
---Type <return> to continue, or q <return> to quit---
#19 0x000000000050eda0 in assert_add (value=18, key=..., this=0x7fffffffdda0) at db/merge_test.cc:214
#20 (anonymous namespace)::testCounters (counters=..., db=0x7fffe0002510, test_compaction=test_compaction@entry=false) at db/merge_test.cc:287
#21 0x0000000000510119 in (anonymous namespace)::runTest (argc=argc@entry=1, dbname=..., use_ttl=use_ttl@entry=false) at db/merge_test.cc:442
#22 0x000000000040ebc0 in main (argc=1) at db/merge_test.cc:508

reference

  1. 官方文档 https://github.com/facebook/rocksdb/wiki/Merge-Operator
  2. 上面的一个翻译https://www.jianshu.com/p/e13338a3f161

或者到博客上提issue 我能收到邮件提醒。

Read More

rocksdb

不整理就忘了

悲报 调用图网站挂了。只能手动doc gen了。有空再搞

另外,版本是5.x,现在已经8了。应该考虑更新一编

[toc]


rocksdb的整体框架和数据模型,leveldb的整体框架,网上文章遍地都是了。细分到组件,又一大堆分析。看的头都大了。事实上我现在还是分不清的。术语看了一遍又一遍,还是卡壳,看代码,还是会忘记。感觉不用硬看不行啊。还是上图吧。

dbimpl 属性图

img

column family 属性图

img

supervision属性图

img

version 属性图

img

version set 属性图

img

概念

Column Family

这个不是Cassandra那个CF,就是键空间。这个用来实现多种需求,比如上层元数据拆分。这涉及到编码的定制。我也不很懂

CRUD

插入就是插入,更新是追加插入,删除也是更新,value空。支持writebatch,原子性

put https://www.jianshu.com/p/daa18eebf6e1

https://glitterisme.github.io/2018/07/03/%E6%89%8B%E6%92%95RocksDB-1/

myrocks put http://mysql.taobao.org/monthly/2017/07/05/

wirte详解? https://blog.csdn.net/wang_xijue/article/details/46521605

http://mysql.taobao.org/monthly/2018/07/04/

并发写?

http://www.pandademo.com/2016/10/parallel-write-rocksdb-source-dissect-3/

https://www.jianshu.com/p/fd7e98fd5ee6

https://youjiali1995.github.io/rocksdb/write-batch/

write stall? 什么时候会write stall?

Gets Iterators Snapshots

iterator 和snapshot是一致的。底层引用计数保证不被删除。但是这个数据是不被持久化的,对比git,这就是拉个分支,然后在分支上修改,但是这个分支信息不记录,不保存就丢,可以在这基础上创建checkpoint然后搞备份

?疑问 checkpoint实现? backupdb实现?是不是基于checkpoint

minifest

这东西就是个一个index 内部实现, 一致性保证

文件具体为

  • MANIFEST-xx 内部是相关的一堆version edit
  • CURRENT 记录最新的MANIFEST-xx(类似git里的head)

这里要说下version edit version set 和version

version 和versionset version这就是个snapshot snapshot这东西就是版本号啦,或者可以理解成git里的commit,每个commit都能抽出来一个分支,这就是snapshot了,version由version edit组成,version edit可以理解成 modify。git中的相关思想在大多kv中都有。

version是某个状态,version set就是所有状态的集合(一个branch?)

参考3 ,介绍了实现,一个cv队列,versionSet 拥有manifest写,新的mannifest入队列,判断current。另外ldd文件可以导出内容,一个例子

 ../ldb manifest_dump --path=./MANIFEST-000006
--------------- Column family "default"  (ID 0) --------------
log number: 15
comparator: leveldb.BytewiseComparator
--- level 0 --- version# 1 ---
 16:31478713['00010' seq:5732151, type:1 .. '0004999995' seq:6766579, type:1]
 14:31206213['00011000020' seq:4753765, type:1 .. '00049999990' seq:4266837, type:1]
 12:31367971['00011000023' seq:3559804, type:1 .. '00049999988' seq:3516765, type:1]
 10:31331378['00011000025' seq:560662, type:1 .. '00049999994' seq:1786954, type:1]
--- level 1 --- version# 1 ---
...省略60行打印
--- level 63 --- version# 1 ---

next_file_number 18 last_sequence 7515368  prev_log_number 0 max_column_family 0 min_log_number_to_keep 15

WAL

这东西就是oplog一种表达,有编码。可以看参考链接3中的实现

什么时候调用NewWritableFile?

  • 打开db,肯定要生成对应的log
  • 切换memtable,写满了,生成新的memtable,旧的imm等持久化,wal日志的生命周期结束

什么时候清理? FIndObsoleteFiles 和sst manifest文件一起清理。

谁负责写?当然是dbimpl,对于memtable切换时全程感知的

考虑场景,WAL文件已经写满,但是memtable还没写满怎么办?

至于内容编码和配置选项,以及相关优化选项,又是另一个话题

内容一例

../ldb dump_wal --walfile=./000015.log |tail -f

7515329,4,76,495457,NOOP PUT(0) : 0x3030303334383431343337 PUT(0) : 0x3030303431353233363738 PUT(0) : 0x30303032383839343733 PUT(0) : 0x3030303135373830323031

` 事务`

这又是个复杂的话题,见参考链接

memtable

一般为了支持并发写,会使用skiplist

compaction

leveldb的由来,compaction gc

什么时候会compaction?

compaction原则?

compaction中的snapshot?

https://yq.aliyun.com/articles/655902

flush

什么时候会flush?

Get

DBImpl::GetImpl

  • db, cfd, sv之间的关系: column family data,cfd就相当于db键空间,cfd 有super version sv,这个就是version的上层抽象,version的集合
  • snapshot,lsn 。 get指定快照多是在事务中。快照实现事务。当然建立过快照,指定快照来取也可以。和git拉分支类似。增加引用计数。snapshot实际上就是lsn的一层薄封装,内部是链表。lsn,版本号。可以理解成两者等同
    • 如果不指定,就会算出一个,通常是versionSet中有的最后一个发布的lsn。versionSet会维护这个lsn
    • rocksdb内部key是有lsn编码的,会有查找构造类lookupkey,需要snapshot参数。

如果read_option没有指定跳过memtable,那就会从memtable开始遍历

sv->mem->Get

  • bloomFilter
  • Saver
  • MemTableRep,table_->Get memtable抽象。后面调用具体的实现。
  • 会有对Merge的判断。Merge途中就跳过。

sv->imm->Get

  • imm实际上是MemTableListVersion 是一系列version的list。不可插入,add就是插入链表。数据本身不变了。
  • 调用GetFromList最后内部遍历还是调用mem->Get,lsn做边界条件,同上,会有对Merge的判断

sv->current->Get

  • 通过Version中记录的信息遍历Level中的所有SST文件,利用SST文件记录的边界最大,最小key- smallest_keylargest_key来判断查找的key是否有可能存在
  • 如果在该Level中可能存在,调用TableReader的接口来查找SST文件
    • 首先通过SST文件中的Filter来初判key是否存在
    • 如果存在key存在,进入Data Block中查找
    • 在Data Block利用Block的迭代器BlockIter利用二分查找BinarySeek或者前缀查找PrefixSeek
  • 如果查找成功则返回,如果该Level不存在查找的Key,继续利用Version中的信息进行下一个Level的查找,直到最大的一个Level

table_cache_->Get(k)

#4   ./db_test() [0x6ae065] rocksdb::MemTable::KeyComparator::operator()(char const*, rocksdb::Slice const&) const      ??:?
#5   ./db_test() [0x7525f2] rocksdb::InlineSkipList<rocksdb::MemTableRep::KeyComparator const&>::FindGreaterOrEqual(char const*) const  ??:?
#6   ./db_test() [0x7516d5] rocksdb::(anonymous namespace)::SkipListRep::Get(rocksdb::LookupKey const&, void*, bool (*)(void*, char const*))skiplistrep.cc:?
#7   ./db_test() [0x6af31f] rocksdb::MemTable::Get(rocksdb::LookupKey const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, rocksdb::Status*, rocksdb::MergeContext*, rocksdb::RangeDelAggregator*, unsigned long*, rocksdb::ReadOptions const&, rocksb::ReadCallback*, bool*)        ??:?
#8   ./db_test() [0x62f49b] rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, bool*, rocksdb::ReadCallback*, bool*)        ??:?
#9   ./db_test() [0x62fc07] rocksdb::DBImpl::Get(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*)  ??:?
#10  ./db_test() [0x59c9cd] rocksdb::DB::Get(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*)     ??:?
#11  ./db_test() [0x58fc9d] rocksdb::DB::Get(rocksdb::ReadOptions const&, rocksdb::Slice const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*)   ??:?
#12  ./db_test() [0x51c3a3] rocksdb::DBTest_MockEnvWithTimestamp_Test::TestBody()       ??:?
#13  ./db_test() [0x9401c8] void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) ??:?
#14  ./db_test() [0x93799c] testing::Test::Run() [clone .part.476]      gtest-all.cc:?
#15  ./db_test() [0x937b6d] testing::TestInfo::Run() [clone .part.477]  gtest-all.cc:?
#16  ./db_test() [0x937ce5] testing::TestCase::Run() [clone .part.478]  gtest-all.cc:?
#17  ./db_test() [0x93812d] testing::internal::UnitTestImpl::RunAllTests()      ??:?
#18  ./db_test() [0x940678] bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*)   ??:?
#19  ./db_test() [0x93843f] testing::UnitTest::Run()    ??:?
#20  ./db_test() [0x40d9bd] main        ??:?
#21  /lib64/libc.so.6(__libc_start_main+0xf5) [0x7f3f6c423bb5] ??       ??:0
#22  ./db_test() [0x513761] _start      ??:?

OptimizeForPointLookup

ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup(
    uint64_t block_cache_size_mb) {
  prefix_extractor.reset(NewNoopTransform());
  BlockBasedTableOptions block_based_options;
  block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
  block_based_options.filter_policy.reset(NewBloomFilterPolicy(10));
  block_based_options.block_cache =
      NewLRUCache(static_cast<size_t>(block_cache_size_mb * 1024 * 1024));
  table_factory.reset(new BlockBasedTableFactory(block_based_options));
  memtable_prefix_bloom_size_ratio = 0.02;
  return this;
}

优化点查询,实际上是改变了BlockBasedTable的index_type, 变成hash自然就不可迭代,主要是bloomfilter prefix tranform哪里会有hash查找

  bool const may_contain =
      nullptr == prefix_bloom_
          ? false
          : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));

write

主要流程1中写的也很详细。我重新复述一下,加深印象

WriteBatch, WriteBatchInternal, WriteBatch::handler, WriteAheadLog

db调用的put等操作,最终调用的都是WriteBatch内部的put,然后调用WriteBatchInternal内部的动作,WriteBatch是一组操作,一起写要比一条条写要高效,并且每一组WriteBatch拥有同一个lsn。

由于WriteBatch是一组动作,putputdelget等等,每一条WriteBatch内部就是个string字节流,第一个字节记录动作类型。内部进行遍历iterate时,调用Handler实现的相关接口,判断是什么动作,最后由memtable执行,putCF等。

另外一个疑问,如何具体区分每一个动作和相关的kv呢,Slice。也就是string_view,每个kv都会有size,这个size提前编码到type的后面,相当于 type+(cfid)+ size+ sv+size+ sv 作为一个单元。del没有第二个size+sv

所以WriteBatch就是 lsn-count-header+{type size sv}…, 多条WriteBatch字节流组合,就是Write Ahead Log

WriteBatch::Handler是个接口类,memtable最终插入数据,memtableinserter要继承handler。

WriteBatchInternal内部工具类。当然是WriteBatch的友元类啦。可以操作WriteBatch的字节流string

编解码是一套很麻烦的动作,rocksdb util/coding.cc/h有很多小函数。可以看看抄一抄。由于string_view是不拥有值的,view嘛,所以复杂的编码动作都需要搞回到string在搞回来。直接搞到string上就append了。如果sv和sv连接,免不了string做交换,有机会找找解决办法。

Writer, WriteGroup, WriteBatch, 并发写

每个Writer是持有WriteBatch的。调用db put接口后就写到WriteBatch中,WriteBatch相当于缓存,这种异步和事件触发的异步不太一样,是同步造出一个二阶段缓存搞的异步。WriteBatch越大吞吐量越大,但是写到memtable越多则会影响后面的flush造成write stall,是个负反馈过程。

WriteBatch+WriteGroup 主要是为了写WAL,WAL是没办法并发写的,Leader Writer负责写,写完唤醒其他Writer,大家一起写memtable,无锁队列+condvar。无锁队列是用原子量实现的。主要逻辑在LinkOne

Write Stall, Write Controller

dbformat

internalkey setinternalkey parsedinternalkey userkey

简单来说 userkey就是internalkey 因为一个key会把lsn和类型编码进去(PackSequenceAndType),去掉这八个字节就是userkey,在rocksdb代码中,有些地方就直接是硬编码,-8了,有些地方转换回来还要+8, Jesus

Slice MemTableRep::UserKey(const char* key) const {
  Slice slice = GetLengthPrefixedSlice(key);
  return Slice(slice.data(), slice.size() - 8);
}

其实应该有个更小的封装,因为-8可能变(对于想要用rocksdb来修改的人来说)

memtable inserter, memtable, iterate

最终并发写入会调用memtableInserter, 也会生成memtable对象(就那么一个new入口),memtable inserter实现了memtable::handler,内部接口,操作memtable,调用add。

inserter会遍历当前的WriteBatch,解析出每个kv头的tag,然后分别调用putcf,deletecf 等等,内部都是memtable->add

merge


  options.merge_operator = MergeOperators::CreatePutOperator();

merge有时候语义上就是put

iterator

iteration

  1. ArenaWrappedDBIter是暴露给用户的Iterator,它包含DBIter,DBIter则包含InternalIterator,InternalIterator顾名思义,是内部定义,MergeIterator、TwoLevelIterator、BlockIter、MemTableIter、LevelFileNumIterator等都是继承自InternalIterator
  2. 图中绿色实体框对应的是各个Iterator,按包含顺序颜色由深至浅
  3. 图中虚线框对应的是创建各个Iterator的方法
  4. 图中蓝色框对应的则是创建过程中需要的类的对象及它的方法


注意到第二步,构造DBIter, 其中的Seek有IsVisiable的判断,也就是实现快照的关键。

每层iterator都会实现iterator头文件中定义好的接口,Seek Next等等,迭代过程中也是逐层调用

另外要注意Seek的语义(SeekForPrev正好相反),不同于Get,查不到不一定返回无效, 会返回所查找的元素的下一个存在于db中的数据

// Position at the first key in the source that at or past target.
// The iterator is Valid() after this call iff the source contains
// an entry that comes at or past target.
// All Seek*() methods clear any error status() that the iterator had prior to
// the call; after the seek, status() indicates only the error (if any) that
// happened during the seek, not any past errors.
virtual void Seek(const Slice& target) = 0;

iterator 牵连的引用以及生命周期, reseek

iterator迭代是依赖快照的,所以创建的迭代器是无法获得新加入的数据的。

iterator不释放 ->versionSet析构->cf set析构-> cfd析构,内部会持有引用。debug模式会assert挂掉

根据上面的结论, 如果版本skip设定比较小,是很容易发生reseek的,设定在Options中

// An iteration->Next() sequentially skips over keys with the same
// user-key unless this option is set. This number specifies the number
// of keys (with the same userkey) that will be sequentially
// skipped before a reseek is issued.
//
// Default: 8
//
// Dynamically changeable through SetOptions() API
uint64_t max_sequential_skip_in_iterations = 8;

比如这个没有flush的场景, option中设定skip=3

Put("a","v1");
Put("a","v2");
Put("b","v1");
Put("a","v3");
iter = NewIterator(RO, CF);
iter->SeekToFirst();//iter指向v4 ,这是最新的数据
int num_reseek = options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION);
iter->Next();// Seek本身是逆序搜最新的,按理说应该访问下一个,b在a后面,但是没击中,步进大于3,所以就需要重新Seek
assert(options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)==num_reseek+1);

由于没有合并key a的所有版本都在memtable中,所以就需要找最旧的a,重新seek,下一个就是b

反过来,如果插入了新的b,先找b后找a,也会出现同样的场景

Put("b","v2");
// 现在是 a3 a2 a1 b2 b1 假设这个iter不指定snapshot
iter = NewIterator(RO, CF);
iter->SeekToLast();// iter指向b v2
iter->Prev();//正常来说是a,但是版本不确定,迭代大于3,就得reseek

iterate_lower_bound, iterate_upper_bound

ReadOption中带有的这两个选项,限定iterator的有效与否

  // `iterate_lower_bound` defines the smallest key at which the backward
  // iterator can return an entry. Once the bound is passed, Valid() will be
  // false. `iterate_lower_bound` is inclusive ie the bound value is a valid
  // entry.
  //
  // If prefix_extractor is not null, the Seek target and `iterate_lower_bound`
  // need to have the same prefix. This is because ordering is not guaranteed
  // outside of prefix domain.
  //
  // Default: nullptr
  const Slice* iterate_lower_bound;

  // "iterate_upper_bound" defines the extent upto which the forward iterator
  // can returns entries. Once the bound is reached, Valid() will be false.
  // "iterate_upper_bound" is exclusive ie the bound value is
  // not a valid entry.  If iterator_extractor is not null, the Seek target
  // and iterator_upper_bound need to have the same prefix.
  // This is because ordering is not guaranteed outside of prefix domain.
  //
  // Default: nullptr
  const Slice* iterate_upper_bound;

如果有key a b y z,设定upper_bound x,seek c,这个场景下iter是invalid的,过了x也没找到c,那就是无效的,同理lower_bound

tombstones, Next

看SeekAfterHittingManyInternalKeys这个测试用例

这里实际上是涉及到一个prefix bloom filter的,参考链接2给出了很详细的介绍和使用说明,我就直接抄过来了

[toc]

Prefix Seek

Prefix seek是RocksDB的一种模式,主要影响Iterator的行为。 在这种模式下,RocksDB的Iterator并不保证所有key是有序的,而只保证具有相同前缀的key是有序的。这样可以保证具有相同特征的key(例如具有相同前缀的key)尽量地被聚合在一起。

使用方法

prefix seek模式的使用如下,

int main() {
  DB* db;
  Options options;
  options.IncreaseParallelism();
  options.OptimizeLevelStyleCompaction();
  options.create_if_missing = true;
  options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(3));

  Status s = DB::Open(options, kDBPath, &db);
  assert(s.ok());

  s = db->Put(WriteOptions(), "key1", "value1");
  assert(s.ok());
  s = db->Put(WriteOptions(), "key2", "value2");
  assert(s.ok());
  s = db->Put(WriteOptions(), "key3", "value3");
  assert(s.ok());
  s = db->Put(WriteOptions(), "key4", "value4");
  assert(s.ok());
  s = db->Put(WriteOptions(), "otherPrefix1", "otherValue1");
  assert(s.ok());
  s = db->Put(WriteOptions(), "abc", "abcvalue1");
  assert(s.ok());

  auto iter = db->NewIterator(ReadOptions());
  for (iter->Seek("key2"); iter->Valid(); iter->Next())
  {
    std::cout << iter->key().ToString() << ": " << iter->value().ToString() << std::endl;
  }

  delete db;
  return 0;
}

输出如下

key2: value2
key3: value3
key4: value4
otherPrefix1: otherValue1

从上面的输出,我们可以看到prefix seek模式下iterator的几个特性

  1. 首先seek(targetKey)方法会将iterator定位到具有相同前缀的区间,并且大于或等于targetKey的位置.
  2. Next()方法是会跨prefix的,就像例子中,当以”key”为前缀的key都遍历完之后,跨到了下一个prefix “oth”上继续遍历,直到遍历完所有的key,Valid()方法返回false.
  3. 在遍历的时候,如果只想遍历相同前缀的key,需要在每一次Next之后,判断一次key是前缀是否符合预期.
  auto iter = db->NewIterator(ReadOptions());
  Slice prefix = options.prefix_extractor->Transform("key0");
  for (iter->Seek("key0"); iter->Valid() && iter->key().starts_with(prefix); iter->Next())
  {
    std::cout << iter->key().ToString() << ": " << iter->value().ToString() << std::endl;
  }

输出如下:

key1: value1
key2: value2
key3: value3
key4: value4

Prefix Seek相关的一些优化

为了更快地定位指定prefix的key,或者排除不存在的key,RocksDB做了一些优化,包括:

  1. prefix bloom filter for block based table
  2. prefix bloom for memtable
  3. memtable底层使用hash skiplist
  4. 使用plain table

一个典型的优化设置见下例

Options options;

// Enable prefix bloom for mem tables
options.prefix_extractor.reset(NewFixedPrefixTransform(3));
options.memtable_prefix_bloom_bits = 100000000;
options.memtable_prefix_bloom_probes = 6;

// Enable prefix bloom for SST files
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));

DB* db;
Status s = DB::Open(options, "/tmp/rocksdb",  &db);

......

auto iter = db->NewIterator(ReadOptions());
iter->Seek("foobar"); // Seek inside prefix "foo"

Prefix Seek相关类图

img

prefix seek相关的iterator类图

说是Prefix Seek相关的类图,其实就是rocksdb的Iterator类图。这里列出了几个主要的类。

  • InternalIterator 接口类,定义了iterator的接口,包括Seek、Next、Prev等接口。
  • MergingIterator 返回给用户的实际Iterator类型,之所以叫”Merging”,是因为它持有memtable和sst文件的iterater,对外提供了统一的iterator接口。
  • MemTableIterator 用于遍历memtable的iterator。主要的分析对象。
  • TwoLevelIterator 用于遍历sst文件的iterator。

Prefix Seek工作流程

Iterator简单分析

通过Iterator的相关接口,来逐层分析,持有PrefixTransform对象的option.prefix_extractor选项是如何工作的。 DB的Iterator由三部分组成

  1. 当前memtable的iterator
  2. 所有不可修改的memtable的iterator
  3. 所有level上的sst文件的iterator(TwoLevelIterator)

这些iterator统一由MergeIterator来管理。MergeIterator持有一个vector成员,上面三类iterator都会添加到该成员中。

autovector<IteratorWrapper, kNumIterReserve> children_;

autovector是对std::vector的完全模板特化版本

class autovector : public std::vector<T> {
  using std::vector<T>::vector;
};

autovector主要针对在栈上分配的数组,保存数量较少的item这种使用场景做了优化。 IteratorWrapper是对InternalIterator*类型的对象的一个简单封装,主要cache了iter)->Valid()和iter_->key()的结果,提供更好的局部性访问性能。其他接口和InternalIterator一样,只是转发请求给iter_。

DB的Iterator添加当前使用的memtable的iterator的代码如下

InternalIterator* DBImpl::NewInternalIterator(
    const ReadOptions& read_options, ColumnFamilyData* cfd,
    SuperVersion* super_version, Arena* arena,
    RangeDelAggregator* range_del_agg) {
  ...
    merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_options, arena));
  ...

arena是为所有iterator已经分配的一块内存。 向MergeIterator添加Iterator的实现:

  1. 先将要添加的iter加入到vector成员children_中
  2. 如果iter是有效的iterator,将该iter添加到一个最小堆成员中。
  virtual void AddIterator(InternalIterator* iter) {
    assert(direction_ == kForward);
    children_.emplace_back(iter);
    if (pinned_iters_mgr_) {
      iter->SetPinnedItersMgr(pinned_iters_mgr_);
    }
    auto new_wrapper = children_.back();
    if (new_wrapper.Valid()) {
      minHeap_.push(&new_wrapper);
      current_ = CurrentForward();
    }
  }

这里的最小堆minHeap_用于维护上面所有合法iterator的访问顺序,指向key较小的iter,越靠近堆顶,这样,当连续多次调用Next接口时,都在同一个child iterator上,可以直接通过堆顶的iterator来获取Next,而不用遍历所有children。 如果堆顶iterator的下一个元素的不是合法的,则将该iterator从堆中pop出去,调整堆之后,拿到新的堆顶元素。 DB的Iterator MergingIterator的Next接口主要实现如下

  virtual void Next() override {
    assert(Valid());
    ...
    // For the heap modifications below to be correct, current_ must be the
    // current top of the heap.
    assert(current_ == CurrentForward());

    // as the current points to the current record. move the iterator forward.
    current_->Next();
    if (current_->Valid()) {
      // current is still valid after the Next() call above.  Call
      // replace_top() to restore the heap property.  When the same child
      // iterator yields a sequence of keys, this is cheap.
      minHeap_.replace_top(current_);
    } else {
      // current stopped being valid, remove it from the heap.
      minHeap_.pop();
    }
    current_ = CurrentForward();

Seek接口的实现

Seek一个target的实现,通过遍历每个child iterator,然后将合法的iterator插入到最小堆minHeap中,将current_成员指向minHeap的堆顶。

  virtual void Seek(const Slice& target) override {
    ClearHeaps();
    for (auto& child : children_) {
      {
        PERF_TIMER_GUARD(seek_child_seek_time);
        child.Seek(target);
      }
      PERF_COUNTER_ADD(seek_child_seek_count, 1);

      if (child.Valid()) {
        PERF_TIMER_GUARD(seek_min_heap_time);
        minHeap_.push(&child);
      }
    }
    direction_ = kForward;
    {
      PERF_TIMER_GUARD(seek_min_heap_time);
      current_ = CurrentForward();
    }
  }

以当前可写的memtable的iterator为例,当用户调用Seek接口时,如何定位到memtable中的目标key。

MemTableIterator的Seek

在memtable中持有一个MemTableRep::Iterator*类型的iterator指针iter_,iter_指向的对象的实际类型由MemTableRep的子类中分别定义。以SkipListRep为例, 在MemTableIterator的构造函数中,当设置了prefix_seek模式时,调用MemTableRep的GetDynamicPrefixIterator接口来获取具体的iterator对象。

  MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
                   Arena* arena, bool use_range_del_table = false)
      : bloom_(nullptr),
        prefix_extractor_(mem.prefix_extractor_),
        comparator_(mem.comparator_),
        valid_(false),
        arena_mode_(arena != nullptr),
        value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) {
    if (use_range_del_table) {
      iter_ = mem.range_del_table_->GetIterator(arena);
    } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
      bloom_ = mem.prefix_bloom_.get();
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
    } else {
      iter_ = mem.table_->GetIterator(arena);
    }
  }

对于SkipList来说,GetDynamicPrefixIterator也是调用GetIterator,只有对HashLinkListRep和HashSkipListRe,这个方法才有不同的实现。

  virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
    if (lookahead_ > 0) {
      void *mem =
        arena ? arena->AllocateAligned(sizeof(SkipListRep::LookaheadIterator))
              : operator new(sizeof(SkipListRep::LookaheadIterator));
      return new (mem) SkipListRep::LookaheadIterator(*this);
    } else {
      void *mem =
        arena ? arena->AllocateAligned(sizeof(SkipListRep::Iterator))
              : operator new(sizeof(SkipListRep::Iterator));
      return new (mem) SkipListRep::Iterator(&skip_list_);
    }
  }

这样就创建了MemTableIterator。 当调用到MemTableIterator的Seek接口时:

  • 首先在DynamicBloom成员bloom_中查找目标key的prefix是否可能在当前的memtable中存在。
  • 如果不存在,则一定不存在,可以直接返回。
  • 如果可能存在,则调用MemTableRep的iterator进行Seek。
  virtual void Seek(const Slice& k) override {
    PERF_TIMER_GUARD(seek_on_memtable_time);
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
    if (bloom_ != nullptr) {
      if (!bloom_->MayContain(
              prefix_extractor_->Transform(ExtractUserKey(k)))) {
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
        valid_ = false;
        return;
      } else {
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
      }
    }
    iter_->Seek(k, nullptr);
    valid_ = iter_->Valid();
  }

当调用MemTableRep的iterator进行Seek时,则是通过调用实际使用的数据结构的Seek接口,找到第一个大于或等于key的目标。

    // Advance to the first entry with a key >= target    virtual void Seek(const Slice& user_key, const char* memtable_key)        override {      if (memtable_key != nullptr) {        iter_.Seek(memtable_key);      } else {        iter_.Seek(EncodeKey(&tmp_, user_key));      }    }

第一个分支主要用于确定在memtable中的key,做update类的操作.

MemTable Prefix Bloom的构建

prefix bloom作为memtable的一个成员,当向memtable插入一个key value时,会截取key的prefix,插入到prefix bloom中。

void MemTable::Add(SequenceNumber s, ValueType type,                   const Slice& key, /* user key */                   const Slice& value, bool allow_concurrent,                   MemTablePostProcessInfo* post_process_info) {  ...  if (!allow_concurrent) {    ...    if (prefix_bloom_) {      assert(prefix_extractor_);      prefix_bloom_->Add(prefix_extractor_->Transform(key));    }    ...  else {    ...    if (prefix_bloom_) {      assert(prefix_extractor_);      prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));    }    ...  }  ...}

关于bloom filter的原理和实现可以参考前文。传送门 因为再BlockBasedTable中,对bloom filter的用法不同,所以使用了不同实现的bloom filter。基本原理是相同的。

WAL

对RocksDB的每一次update都会写入两个位置:1) 内存表(内存数据结构,后续会flush到SST file) 2)磁盘中的write ahead log(WAL)。在故障发生时,WAL可以用来恢复内存表中的数据。默认情况下,RocksDB通过在每次用户写时调用fflush WAL文件来保证一致性。

Life Cycle of a WAL

举个例子,RocksDB实例创建了两个column families,分别是 new_cf和default。一旦db被open,就会在磁盘上创建一个WAL来持久化所有的写操作。

DB* db;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
    kDefaultColumnFamilyName, ColumnFamilyOptions()));
column_families.push_back(ColumnFamilyDescriptor(
    "new_cf", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles;
s = DB::Open(DBOptions(), kDBPath, column_families, &handles, &db);

添加一些kv对数据

db->Put(WriteOptions(), handles[1], Slice("key1"), Slice("value1"));
db->Put(WriteOptions(), handles[0], Slice("key2"), Slice("value2"));
db->Put(WriteOptions(), handles[1], Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), handles[0], Slice("key4"), Slice("value4"));

此时,WAL已经记录了所有的写操作。WAL文件会保持打开状态,一直记录后续所有的写,直到WAL 大小达到DBOptions::max_total_wal_size为止。 如果用户决定要flush new_cf中的数据时,会有以下操作:1) new_cf的数据 key1和key3会flush到一个新的SST file。 2)新建一个WAL,后续对所有列族的写都通过新的WAL记录。3)老的WAL不再记录新的写

db->Flush(FlushOptions(), handles[1]);
// key5 and key6 will appear in a new WAL
db->Put(WriteOptions(), handles[1], Slice("key5"), Slice("value5"));
db->Put(WriteOptions(), handles[0], Slice("key6"), Slice("value6"));

此时,会有两个WAL文件,老的WAL会包含key1、key2、key3和key4,新的WAL文件包含key5和key6。因为老的WAL仍然含有default 列族的数据,所以还没有被删除。只有当用户决定flush default列族的数据之后,老的WAL 才会被归档然后从磁盘中删除。

db->Flush(FlushOptions(), handles[0]);
// The older WAL will be archived and purged separetely

总之,在以下情况下会创建一个WAL:

  • 新打开一个DB
  • flush了一个column family。一个WAL文件只有当所有的列族数据都已经flush到SST file之后才会被删除,或者说,所有的WAL中数据都持久化到SST file之后,才会被删除。归档的WAL文件会move到一个单独的目录,后续从磁盘中删除。

WAL Configurations

  • DBOptions::wal_dir : RocksDB保存WAL file的目录,可以将目录与数据目录配置在不同的路径下。
  • DBOptions::WAL_ttl_seconds, DBOptions::WAL_size_limit_MB:这两个选项影响归档WAL被删除的快慢。
  • DBOptions::max_total_wal_size : 为了限制WALs的大小,RocksDB使用该配置来触发 column family flush到SST file。一旦,WALs超过了这个大小,RocksDB会强制将所有列族的数据flush到SST file,之后就可以删除最老的WALs。
  • DBOptions::avoid_flush_during_recovery
  • DBOptions::manual_wal_flush: 这个参数决定了WAL flush是否在每次写之后自动执行,或者是纯手动执行(用户调用FlushWAL来触发)。
  • DBOptions::wal_filter:在恢复数据时可以过滤掉WAL中某些记录
  • WriteOptions::disableWAL: 打开或者关闭WAL支持

WAL LOG File Format

OverView

WAL将内存表的操作记录序列化后持久化存储到日志文件。当DB故障时可以使用WAL 文件恢复到DB故障前的一致性状态。当内存表的数据安全地flush到持久化存储文件中后,对应的WAL log(s)被归档,然后在某个时刻被删除。

WAL Manager

在WAL目录中,WAL文件按照序列号递增命名。为了重建数据库故障之前的一致性状态,必须按照序号号递增的顺序读取WAL文件。WAL manager封装了读WAL文件的操作。WAL manager内部使用Reader or Writer abstraction 来读取WAL file。

Reader/Writer

Writer提供了将log记录append到log 文件的操作接口(内部使用WriteableFile接口)。Reader提供了从log文件中顺序读日志记录的操作接口(内部使用SequentialFile接口)。

Log File Format

日志文件保安了一系列不停长度的记录。Record按照kBlockSize分配。如果一个特定的记录不能完全适配剩余的空间,那么就会将剩余的空间补零。writer按照kBlockSize去写一个数据库,reader按照kBlockSIze去读一个数据库。

       +-----+-------------+--+----+----------+------+-- ... ----+
 File  | r0  |        r1   |P | r2 |    r3    |  r4  |           |
       +-----+-------------+--+----+----------+------+-- ... ----+
       <--- kBlockSize ------>|<-- kBlockSize ------>|

  rn = variable size records
  P = Padding

Record Format

+---------+-----------+-----------+--- ... ---+
|CRC (4B) | Size (2B) | Type (1B) | Payload   |
+---------+-----------+-----------+--- ... ---+

CRC = 32bit hash computed over the payload using CRC
Size = Length of the payload data
Type = Type of record
       (kZeroType, kFullType, kFirstType, kLastType, kMiddleType )
       The type is used to group a bunch of records together to represent
       blocks that are larger than kBlockSize
Payload = Byte stream as long as specified by the payload size

日志文件由连续的32KB大小的block组成。唯一的例外是文件尾部数据包含一个不是完整的block。 每一个block包含连续的Records:

block := record* trailer?
record :=
  checksum: uint32  // crc32c of type and data[]
  length: uint16
  type: uint8       // One of FULL, FIRST, MIDDLE, LAST 
  data: uint8[length]

record type

FULL == 1
FIRST == 2
MIDDLE == 3
LAST == 4

FULL 表示包含了全部用户record记录 FIRST、MIDDLE、LAST用户表示一个record太大然后拆分为多个数据片。FIRST表示是用户record的第一块数据,LAST表示最后一个,MID是用户Record中间部分的数据。 Example

A: length 1000
B: length 97270
C: length 8000

A会在第一个block中存储一个完整的Record。 B会被拆分为3个分片,第一个分片占用了第一个block的剩余全部空间,第二个分片占用第二个block的全部空间,第三个分片占用第三个block的前面部分空间。会在第三个block中预留6个字节,置为空来表示结束。 C将会完整存储在第四个block。FULL Record

WAL Recovery Modes

每个应用都是唯一的,都需要RocksDB保证特定状态的一致性。RocksDB中每一个提交的记录都是持久化的。没有提交的记录保存在WAL file中。当DB正常退出时,在退出之前会提交所有没有提交的数据,所以总是能够保证一致性。当RocksDB进程被kill或者服务器重启时,RocksDB需要恢复到一个一致性状态。最重要的恢复操作之一就是replay所有WAL中没有提交的记录。不同的WAL recovery 模式定义了不同的replay WAL的行为。

kTolerateCorruptedTailRecords

在这种模式下,WAL replay 会忽视日志文件末尾的任何error。这些error主要来源于不安全的进程退出后产生的不完全的写错误日志。这是一种启发式的或者探索式的模式,系统并不能区分是日志文件tail的data corruption还是不完全写操作。任何其他的IO error,都会被视作data corruption。

这种模式被大部分应用使用,这是因为该模式提供了一种比较合理的tradeoff between 不安全退出后重启和数据一致性。

kAbsoluteConsistency

在这种模式下,在replay WAL过程中任何IO 错误都被视为data corruption。这种模式适用于以下场景:应用不能接受丢失任何记录,而且有方法恢复未提交的数据。

kPointInTimeConsistency

在这种模式下,当发生IO errror时,WAL replay会stop,系统恢复到一个满足一致性的时间点上。这种模式适用于有副本的集群,来自另外一个副本的数据可以用来恢复当前的实例。

kSkipAnyCorruptedRecord

在这种模式下,WAL replay会忽视日志文件中的任何错误。系统尝试恢复尽可能多的数据。适用于灾后重建的场景。

MISC

Write Buffer Manager

其实rocksdb整体很多这种插件接口语义,比如Write Buffer Manager, 比如Merge Operator,比如Rate Limiter,sst_file_maniger,eventlistener,用户创建,传进指针(有默认的,也可以继承重写接口)

Write Buffer Manager正如名字,就是控制写入buffer的,这个正是memtable的总大小上限,没有设置的话会有个默认的设定比如两个memtable大小

什么时候会切换memtable?

  • Memtable的大小在一次写入后超过write_buffer_size。
  • 所有列族中的memtable大小超过db_write_buffer_size了,或者write_buffer_manager要求落盘。在这种场景,最大的memtable会被落盘
  • WAL文件的总大小超过max_total_wal_size。在这个场景,有着最老数据的memtable会被落盘,这样才允许携带有跟这个memtable相关数据的WAL文件被删除。
ScheduleFlushes HandleWALFull HandleWriteBufferFull FlushMemTable

条件

  1. 当前线程持有db的大锁

  2. 当前线程是写wal文件的leader

    • 如果开启了enable_pipelined_write选项(写wal和写memtable分开), 那么同时要等到成为写memtable的leader

    • 判断当前wal文件是否为空,如果不为空就创建新的wal文件(recycle_log_file_num选项开启就复用),然后构建新memtable。刷盘当前wal文件
    • 把之前的memtable变成immutable,然后切到新memtable

    • 调用InstallSuperVersionAndScheduleWork函数,这个函数会更新super_version_

memtable

影响memtable的几个选项

影响memtable的最重要的几个选项是:

  • memtable_factory: memtable对象的工厂。通过声明一个工厂对象,用户可以改变底层memtable的实现,并提供事先声明的选项。
  • write_buffer_size:一个memtable的大小
  • db_write_buffer_size:多个列族的memtable的大小总和。这可以用来管理memtable使用的总内存数。
  • write_buffer_manager:除了声明memtable的总大小,用户还可以提供他们自己的写缓冲区管理器,用来控制总体的memtable使用量。这个选项会覆盖db_write_buffer_size
  • max_write_buffer_number:内存中可以拥有刷盘到SST文件前的最大memtable数。
Memtable类型 SkipList HashSkipList HashLinkList Vector
最佳使用场景 通用 带特殊key前缀的范围查询 带特殊key前缀,并且每个前缀都只有很小数量的行 大量随机写压力
索引类型 二分搜索 哈希+二分搜索 哈希+线性搜索 线性搜索
是否支持全量db有序扫描? 天然支持 非常耗费资源(拷贝以及排序一生成一个临时视图) 同HashSkipList 同HashSkipList
额外内存 平均(每个节点有多个指针) 高(哈希桶+非空桶的skiplist元数据+每个节点多个指针) 稍低(哈希桶+每个节点的指针) 低(vector尾部预分配的内存)
Memtable落盘 快速,以及固定数量的额外内存 慢,并且大量临时内存使用 同HashSkipList 同HashSkipList
并发插入 支持 不支持 不支持 不支持
带Hint插入 支持(在没有并发插入的时候) 不支持 不支持 不支持

Direct IO

关于buffered IO和DirectIO可以看参考链接3, 这个图不错,偷过来

rocksdb属于自缓存应用,有memtable+blockcache来保证了一定的局部性。本身已经实现了缓存算法(LRU,clock)可以不使用系统的页缓存,使用DirectIO,在options中有配置

  // Use O_DIRECT for user reads
  // Default: false
  // Not supported in ROCKSDB_LITE mode!
  bool use_direct_reads = false;

  // Use O_DIRECT for both reads and writes in background flush and compactions
  // When true, we also force new_table_reader_for_compaction_inputs to true.
  // Default: false
  // Not supported in ROCKSDB_LITE mode!
  bool use_direct_io_for_flush_and_compaction = false;

对于use_direct_io_for_flush_and_compaction, compaction有个compaction block cache 不建议用,具体可以参考链接4

Rate Limiter限流器

Rocksdb内置控制写入速率降低延迟的,当然包括compaction速率。如果compaction导致了较多毛刺,考虑一下是不是没使用limiter

通过调用NewGenericRateLimiter创建一个RateLimiter对象,可以对每个RocksDB实例分别创建,或者在RocksDB实例之间共享,以此控制落盘和压缩的写速率总量。

RateLimiter* rate_limiter = NewGenericRateLimiter(
    rate_bytes_per_sec /* int64_t */, 
    refill_period_us /* int64_t */,
    fairness /* int32_t */);

参数:

  • rate_bytes_per_sec:通常这是唯一一个你需要关心的参数。他控制压缩和落盘的总速率,单位为bytes/秒。现在,RocksDB并不强制限制除了落盘和压缩以外的操作(如写WAL)
  • refill_period_us:这个控制令牌被填充的频率。例如,当rate_bytes_per_sec被设置为10MB/s然后refill_period_us被设置为100ms,那么就每100ms会从新填充1MB的限量。更大的数值会导致突发写,而更小的数值会导致CPU过载。默认数值100,000应该在大多数场景都能很好工作了
  • fairness:RateLimiter接受高优先级请求和低优先级请求。一个低优先级任务会被高优先级任务挡住。现在,RocksDB把来自压缩的请求认为是低优先级的,把来自落盘的任务认为是高优先级的。如果落盘请求不断地过来,低优先级请求会被拦截。这个fairness参数保证低优先级请求,在即使有高优先级任务的时候,也会有1/fairness的机会被执行,以避免低优先级任务的饿死。默认是10通常是可以的。

尽管令牌会以refill_period_us设定的时间按照间隔来填充,我们仍然需要保证一次写爆发中的最大字节数,因为我们不希望看到令牌堆积了很久,然后在一次写爆发中一次性消耗光,这显然不符合我们的需求。GetSingleBurstBytes会返回这个令牌数量的上限。

这样,每次写请求前,都需要申请令牌。如果这个请求无法被满足,请求会被阻塞,直到令牌被填充到足够完成请求。比如:

// block if tokens are not enough
rate_limiter->Request(1024 /* bytes */, rocksdb::Env::IO_HIGH); 
Status s = db->Flush();

如果有需要,用户还可以通过SetBytesPerSecond动态修改限流器每秒流量。参考include/rocksdb/rate_limiter.h 了解更多细节

对那些RocksDB提供的原生限流器无法满足需求的用户,他们可以通过继承include/rocksdb/rate_limiter.h 来实现自己的限流器。

memory

主要几大块, blockcache,memtable,index & filter block, pinned by iterator

memtable也可以被write buffer manager来控制

table_options.block_cache->GetPinnedUsage()获得第四个

文件的快速导入rocksdb::SstFileWriter/ IngestExternalFile

具体可以看这篇文档https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files

经常用于文件导入,快速导入,调用write接口还是太慢,直接生成L0sst更快。但是存在问题,L0文件过多,触发compaction,所以要先停掉compaction,这点在rockset使用该功能的时候分享过。这里标记备忘

Arena

分配器需要做什么

  • 状态,这里的分配器不需要实时的回收,也就仅仅是分配器,不管回收,生命周期内析构就可以了
  • 分配,尽可能的分配成功

看看arena的api什么样的

class Arena {
 public:
  Arena();
  ~Arena();
  // Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);
  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);
  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
  }
 private:
  char* AllocateFallback(size_t bytes);
  char* AllocateNewBlock(size_t block_bytes);
  // Allocation state
  char* alloc_ptr_;
  size_t alloc_bytes_remaining_;
  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;
  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_;
  Arena(const Arena&);
  void operator=(const Arena&);
};

能看出来就是个二维动态数组,vector<char*>,每次分一个数组,也就是block,如果不够用了在分配一个新的block

Allocate会调用AllocateFallBack和AllocateNewBlock 完全取决于剩余内存和新分配内存的关系

  1. 如果需求的内存小于剩余的内存,那么直接在剩余的内存分配就可以了;
  2. 如果需求的内存大于剩余的内存,而且大于4096/4,则给这内存单独分配一块bytes(函数参数)大小的内存。
  3. 如果需求的内存大于剩余的内存,而且小于4096/4,则重新分配一个内存块,默认大小4096,用于存储数据。

rocksdb的优化做了哪些工作

抽象出allocator api,以及支持hugepage

char* Arena::AllocateFromHugePage(size_t bytes)

###

reference

Read More

c++中文件操作的坑

  • 使用ofstream读写文件,记得一定要关闭,否则同进程看不到这个文件的修改内容
std::ofstream f(config_file);
f.write(content.data(), content.size());
// f.close(); // missing!
SomeConfig.Load(config_file); // error, it's empty!

当然,更推荐scope_guard 或者gsl::finally来管理

  • stream的继承关系
void foo(const std::istream&) {
    puts("istream");
}
void foo(const std::ifstream&) {
    puts("ifstream");
}
int main() {
    std::fstream t;
    foo(t);
}

猜猜调用那个?第一个

继承关系,深坑

  • stream是有状态的

  • 占用很大

using namespace std;
cout<<"std::fstream = "<<sizeof(fstream)<<endl
  <<"std::ifstream = "<<sizeof(ifstream)<<endl
  <<"std::ofstream = "<<sizeof(ofstream)<<endl;

  //  std::fstream = 528
  //  std::ifstream = 520
  //  std::ofstream = 512
  • 检验文件是否存在 ifstream并不怎么快。不过确实挺好用的
#include <sys/stat.h>
#include <unistd.h>
#include <string>
#include <fstream>

inline bool exists_test0 (const std::string& name) {
    ifstream f(name.c_str());
    return f.good();
}

inline bool exists_test1 (const std::string& name) {
    if (FILE *file = fopen(name.c_str(), "r")) {
        fclose(file);
        return true;
    } else {
        return false;
    }   
}

inline bool exists_test2 (const std::string& name) {
    return ( access( name.c_str(), F_OK ) != -1 );
}

inline bool exists_test3 (const std::string& name) {
  struct stat buffer;   
  return (stat (name.c_str(), &buffer) == 0); 
}
ifstream 0.485s
FILE fopen 0.302s
posix access() 0.202s
posix stat() 0.134s

ref

  • https://quuxplusone.github.io/blog/2018/11/26/remember-the-ifstream/
  • https://zhuanlan.zhihu.com/p/90194868
  • 检查文件的benchmark代码在这里https://stackoverflow.com/questions/12774207/fastest-way-to-check-if-a-file-exist-using-standard-c-c11-c
  • finally https://www.bfilipek.com/2017/04/finalact-follow-up.html

Read More

rust学习笔记(c++ based)

为什么又要写一个?不写一遍真的记不住,写的也没有参考中的文章写得好。详细的版本直接跳到参考。

rust这个语言真是热闹。很多特性的融合,对标c++,必须要探究一下! 首先,这个社区+api文档 + 手把手demo 真的非常好,上手难度还好。写好的程序不好说,这文档是真方便,随便点一下就跳到实现上了。 这也说明新语言没那么大包袱。c++打开头文件看实现,天书,还是点开cppreference看接口文档吧。这东西和cppcoreguideline文档也是新搞的。c++也有人注意到了这个问题吧。

列几点我看书(rust book)过程中记录的好玩的地方。

  • 默认不可变语义,这个和c++是正好相反的,c/c++ 想要保证不可变,程序员自觉不改,或者加const,加上const也会有阴招(const_cast)绕过去,c/c++原则,充分信任程序员,程序员要知道自己在做什么,rust不信任,默认不可变,可变得手动标mut,不然分分钟报错
  • 引用语义。c++中,值语义,引用语义的切换非常自然。这个代价就是类型转换天花乱坠,但是自然。rust的引用语义(以及衍生的slice)需要显式指定,传参数得带&,这语法太难看。
  • 接上面,引用在rust中有借用语义,编译器强保证,所以用起来会很痛苦。
  • 宏,rust的宏有点像c++的变参模板,但是语法很邪恶,像perl shell这种脚本语法。很邪恶。
  • enum,这个东西就是个std::variant,配合rust本身的match语法,更变态一些。match这东西从函数式语言中抄过来的。也是先进性体现了。
  • traits 好东西,和c++的type_traits也有点像,也像c++ concept这种静态接口。rust直接去掉了继承,大家实现接口就好了,or enum
  • 这个函数声明风格挺像go的。说起go,go也有interface这种东西,也有magic方法,magic方法有点不适应,就比如make defer这种,没规律。难受。感觉还是c风格,指针回调的感觉。相比rust的traits方法还算系统,c++ concept当初也是这个计划,
  • 最后一句可以当返回值,这是ruby perl语言的优势,不用return。这个有利有弊
  • 很多语法和go很像,但是更系统一些(或者是我对go有偏见)

暂时就这么多。具体还是要写代码才懂。

reference

  • 一个英语博客 https://github.com/nrc/r4cppp
  • 一个不错的总结,针对c++语法 写的不错,基本把我想写的写了。
    • https://xr1s.me/2018/03/01/rust-learning-notes-for-cxx-programmer-part-one/
    • https://xr1s.me/2018/03/01/rust-learning-notes-for-cxx-programmer-part-two/
    • https://xr1s.me/2018/03/01/rust-learning-notes-for-cxx-programmer-part-three/
Read More

Apache Kafka源码剖析笔记



特点

  • Kafka具有近乎实时性的消息处理能力,即使面对海量消息也能够高效地存储消息和查询消息。
  • Kafka将消息保存在磁盘中,在其设计理念中并不惧怕磁盘操作,它以顺序读写的方式访问磁盘, 从而避免了随机读写磁盘导致的性能瓶颈。
  • Kafka支持批量读写消息,并且会对消息进行批量压缩,这样既提高了网络的利用率,也提高了压 缩效率。
  • Kafka支持消息分区,每个分区中的消息保证顺序传输,而分区之间则可以并发操作,这样就提高 了Kafka的并发能力。
  • Kafka也支持在线增加分区,支持在线水平扩展。
  • Kafka支持为每个分区创建多个副本,其中只会有一个Leader副本负责读写,其他副本只负责与 Leader副本进行同步,这种方式提高了数据的容灾能力。Kafka会将Leader副本均匀地分布在集群 中的服务器上,实现性能最大化

为什么引入mq中间件,以及,原有方案的弊端

  • 由于子系统之间存在的耦合性,两个存储之间要进行数据交换的话,开发人员就必须了解这两个 存储系统的API,不仅是开发成本,就连维护成本也会很高。一旦其中一个子系统发生变化,就可 能影响其他多个子系统,这简直就是一场灾难。
  • 在某些应用场景中,数据的顺序性尤为重要,一旦数据出现乱序,就会影响最终的计算结果,降 低用户体验,这就提高了开发的难度。
  • 除了考虑数据顺序性的要求,还要考虑数据重传等提高可靠性的机制,毕竟通过网络进行传输并 不可靠,可能出现丢失数据的情况。
  • 进行数据交换的两个子系统,无论哪一方宕机,重新上线之后,都应该恢复到之前的传输位置, 继续传输。尤其是对于非幂等性的操作,恢复到错误的传输位置,就会导致错误的结果。
  • 随着业务量的增长,系统之间交换的数据量会不断地增长,水平可扩展的数据传输方式就显得尤 为重要。

应对的解决方案

  • 中间件解耦
  • 数据持久化
  • 扩展与容灾
    • topic,partition,replica
    • 多个consumer消费,各自记录消费标签信息,由consumer决定cursor

核心概念

  • 消息
  • topic/partition/log
    • partition offset保证顺序性
    • partition 用Log表述,大小有限,可以分节,顺序io追加,不会有性能问题
      • 索引文件 稀疏索引 内存中定位加速
    • 保留策略 顺序写总会写满,设置消息保留时间或者总体大小
    • 日志压缩 类似rocksdb compaction
  • Broker kafka server概念,接受生产者的消息,分配offset保存到磁盘中
  • 副本(并不是实时同步)
    • ISR集合,可用的没有lag太多的可升leader的节点的集合
    • HighWatermar LEO,描述可消费的属性
      • ①Producer向此Partition推送消息。
      • ②Leader副本将消息追加到Log中,并递增其LEO。
      • ③Follower副本从Leader副本拉取消息进行同步。
      • ④Follower副本将拉取到的消息更新到本地Log中,并递增其LEO。
      • ⑤当ISR集合中所有副本都完成了对offset=11的消息的同步,Leader副本会递增HW。
      • 在①~⑤步完成之后,offset=11的消息就对生产者可见了。
  • 消费者,消费组、

image-20200914163714353

细心的读者可能会问,为什么GZIP压缩方式会直接使用new创建,而Snappy则使用反射方式呢?这主要是因为GZIP使用的GZIPOutputStream是JDK自带的包,而Snappy则需要引入额外的依赖包,为了在不使用Snappy压缩方式时,减少依赖包,这里使用反射的方式动态创建。这种设计的小技巧,值得读者积累。在Compressor中还提供了wrapForInput()方法,用于创建解压缩输入流,逻辑与wrapForOutput()类似,不再赘述。

生产者KafkaProducer分析

同步还是异步就差在future本身调用不调用get

image-20200916095525486

结构很清晰

KafkaProducer

  • send()方法:发送消息,实际是将消息放入RecordAccumulator暂存,等待发送。
  • flush()方法:刷新操作,等待RecordAccumulator中所有消息发送完成,在刷新完成之前会阻塞调用的线程。
  • partitionsFor()方法:在KafkaProducer中维护了一个Metadata对象用于存储Kafka集群的元数据,Metadata中的元数据会定期更新。partitionsFor()方法负责从Metadata中获取指定Topic中的分区信息。
    • topic, verison, timestamp….
  • close()方法:关闭此Producer对象,主要操作是设置close标志,等待RecordAccumulator中的消息清 空,关闭Sender线程

RecordAccumulator

RecordAccumulator中有一个以TopicPartition为key的ConcurrentMap,每个value是ArrayDeque<RecordBatch>(ArrayDeque并不是线程安全的集合,后面会详细介绍其加锁处理过程),其中缓存了发往对应TopicPartition的消息。每个RecordBatch拥有一个MemoryRecords对象的引用

(1)Deque中有多个RecordBatch或是第一个RecordBatch是否满了。 (2)是否超时了。 (3)是否有其他线程在等待BufferPool释放空间(即BufferPool的空间耗尽了)。 (4)是否有线程正在等待flush操作完成。 (5)Sender线程准备关闭。

BufferPool

  • 有锁。全局分配器

Sender

据RecordAccumulator的缓存情况,筛选出可以向哪些Node节点发送消息,RecordAccumulator.ready();然后,根据生产者与各个节点的连接情况(由NetworkClient管理),过滤Node节点;之后,生成相应的请求,这里要特别注意的是,每个Node节点只生成一个请求;最后,调用NetWorkClient将请求发送出去

image-20200921165610705

controller

http://www.sirann.cn/blog/kafka-controller-%E6%A8%A1%E5%9D%97%E4%B8%80%E6%A6%82%E8%BF%B0/

删除消息,支持按照offset SO,按照topic来删

支持时间删除和大小删除

痛点

对于单纯运行Kafka的集群而言,首先要注意的就是为Kafka设置合适(不那么大)的JVM堆大小。从上面的分析可知,Kafka的性能与堆内存关系并不大,而对page cache需求巨大。根据经验值,为Kafka分配6~8GB的堆内存就已经足足够用了,将剩下的系统内存都作为page cache空间,可以最大化I/O效率。

另一个需要特别注意的问题是lagging consumer,即那些消费速率慢、明显落后的consumer。它们要读取的数据有较大概率不在broker page cache中,因此会增加很多不必要的读盘操作。比这更坏的是,lagging consumer读取的“冷”数据仍然会进入page cache,污染了多数正常consumer要读取的“热”数据,连带着正常consumer的性能变差。在生产环境中,这个问题尤为重要。

解决方案,调整page cache https://www.jianshu.com/p/92f33aa0ff52

实时消费与延迟消费的作业在 PageCache 层次产生竞争,导致实时消费产生非预期磁盘读。线上存在 20%的延迟消费作业,污染cache

传统 HDD 随着读并发升高性能急剧下降。

解决方案

  • 引入SSD 做cache https://www.infoq.cn/article/k6dqfqqihpjfepl3y3hs
  • 重新设计kafka cache系统 https://www.jiqizhixin.com/articles/2019-07-23-11

优化细节

zero-copy技术

减少甚至避免用户空间和内核空间之间的数据拷贝:在一些场景下,用户进程在数据传输过程中并不需要对数据进行访问和处理,那么数据在 Linux 的 Page Cache 和用户进程的缓冲区之间的传输就完全可以避免,让数据拷贝完全在内核里进行,甚至可以通过更巧妙的方式避免在内核里的数据拷贝。这一类实现一般是通过增加新的系统调用来完成的,比如 Linux 中的 mmap(),sendfile() 以及 splice()

这里用的sendfile


Read More


memcached源码剖析笔记



memcache特点

  • 协议简单
    • add/set/replace/get/get/delete
  • 基于libevent
  • 内存存储,LRU,抗干扰?

内存存储机制

slab allocation,感觉像linux系统的slab

  • page 1M chunk内存空间slab 特定大小的chunk组
  • 解决内存碎片但是空间利用存在浪费
    • chunk可以动态増缩?
  • 增长因子可以设定 growth factor

image-20200911173653707


ref


Read More


nginx源码剖析笔记


linux内核参数优化

;最大句柄数
fs.file-max = 99999 
;time_wait状态的socket重新用于新的tcp链接
net.ipv4.tcp_tw_reuse = 1
;tcp发送keeptime的时间,调小可以快速清除无效连接(?单位是什么)
net.ipv4.tcp_keepalive_time = 600
;服务器主动关闭保持FIN_WAIT_2的最大时间
net.ipv4.tcp_fin_timeout = 30
;TIME_WAIT的socket最大值,上限,超过这个值会清掉所有TIME_WAIT TIME_WAIT过多会卡
net.ipv4.tcp_max_tw_buckets = 5000
net.ipv4.ip_local_port_range = 1024 61000
net.ipv4.tcp_rmem = 4096 32768 262142
net.ipv4.tcp_wmem = 4096 32768 262142
;内核处理接收包队列的长度上限
net.core.netdev_max_backlog = 8096
net.core.rmem_default = 262144
net.core.wmem_default = 262144
net.core.rmem_max = 2097152
net.core.wmem_max = 2097152
;tcp syn攻击
net.ipv4.tcp_syncookies =1
;正在三次握手建立阶段的请求队列,可以调高以免丢失客户端连接
net.ipv4.tcp_max_syn.backlog = 1024

滑动窗口大小与套接字缓存设置会在一定程度上影响并发 每个tcp链接都会为了维护滑动窗口而消耗内存

命令行相关

快速退出进程

kill -s SIGTERM <pid>

kill -s SIGINT <pid>

优雅退出

kill -s SIGQUIT <master pid>

kill -s SIGWINCH <worker pid>

重读配置

kill -s SIGHUP <master pid>

日志回滚

kill -s SIGUSR1 <master pid>

还真有这么实现日志回滚的。。。我惊了

平滑升级

kill -s SIGUSR2 <master pid>

这些是对信号的handler做自定义了

性能调优

  • 指定worker个数
    • worker绑核 (代码层怎么实现的?)
  • SSL硬件加速
  • 系统调用gettimeofday执行频率 (现在开销没那么大,也可以限制)
  • worker优先级 nice值设定

事件类配置型

  • 是否打开accept锁 连接负载均衡锁
    • 延迟时间设定

Nginx基础架构

  • 通用的ngx_module_t

  • TCP_DEFER_ACCEPT以及post_accept_timeout 如果连接很久没事件就踢掉

  • ngx_cycle_t

    • void ****conf_ctx 所有模块配置项结构体指针(数组->指针->指针数组->指针)

    image-20200910174704792

讲了很多模块知识。。我对模块不太感兴趣。不看了先


ref

  • 关键字 陶辉的博客。

Read More

overloaded trick

why

这篇就是参考链接2的总结,还是从参考链接1中单独拎出来说一下


之前学std::variant 和 std::visit 学到了overloaded这个模板,

#include <variant>
#include <cstdio>
#include <vector>

template<class... Ts> struct overloaded : Ts... { using Ts::operator()...; }; // (1)
template<class... Ts> overloaded(Ts...) -> overloaded<Ts...>;  // (2)

using var_t = std::variant<int, const char*>;

int main() {
    std::vector<var_t> vars = {1, 2, "Hello, World!"};

    for (auto& v : vars) {
        std::visit(overloaded {  // (3)
            [](int i) { printf("%d\n", i); },
            [](const char* str) { puts(str); }
        }, v);
    }

    return 0;
}

(2)是c++17引入的新特性,乍一看看不懂,咱们一点一点顺一下

首先,这个overloaded模板就是一个转发继承而来的operator (), 一个粗暴的版本,需要基类实现operator()

struct PrintInt { //(1)
    void operator() (int i) {
        printf("%d\n", i);
    }
};

struct PrintCString { // (2)
    void operator () (const char* str) {
        puts(str);
    }
};

struct Print : PrintInt, PrintCString { // (3)
    using PrintInt::operator();
    using PrintCString::operator();
};

如果写成模板形式,那就是

template <class... Ts> // (1)
struct Print : Ts... {
    using Ts::operator()...;
};

int main() {
    std::vector<var_t> vars = {1, 2, "Hello, World!"};

    for (auto& v : vars) {
        std::visit(Print<PrintCString, PrintInt>{}, v); // (2)
    }

    return 0;
}

注意到这个写法特别重,考虑到开头这个优雅的用法,使用lambda,代码写起来就更难看了

int main() {
    std::vector<var_t> vars = {1, 2, "Hello, World!"};
    auto PrintInt = [](int i) { printf("%d\n", i); }; // (1)
    auto PrintCString = [](const char* str) { puts(str); };

    for (auto& v : vars) {
        std::visit(
            Print<decltype(PrintCString), decltype(PrintInt)>{PrintCString, PrintInt}, // (2)
            v);
    }

    return 0;
}

所以理所当然,推导动作应该放在一个helper函数里, 上面这个调用模式还是很容写出一个推导helper的

template <class... Ts> // (1)
auto MakePrint(Ts... ts) {
    return Print<Ts...>{ts...};
}

int main() {
    std::vector<var_t> vars = {1, 2, "Hello, World!"};

    for (auto& v : vars) {
        std::visit(
            MakePrint( // (2)
                [](const char* str) { puts(str); },
                [](int i) { printf("%d\n", i); }
                ),
            v);
    }

    return 0;
}

这已经和overload非常接近了,回到一开始我们提到的,如何写成开头那个样子呢,这就需要c++

17 的新特性,类模板实参推导, 自定义推导指引,User-defined deduction guides,简单说,就是构造函数能做helper的活(make_tuple, make_pair),只要定义好规则就可以

在c++17中,可以干净的写出

std::tuple t(4, 3, 2.5); // same as auto t = std::make_tuple(4, 3, 2.5);

在tuple中,写好了推导规则

#ifndef _LIBCPP_HAS_NO_DEDUCTION_GUIDES
// NOTE: These are not yet standardized, but are required to simulate the
// implicit deduction guide that should be generated had libc++ declared the
// tuple-like constructors "correctly"
template <class _Alloc, class ..._Args>
tuple(allocator_arg_t, const _Alloc&, tuple<_Args...> const&) -> tuple<_Args...>;
template <class _Alloc, class ..._Args>
tuple(allocator_arg_t, const _Alloc&, tuple<_Args...>&&) -> tuple<_Args...>;
#endif

make_tuple就下岗了

类似的,只要为Print写好推导,就可以省掉MakePrint

#include <variant>
#include <cstdio>
#include <vector>

using var_t = std::variant<int, const char*>;

template <class... Ts>
struct Print : Ts... {
    using Ts::operator()...;
};

template <class...Ts> Print(Ts...) -> Print<Ts...>; // (1)

int main() {
    std::vector<var_t> vars = {1, 2, "Hello, World!"};
    for (auto& v : vars) {
        std::visit(
            Print{ // (2)
                [](const char* str) { puts(str); },
                [](int i) { printf("%d\n", i); }
            },
            v);
    }
    return 0;
}

到此,overloaded trick就解释完了

reference

Read More

^