blog review 第六期
看tag知内容 [toc]
cd is not a program
cd是shell内建的,全靠CDPATH发挥作用,所以脚本中,最好不要用cd或者用到cd的时候清一下CDPATH,否则可能会有意外事故
Advanced usage of Python requests - timeouts, retries, hooks
一个工具库介绍了几个用法,代码在这里 https://github.com/requests/toolbelt
hook
response = requests.get('https://api.github.com/user/repos?page=1')
# Assert that there were no errors
response.raise_for_status()
# 修改的方案
# Create a custom requests object, modifying the global module throws an error
http = requests.Session()
assert_status_hook = lambda response, *args, **kwargs: response.raise_for_status()
http.hooks["response"] = [assert_status_hook]
http.get("https://api.github.com/user/repos?page=1")
# 打印
import requests
from requests_toolbelt.utils import dump
def logging_hook(response, *args, **kwargs):
data = dump.dump_all(response)
print(data.decode('utf-8'))
http = requests.Session()
http.hooks["response"] = [logging_hook]
http.get("https://api.openaq.org/v1/cities", params={"country": "BA"})
设置base url
requests.get('https://api.org/list/')
requests.get('https://api.org/list/3/item')
#修改的方案
from requests_toolbelt import sessions
http = sessions.BaseUrlSession(base_url="https://api.org")
http.get("/list")
http.get("/list/item")
timeout
requests.get('https://github.com/', timeout=0.001)
#修改的方案
from requests.adapters import HTTPAdapter
DEFAULT_TIMEOUT = 5 # seconds
class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
self.timeout = DEFAULT_TIMEOUT
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)
def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None:
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)
import requests
http = requests.Session()
# Mount it for both http and https usage
adapter = TimeoutHTTPAdapter(timeout=2.5)
http.mount("https://", adapter)
http.mount("http://", adapter)
# Use the default 2.5s timeout
response = http.get("https://api.twilio.com/")
# Override the timeout as usual for specific requests
response = http.get("https://api.twilio.com/", timeout=10)
retry
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
response = http.get("https://en.wikipedia.org/w/api.php")
Query Engines: Push vs. Pull
Push-based execution refers to the fact that relational operators push their results to their downstream operators, rather than waiting for these operators to pull data (classic Volcano-style model). Push-based execution improves cache efficiency, because it removes control flow logic from tight loops. It also enables Snowflake to efficiently process DAG-shaped plans, as opposed to just trees, creating additional opportunities for sharing and pipelining of intermediate results.
push-based DAG处理已经成为事实上的state of the art 新的数据系统都会基于push-based execution来设计
这篇文章分析了引用里提到的优点,简单论证了合理性
目前比较典型的就是naiad timely workflow了
Buffer
基本的设计想法,就是循环缓冲了
muduo中的buffer,其实libevent中差不多
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
const char* peek() const
{ return begin() + readerIndex_; }
size_t writableBytes() const
{ return buffer_.size() - writerIndex_; }
size_t readableBytes() const
{ return writerIndex_ - readerIndex_; }
size_t prependableBytes() const
{ return readerIndex_; }
这里如果涉及到扩展还是需要拷贝的,如何zero-copy?iobuf设计,看brpr 的iobuf https://github.com/apache/incubator-brpc/blob/master/src/butil/iobuf.h
Hosting SQLite databases on Github Pages
把sqlite编译到wasm 妙啊
这也提供了思路,想做一个二进制程序,可以编译到wasm然后放到github page上进行演示
Dropping cache didn’t drop cache
这个是twitter公司找bug记录,顺便了解page cache的功能逻辑。twitter使用的内核是比较激进的,上面的 不稳定变动可能导致引入bug
最终问题的解决的patch https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=8199be001a470209f5c938570cc199abb012fe53
diff --git a/mm/list_lru.c b/mm/list_lru.c
index 8de5e37..1e61161 100644
--- a/mm/list_lru.c
+++ b/mm/list_lru.c
@@ -534,7 +534,6 @@ static void memcg_drain_list_lru_node(struct list_lru *lru, int nid,
struct list_lru_node *nlru = &lru->node[nid];
int dst_idx = dst_memcg->kmemcg_id;
struct list_lru_one *src, *dst;
- bool set;
/*
* Since list_lru_{add,del} may be called under an IRQ-safe lock,
@@ -546,11 +545,12 @@ static void memcg_drain_list_lru_node(struct list_lru *lru, int nid,
dst = list_lru_from_memcg_idx(nlru, dst_idx);
list_splice_init(&src->list, &dst->list);
- set = (!dst->nr_items && src->nr_items);
- dst->nr_items += src->nr_items;
- if (set)
+
+ if (src->nr_items) {
+ dst->nr_items += src->nr_items;
memcg_set_shrinker_bit(dst_memcg, nid, lru_shrinker_id(lru));
- src->nr_items = 0;
+ src->nr_items = 0;
+ }
spin_unlock_irq(&nlru->lock);
}
snowflake 算法原理
#pragma once
#include <cstdint>
#include <chrono>
#include <stdexcept>
#include <mutex>
class snowflake_nonlock
{
public:
void lock()
{
}
void unlock()
{
}
};
template<int64_t Twepoch, typename Lock = snowflake_nonlock>
class snowflake
{
using lock_type = Lock;
static constexpr int64_t TWEPOCH = Twepoch;
static constexpr int64_t WORKER_ID_BITS = 5L;
static constexpr int64_t DATACENTER_ID_BITS = 5L;
static constexpr int64_t MAX_WORKER_ID = (1 << WORKER_ID_BITS) - 1;
static constexpr int64_t MAX_DATACENTER_ID = (1 << DATACENTER_ID_BITS) - 1;
static constexpr int64_t SEQUENCE_BITS = 12L;
static constexpr int64_t WORKER_ID_SHIFT = SEQUENCE_BITS;
static constexpr int64_t DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
static constexpr int64_t TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS;
static constexpr int64_t SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
using time_point = std::chrono::time_point<std::chrono::steady_clock>;
time_point start_time_point_ = std::chrono::steady_clock::now();
int64_t start_millsecond_ = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t last_timestamp_ = -1;
int64_t workerid_ = 0;
int64_t datacenterid_ = 0;
int64_t sequence_ = 0;
lock_type lock_;
public:
snowflake() = default;
snowflake(const snowflake&) = delete;
snowflake& operator=(const snowflake&) = delete;
void init(int64_t workerid, int64_t datacenterid)
{
if (workerid > MAX_WORKER_ID || workerid < 0) {
throw std::runtime_error("worker Id can't be greater than 31 or less than 0");
}
if (datacenterid > MAX_DATACENTER_ID || datacenterid < 0) {
throw std::runtime_error("datacenter Id can't be greater than 31 or less than 0");
}
workerid_ = workerid;
datacenterid_ = datacenterid;
}
int64_t nextid()
{
std::lock_guard<lock_type> lock(lock_);
//std::chrono::steady_clock cannot decrease as physical time moves forward
auto timestamp = millsecond();
if (last_timestamp_ == timestamp)
{
// 当同一时间戳(精度:毫秒)下多次生成id会增加序列号
sequence_ = (sequence_ + 1)&SEQUENCE_MASK;
if (sequence_ == 0)
{
// 如果当前序列超出12bit长度,则需要等待下一毫秒
// 下一毫秒将使用sequence:0
// 为什么这里不直接+1
timestamp = wait_next_millis(last_timestamp_);
}
}
else
{
sequence_ = 0;
}
last_timestamp_ = timestamp;
return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT)
| (datacenterid_ << DATACENTER_ID_SHIFT)
| (workerid_ << WORKER_ID_SHIFT)
| sequence_;
}
private:
int64_t millsecond() const noexcept
{
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time_point_);
return start_millsecond_ + diff.count();
}
int64_t wait_next_millis(int64_t last) const noexcept
{
auto timestamp = millsecond();
while (timestamp <= last)
{
timestamp = millsecond();
}
return timestamp;
}
};
ClickHouse使用姿势系列之分布式JOIN
join的几种写法
分布式JOIN最佳实践
在清楚了ClickHouse 分布式JOIN查询实现后,我们总结一些实际经验。
- 一、尽量减少JOIN右表数据量
ClickHouse根据JOIN的右表数据,构建HASH MAP,并将SQL中所需的列全部读入内存中。如果右表数据量过大,节点内存无法容纳后,无法完成计算。
在实际中,我们通常将较小的表作为右表,并尽可能增加过滤条件,降低进入JOIN计算的数据量。
- 二、利用GLOBAL JOIN 避免查询放大带来性能损失
如果右表或者子查询的数据量可控,可以使用GLOBAL JOIN来避免读放大。需要注意的是,GLOBAL JOIN 会触发数据在节点之间传播,占用部分网络流量。如果数据量较大,同样会带来性能损失。
- 三、数据预分布实现Colocate JOIN
当JOIN涉及的表数据量都非常大时,读放大,或网络广播都带来巨大性能损失时,我们就需要采取另外一种方式来完成JOIN计算了。
根据“相同JOIN KEY必定相同分片”原理,我们将涉及JOIN计算的表,按JOIN KEY在集群维度作分片。将分布式JOIN转为为节点的本地JOIN,极大减少了查询放大问题
建议看看这个https://jiamaoxiang.top/2020/11/01/Spark%E7%9A%84%E4%BA%94%E7%A7%8DJOIN%E6%96%B9%E5%BC%8F%E8%A7%A3%E6%9E%90/
A robust distributed locking algorithm based on Google Cloud Storage
讲分布式锁的简单实现,以及经典问题,以及解决方案(ttl)
最近的点子
https://github.com/a8m/rql 改成c++的
https://github.com/stateright/stateright
https://docs.rs/stateright/0.28.0/stateright/
待读
https://github.com/stedolan/jq/wiki/Internals:-the-interpreter
https://github.com/riba2534/TCP-IP-NetworkNote/tree/master/ch03
https://github.com/arximboldi/lager
https://github.com/eatingtomatoes/pure_simd
https://github.com/logicalclocks/rondb
https://github.com/Jason2013/gslcl/blob/master/ch04.rst··
Page cache https://www.yuque.com/jdxj/bq4chm/rug9eq
https://github.com/lj1208/binloglistener/tree/master/src
oceanbase开源了 https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.0/overall-architecture
代码走读https://zhuanlan.zhihu.com/p/392107745
https://github.com/wangzzu/awesome/issues/31