miniselect 一个选择算法库


原文链接 这个是作者用在clickhouse上的,抽出来做成公共库了。借着这个库重新复习一下选择/排序算法

选择算法,见图

Name Average Best Case Worst Case Comparisons Memory
pdqselect \large O(n) \large O(n) \large O(n\log n) At least \large 2n. Random data \large 2.5n \large O(1)
Floyd-Rivest \large O(n) \large O(n) \large O(n^2 ) Avg: \large n + \min(k, n - k) + O(\sqrt{n \log n}) \large O(\log \log n)
Median Of Medians \large O(n) \large O(n) \large O(n) Between \large 2n and \large 22n. Random data \large 2.5n \large O(\log n)
Median Of Ninthers \large O(n) \large O(n) \large O(n) Between \large 2n and \large 21n. Random data \large 2n \large O(\log n)
Median Of 3 Random \large O(n) \large O(n) \large O(n^2 ) At least \large 2n. Random data \large 3n \large O(\log n)
HeapSelect \large O(n\log k) \large O(n) \large O(n\log k) \large n\log k on average, for some data patterns might be better \large O(1)
libstdc++ (introselect) \large O(n) \large O(n) \large O(n\log n) At least \large 2n. Random data \large 3n \large O(1)
libc++ (median of 3) \large O(n) \large O(n) \large O(n^2 ) At least \large 2n. Random data \large 3n \large O(1)

作者总结了这些快速选择的使用特点

  • pdqselect 改自pdqsort 使用场景 Use it when you need to sort a big chunk so that \large k is close to \large n.
  • FR 快速选择算法,非常牛逼,大部分场景性能都很好,除非重复元素多
  • Median Of Medians,别用
  • Median Of Ninthers , AA大作,除非非常悲观,追求保底性能,否则别用
  • Median Of 3Random,别用
  • introselect ,也就是标准库的nth_select,别用
  • Median Of 3 别用
  • std::partial_sort or HeapSelect 非常随机的数据,用,否则别用

复习一下各种排序算法

排序算法 最好时间 平均时间 最坏时间 辅助空间
归并排序 \large O(n\log n) \large O(n\log n) \large O(n\log n) \large O(n)
堆排序 \large O(n\log n) \large O(n\log n) \large O(n\log n) \large O(1)
快速排序 \large O(n\log n) \large O(n\log n) \large O(n^2 ) O(logn) O(n)
自省排序 \large O(n\log n) \large O(n\log n) \large O(n\log n) -
PDQSort \large O(n) \large O(n\log n) \large O(n\log n) -
K路归并 - O(nklogk)) or O(nk^2) - -
并行归并 O((log n)^2) O((log n )^2) O((logn)3) -

简单原理归纳见参考链接,这里简单说一下

  • 归并,两指针比较移动 比较经典
  • 堆,构造一个最小堆/最大堆,然后重复构造交换节点等
  • 快排,选定一个pivot点,左边小于P右边大于P 子区间重复划分
    • 这个效率取决于P的选取,如果P选的不好,那就是最差的冒泡
    • 一般有三分采样 Median of three,也可以算个平均值,还有什么好的P选取法?
    • java DualPivotQuickSort,取两个Pivot,实现快速三向切分的快速排序,也是个有意思的点子
    • 数据集小,不如插入排序,也就产生了自省排序,也就是快排+插入+堆排序组合
  • pdqsort Pattern-defeating Quicksort,这也是主要说的,rust库用的sort_unstable就是这个,也是当前发展上最快的sort,论文关键字 BlockQuicksort: Avoiding Branch Mispredictions in Quicksort 实现看参考链接 是快排的优化版,参考链接的实现是std::sort的优化版,其中快排部分采用了论文提到的优化:部分打乱,小范围内交换位置等等避免分支预测

pdqsort gets a great speedup over the traditional way of implementing quicksort when sorting large arrays (1000+ elements). This is due to a new technique described in “BlockQuicksort: How Branch Mispredictions don’t affect Quicksort” by Stefan Edelkamp and Armin Weiss. In short, we bypass the branch predictor by using small buffers (entirely in L1 cache) of the indices of elements that need to be swapped. We fill these buffers in a branch-free way that’s quite elegant (in pseudocode):

buffer_num = 0; buffer_max_size = 64;
for (int i = 0; i < buffer_max_size; ++i) {
    // With branch:
    if (elements[i] < pivot) { buffer[buffer_num] = i; buffer_num++; }
    // Without:
    buffer[buffer_num] = i; buffer_num += (elements[i] < pivot);
}

上文作者介绍原理

  1. If there are n < 24 elements, use insertion sort to partition or even sort them. As insertion sort is really fast for a small amount of elements, it is reasonable
  2. If it is more, choose pivot:
    1. If there are less or equal than 128 elements, choose pseudomedian (or “ninther”, or median of medians which are all them same) of the following 3 groups:
      1. begin, mid, end
      2. begin + 1, mid – 1, end – 1
      3. begin + 2, mid + 1, end – 2
    2. If there are more than 128 elements, choose median of 3 from begin, mid, end
  3. Partition the array by the chosen pivot with avoiding branches
    1. The partition is called bad if it splits less than 1/8n elements
    2. If the total number of bad partitions exceeds \log n, use std::nth_element or any other fallback algorithm and return
    3. Otherwise, try to defeat some patterns in the partition by (sizes are l_size and r_size respectively):
      1. Swapping begin, begin + l_size / 4
      2. Swapping p – 1 and p – l_size / 4
      3. And if the number of elements is more than 128
        1. begin + 1, begin + l_size / 4 + 1
        2. begin + 2, begin + l_size / 4 + 2
        3. p – 2, p – l_size / 4 + 1
        4. p – 3, p – l_size / 4 + 2
      4. Do the same with the right partition
  4. Choose the right partition part and repeat like in QuickSelect
  • k路归并就是归并个数增加,并行归并就是开并发,不表

多提一点,基数排序 Radix Sort在某些场景上要比快排快的。当然,只能用于整数

n std::sort radix_sort
10 3.3 ns 284.2 ns
100 6.1 ns 91.6 ns
1 000 19.3 ns 59.8 ns
10 000 54.8 ns 46.8 ns
100 000 66.9 ns 40.1 ns
1 000 000 81.1 ns 40.8 ns
10 000 000 95.1 ns 40.7 ns
100 000 000 108.4 ns 40.6 ns

ref

  • 作者还提到了learned sort,机器学习真牛逼啊 https://blog.acolyer.org/2020/10/19/the-case-for-a-learned-sorting-algorithm/ 看不太懂
  • FR select https://zhuanlan.zhihu.com/p/109385885
  • https://rongyi.blog/fast-sorting
  • https://github.com/orlp/pdqsort/blob/master/pdqsort.h
  • sound of sorting https://panthema.net/2013/sound-of-sorting/
  • 基数排序快过std::sort https://sortingsearching.com/2015/09/26/radix-sort.html
    • 作者提到快过基数排序的 Kirkpatrick-Reisch 排序,感觉是基数排序的优化版https://sortingsearching.com/2020/06/06/kirkpatrick-reisch.html

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

flatbuffers使用细节以及和PB做一下对比


关注了anna用的也是fbs, smf rpc框架用的也是fbs

anna感觉技术更像是seastar那套

类似protobuf的介绍,先关注一下使用细节

// Example IDL file for our monster's schema.
namespace MyGame.Sample;
enum Color:byte { Red = 0, Green, Blue = 2 }
union Equipment { Weapon } // Optionally add more tables.
struct Vec3 {
  x:float;
  y:float;
  z:float;
}
table Monster {
  pos:Vec3; // Struct.
  mana:short = 150;
  hp:short = 100;
  name:string;
  friendly:bool = false (deprecated);
  inventory:[ubyte];  // Vector of scalars.
  color:Color = Blue; // Enum.
  weapons:[Weapon];   // Vector of tables.
  equipped:Equipment; // Union.
  path:[Vec3];        // Vector of structs.
}
table Weapon {
  name:string;
  damage:short;
}
root_type Monster;

Tables

  • 新增字段只能增加在table定义末尾, 比如旧版本schema定义 table { a:int; b:int;},新版本新增一个字段在末尾table { a:int; b:int; c:int; },那么

    • 用旧版本schema读取新的数据结构会忽略新字段 c 的存在,因为新增字段在末尾。
    • 用新版本schema读取旧的数据结构,将会取到新增字段的默认值。

    如果新增字段出现在中间,会导致版本不兼容问题。 table { a:int; c:int; b:int; } - 用旧版本schema读取新的数据结构,读取b字段的时候,会读取到c字段 - 用新版本schema读取旧的数据结构,读取c字段的时候,会读取到b字段

    如果不想新增字段到末尾,用id属性可以实现 table { c:int (id: 2); a:int (id: 0); b:int (id: 1); } 引入 id 以后,table 中的字段顺序就无所谓了,新的与旧的 schema 完全兼容,只要我们保留 id 序列即可。

    用id的方案和PB一致

  • schema不能删除任何字段,写数据的时候可以不再写废弃字段的值,在schema中把这个字段标记为deprecated,那么生成的代码里就不会出现废弃字段。table { a:int (deprecated); b:int; }

    • 用旧版本schema读取新的数据结构,将会取到字段a的默认值,因为不存在。
    • 用新版本schema不能读取也不能写入字段a,会导致编译错误
  • 可以改变字段类型,在类型大小相同情况下代码随之改动,是ok的。比如 table { a:uint; b:uint; } -> table { a:int = 1; b:int = 2; } 代码必须保证正确性。

Structs

和Table类似,区别是没有字段是optional的,字段不能增加或者废弃deprecation。Structs只接受标量或者其他Structs。使用这个对象的时候,必须是非常确定这个结构将来不会任何改动。Structs比Table内存占用更少,检索速度更快。

Types

build-in标量类型有

  • 8 bit: byte (int8), ubyte (uint8), bool
  • 16 bit: short (int16), ushort (uint16)
  • 32 bit: int (int32), uint (uint32), float (float32)
  • 64 bit: long (int64), ulong (uint64), double (float64) 括号内名称可以相互替换,不会影响代码生成。

build-in非标量类型有

  • 任何类型的数组。不过不支持嵌套数组,可以用 table 内定义数组的方式来取代嵌套数组。
  • UTF-8 和 7-bit ASCII 的字符串。其他格式的编码字符串或者二进制数据,需要用 [byte] 或者 [ubyte] 来替代。
  • table、structs、enums、unions

这些字段的类型一旦使用之后就无法再更改,可以用reinterpret_cast强转,比如如果当前数据的最符号位没有值得话,可以将uint强转成int

Attributes

Attributes 可以附加到字段声明,放在字段后面或者 table/struct/enum/union 的名称之后。这些字段可能有值也有可能没有值。

一些 Attributes 只能被编译器识别,比如 deprecated。用户也可以定义一些 Attributes,但是需要提前进行 Attributes 声明。声明以后可以在运行时解析 schema 的时候进行查询。这个对于开发一个属于自己的代码编译/生成器来说是非常有用的。或者是想添加一些特殊信息(一些帮助信息等等)到自己的 FlatBuffers 工具之中。

目前最新版能识别到的 Attributes 有 11 种。

  • id:n (on a table field) id 代表设置某个字段的标识符为 n 。一旦启用了这个 id 标识符,那么所有字段都必须使用 id 标识,并且 id 必须是从 0 开始的连续数字。需要特殊注意的是 Union,由于 Union 是由 2 个字段构成的,并且隐藏字段是排在 union 字段的前面。(假设在 union 前面字段的 id 排到了6,那么 union 将会占据 7 和 8 这两个 id 编号,7 是隐藏字段,8 是 union 字段)添加了 id 标识符以后,字段在 schema 内部的相互顺序就不重要了。新字段用的 id 必须是紧接着的下一个可用的 id(id 不能跳,必须是连续的)。
  • deprecated (on a field) deprecated 代表不再为此字段生成访问器,代码应停止使用此数据。旧数据可能仍包含此字段,但不能再通过新的代码去访问这个字段。请注意,如果您弃用先前所需的字段,旧代码可能无法验证新数据(使用可选验证器时)。
  • required (on a non-scalar table field) required 代表该字段不能被省略。默认情况下,所有字段都是可选的,即可以省略。这是可取的,因为它有助于向前/向后兼容性以及数据结构的灵活性。这也是阅读代码的负担,因为对于非标量字段,它要求您检查 NULL 并采取适当的操作。通过指定 required 字段,可以强制构建 FlatBuffers 的代码确保此字段已初始化,因此读取的代码可以直接访问它,而不检查 NULL。如果构造代码没有初始化这个字段,他们将得到一个断言,并提示缺少必要的字段。请注意,如果将此属性添加到现有字段,则只有在现有数据始终包含此字段/现有代码始终写入此字段,这两种情况下才有效。
  • force_align: size (on a struct) force_align 代表强制这个结构的对齐比它自然对齐的要高。如果 buffer 创建的时候是以 force_align 声明创建的,那么里面的所有 structs 都会被强制对齐。(对于在 FlatBufferBuilder 中直接访问的缓冲区,这种情况并不是一定的)
  • bit_flags (on an enum) bit_flags 这个字段的值表示比特,这意味着在 schema 中指定的任何值 N 最终将代表1 « N,或者默认不指定值的情况下,将默认得到序列1,2,4,8 ,…
  • nested_flatbuffer: "table_name" (on a field) nested_flatbuffer 代表该字段(必须是 ubyte 的数组)嵌套包含 flatbuffer 数据,其根类型由 table_name 给出。生成的代码将为嵌套的 FlatBuffer 生成一个方便的访问器。
  • flexbuffer (on a field) flexbuffer 表示该字段(必须是 ubyte 的数组)包含 flexbuffer 数据。生成的代码将为 FlexBuffer 的 root 创建一个方便的访问器。
  • key (on a field) key 字段用于当前 table 中,对其所在类型的数组进行排序时用作关键字。可用于就地查找二进制搜索。
  • hash (on a field) 这是一个不带符号的 32/64 位整数字段,因为在 JSON 解析过程中它的值允许为字符串,然后将其存储为其哈希。属性的值是要使用的散列算法,即使用 fnv1_32、fnv1_64、fnv1a_32、fnv1a_64 其中之一。
  • original_order (on a table) 由于表中的元素不需要以任何特定的顺序存储,因此通常为了优化空间,而对它们大小进行排序。而 original_order 阻止了这种情况发生。通常应该没有任何理由使用这个标志。
  • ‘native_*’ 已经添加了几个属性来支持基于 C++ 对象的 API,所有这些属性都以 “native_” 作为前缀。具体可以点链接查看支持的说明,native_inlinenative_defaultnative_custom_allocnative_typenative_include: "path"

Enums

enum Color : byte { Red = 1, Green, Blue } 定义一系列命名常量,每个命名常量可以分别给一个定值,也可以默认的从前一个值增加一。默认的第一个值是 0。enum声明的时候可以指定底层的整数类型,只能指定整数类型。 enum只能增加,不能删除或者废弃deprecation,因此代码必须保证兼容性,处理未知的枚举值。

Unions

Unions和Enums有很多类似之处,但是union包含的是table,enum包含的是scalar或者 struct。 可以声明一个 Unions 字段,该字段可以包含对这些类型中的任何一个的引用,即这块内存区域只能由其中一种类型使用。另外还会生成一个带有后缀 _type 的隐藏字段,该字段包含相应的枚举值,从而可以在运行时知道要将哪些类型转换为类型。

Namespaces

C++代码中生成namespace,Java代码中生成package

Includes

包含另一个schama文件,保证每个文件可以不被多次引用,但是只被解析一次。

Root type

声明序列化数据中的根表root table,在解析JSON数据时尤为重要,因为他们不包含对象信息。

设计建议

由于 FlatBuffers 的灵活性和可扩展性,将任何类型的数据表示为字典(如在 JSON 中)是非常普遍的做法。尽管可以在 FlatBuffers(作为具有键和值的表的数组)中模拟这一点,但这对于像 FlatBuffers 这样的强类型系统来说,这样做是一种低效的方式,会导致生成相对较大的二进制文件。在大多数系统中,FlatBuffer table 比 classes/structs 更灵活,因为 table 在处理 field 数量非常多,但是实际使用只有其中少数几个 field 这种情况,效率依旧非常高。因此,组织数据应该尽可能的组织成 table 的形式。

同样,如果可能的话,尽量使用枚举的形式代替字符串。

FlatBuffers 中没有继承的概念,所以想表示一组相关数据结构的方式是 union。但是,union 确实有成本,另外一种高效的做法就是建立一个 table 。如果这些数据结构有很多相似或者可以共享的 field ,那么建议一个 table 是非常高效的。在这个 table 中包含所有数据结构的所有字段即可。高效的原因就是 optional 字段是非常廉价的,消耗少。

FlatBuffers 默认可以支持存放的下所有整数,因此尽量选择所需的最小大小,而不是默认为 int/long。

可以考虑用 buffer 中一个字符串或者 table 来共享一些公共的数据,这样做会提高效率,因此将重复的数据拆成共享数据结构 + 私有数据结构,这样做是非常值得的。

CURD

// 写
flatbuffers::FlatBufferBuilder builder(1024);
// 用builder.createxx创建基本类型
auto weapon_one_name = builder.CreateString("Sword");
short weapon_one_damage = 3;
auto weapon_two_name = builder.CreateString("Axe");
short weapon_two_damage = 5;
// Use the `CreateWeapon` shortcut to create Weapons with all the fields set.
auto sword = CreateWeapon(builder, weapon_one_name, weapon_one_damage);
auto axe = CreateWeapon(builder, weapon_two_name, weapon_two_damage);

// Create the position struct
auto position = Vec3(1.0f, 2.0f, 3.0f);
// Set his hit points to 300 and his mana to 150.
int hp = 300;
int mana = 150;
// Finally, create the monster using the `CreateMonster` helper function
// to set all fields.
auto orc = CreateMonster(builder, &position, mana, hp, name, inventory,
                        Color_Red, weapons, Equipment_Weapon, axe.Union(),
                        path);
// You can use this code instead of `CreateMonster()`, to create our orc
// manually.
MonsterBuilder monster_builder(builder);
monster_builder.add_pos(&position);
monster_builder.add_hp(hp);
monster_builder.add_name(name);
monster_builder.add_inventory(inventory);
monster_builder.add_color(Color_Red);
monster_builder.add_weapons(weapons);
monster_builder.add_equipped_type(Equipment_Weapon);
monster_builder.add_equipped(axe.Union());
auto orc = monster_builder.Finish();
builder.Finish(orc);
// This must be called after `Finish()`.
uint8_t *buf = builder.GetBufferPointer();
int size = builder.GetSize(); // Returns the size of the buffer that
                              // `GetBufferPointer()` points to.
//这一套下来就可以直接拷贝/传range了



// 读
uint8_t *buffer_pointer = /* the data you just read */;
// Get a pointer to the root object inside the buffer.
auto monster = GetMonster(buffer_pointer);
auto hp = monster->hp();
auto mana = monster->mana();
auto name = monster->name()->c_str();
auto pos = monster->pos();
auto x = pos->x();
auto y = pos->y();
auto z = pos->z();

// 原地改,接口类似pb的mutable_xx

auto monster = GetMutableMonster(buffer_pointer);  // non-const
monster->mutate_hp(10);                      // Set the table `hp` field.
monster->mutable_pos()->mutate_z(4);         // Set struct field.
monster->mutable_inventory()->Mutate(0, 1);  // Set vector element.


参考链接

  • 教程 https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html
  • https://halfrost.com/flatbuffers_schema/

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

protobuf使用细节


这东西和什么语言无关,是中间接口描述语言IDL,

具体细节不说了,在参考链接里都有的,这里记录下我关注的细节

[toc]

字段

消息对象的字段 组成主要是:字段 = 字段修饰符 + 字段类型 +字段名 +标识号

类型是有个表格来描述具体的字节占用 (原来的表格有java我不太关注就去掉了)

.proto类型 C++类型 Go 备注
double double float64  
float float float32  
int32 int32   使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint32。
int64 int64   使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint64。
uint32 uint32   Uses variable-length encoding.
uint64 uint64   Uses variable-length encoding.
sint32 int32   使用可变长编码方式。有符号的整型值。编码时比通常的int32高效。
sint64 int64   使用可变长编码方式。有符号的整型值。编码时比通常的int64高效。
fixed32 uint32   总是4个字节。如果数值总是比总是比228大的话,这个类型会比uint32高效。
fixed64 uint64   总是8个字节。如果数值总是比总是比256大的话,这个类型会比uint64高效。
sfixed32 int32   总是4个字节。
sfixed64 int64   总是8个字节。
bool bool    
string string   一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。
bytes string   可能包含任意顺序的字节数据。 使用和string一样的,传参数也是string

如果字段更新类型,转换规则可以看字段更新部分

标识号分配

[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。要为将来有可能添加的/频繁出现的标识号预留一些标识号。

字段更新

如果一个已有的消息格式已无法满足新的需求——如,要在消息中添加一个额外的字段——但是同时旧版本写的代码仍然可用。不用担心!更新消息而不破坏已有代码是非常简单的。在更新时只要记住以下的规则即可。

  • 不要更改任何已有的字段的数值标识号。
  • 所添加的任何字段都必须是optional或repeated的。这就意味着任何使用“旧”的消息格式的代码序列化的消息可以被新的代码所解析,因为它们 不会丢掉任何required的元素。应该为这些元素设置合理的默认值,这样新的代码就能够正确地与老代码生成的消息交互了。类似地,新的代码创建的消息 也能被老的代码解析:老的二进制程序在解析的时候只是简单地将新字段忽略。然而,未知的字段是没有被抛弃的。此后,如果消息被序列化,未知的字段会随之一 起被序列化——所以,如果消息传到了新代码那里,则新的字段仍然可用。注意:对Python来说,对未知字段的保留策略是无效的。
  • 非required的字段可以移除——只要它们的标识号在新的消息类型中不再使用(更好的做法可能是重命名那个字段,例如在字段前添加“OBSOLETE_”前缀,那样的话,使用的.proto文件的用户将来就不会无意中重新使用了那些不该使用的标识号)。
  • 一个非required的字段可以转换为一个扩展,反之亦然——只要它的类型和标识号保持不变。
  • int32, uint32, int64, uint64,和bool是全部兼容的,这意味着可以将这些类型中的一个转换为另外一个,而不会破坏向前、 向后的兼容性。如果解析出来的数字与对应的类型不相符,那么结果就像在C++中对它进行了强制类型转换一样(例如,如果把一个64位数字当作int32来 读取,那么它就会被截断为32位的数字)。
  • sint32和sint64是互相兼容的,但是它们与其他整数类型不兼容。
  • string和bytes是兼容的——只要bytes是有效的UTF-8编码。
  • 嵌套消息与bytes是兼容的——只要bytes包含该消息的一个编码过的版本。
  • fixed32与sfixed32是兼容的,fixed64与sfixed64是兼容的。

说实话,兼容不是特别关注,主要关注标识号改动部分,最开始开发要做预留,然后改动标识号不要改动已有的,找缝隙加上就行

字段的操作(CURD)

一般来说,消息的字段会自动生成set_xxx方法

package message;                                                                                                                message MessageRequest {
    required string msg = 10;
}

对应的

message::MessageRequest::MessageRequest req;
req.set_msg("yoyoyo");

下面列举几个特殊场景

repeated字段的更新

repeat可以理解成数组,处理方法也多了几步, 会提供一个接口

package message;                                                                                                                
message Pair {
    required string key;
    required string value;
}
message MessageRequest {
    required string msg = 10;
    required int32 seq = 20;
    repeated Pair pairs = 30;
}

对应的修改

message::MessageRequest req;
std::vector<message::Pair> pairs;
for (auto& v: pairs) {
  //type: message::MessageRequest::field*  
  auto pair = req.add_pairs();
  pair->set_key('kk');
  pair->set_value('vv');
}
有人说,通过repeated字段来更新数据,当返回为空的时候,可能分不清是应该清空还是保持不变

需要加字段来纠正这个歧义,这里不细说了,感觉就是想要便捷(返回空)强行创造的歧义。约定好的话没啥问题,不需要加字段

package message;
message Pair {
    required string key;
    required string value;
}
message MessageRequest {
    required string msg = 10;
    required int32 seq = 20;
    repeated Pair pairs = 30;
    optional bool modify = 31; //如果是0个field modify是1那就是清空,modify是0那就是没更新
}

嵌套结构的消息, 生成了set_allocated_xxx方法, 没有普通的set_xxx方法

这里传进set_allocated_xxx 的对象必须是new好的,这个不太好用,protobuf内部会帮你delete,你自己也可以调用release_xx(最好别)

也可以用mutable_xx 内部帮你new好,你自己赋值,类似这样的

mutable_xx()->set_xx(/*xx*/);

也可以用copyfrom比较省心,其实都不太好用,尽量别嵌套

optional字段会生成has_xx方法 但proto3不支持怎么办

https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3

message Foo {
    int32 bar = 1;
    oneof optional_baz {
        int32 baz = 2;
    }
}

用oneof来封装一层 proto3新版本也支持optional了

merge

支持字段的merge操作,设置fieldMask


参考链接

  • 官方文档的翻译 https://colobu.com/2015/01/07/Protobuf-language-guide/
  • https://www.jianshu.com/p/e06ba6249edc 这篇感觉就是上一篇的细化
  • 这有个整理的更细致的 https://juejin.im/post/6844903687089831944
  • Repeated 修改 http://lambda.hk/protobuf/2015/06/05/protobuf-repeated/
  • repeated 歧义 https://blog.csdn.net/love_newzai/article/details/6929430
  • 嵌套 https://blog.csdn.net/xiaxiazls/article/details/50118161
    • copyFrom https://blog.csdn.net/u014088857/article/details/53291545
  • https://jergoo.gitbooks.io/go-grpc-practice-guide/content/chapter1/protobuf-go.html go的使用说明

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

quick-bench的原理是啥?


找到两个bench的实现,这里记录一下,做个备忘,以后记得整理一下

https://nanobench.ankerl.com/tutorial.html#usage

https://github.com/p-ranav/criterion

https://github.com/martinus/nanobench


看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

神秘的40ms bug


原文链接

这个是比较经典的问题了,你一搜40ms,网上一堆结果

我去年就见到pika解决这个问题 https://github.com/Qihoo360/pink/pull/4/

这个链接也提到了喜马拉雅团队定位并解决这个问题 上面的合入就是这么引入的

简单说,就是Nagle’s algorithm开了,没设置tcp_nodelay,但是对端有delayed ack优化,这边不发,那边不回,正好超时,超时时间40ms

最近逛lobsters看到参考链接1,他们也遇到了这个问题,分析了一波应用自身,各种重写改写链路的接口,没定位到,最后才发现Nagle算法,以及背后的delayed ack,关掉Nagle algorithm设置tcp_nodelay就行了


ref

  • 一个解释 https://cloud.tencent.com/developer/article/1648761
  • https://mysteries.wizardzines.com/50ms-request.html 这有个小练习题,挺有意思

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

重载返回值


本文是这篇文章的翻译整理

主要是这个场景

// this is OK
std::string to_string(int i);
std::string to_string(bool b);

std::string si = to_string(0);
std::string sb = to_string(true);

// this is not OK
int from_string(std::string_view s);
bool from_string(std::string_view s);

int i = from_string("7");
bool b = from_string("false");

想让返回值更统一

做法是做一个统一的返回类型,然后重载 类型操作符

struct to_string_t
{
    std::string_view s;

    operator int() const;  // int  from_string(std::string_view s);
    operator bool() const; // bool from_string(std::string_view s);
};

int i = to_string_t{"7"};
bool b = to_string_t{"true"};

每个类型都要写?模版话,给内建类型定义好,自己的类型,自己用sfinae拼

// base template, specialize and provide a static from_string method
template <class T, class = void>
struct to_string_impl 
{
};

namespace detail // hide impl detail
{
template <class T>
auto has_from_string(int) -> decltype(
    to_string_impl<T>::from_string(std::declval<std::string_view>()), 
    std::true_type{});

template <class T>
std::false_type has_from_string(char);
}

// check if T has a from_string
template <class T>
constexpr bool has_from_string = decltype(detail::has_from_string<T>(0))::value;

// return-type overload mechanism
struct to_string_t
{
    std::string_view s;

    template <class T>
    operator T() const 
    {
        static_assert(has_from_string<T>, "conversion to T not supported");
        return to_string_impl<T>::from_string(s); 
    }
};

// convenience wrapper to provide a "return-type overloaded function"
to_string_t from_string(std::string_view s) { return to_string_t{s}; }



//各种类型的实现自己拼好就可以了
template <>
struct to_string_impl<int>
{
    static int from_string(std::string_view s);
};

template <>
struct to_string_impl<bool>
{
    static bool from_string(std::string_view s);
};

//自定义实现
template <class T>
struct my_range { /* ... */ };

template <class T>
struct to_string_impl<my_range<T>, std::enable_if_t<has_from_string<T>>>
{
    static my_range<T> from_string(std::string_view s);
};

就是偏特化+sfinae套路


看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

线程池/任务队列调研


解决什么问题:资源隔离,不同的任务通过不同的线程(池)/任务队列 来做

需要一个能动态调整(弹性线程数调整),区分优先级,且能做到绑核(一个核一个线程池?) 租户隔离的线程池/任务队列

优先级更进一步:动态调整线程的优先级?如何判定?

更更进一步:租户级别优先级?

解决方案:

  1. 线程池 (内部有任务队列) 比如rocksdb的线程池 每个线程有自己的优先级(IO有IO优先级,CPU有CPU优先级),不同的任务,IO的和cpu的放到不同的池子里,注意rocksdb的线程池是没有主动schedule的,设置线程的优先级,然后通过系统调用来调度(仅支持linux)

  2. 异步事件处理 + future/promise+线程池 ,线程池纯粹一点,就是资源池。资源池分成不同的种类,future/promise调用能穿起不同的资源,比如folly ,没有线程级别的优先级,但可以指定不同的线程池,比如IO线程池,CPU线程池等等,一个future可以串多个线程池,把任务分解掉

  3. 异步事件框架 +线程池 线程池没有别的作用,就是资源池,事件框架可以是reactor/proactor,有调度器 schedule,负责选用资源 比如boost::asio

  4. 异步事件处理(一个主事件线程+一个工作线程+一个无锁队列) + future/promise + 任务队列 比如seastar (侵入比较强,系统级)

以rocksdb的线程池做基线

  动态调整线程池 任务可以区分优先级 内部有队列? 统计指标 使用负担
rocksdb的线程池
可以调整池子大小
✅ rocksdb线程池的优先级是系统级别的优先级,有系统调用的。而不是自定义schedule循环,自己维护优先级的 ✅ std::duque<BGItem> worker线程的各种状态统计idle等待 组件级,可以理解成高级点的任务队列
boost::asio::thread_pool 没有队列,一般使用不需要队列,如果有任务队列需要自己维护
结合post使用静态的池子
X 组件级,但是得配合asio使用,摘出来没什么意义
Folly::threadpoolExecutor 没有队列,add直接选线程调用可以定制各种类型的executor 结合future使用 future then串起队列 worker线程的各种状态统计idle等待 单独用相当于epoll + 多线程worker
seastar 有队列,每个核一个reator一个队列,核间通信靠转发,而不是同步 X 系统级,想用必须得用整个框架来组织应用
grpc的线程池
一般的简单线程池

调整优先级

//cpu 优先级

    if (cpu_priority < current_cpu_priority) {
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
                               &current_cpu_priority);
      // 0 means current thread.
      port::SetCpuPriority(0, cpu_priority);
      current_cpu_priority = cpu_priority;
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
                               &current_cpu_priority);
    }
//IO优先级
#ifdef OS_LINUX
    if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
      // These system calls only have an effect when used in conjunction
      // with an I/O scheduler that supports I/O priorities. As at
      // kernel 2.6.17 the only such scheduler is the Completely
      // Fair Queuing (CFQ) I/O scheduler.
      // To change scheduler:
      //  echo cfq > /sys/block/<device_name>/queue/schedule
      // Tunables to consider:
      //  /sys/block/<device_name>/queue/slice_idle
      //  /sys/block/<device_name>/queue/slice_sync
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
              0,                  // current thread
              IOPRIO_PRIO_VALUE(3, 0));
      low_io_priority = true;
    }
#else
    (void)decrease_io_priority;  // avoid 'unused variable' error
#endif

其实实现上都是queue(cond var + mutex) + threads (+ event (reactor/proactor))

cond var可以隐藏在队列上,也可以隐藏在future里,当然,更高级点,不用cond var用原子变量

简单的线程池+队列实现

#include <condition_variable>  
#include <functional>
#include <list>
#include <mutex>  
#include <string>
#include <thread> 
#include <vector>

class ThreadPool {
 private:
  const int num_workers_;
  std::list<std::function<void()> > tasks_;
  std::mutex mutex_;
  std::condition_variable condition_;
  std::condition_variable capacity_condition_;
  bool waiting_to_finish_ = false;
  bool waiting_for_capacity_ = false;
  bool started_ = false;
  int queue_capacity_ = 2e9;
  std::vector<std::thread> all_workers_;

  void RunWorker(void* data) {
    ThreadPool* const thread_pool = reinterpret_cast<ThreadPool*>(data);
    std::function<void()> work = thread_pool->GetNextTask();
    while (work != NULL) {
      work();
      work = thread_pool->GetNextTask();
    }

 public:
  ThreadPool(const std::string& prefix, int num_workers)
      : num_workers_(num_workers) {}

  ~ThreadPool() {
    if (started_) {
      std::unique_lock<std::mutex> mutex_lock(mutex_);
      waiting_to_finish_ = true;
      mutex_lock.unlock();
      condition_.notify_all();
      for (int i = 0; i < num_workers_; ++i) {
        all_workers_[i].join();
      }
    }
  }

  void SetQueueCapacity(int capacity) {
    queue_capacity_ = capacity;
  }

  void StartWorkers() {
    started_ = true;
    for (int i = 0; i < num_workers_; ++i) {
      all_workers_.push_back(std::thread(&RunWorker, this));
    }
  }

  std::function<void()> GetNextTask() {
    std::unique_lock<std::mutex> lock(mutex_);
    for (;;) {
      if (!tasks_.empty()) {
        std::function<void()> task = tasks_.front();
        tasks_.pop_front();
        if (tasks_.size() < queue_capacity_ && waiting_for_capacity_) {
          waiting_for_capacity_ = false;
          capacity_condition_.notify_all();
        }
        return task;
      }
      if (waiting_to_finish_) {
        return nullptr;
      } else {
        condition_.wait(lock);
      }
    }
    return nullptr;
  }

  void Schedule(std::function<void()> closure) {
    std::unique_lock<std::mutex> lock(mutex_);
    while (tasks_.size() >= queue_capacity_) {
      waiting_for_capacity_ = true;
      capacity_condition_.wait(lock);
    }
    tasks_.push_back(closure);
    if (started_) {
      lock.unlock();
      condition_.notify_all();
    }
  }
};

怎么动态增删线程?判断依据是啥?

#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <list>

class DynamicThreadPool
{
public:
    explicit DynamicThreadPool(int reserve_threads)
      :shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), threads_waiting_(0){
          for (int i = 0; i < reserve_threads_; i++) {
              std::lock_guard<std::mutex> lock(mu_);
              nthreads_++;
              new DynamicThread(this);
          }
      }
  
    ~DynamicThreadPool() {
        std::unique_lock<std::mutex> lock_(mu_);
        shutdown_ = true;
        cv_.notify_all();

        while (nthreads_ != 0) {
            shutdown_cv_.wait(lock_);        
        }

        ReapThreads(&dead_threads_);    
    }

    void Add(const std::function<void()> &callback) {
        std::lock_guard<std::mutex> lock(mu_);

        // Add works to the callbacks list
        callbacks_.push(callback);

        // Increase pool size or notify as needed
        if (threads_waiting_ == 0) {
            // Kick off a new thread
            nthreads_++;
            new DynamicThread(this);
        } else {
            cv_.notify_one();
        }

        // Also use this chance to harvest dead threads
        if (!dead_threads_.empty()) {
            ReapThreads(&dead_threads_);
        }
    }

private:
    class DynamicThread {
    public:
        DynamicThread(DynamicThreadPool* pool):pool_(pool),thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)){}
        ~DynamicThread() {
            thd_->join();
            thd_.reset();    
        }

    private:
        DynamicThreadPool* pool_;
        std::unique_ptr<std::thread> thd_;
        void ThreadFunc() {
          pool_->ThreadFunc();

          // Now that we have killed ourselves, we should reduce the thread count
          std::unique_lock<std::mutex> lock(pool_->mu_);
          pool_->nthreads_--;

          // Move ourselves to dead list
          pool_->dead_threads_.push_back(this);

          if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
              pool_->shutdown_cv_.notify_one();
          }
      }
    };
    
    std::mutex mu_;
    std::condition_variable cv_;
    std::condition_variable shutdown_cv_;
    bool shutdown_;
    std::queue<std::function<void()>> callbacks_;
    int reserve_threads_;
    int nthreads_;
    int threads_waiting_;
    std::list<DynamicThread*> dead_threads_;

    void ThreadFunc() {
        for (;;) {
            std::unique_lock<std::mutex> lock(mu_);
            // Wait until work is available or we are shutting down.
            if (!shutdown_ && callbacks_.empty()) {
                // If there are too many threads waiting, then quit this thread
                if (threads_waiting_ >= reserve_threads_)
                    break;
                threads_waiting_++;
                cv_.wait(lock);
                threads_waiting_--;
            }

            // Drain callbacks before considering shutdown to ensure all work gets completed.
            if (!callbacks_.empty()) {
                auto cb = callbacks_.front();
                callbacks_.pop();
                lock.unlock();
                cb();            
            } else if (shutdown_)
                break;            

        }
    }
  
    static void ReapThreads(std::list<DynamicThread*>* tlist) {
        for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
            delete *t;
        }
    }
};

work steal 概念

taskflow https://github.com/taskflow/work-stealing-queue

taskflow 文档 https://taskflow.github.io/taskflow/chapter2.html#C2_CreateAnExecutor


ref

  • https://www.jianshu.com/p/abf15e5e306b
  • https://blog.csdn.net/weixin_36145588/article/details/78545778

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

又招人啦


工作内容:

  • 负责分布式存储相关方向的技术的研发以及维护;
  • 负责多模型NoSQL服务的架构设计和研发;
  • 负责多模型NoSQL服务的架构优化;

岗位要求:

精通Linux/Unix平台上的C++/GO编程; 熟悉常用的算法和数据结构; 熟悉网络编程,多线程编程技术,并且具备丰富的后台研发项目经验; 熟悉存储设备、文件系统、 Linux操作系统原理;

富有激情和创造力,学习能力强,良好的沟通能力和团队合作能力; 优秀的分析问题和解决问题的能力,勇于接受挑战。

具有以下条件者优先:

  • 熟悉分布式系统理论; 有大规模分布式系统设计架构经验; 有数据库存储引擎或NoSQL存储引擎开发经验;

  • 有业界大规模分布式系统经验,包Redis、Mongodb、CosmosDB、Dynamodb、AWS-S3、GFS、BigTable、HDFS、HBASE等;

  • 对新技术敏感,求知欲强,能快速学习并具备较强的技术领悟能力; 开源社区的活跃贡献者;主导过企业私有云或公有云平台的实施和架构设计;

工作地点在深圳南山区生态园, 腾讯那几个楼,来邮件我告诉你是啥公司。有想法的简历发到wanghenshui@qq.com 多谢!


看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More

future promise实现程度调研

Future promise
接口实现程度
std::future boost::future seastar::future Folly::Future
continuations/then() X
when_all X √*
when_any X X
whenN X X X
future超时 X*
指定Executor/线程池 X* X* √*
Executor动态调度
(包括增删线程/主动调度)
X X X* X*
异步文件io X X √* √*
  • 关于continuations,folly的when_all/when_any接口叫collect*

  • 关于超时,api不太一样,std::future::wait_for (从wait来的) 没有回调接口,folly::future::onTimeout

  • 关于指定Executor, boost::future 可以在then里指定 , 有接口但是没有样例。executor和future暂时不能结合使用

  • folly指定Executor通过via接口,不同的异步流程交给不同的executor来工作,,避免引入数据竞争,Executor可以说线程池(ThreadPollExecutor)也可以一个事件循环EventBase(封装libevent)

makeFutureWith(x)
  .via(exe1).then(y)
  .via(exe2).then(z);

可以中途变更executor,改执行策略,这也是标准库演进的方向,尽可能的泛型

  • std::future 和executor配合使用本来计划本来concurrency-ts中实现,规划c++20,后来作废了。支持std::experiental::executor 和std::experiential::future没有编译器实现,ASIO作者有个实现但是是很久以前的了

    新的executor得等到c++23了,目前标准库还在评审,一时半会是用不上了

但是ASIO是实现了executor了的,这里的异步抽象更高一些,和future接口没啥关系

void connection::do_read() 
{
  socket_.async_read_some(in_buffer_, 
    wrap(strand_, [this](error_code ec, size_t n)
      {
        if (!ec) do_read();
      }));
}
//strand_ 是asio中的概念,可以简单理解成executor,换成pool之类的也是可以的 
  • seastar 可以使用scheduling_group来规划不同的future,分阶段调度

    • 文件异步io AIO 系统api封装

ref

  • https://www.cnblogs.com/chenyangyao/p/folly-future.html
  • https://engineering.fb.com/developer-tools/futures-for-c-11-at-facebook/
  • https://www.modernescpp.com/index.php/std-future-extensions
  • https://www.modernescpp.com/index.php/a-short-detour-executors
  • https://stackoverflow.com/questions/44355747/how-to-implement-stdwhen-any-without-polling
  • asio的概念 executor/strandhttps://www.cnblogs.com/bbqzsl/p/11919502.html

看到这里或许你有建议或者疑问或者指出我的错误,请留言评论或者邮件mailto:wanghenshui@qq.com, 多谢!

觉得写的不错可以点开扫码赞助几毛 微信转账
Read More


^