基本原理?

liburing api介绍 7

// 非系统调用,初始化io_uring,entries:队列深度 queue depth
//开启 IORING_SETUP_IOPOLL 后,io_uring 会使用轮询的方式执行所有的操作。
//开启 IORING_SETUP_SQPOLL 后,io_uring 会创建一个内核线程专门用来收割用户提交的任务。
extern int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);

// 非系统调用,清理io_uring
extern void io_uring_queue_exit(struct io_uring *ring);

// 非系统调用,获取一个可用的 submit_queue_entry,用来提交IO
extern struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);

// 非系统调用,准备阶段,和libaio封装的io_prep_writev一样
static inline void io_uring_prep_writev(struct io_uring_sqe *sqe, int fd,const struct iovec *iovecs, unsigned nr_vecs, off_t offset)

// 非系统调用,准备阶段,和libaio封装的io_prep_readv一样
static inline void io_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)
 
// 非系统调用,把准备阶段准备的data放进 submit_queue_entry
static inline void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data)
 
// 非系统调用,设置submit_queue_entry的flag
static inline void io_uring_sqe_set_flags(struct io_uring_sqe *sqe, unsigned flags)
 
// 非系统调用,提交sq的entry,不会阻塞等到其完成,内核在其完成后会自动将sqe的偏移信息加入到cq,在提交时需要加锁
extern int io_uring_submit(struct io_uring *ring);

// 非系统调用,提交sq的entry,阻塞等到其完成,在提交时需要加锁。
extern int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr);

// 非系统调用 宏定义,会遍历cq从head到tail,来处理完成的IO
#define io_uring_for_each_cqe(ring, head, cqe)

// 非系统调用 遍历时,可以获取cqe的data
static inline void *io_uring_cqe_get_data(const struct io_uring_cqe *cqe)

// 非系统调用 遍历完成时,需要调整head往后移nr
static inline void io_uring_cq_advance(struct io_uring *ring, unsigned nr)

默认是IOPOLL啊

rocksdb怎么用io uring?

只在mread 上用

Io_uring管理比较简单

// io_uring instance queue depth
const unsigned int kIoUringDepth = 256;

inline void DeleteIOUring(void* p) {
  struct io_uring* iu = static_cast<struct io_uring*>(p);
  delete iu;
}

inline struct io_uring* CreateIOUring() {
  struct io_uring* new_io_uring = new struct io_uring;
  int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0);
  if (ret) {
    delete new_io_uring;
    new_io_uring = nullptr;
  }
  return new_io_uring;
}

mread

IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
                                          size_t num_reqs,
                                          const IOOptions& options,
                                          IODebugContext* dbg) {
  if (use_direct_io()) {
    for (size_t i = 0; i < num_reqs; i++) {
      assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
      assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
      assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
    }
  }

#if defined(ROCKSDB_IOURING_PRESENT)
  struct io_uring* iu = nullptr;
  if (thread_local_io_urings_) {
    iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
    if (iu == nullptr) {
      iu = CreateIOUring();
      if (iu != nullptr) {
        thread_local_io_urings_->Reset(iu);
      }
    }
  }

  // Init failed, platform doesn't support io_uring. Fall back to
  // serialized reads
  if (iu == nullptr) {
    return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
  }

  IOStatus ios = IOStatus::OK();

  struct WrappedReadRequest {
    FSReadRequest* req;
    struct iovec iov;
    size_t finished_len;
    explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
  };

  autovector<WrappedReadRequest, 32> req_wraps;
  autovector<WrappedReadRequest*, 4> incomplete_rq_list;

  for (size_t i = 0; i < num_reqs; i++) {
    req_wraps.emplace_back(&reqs[i]);
  }

  size_t reqs_off = 0;
  while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
    size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();

    // If requests exceed depth, split it into batches
    if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;

    assert(incomplete_rq_list.size() <= this_reqs);
    //循环,设置sqe
    for (size_t i = 0; i < this_reqs; i++) {
      WrappedReadRequest* rep_to_submit;
      if (i < incomplete_rq_list.size()) {
        rep_to_submit = incomplete_rq_list[i];
      } else {
        rep_to_submit = &req_wraps[reqs_off++];
      }
      assert(rep_to_submit->req->len > rep_to_submit->finished_len);
      rep_to_submit->iov.iov_base =
          rep_to_submit->req->scratch + rep_to_submit->finished_len;
      rep_to_submit->iov.iov_len =
          rep_to_submit->req->len - rep_to_submit->finished_len;

      struct io_uring_sqe* sqe;
      sqe = io_uring_get_sqe(iu);
      io_uring_prep_readv(
          sqe, fd_, &rep_to_submit->iov, 1,
          rep_to_submit->req->offset + rep_to_submit->finished_len);
      io_uring_sqe_set_data(sqe, rep_to_submit);
    }
    incomplete_rq_list.clear();
    //这个wait类似epollwait,下面的io_uring_wait_cqe是真正的拿cqe数据
    ssize_t ret =
        io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
    TEST_SYNC_POINT_CALLBACK(
        "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
        &ret);
    TEST_SYNC_POINT_CALLBACK(
        "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
        iu);

    if (static_cast<size_t>(ret) != this_reqs) {
      fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
      // If error happens and we submitted fewer than expected, it is an
      // exception case and we don't retry here. We should still consume
      // what is is submitted in the ring.
      for (ssize_t i = 0; i < ret; i++) {
        struct io_uring_cqe* cqe = nullptr;
        io_uring_wait_cqe(iu, &cqe);
        if (cqe != nullptr) {
          io_uring_cqe_seen(iu, cqe);
        }
      }
      return IOStatus::IOError("io_uring_submit_and_wait() requested " +
                               ToString(this_reqs) + " but returned " +
                               ToString(ret));
    }

    for (size_t i = 0; i < this_reqs; i++) {
      struct io_uring_cqe* cqe = nullptr;
      WrappedReadRequest* req_wrap;

      // We could use the peek variant here, but this seems safer in terms
      // of our initial wait not reaping all completions
      ret = io_uring_wait_cqe(iu, &cqe);
      TEST_SYNC_POINT_CALLBACK(
          "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
      if (ret) {
        ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret));

        if (cqe != nullptr) {
          io_uring_cqe_seen(iu, cqe);
        }
        continue;
      }
			//io_uring_wait_cqe 拿数据
      req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
      FSReadRequest* req = req_wrap->req;
      if (cqe->res < 0) {
        req->result = Slice(req->scratch, 0);
        req->status = IOError("Req failed", filename_, cqe->res);
      } else {
        size_t bytes_read = static_cast<size_t>(cqe->res);
        TEST_SYNC_POINT_CALLBACK(
            "PosixRandomAccessFile::MultiRead:io_uring_result", &bytes_read);
        if (bytes_read == req_wrap->iov.iov_len) {
          req->result = Slice(req->scratch, req->len);
          req->status = IOStatus::OK();
        } else if (bytes_read == 0) {
          // cqe->res == 0 can means EOF, or can mean partial results. See
          // comment
          // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
          // Fall back to pread in this case.
          if (use_direct_io() &&
              !IsSectorAligned(req_wrap->finished_len,
                               GetRequiredBufferAlignment())) {
            // Bytes reads don't fill sectors. Should only happen at the end
            // of the file.
            req->result = Slice(req->scratch, req_wrap->finished_len);
            req->status = IOStatus::OK();
          } else {
            Slice tmp_slice;
            req->status =
                Read(req->offset + req_wrap->finished_len,
                     req->len - req_wrap->finished_len, options, &tmp_slice,
                     req->scratch + req_wrap->finished_len, dbg);
            req->result =
                Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
          }
        } else if (bytes_read < req_wrap->iov.iov_len) {
          assert(bytes_read > 0);
          assert(bytes_read + req_wrap->finished_len < req->len);
          req_wrap->finished_len += bytes_read;
          incomplete_rq_list.push_back(req_wrap);
        } else {
          req->result = Slice(req->scratch, 0);
          req->status = IOError("Req returned more bytes than requested",
                                filename_, cqe->res);
        }
      }
      io_uring_cqe_seen(iu, cqe);
    }
  }
  return ios;
#else
  return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
#endif
}

io_uring_cqe_seen处理index,如果错误了,比如提前读了,需要标记一下,如果读完了,也要标记一下

  /*
There's a failure case where an application gets a cqe entry, but
the kernel can then overwrite it before the application is done
reading it. This can happen since the io_uring_{get,wait}_completion()
interface both returns a CQE pointer AND increments the ring index.
If the kernel reuses this entry before the applications is done reading
it, the contents may be corrupted.

Remove the CQ head increment from the CQE retrieval, and put it into
a separate helper, io_uring_cqe_seen(). The application must call this
helper when it got a new CQE entry through one of the above calls, and
it's now done reading it.
  

Must be called after io_uring_{get,wait}_completion() after the cqe has
been processed by the application.
*/
static inline void io_uring_cqe_seen(struct io_uring *ring,
				     struct io_uring_cqe *cqe)
{
	if (cqe) {
		struct io_uring_cq *cq = &ring->cq;

		(*cq->khead)++;
		/*
		 * Ensure that the kernel sees our new head, the kernel has
		 * the matching read barrier.
		 */
		write_barrier();
	}
}

tikv团队做了一些额外的工作 http://openinx.github.io/ppt/io-uring.pdf

试着将compaction WAL都用上iouring

  1. wal和sstbale的写入使用io_uring,但是测完之后性能提升不明显。

  2. compaction file write的时间降低了一半。


ref

  1. https://unixism.net/loti/
  2. https://unixism.net/2020/04/io-uring-by-example-article-series/
  3. http://kernel.dk/io_uring.pdf
    • 讨论https://news.ycombinator.com/item?id=19843464
  4. https://lwn.net/Articles/810414/
    • 讨论https://news.ycombinator.com/item?id=22262964
  5. https://cor3ntin.github.io/posts/iouring/
  6. https://zhuanlan.zhihu.com/p/380726590 好文
  7. 一部分内容,抄自这篇文章 https://zhuanlan.zhihu.com/p/361955546