flatbuffers使用细节以及和PB做一下对比


关注了anna用的也是fbs, smf rpc框架用的也是fbs

anna感觉技术更像是seastar那套

类似protobuf的介绍,先关注一下使用细节

// Example IDL file for our monster's schema.
namespace MyGame.Sample;
enum Color:byte { Red = 0, Green, Blue = 2 }
union Equipment { Weapon } // Optionally add more tables.
struct Vec3 {
  x:float;
  y:float;
  z:float;
}
table Monster {
  pos:Vec3; // Struct.
  mana:short = 150;
  hp:short = 100;
  name:string;
  friendly:bool = false (deprecated);
  inventory:[ubyte];  // Vector of scalars.
  color:Color = Blue; // Enum.
  weapons:[Weapon];   // Vector of tables.
  equipped:Equipment; // Union.
  path:[Vec3];        // Vector of structs.
}
table Weapon {
  name:string;
  damage:short;
}
root_type Monster;

Tables

  • 新增字段只能增加在table定义末尾, 比如旧版本schema定义 table { a:int; b:int;},新版本新增一个字段在末尾table { a:int; b:int; c:int; },那么

    • 用旧版本schema读取新的数据结构会忽略新字段 c 的存在,因为新增字段在末尾。
    • 用新版本schema读取旧的数据结构,将会取到新增字段的默认值。

    如果新增字段出现在中间,会导致版本不兼容问题。 table { a:int; c:int; b:int; } - 用旧版本schema读取新的数据结构,读取b字段的时候,会读取到c字段 - 用新版本schema读取旧的数据结构,读取c字段的时候,会读取到b字段

    如果不想新增字段到末尾,用id属性可以实现 table { c:int (id: 2); a:int (id: 0); b:int (id: 1); } 引入 id 以后,table 中的字段顺序就无所谓了,新的与旧的 schema 完全兼容,只要我们保留 id 序列即可。

    用id的方案和PB一致

  • schema不能删除任何字段,写数据的时候可以不再写废弃字段的值,在schema中把这个字段标记为deprecated,那么生成的代码里就不会出现废弃字段。table { a:int (deprecated); b:int; }

    • 用旧版本schema读取新的数据结构,将会取到字段a的默认值,因为不存在。
    • 用新版本schema不能读取也不能写入字段a,会导致编译错误
  • 可以改变字段类型,在类型大小相同情况下代码随之改动,是ok的。比如 table { a:uint; b:uint; } -> table { a:int = 1; b:int = 2; } 代码必须保证正确性。

Structs

和Table类似,区别是没有字段是optional的,字段不能增加或者废弃deprecation。Structs只接受标量或者其他Structs。使用这个对象的时候,必须是非常确定这个结构将来不会任何改动。Structs比Table内存占用更少,检索速度更快。

Types

build-in标量类型有

  • 8 bit: byte (int8), ubyte (uint8), bool
  • 16 bit: short (int16), ushort (uint16)
  • 32 bit: int (int32), uint (uint32), float (float32)
  • 64 bit: long (int64), ulong (uint64), double (float64) 括号内名称可以相互替换,不会影响代码生成。

build-in非标量类型有

  • 任何类型的数组。不过不支持嵌套数组,可以用 table 内定义数组的方式来取代嵌套数组。
  • UTF-8 和 7-bit ASCII 的字符串。其他格式的编码字符串或者二进制数据,需要用 [byte] 或者 [ubyte] 来替代。
  • table、structs、enums、unions

这些字段的类型一旦使用之后就无法再更改,可以用reinterpret_cast强转,比如如果当前数据的最符号位没有值得话,可以将uint强转成int

Attributes

Attributes 可以附加到字段声明,放在字段后面或者 table/struct/enum/union 的名称之后。这些字段可能有值也有可能没有值。

一些 Attributes 只能被编译器识别,比如 deprecated。用户也可以定义一些 Attributes,但是需要提前进行 Attributes 声明。声明以后可以在运行时解析 schema 的时候进行查询。这个对于开发一个属于自己的代码编译/生成器来说是非常有用的。或者是想添加一些特殊信息(一些帮助信息等等)到自己的 FlatBuffers 工具之中。

目前最新版能识别到的 Attributes 有 11 种。

  • id:n (on a table field) id 代表设置某个字段的标识符为 n 。一旦启用了这个 id 标识符,那么所有字段都必须使用 id 标识,并且 id 必须是从 0 开始的连续数字。需要特殊注意的是 Union,由于 Union 是由 2 个字段构成的,并且隐藏字段是排在 union 字段的前面。(假设在 union 前面字段的 id 排到了6,那么 union 将会占据 7 和 8 这两个 id 编号,7 是隐藏字段,8 是 union 字段)添加了 id 标识符以后,字段在 schema 内部的相互顺序就不重要了。新字段用的 id 必须是紧接着的下一个可用的 id(id 不能跳,必须是连续的)。
  • deprecated (on a field) deprecated 代表不再为此字段生成访问器,代码应停止使用此数据。旧数据可能仍包含此字段,但不能再通过新的代码去访问这个字段。请注意,如果您弃用先前所需的字段,旧代码可能无法验证新数据(使用可选验证器时)。
  • required (on a non-scalar table field) required 代表该字段不能被省略。默认情况下,所有字段都是可选的,即可以省略。这是可取的,因为它有助于向前/向后兼容性以及数据结构的灵活性。这也是阅读代码的负担,因为对于非标量字段,它要求您检查 NULL 并采取适当的操作。通过指定 required 字段,可以强制构建 FlatBuffers 的代码确保此字段已初始化,因此读取的代码可以直接访问它,而不检查 NULL。如果构造代码没有初始化这个字段,他们将得到一个断言,并提示缺少必要的字段。请注意,如果将此属性添加到现有字段,则只有在现有数据始终包含此字段/现有代码始终写入此字段,这两种情况下才有效。
  • force_align: size (on a struct) force_align 代表强制这个结构的对齐比它自然对齐的要高。如果 buffer 创建的时候是以 force_align 声明创建的,那么里面的所有 structs 都会被强制对齐。(对于在 FlatBufferBuilder 中直接访问的缓冲区,这种情况并不是一定的)
  • bit_flags (on an enum) bit_flags 这个字段的值表示比特,这意味着在 schema 中指定的任何值 N 最终将代表1 « N,或者默认不指定值的情况下,将默认得到序列1,2,4,8 ,…
  • nested_flatbuffer: "table_name" (on a field) nested_flatbuffer 代表该字段(必须是 ubyte 的数组)嵌套包含 flatbuffer 数据,其根类型由 table_name 给出。生成的代码将为嵌套的 FlatBuffer 生成一个方便的访问器。
  • flexbuffer (on a field) flexbuffer 表示该字段(必须是 ubyte 的数组)包含 flexbuffer 数据。生成的代码将为 FlexBuffer 的 root 创建一个方便的访问器。
  • key (on a field) key 字段用于当前 table 中,对其所在类型的数组进行排序时用作关键字。可用于就地查找二进制搜索。
  • hash (on a field) 这是一个不带符号的 32/64 位整数字段,因为在 JSON 解析过程中它的值允许为字符串,然后将其存储为其哈希。属性的值是要使用的散列算法,即使用 fnv1_32、fnv1_64、fnv1a_32、fnv1a_64 其中之一。
  • original_order (on a table) 由于表中的元素不需要以任何特定的顺序存储,因此通常为了优化空间,而对它们大小进行排序。而 original_order 阻止了这种情况发生。通常应该没有任何理由使用这个标志。
  • ‘native_*’ 已经添加了几个属性来支持基于 C++ 对象的 API,所有这些属性都以 “native_” 作为前缀。具体可以点链接查看支持的说明,native_inlinenative_defaultnative_custom_allocnative_typenative_include: "path"

Enums

enum Color : byte { Red = 1, Green, Blue } 定义一系列命名常量,每个命名常量可以分别给一个定值,也可以默认的从前一个值增加一。默认的第一个值是 0。enum声明的时候可以指定底层的整数类型,只能指定整数类型。 enum只能增加,不能删除或者废弃deprecation,因此代码必须保证兼容性,处理未知的枚举值。

Unions

Unions和Enums有很多类似之处,但是union包含的是table,enum包含的是scalar或者 struct。 可以声明一个 Unions 字段,该字段可以包含对这些类型中的任何一个的引用,即这块内存区域只能由其中一种类型使用。另外还会生成一个带有后缀 _type 的隐藏字段,该字段包含相应的枚举值,从而可以在运行时知道要将哪些类型转换为类型。

Namespaces

C++代码中生成namespace,Java代码中生成package

Includes

包含另一个schama文件,保证每个文件可以不被多次引用,但是只被解析一次。

Root type

声明序列化数据中的根表root table,在解析JSON数据时尤为重要,因为他们不包含对象信息。

设计建议

由于 FlatBuffers 的灵活性和可扩展性,将任何类型的数据表示为字典(如在 JSON 中)是非常普遍的做法。尽管可以在 FlatBuffers(作为具有键和值的表的数组)中模拟这一点,但这对于像 FlatBuffers 这样的强类型系统来说,这样做是一种低效的方式,会导致生成相对较大的二进制文件。在大多数系统中,FlatBuffer table 比 classes/structs 更灵活,因为 table 在处理 field 数量非常多,但是实际使用只有其中少数几个 field 这种情况,效率依旧非常高。因此,组织数据应该尽可能的组织成 table 的形式。

同样,如果可能的话,尽量使用枚举的形式代替字符串。

FlatBuffers 中没有继承的概念,所以想表示一组相关数据结构的方式是 union。但是,union 确实有成本,另外一种高效的做法就是建立一个 table 。如果这些数据结构有很多相似或者可以共享的 field ,那么建议一个 table 是非常高效的。在这个 table 中包含所有数据结构的所有字段即可。高效的原因就是 optional 字段是非常廉价的,消耗少。

FlatBuffers 默认可以支持存放的下所有整数,因此尽量选择所需的最小大小,而不是默认为 int/long。

可以考虑用 buffer 中一个字符串或者 table 来共享一些公共的数据,这样做会提高效率,因此将重复的数据拆成共享数据结构 + 私有数据结构,这样做是非常值得的。

CURD

// 写
flatbuffers::FlatBufferBuilder builder(1024);
// 用builder.createxx创建基本类型
auto weapon_one_name = builder.CreateString("Sword");
short weapon_one_damage = 3;
auto weapon_two_name = builder.CreateString("Axe");
short weapon_two_damage = 5;
// Use the `CreateWeapon` shortcut to create Weapons with all the fields set.
auto sword = CreateWeapon(builder, weapon_one_name, weapon_one_damage);
auto axe = CreateWeapon(builder, weapon_two_name, weapon_two_damage);

// Create the position struct
auto position = Vec3(1.0f, 2.0f, 3.0f);
// Set his hit points to 300 and his mana to 150.
int hp = 300;
int mana = 150;
// Finally, create the monster using the `CreateMonster` helper function
// to set all fields.
auto orc = CreateMonster(builder, &position, mana, hp, name, inventory,
                        Color_Red, weapons, Equipment_Weapon, axe.Union(),
                        path);
// You can use this code instead of `CreateMonster()`, to create our orc
// manually.
MonsterBuilder monster_builder(builder);
monster_builder.add_pos(&position);
monster_builder.add_hp(hp);
monster_builder.add_name(name);
monster_builder.add_inventory(inventory);
monster_builder.add_color(Color_Red);
monster_builder.add_weapons(weapons);
monster_builder.add_equipped_type(Equipment_Weapon);
monster_builder.add_equipped(axe.Union());
auto orc = monster_builder.Finish();
builder.Finish(orc);
// This must be called after `Finish()`.
uint8_t *buf = builder.GetBufferPointer();
int size = builder.GetSize(); // Returns the size of the buffer that
                              // `GetBufferPointer()` points to.
//这一套下来就可以直接拷贝/传range了



// 读
uint8_t *buffer_pointer = /* the data you just read */;
// Get a pointer to the root object inside the buffer.
auto monster = GetMonster(buffer_pointer);
auto hp = monster->hp();
auto mana = monster->mana();
auto name = monster->name()->c_str();
auto pos = monster->pos();
auto x = pos->x();
auto y = pos->y();
auto z = pos->z();

// 原地改,接口类似pb的mutable_xx

auto monster = GetMutableMonster(buffer_pointer);  // non-const
monster->mutate_hp(10);                      // Set the table `hp` field.
monster->mutable_pos()->mutate_z(4);         // Set struct field.
monster->mutable_inventory()->Mutate(0, 1);  // Set vector element.


参考链接

  • 教程 https://google.github.io/flatbuffers/flatbuffers_guide_tutorial.html
  • https://halfrost.com/flatbuffers_schema/

Read More

protobuf使用细节


这东西和什么语言无关,是中间接口描述语言IDL,

具体细节不说了,在参考链接里都有的,这里记录下我关注的细节

[toc]

字段

消息对象的字段 组成主要是:字段 = 字段修饰符 + 字段类型 +字段名 +标识号

类型是有个表格来描述具体的字节占用 (原来的表格有java我不太关注就去掉了)

.proto类型 C++类型 Go 备注
double double float64  
float float float32  
int32 int32   使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint32。
int64 int64   使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint64。
uint32 uint32   Uses variable-length encoding.
uint64 uint64   Uses variable-length encoding.
sint32 int32   使用可变长编码方式。有符号的整型值。编码时比通常的int32高效。
sint64 int64   使用可变长编码方式。有符号的整型值。编码时比通常的int64高效。
fixed32 uint32   总是4个字节。如果数值总是比总是比228大的话,这个类型会比uint32高效。
fixed64 uint64   总是8个字节。如果数值总是比总是比256大的话,这个类型会比uint64高效。
sfixed32 int32   总是4个字节。
sfixed64 int64   总是8个字节。
bool bool    
string string   一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。
bytes string   可能包含任意顺序的字节数据。 使用和string一样的,传参数也是string

如果字段更新类型,转换规则可以看字段更新部分

标识号分配

[1,15]之内的标识号在编码的时候会占用一个字节。[16,2047]之内的标识号则占用2个字节。所以应该为那些频繁出现的消息元素保留 [1,15]之内的标识号。要为将来有可能添加的/频繁出现的标识号预留一些标识号。

字段更新

如果一个已有的消息格式已无法满足新的需求——如,要在消息中添加一个额外的字段——但是同时旧版本写的代码仍然可用。不用担心!更新消息而不破坏已有代码是非常简单的。在更新时只要记住以下的规则即可。

  • 不要更改任何已有的字段的数值标识号。
  • 所添加的任何字段都必须是optional或repeated的。这就意味着任何使用“旧”的消息格式的代码序列化的消息可以被新的代码所解析,因为它们 不会丢掉任何required的元素。应该为这些元素设置合理的默认值,这样新的代码就能够正确地与老代码生成的消息交互了。类似地,新的代码创建的消息 也能被老的代码解析:老的二进制程序在解析的时候只是简单地将新字段忽略。然而,未知的字段是没有被抛弃的。此后,如果消息被序列化,未知的字段会随之一 起被序列化——所以,如果消息传到了新代码那里,则新的字段仍然可用。注意:对Python来说,对未知字段的保留策略是无效的。
  • 非required的字段可以移除——只要它们的标识号在新的消息类型中不再使用(更好的做法可能是重命名那个字段,例如在字段前添加“OBSOLETE_”前缀,那样的话,使用的.proto文件的用户将来就不会无意中重新使用了那些不该使用的标识号)。
  • 一个非required的字段可以转换为一个扩展,反之亦然——只要它的类型和标识号保持不变。
  • int32, uint32, int64, uint64,和bool是全部兼容的,这意味着可以将这些类型中的一个转换为另外一个,而不会破坏向前、 向后的兼容性。如果解析出来的数字与对应的类型不相符,那么结果就像在C++中对它进行了强制类型转换一样(例如,如果把一个64位数字当作int32来 读取,那么它就会被截断为32位的数字)。
  • sint32和sint64是互相兼容的,但是它们与其他整数类型不兼容。
  • string和bytes是兼容的——只要bytes是有效的UTF-8编码。
  • 嵌套消息与bytes是兼容的——只要bytes包含该消息的一个编码过的版本。
  • fixed32与sfixed32是兼容的,fixed64与sfixed64是兼容的。

说实话,兼容不是特别关注,主要关注标识号改动部分,最开始开发要做预留,然后改动标识号不要改动已有的,找缝隙加上就行

字段的操作(CURD)

一般来说,消息的字段会自动生成set_xxx方法

package message;                                                                                                                message MessageRequest {
    required string msg = 10;
}

对应的

message::MessageRequest::MessageRequest req;
req.set_msg("yoyoyo");

下面列举几个特殊场景

repeated字段的更新

repeat可以理解成数组,处理方法也多了几步, 会提供一个接口

package message;                                                                                                                
message Pair {
    required string key;
    required string value;
}
message MessageRequest {
    required string msg = 10;
    required int32 seq = 20;
    repeated Pair pairs = 30;
}

对应的修改

message::MessageRequest req;
std::vector<message::Pair> pairs;
for (auto& v: pairs) {
  //type: message::MessageRequest::field*  
  auto pair = req.add_pairs();
  pair->set_key('kk');
  pair->set_value('vv');
}
有人说,通过repeated字段来更新数据,当返回为空的时候,可能分不清是应该清空还是保持不变

需要加字段来纠正这个歧义,这里不细说了,感觉就是想要便捷(返回空)强行创造的歧义。约定好的话没啥问题,不需要加字段

package message;
message Pair {
    required string key;
    required string value;
}
message MessageRequest {
    required string msg = 10;
    required int32 seq = 20;
    repeated Pair pairs = 30;
    optional bool modify = 31; //如果是0个field modify是1那就是清空,modify是0那就是没更新
}

嵌套结构的消息, 生成了set_allocated_xxx方法, 没有普通的set_xxx方法

这里传进set_allocated_xxx 的对象必须是new好的,这个不太好用,protobuf内部会帮你delete,你自己也可以调用release_xx(最好别)

也可以用mutable_xx 内部帮你new好,你自己赋值,类似这样的

mutable_xx()->set_xx(/*xx*/);

也可以用copyfrom比较省心,其实都不太好用,尽量别嵌套

optional字段会生成has_xx方法 但proto3不支持怎么办

https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3

message Foo {
    int32 bar = 1;
    oneof optional_baz {
        int32 baz = 2;
    }
}

用oneof来封装一层 proto3新版本也支持optional了

merge

支持字段的merge操作,设置fieldMask


参考链接

  • 官方文档的翻译 https://colobu.com/2015/01/07/Protobuf-language-guide/
  • https://www.jianshu.com/p/e06ba6249edc 这篇感觉就是上一篇的细化
  • 这有个整理的更细致的 https://juejin.im/post/6844903687089831944
  • Repeated 修改 http://lambda.hk/protobuf/2015/06/05/protobuf-repeated/
  • repeated 歧义 https://blog.csdn.net/love_newzai/article/details/6929430
  • 嵌套 https://blog.csdn.net/xiaxiazls/article/details/50118161
    • copyFrom https://blog.csdn.net/u014088857/article/details/53291545
  • https://jergoo.gitbooks.io/go-grpc-practice-guide/content/chapter1/protobuf-go.html go的使用说明

Read More


神秘的40ms bug


原文链接

这个是比较经典的问题了,你一搜40ms,网上一堆结果

我去年就见到pika解决这个问题 https://github.com/Qihoo360/pink/pull/4/

这个链接也提到了喜马拉雅团队定位并解决这个问题 上面的合入就是这么引入的

简单说,就是Nagle’s algorithm开了,没设置tcp_nodelay,但是对端有delayed ack优化,这边不发,那边不回,正好超时,超时时间40ms

最近逛lobsters看到参考链接1,他们也遇到了这个问题,分析了一波应用自身,各种重写改写链路的接口,没定位到,最后才发现Nagle算法,以及背后的delayed ack,关掉Nagle algorithm设置tcp_nodelay就行了


ref

  • 一个解释 https://cloud.tencent.com/developer/article/1648761
  • https://mysteries.wizardzines.com/50ms-request.html 这有个小练习题,挺有意思

Read More

重载返回值


本文是这篇文章的翻译整理

主要是这个场景

// this is OK
std::string to_string(int i);
std::string to_string(bool b);

std::string si = to_string(0);
std::string sb = to_string(true);

// this is not OK
int from_string(std::string_view s);
bool from_string(std::string_view s);

int i = from_string("7");
bool b = from_string("false");

想让返回值更统一

做法是做一个统一的返回类型,然后重载 类型操作符

struct to_string_t
{
    std::string_view s;

    operator int() const;  // int  from_string(std::string_view s);
    operator bool() const; // bool from_string(std::string_view s);
};

int i = to_string_t{"7"};
bool b = to_string_t{"true"};

每个类型都要写?模版话,给内建类型定义好,自己的类型,自己用sfinae拼

// base template, specialize and provide a static from_string method
template <class T, class = void>
struct to_string_impl 
{
};

namespace detail // hide impl detail
{
template <class T>
auto has_from_string(int) -> decltype(
    to_string_impl<T>::from_string(std::declval<std::string_view>()), 
    std::true_type{});

template <class T>
std::false_type has_from_string(char);
}

// check if T has a from_string
template <class T>
constexpr bool has_from_string = decltype(detail::has_from_string<T>(0))::value;

// return-type overload mechanism
struct to_string_t
{
    std::string_view s;

    template <class T>
    operator T() const 
    {
        static_assert(has_from_string<T>, "conversion to T not supported");
        return to_string_impl<T>::from_string(s); 
    }
};

// convenience wrapper to provide a "return-type overloaded function"
to_string_t from_string(std::string_view s) { return to_string_t{s}; }



//各种类型的实现自己拼好就可以了
template <>
struct to_string_impl<int>
{
    static int from_string(std::string_view s);
};

template <>
struct to_string_impl<bool>
{
    static bool from_string(std::string_view s);
};

//自定义实现
template <class T>
struct my_range { /* ... */ };

template <class T>
struct to_string_impl<my_range<T>, std::enable_if_t<has_from_string<T>>>
{
    static my_range<T> from_string(std::string_view s);
};

就是偏特化+sfinae套路


Read More

线程池/任务队列调研


解决什么问题:资源隔离,不同的任务通过不同的线程(池)/任务队列 来做

需要一个能动态调整(弹性线程数调整),区分优先级,且能做到绑核(一个核一个线程池?) 租户隔离的线程池/任务队列

优先级更进一步:动态调整线程的优先级?如何判定?

更更进一步:租户级别优先级?

解决方案:

  1. 线程池 (内部有任务队列) 比如rocksdb的线程池 每个线程有自己的优先级(IO有IO优先级,CPU有CPU优先级),不同的任务,IO的和cpu的放到不同的池子里,注意rocksdb的线程池是没有主动schedule的,设置线程的优先级,然后通过系统调用来调度(仅支持linux)

  2. 异步事件处理 + future/promise+线程池 ,线程池纯粹一点,就是资源池。资源池分成不同的种类,future/promise调用能穿起不同的资源,比如folly ,没有线程级别的优先级,但可以指定不同的线程池,比如IO线程池,CPU线程池等等,一个future可以串多个线程池,把任务分解掉

  3. 异步事件框架 +线程池 线程池没有别的作用,就是资源池,事件框架可以是reactor/proactor,有调度器 schedule,负责选用资源 比如boost::asio

  4. 异步事件处理(一个主事件线程+一个工作线程+一个无锁队列) + future/promise + 任务队列 比如seastar (侵入比较强,系统级)

以rocksdb的线程池做基线

  动态调整线程池 任务可以区分优先级 内部有队列? 统计指标 使用负担
rocksdb的线程池
可以调整池子大小
✅ rocksdb线程池的优先级是系统级别的优先级,有系统调用的。而不是自定义schedule循环,自己维护优先级的 ✅ std::duque<BGItem> worker线程的各种状态统计idle等待 组件级,可以理解成高级点的任务队列
boost::asio::thread_pool 没有队列,一般使用不需要队列,如果有任务队列需要自己维护
结合post使用静态的池子
X 组件级,但是得配合asio使用,摘出来没什么意义
Folly::threadpoolExecutor 没有队列,add直接选线程调用可以定制各种类型的executor 结合future使用 future then串起队列 worker线程的各种状态统计idle等待 单独用相当于epoll + 多线程worker
seastar 有队列,每个核一个reator一个队列,核间通信靠转发,而不是同步 X 系统级,想用必须得用整个框架来组织应用
grpc的线程池
一般的简单线程池

调整优先级

//cpu 优先级

    if (cpu_priority < current_cpu_priority) {
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
                               &current_cpu_priority);
      // 0 means current thread.
      port::SetCpuPriority(0, cpu_priority);
      current_cpu_priority = cpu_priority;
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
                               &current_cpu_priority);
    }
//IO优先级
#ifdef OS_LINUX
    if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
      // These system calls only have an effect when used in conjunction
      // with an I/O scheduler that supports I/O priorities. As at
      // kernel 2.6.17 the only such scheduler is the Completely
      // Fair Queuing (CFQ) I/O scheduler.
      // To change scheduler:
      //  echo cfq > /sys/block/<device_name>/queue/schedule
      // Tunables to consider:
      //  /sys/block/<device_name>/queue/slice_idle
      //  /sys/block/<device_name>/queue/slice_sync
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
              0,                  // current thread
              IOPRIO_PRIO_VALUE(3, 0));
      low_io_priority = true;
    }
#else
    (void)decrease_io_priority;  // avoid 'unused variable' error
#endif

其实实现上都是queue(cond var + mutex) + threads (+ event (reactor/proactor))

cond var可以隐藏在队列上,也可以隐藏在future里,当然,更高级点,不用cond var用原子变量

简单的线程池+队列实现

#include <condition_variable>  
#include <functional>
#include <list>
#include <mutex>  
#include <string>
#include <thread> 
#include <vector>

class ThreadPool {
 private:
  const int num_workers_;
  std::list<std::function<void()> > tasks_;
  std::mutex mutex_;
  std::condition_variable condition_;
  std::condition_variable capacity_condition_;
  bool waiting_to_finish_ = false;
  bool waiting_for_capacity_ = false;
  bool started_ = false;
  int queue_capacity_ = 2e9;
  std::vector<std::thread> all_workers_;

  void RunWorker(void* data) {
    ThreadPool* const thread_pool = reinterpret_cast<ThreadPool*>(data);
    std::function<void()> work = thread_pool->GetNextTask();
    while (work != NULL) {
      work();
      work = thread_pool->GetNextTask();
    }

 public:
  ThreadPool(const std::string& prefix, int num_workers)
      : num_workers_(num_workers) {}

  ~ThreadPool() {
    if (started_) {
      std::unique_lock<std::mutex> mutex_lock(mutex_);
      waiting_to_finish_ = true;
      mutex_lock.unlock();
      condition_.notify_all();
      for (int i = 0; i < num_workers_; ++i) {
        all_workers_[i].join();
      }
    }
  }

  void SetQueueCapacity(int capacity) {
    queue_capacity_ = capacity;
  }

  void StartWorkers() {
    started_ = true;
    for (int i = 0; i < num_workers_; ++i) {
      all_workers_.push_back(std::thread(&RunWorker, this));
    }
  }

  std::function<void()> GetNextTask() {
    std::unique_lock<std::mutex> lock(mutex_);
    for (;;) {
      if (!tasks_.empty()) {
        std::function<void()> task = tasks_.front();
        tasks_.pop_front();
        if (tasks_.size() < queue_capacity_ && waiting_for_capacity_) {
          waiting_for_capacity_ = false;
          capacity_condition_.notify_all();
        }
        return task;
      }
      if (waiting_to_finish_) {
        return nullptr;
      } else {
        condition_.wait(lock);
      }
    }
    return nullptr;
  }

  void Schedule(std::function<void()> closure) {
    std::unique_lock<std::mutex> lock(mutex_);
    while (tasks_.size() >= queue_capacity_) {
      waiting_for_capacity_ = true;
      capacity_condition_.wait(lock);
    }
    tasks_.push_back(closure);
    if (started_) {
      lock.unlock();
      condition_.notify_all();
    }
  }
};

怎么动态增删线程?判断依据是啥?

#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <list>

class DynamicThreadPool
{
public:
    explicit DynamicThreadPool(int reserve_threads)
      :shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), threads_waiting_(0){
          for (int i = 0; i < reserve_threads_; i++) {
              std::lock_guard<std::mutex> lock(mu_);
              nthreads_++;
              new DynamicThread(this);
          }
      }
  
    ~DynamicThreadPool() {
        std::unique_lock<std::mutex> lock_(mu_);
        shutdown_ = true;
        cv_.notify_all();

        while (nthreads_ != 0) {
            shutdown_cv_.wait(lock_);        
        }

        ReapThreads(&dead_threads_);    
    }

    void Add(const std::function<void()> &callback) {
        std::lock_guard<std::mutex> lock(mu_);

        // Add works to the callbacks list
        callbacks_.push(callback);

        // Increase pool size or notify as needed
        if (threads_waiting_ == 0) {
            // Kick off a new thread
            nthreads_++;
            new DynamicThread(this);
        } else {
            cv_.notify_one();
        }

        // Also use this chance to harvest dead threads
        if (!dead_threads_.empty()) {
            ReapThreads(&dead_threads_);
        }
    }

private:
    class DynamicThread {
    public:
        DynamicThread(DynamicThreadPool* pool):pool_(pool),thd_(new std::thread(&DynamicThreadPool::DynamicThread::ThreadFunc, this)){}
        ~DynamicThread() {
            thd_->join();
            thd_.reset();    
        }

    private:
        DynamicThreadPool* pool_;
        std::unique_ptr<std::thread> thd_;
        void ThreadFunc() {
          pool_->ThreadFunc();

          // Now that we have killed ourselves, we should reduce the thread count
          std::unique_lock<std::mutex> lock(pool_->mu_);
          pool_->nthreads_--;

          // Move ourselves to dead list
          pool_->dead_threads_.push_back(this);

          if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
              pool_->shutdown_cv_.notify_one();
          }
      }
    };
    
    std::mutex mu_;
    std::condition_variable cv_;
    std::condition_variable shutdown_cv_;
    bool shutdown_;
    std::queue<std::function<void()>> callbacks_;
    int reserve_threads_;
    int nthreads_;
    int threads_waiting_;
    std::list<DynamicThread*> dead_threads_;

    void ThreadFunc() {
        for (;;) {
            std::unique_lock<std::mutex> lock(mu_);
            // Wait until work is available or we are shutting down.
            if (!shutdown_ && callbacks_.empty()) {
                // If there are too many threads waiting, then quit this thread
                if (threads_waiting_ >= reserve_threads_)
                    break;
                threads_waiting_++;
                cv_.wait(lock);
                threads_waiting_--;
            }

            // Drain callbacks before considering shutdown to ensure all work gets completed.
            if (!callbacks_.empty()) {
                auto cb = callbacks_.front();
                callbacks_.pop();
                lock.unlock();
                cb();            
            } else if (shutdown_)
                break;            

        }
    }
  
    static void ReapThreads(std::list<DynamicThread*>* tlist) {
        for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
            delete *t;
        }
    }
};

work steal 概念

taskflow https://github.com/taskflow/work-stealing-queue

taskflow 文档 https://taskflow.github.io/taskflow/chapter2.html#C2_CreateAnExecutor


ref

  • https://www.jianshu.com/p/abf15e5e306b
  • https://blog.csdn.net/weixin_36145588/article/details/78545778

Read More

又招人啦

不招了。腾讯云架构平台部门。JD留着吧

我在这混了几年。没啥意思

Read More

future promise实现程度调研

Future promise
接口实现程度
std::future boost::future seastar::future Folly::Future
continuations/then() X
when_all X √*
when_any X X
whenN X X X
future超时 X*
指定Executor/线程池 X* X* √*
Executor动态调度
(包括增删线程/主动调度)
X X X* X*
异步文件io X X √* √*
  • 关于continuations,folly的when_all/when_any接口叫collect*

  • 关于超时,api不太一样,std::future::wait_for (从wait来的) 没有回调接口,folly::future::onTimeout

  • 关于指定Executor, boost::future 可以在then里指定 , 有接口但是没有样例。executor和future暂时不能结合使用

  • folly指定Executor通过via接口,不同的异步流程交给不同的executor来工作,,避免引入数据竞争,Executor可以说线程池(ThreadPollExecutor)也可以一个事件循环EventBase(封装libevent)

makeFutureWith(x)
  .via(exe1).then(y)
  .via(exe2).then(z);

可以中途变更executor,改执行策略,这也是标准库演进的方向,尽可能的泛型

  • std::future 和executor配合使用本来计划本来concurrency-ts中实现,规划c++20,后来作废了。支持std::experiental::executor 和std::experiential::future没有编译器实现,ASIO作者有个实现但是是很久以前的了

    新的executor得等到c++23了,目前标准库还在评审,一时半会是用不上了

但是ASIO是实现了executor了的,这里的异步抽象更高一些,和future接口没啥关系

void connection::do_read() 
{
  socket_.async_read_some(in_buffer_, 
    wrap(strand_, [this](error_code ec, size_t n)
      {
        if (!ec) do_read();
      }));
}
//strand_ 是asio中的概念,可以简单理解成executor,换成pool之类的也是可以的 
  • seastar 可以使用scheduling_group来规划不同的future,分阶段调度

    • 文件异步io AIO 系统api封装

ref

  • https://www.cnblogs.com/chenyangyao/p/folly-future.html
  • https://engineering.fb.com/developer-tools/futures-for-c-11-at-facebook/
  • https://www.modernescpp.com/index.php/std-future-extensions
  • https://www.modernescpp.com/index.php/a-short-detour-executors
  • https://stackoverflow.com/questions/44355747/how-to-implement-stdwhen-any-without-polling
  • asio的概念 executor/strandhttps://www.cnblogs.com/bbqzsl/p/11919502.html

Read More


十月待读

https://unixism.net/2019/04/linux-applications-performance-introduction/

https://www.moritz.systems/blog/mastering-unix-pipes-part-1/

https://neilmadden.blog/2020/11/25/parse-dont-type-check/

https://my.oschina.net/evilunix/blog/3003736

https://danlark.org/2020/11/11/miniselect-practical-and-generic-selection-algorithms/

https://github.com/y123456yz/reading-and-annotate-mongodb-3.6

https://zhuanlan.zhihu.com/p/265701877

https://blog.csdn.net/baijiwei/article/details/80504715

https://blog.csdn.net/baijiwei/article/details/78070355

qbe 一个小编译器后端 https://c9x.me/compile/

https://github.com/jsoysouvanh/Refureku

https://github.com/foonathan/lexy

http://stlab.cc/2020/12/01/forest-introduction.html

https://github.com/stlab/libraries/blob/develop/stlab/forest.hpp

https://medium.com/build-and-learn/fun-with-text-generation-pt-1-markov-models-in-awk-5e1b55fe560c

这个我估计以后也不会看。。。

Cli 程序设计规范

https://clig.dev/

https://danlark.org/2020/06/14/128-bit-division/

https://brevzin.github.io/c++/2020/12/01/tag-invoke/

Read More

^