flowchart TD
subgraph "多进程架构"
API_SERVER["API Server进程
处理用户请求/分词/解码"]
ENGINE_CORE["EngineCore进程
调度/GPU推理"]
API_SERVER <--> |"ZeroMQ IPC
SyncMPClient/AsyncMPClient"| ENGINE_CORE
end
subgraph "EngineCore内部结构"
CORE["EngineCore
core.py"]
SCHEDULER["Scheduler
core/scheduler.py"]
KV_CACHE_MGR["KVCacheManager
core/kv_cache_manager.py"]
EXECUTOR["ModelExecutor
executor/"]
CORE --> SCHEDULER
CORE --> KV_CACHE_MGR
CORE --> EXECUTOR
end
subgraph "Executor架构"
EXECUTOR_CLASS["ExecutorClass
executor/__init__.py"]
MULTIPROC_EXEC["MultiprocExecutor
executor/mp.py"]
WORKER_PROCS["Worker进程
WorkerProc.worker_main()"]
GPU_MODEL["GPUModelRunner
worker/gpu_model_runner.py"]
EXECUTOR_CLASS --> MULTIPROC_EXEC
MULTIPROC_EXEC --> |"创建
make_worker_process()"| WORKER_PROCS
WORKER_PROCS --> GPU_MODEL
end
ENGINE_CORE --- CORE
flowchart TD
subgraph "EngineCore初始化
engine/core.py"
INIT_ENGINE["EngineCore.__init__()
引擎初始化"]
INIT_EXECUTOR["初始化Executor
executor_class.__init__()"]
AVAIL_MEM["determine_available_memory()
确定可用显存"]
GET_KV_CONFIG["get_kv_cache_config()
获取KV缓存配置"]
INIT_KV["initialize_from_config()
初始化KV缓存"]
INIT_SCHED["初始化Scheduler
Scheduler()"]
end
subgraph "Executor初始化
执行者初始化"
MP_INIT["MultiprocExecutor._init_executor()
多进程执行者初始化"]
CREATE_MQ["创建消息队列
rpc_broadcast_mq"]
CREATE_WORKERS["创建Worker进程
make_worker_process()"]
WORKER_INIT["Worker进程初始化
worker_main()"]
INIT_DEVICE["初始化设备
init_device()"]
LOAD_MODEL["加载模型
load_model()"]
end
subgraph "Worker KV缓存
worker/gpu_model_runner.py"
GPU_RUNNER["GPUModelRunner.__init__()
GPU模型运行器初始化"]
ALLOC_KV["分配KV缓存Tensor
每层2个(K/V)"]
KV_SHAPE["KV缓存形状
(2, num_blocks, block_size, kv_head, head_size)"]
end
INIT_ENGINE --> INIT_EXECUTOR
INIT_ENGINE --> AVAIL_MEM
AVAIL_MEM --> GET_KV_CONFIG
GET_KV_CONFIG --> INIT_KV
INIT_KV --> INIT_SCHED
INIT_EXECUTOR --> MP_INIT
MP_INIT --> CREATE_MQ
MP_INIT --> CREATE_WORKERS
CREATE_WORKERS --> WORKER_INIT
WORKER_INIT --> INIT_DEVICE
WORKER_INIT --> LOAD_MODEL
LOAD_MODEL --> GPU_RUNNER
GPU_RUNNER --> ALLOC_KV
ALLOC_KV --> KV_SHAPE
flowchart LR
subgraph "Client进程
API Server"
PROMPT["用户输入Prompt"]
LLM_GEN["LLM.generate()
entrypoints/llm.py"]
ADD_REQ["_add_request()
LLMEngine.add_request()"]
TOKENIZE["processor.encode_request()
输入处理/分词"]
RUN_ENGINE["_run_engine()
while循环调用step"]
GET_OUT["LLMEngine.step()
engine_core.get_output()"]
DE_TOKEN["processor.process_output()
解码token"]
RESULT["返回结果给用户"]
PROMPT --> LLM_GEN
LLM_GEN --> ADD_REQ
ADD_REQ --> TOKENIZE
LLM_GEN --> RUN_ENGINE
RUN_ENGINE --> GET_OUT
GET_OUT --> DE_TOKEN --> RESULT
end
subgraph "EngineCore进程"
INPUT_Q["input_queue
EngineCoreProc成员"]
PROC_INPUT["process_input_socket线程
接收请求"]
BUSY_LOOP["run_busy_loop
主线程循环"]
HANDLE_REQ["_handle_client_request()
处理客户端请求"]
ADD_TO_SCHED["EngineCore.add_request()
添加到Scheduler"]
SCHED_STEP["EngineCore.step()
执行一次调度"]
OUTPUT_Q["output_queue
EngineCoreProc成员"]
PROC_OUTPUT["process_output_socket线程
发送结果"]
PROC_INPUT --> INPUT_Q
INPUT_Q --> BUSY_LOOP
BUSY_LOOP --> HANDLE_REQ
HANDLE_REQ --> ADD_TO_SCHED
BUSY_LOOP --> SCHED_STEP
SCHED_STEP --> OUTPUT_Q
OUTPUT_Q --> PROC_OUTPUT
end
ADD_REQ -->|"socket.send_multipart()
序列化请求"| PROC_INPUT
PROC_OUTPUT -->|"socket.recv_multipart()
反序列化结果"| GET_OUT
flowchart TD
subgraph "Client进程
请求处理"
START["用户发送请求"]
ENCODE["processor.encode_request()
将文本转为token"]
WRAP["EngineCoreRequest
封装请求"]
SEND["MPClient.add_request()
发送请求"]
WAIT["_run_engine()
等待结果"]
RECEIVE["MPClient.get_output()
接收结果"]
DECODE["processor.process_output()
解码结果"]
END["返回给用户"]
START --> ENCODE --> WRAP --> SEND --> WAIT --> RECEIVE --> DECODE --> END
end
subgraph "EngineCore进程
请求执行"
RECV_REQ["接收请求
process_input_socket线程"]
ADD_REQ["add_request()
添加到Scheduler"]
SCH_WAIT["等待调度
waiting队列"]
GET_PREFIX["get_computed_blocks()
检查prefix cache命中"]
ALLOC["allocate_slots()
分配KVCache块"]
SCH_RUN["调度执行
running队列"]
PREPARE["_prepare_inputs()
准备模型输入"]
EXEC["execute_model()
执行模型推理"]
UPDATE["_update_states()
更新状态"]
CACHE["cache_full_blocks()
缓存满的块"]
OUT["输出结果
process_output_socket线程"]
RECV_REQ --> ADD_REQ --> SCH_WAIT
SCH_WAIT --> GET_PREFIX --> ALLOC --> SCH_RUN
SCH_RUN --> PREPARE --> EXEC --> UPDATE --> CACHE --> OUT
end
SEND -.->|"socket.send_multipart()
序列化请求"| RECV_REQ
OUT -.->|"socket.recv_multipart()
序列化结果"| RECEIVE
subgraph "抢占恢复流程"
FULL["KVCache空间不足"]
SELECT["select_requests_to_preempt()
选择抢占对象"]
PREEMPT["preempt_requests()
执行抢占"]
FREE["free_blocks()
释放KVCache"]
WAIT_AGAIN["重新进入waiting队列
保留prefix hash"]
RESUME["恢复执行
命中prefix cache"]
FULL --> SELECT --> PREEMPT --> FREE --> WAIT_AGAIN --> RESUME
end
ALLOC -->|"空间不足"| FULL
RESUME --> GET_PREFIX
flowchart TD
subgraph "Scheduler
core/scheduler.py"
INIT_SCHED["__init__()
初始化调度器"]
WAITING["waiting队列
等待调度的请求"]
RUNNING["running队列
正在执行的请求"]
REQ_MAP["requests字典
请求ID到请求对象映射"]
end
subgraph "调度流程
schedule()方法"
SCHED_RUN["schedule_running_requests()
调度running队列中的请求"]
ALLOCATE_RUN["为running请求分配KVCache
allocate_slots()"]
CHECK_PREEMPT["抢占模式检查
is_preemption_needed()"]
EXEC_PREEMPT["执行抢占
execute_preemption()"]
SELECT_VICTIM["选择抢占对象
select_requests_to_preempt()"]
PREEMPT_REQ["抢占请求
preempt_requests()"]
FREE_BLOCKS["释放KVCache块
free_blocks()"]
SCHED_WAIT["schedule_waiting_requests()
调度waiting队列"]
GET_COMPUTED["获取命中prefix cache的块
get_computed_blocks()"]
ALLOCATE_WAIT["为waiting请求分配KVCache
allocate_slots()"]
SCHED_RUN --> ALLOCATE_RUN
ALLOCATE_RUN -->|"分配失败"| CHECK_PREEMPT
CHECK_PREEMPT -->|"需要抢占"| EXEC_PREEMPT
EXEC_PREEMPT --> SELECT_VICTIM
SELECT_VICTIM --> PREEMPT_REQ
PREEMPT_REQ --> FREE_BLOCKS
FREE_BLOCKS --> ALLOCATE_RUN
SCHED_WAIT --> GET_COMPUTED
GET_COMPUTED --> ALLOCATE_WAIT
end
subgraph "模型执行
worker/gpu_model_runner.py"
PREPARE["_prepare_inputs()
准备输入数据"]
UPDATE["_update_states()
更新状态"]
EXECUTE["execute_model()
执行模型推理"]
end
INIT_SCHED --> WAITING
INIT_SCHED --> RUNNING
INIT_SCHED --> REQ_MAP
WAITING --> SCHED_WAIT
RUNNING --> SCHED_RUN
ALLOCATE_RUN -->|"成功"| PREPARE
ALLOCATE_WAIT -->|"成功"| PREPARE
PREPARE --> UPDATE --> EXECUTE
ALLOCATE_WAIT -->|"成功后加入running"| RUNNING
PREEMPT_REQ -->|"被抢占的请求加入waiting"| WAITING
flowchart TD
%% 初始化阶段
subgraph "初始化阶段"
INIT["GPUModelRunner.__init__()"] --> GET_BACKEND["get_attn_backend()"]
GET_BACKEND --> BUILDER["attn_metadata_builder = backend.get_builder_cls()()"]
INIT --> CONFIG["配置参数设置"]
CONFIG --> SLIDING["sliding_window/interleaved_sliding_window
window_size = sliding_window or interleaved_sliding_window"]
CONFIG --> BLOCK_SIZE["block_size = cache_config.block_size"]
CONFIG --> MM_CHECK["is_multimodal_model = model_config.is_multimodal_model"]
CONFIG --> MROPE_CHECK["uses_mrope = model_config.uses_mrope"]
CONFIG --> ALIBI_CHECK["use_alibi = check_use_alibi(model_config)"]
end
%% KV缓存初始化
subgraph "KV缓存初始化"
INIT_KV["initialize_kv_cache(kv_cache_config)"] --> GET_SPEC["get_kv_cache_spec()"]
GET_SPEC --> CREATE_SPECS["为每个Attention层创建KVCacheSpec"]
CREATE_SPECS --> SW_SPEC["SlidingWindowSpec
如果attention.sliding_window不为None"]
CREATE_SPECS --> FULL_SPEC["FullAttentionSpec
标准全注意力"]
SW_SPEC --> CREATE_KV["kv_caches[layer] = torch.zeros()
使用attn_backend.get_kv_cache_shape()"]
FULL_SPEC --> CREATE_KV
CREATE_KV --> BIND_KV["bind_kv_cache(kv_caches, forward_context, self.kv_caches)"]
end
%% 请求处理流程
subgraph "请求处理阶段"
EXEC_MODEL["execute_model(scheduler_output)"] --> UPDATE_STATES["_update_states(scheduler_output)"]
UPDATE_STATES --> CHECK_MM["if self.is_multimodal_model"]
CHECK_MM -->|"True"| MM_EXEC["_execute_mm_encoder(scheduler_output)
_gather_mm_embeddings(scheduler_output)"]
UPDATE_STATES --> PREPARE["_prepare_inputs(scheduler_output)"]
PREPARE --> REORDER["attn_metadata_builder.reorder_batch()"]
PREPARE --> CALC_PREFIX["_compute_cascade_attn_prefix_len(
num_scheduled_tokens, num_common_prefix_blocks)"]
CALC_PREFIX --> USE_CASCADE["attn_backend.use_cascade_attention()"]
PREPARE --> BUILD_META["attn_metadata_builder.build(
num_reqs, num_actual_tokens, max_query_len, common_prefix_len)"]
BUILD_META --> FLASH_META["FlashAttentionMetadata"]
end
%% 前向计算阶段
subgraph "前向计算阶段"
PREPARE --> SET_CTX["set_forward_context(attn_metadata, vllm_config)"]
SET_CTX --> MODEL_FWD["model(input_ids, positions, intermediate_tensors, inputs_embeds)"]
CHECK_MM -->|"True"| PREPARE_MM["准备多模态输入"]
PREPARE_MM --> GET_EMBEDS["inputs_embeds = model.get_input_embeddings(input_ids, mm_embeds)"]
MROPE_CHECK -->|"True"| USE_MROPE["positions = mrope_positions[:, :num_tokens]
_calc_mrope_positions()"]
MROPE_CHECK -->|"False"| USE_STD_POS["positions = self.positions[:num_tokens]"]
end
%% 输出处理阶段
subgraph "输出处理阶段"
MODEL_FWD --> GET_HIDDEN["hidden_states = model.forward()"]
GET_HIDDEN --> COMPUTE_LOG["logits = model.compute_logits(hidden_states, None)"]
COMPUTE_LOG --> GRAMMAR["apply_grammar_bitmask()"]
GRAMMAR --> SAMPLE["sampler_output = model.sample(logits, sampling_metadata)"]
SAMPLE --> OUT_TOKENS["获取采样token IDs"]
OUT_TOKENS --> SPEC_CHECK["检查是否使用推测解码"]
SPEC_CHECK -->|"True"| DRAFT["draft_token_ids = generate_draft_token_ids()/drafter.propose()"]
end
%% 关键连接
FLASH_META --> SET_CTX
MM_EXEC --> PREPARE_MM
flowchart TD
%% 注意力层初始化
subgraph "注意力层初始化"
ATTN_INIT["Attention.__init__(
num_heads, head_size, scale, num_kv_heads...)"]
ATTN_INIT --> CONFIG_SETUP["设置配置参数:
sliding_window, block_size
kv_cache_dtype, is_attention_free"]
ATTN_INIT --> SCALES_INIT["初始化缩放系数:
self._q_scale, self._k_scale, self._v_scale = 1.0"]
ATTN_INIT --> GET_BACKEND["获取注意力后端:
get_attn_backend(head_size, dtype, kv_cache_dtype...)"]
GET_BACKEND --> IMPL_CLS["获取实现类:
impl_cls = attn_backend.get_impl_cls()"]
IMPL_CLS --> CREATE_IMPL["创建实现实例:
self.impl = impl_cls(num_heads, head_size, scale...)"]
ATTN_INIT --> KV_CACHE_SETUP["初始化KV缓存占位:
self.kv_cache = [torch.tensor([]) for ...]"]
end
%% 注意力前向计算
subgraph "注意力前向计算 (forward)"
ATTN_FWD["Attention.forward(
query, key, value, output_shape=None)"]
ATTN_FWD --> CHECK_CALC["检查是否需要计算K/V缩放系数
if self.calculate_kv_scales"]
CHECK_CALC -->|"是"| CALC_SCALES["calc_kv_scales(query, key, value)
计算张量极值与参考范围的比值"]
ATTN_FWD --> CHECK_OUTPUT["是否使用输出缓冲区
if self.use_output"]
CHECK_OUTPUT -->|"是"| WITH_OUTPUT["使用预分配输出buffer"]
WITH_OUTPUT --> RESHAPE_QKV["调整QKV形状:
query.view(-1, num_heads, head_size)
key/value.view(-1, num_kv_heads, head_size)"]
RESHAPE_QKV --> CHECK_DIRECT["检查调用方式
if self.use_direct_call"]
CHECK_DIRECT -->|"直接调用"| GET_CTX["获取前向上下文:
forward_context = get_forward_context()"]
GET_CTX --> GET_META["获取注意力元数据:
attn_metadata = forward_context.attn_metadata"]
GET_META --> GET_KV["获取KV缓存:
self_kv_cache = self.kv_cache[virtual_engine]"]
GET_KV --> CALL_IMPL["调用具体实现:
self.impl.forward(self, query, key, value,
self_kv_cache, attn_metadata, output=output)"]
CHECK_DIRECT -->|"自定义算子"| CALL_OP["调用统一注意力算子:
torch.ops.vllm.unified_attention_with_output(...)"]
WITH_OUTPUT --> RESHAPE_OUT["重塑输出:
output.view(-1, hidden_size)"]
CHECK_OUTPUT -->|"否"| NO_OUTPUT["不使用预分配输出"]
NO_OUTPUT --> CHECK_DIRECT2["检查调用方式"]
CHECK_DIRECT2 -->|"直接调用"| DIRECT_IMPL["直接调用具体实现"]
CHECK_DIRECT2 -->|"自定义算子"| CALL_OP2["调用统一注意力算子"]
end
%% KV缓存缩放计算
subgraph "KV缓存缩放计算 (calc_kv_scales)"
CALC_KV["calc_kv_scales(query, key, value)"]
CALC_KV --> Q_SCALE["计算Q缩放: self._q_scale = torch.abs(query).max() / self.q_range"]
CALC_KV --> K_SCALE["计算K缩放: self._k_scale = torch.abs(key).max() / self.k_range"]
CALC_KV --> V_SCALE["计算V缩放: self._v_scale = torch.abs(value).max() / self.v_range"]
CALC_KV --> SET_FLOAT["设置浮点缩放值:
self._k_scale_float = self._k_scale.item()
self._v_scale_float = self._v_scale.item()"]
CALC_KV --> DISABLE_CALC["禁用后续计算:
self.calculate_kv_scales = False"]
end
%% 自定义算子实现
subgraph "统一注意力算子实现"
UNIFIED["unified_attention(query, key, value, layer_name)"]
UNIFIED --> GET_CTX2["获取前向上下文"]
GET_CTX2 --> GET_LAYER["获取层实例:
self = forward_context.no_compile_layers[layer_name]"]
GET_LAYER --> GET_KV2["获取KV缓存"]
GET_KV2 --> CALL_BACKEND["调用后端实现:
self.impl.forward(...)"]
UNIFIED_OUT["unified_attention_with_output(
query, key, value, output, layer_name)"]
UNIFIED_OUT --> SIMILAR["类似流程,但将结果写入预分配的output"]
end
%% 主要连接线
CALL_IMPL --> RESHAPE_OUT
CALL_OP --> RESHAPE_OUT
DIRECT_IMPL --> ATTN_FWD
CALL_OP2 --> ATTN_FWD
flowchart TD
subgraph "KVCacheManager
core/kv_cache_manager.py"
INIT_KV["__init__()
初始化KVCache管理器"]
BLOCK_POOL["BlockPool
block_pool.py"]
REQ_BLOCKS["req_to_blocks
请求ID到KVCache块映射"]
REQ_HASH["req_to_block_hashes
请求ID到块Hash映射"]
INIT_KV --> BLOCK_POOL
INIT_KV --> REQ_BLOCKS
INIT_KV --> REQ_HASH
end
subgraph "BlockPool
core/block_pool.py"
INIT_BP["__init__()
创建所有KVCacheBlock"]
FREE_Q["free_block_queue
空闲块LRU链表"]
CACHED_MAP["cached_block_hash_to_block
Hash值到块的映射"]
ALLOC["allocate()
分配一个块"]
FREE["free()
释放一个块"]
INIT_BP --> FREE_Q
INIT_BP --> CACHED_MAP
ALLOC --> FREE_Q
FREE --> FREE_Q
end
subgraph "调度函数
core/scheduler.py"
GET_COMP["get_computed_blocks()
获取命中prefix cache的块"]
ALLOCATE["allocate_slots()
分配KVCache块"]
CACHE_FULL["cache_full_blocks()
缓存已满的块"]
ALLOCATE --> |"调用"| CACHE_FULL
end
GET_COMP --> |"查询"| REQ_HASH
GET_COMP --> |"返回命中的块"| ALLOCATE
ALLOCATE --> |"申请块
block_pool.allocate()"| ALLOC
ALLOCATE --> |"更新"| REQ_BLOCKS
CACHE_FULL --> |"计算hash并添加
cached_block_hash_to_block[hash]=block"| CACHED_MAP
flowchart TD
%% 初始化阶段
subgraph "初始化阶段"
INIT["GPUModelRunner.__init__()"] --> GET_BACKEND["get_attn_backend()"]
GET_BACKEND --> BUILDER["attn_metadata_builder = backend.get_builder_cls()()"]
INIT --> CONFIG["配置参数设置"]
CONFIG --> SLIDING["sliding_window/interleaved_sliding_window
window_size = sliding_window or interleaved_sliding_window"]
CONFIG --> BLOCK_SIZE["block_size = cache_config.block_size"]
CONFIG --> MM_CHECK["is_multimodal_model = model_config.is_multimodal_model"]
CONFIG --> MROPE_CHECK["uses_mrope = model_config.uses_mrope"]
CONFIG --> ALIBI_CHECK["use_alibi = check_use_alibi(model_config)"]
end
%% KV缓存初始化
subgraph "KV缓存初始化"
INIT_KV["initialize_kv_cache(kv_cache_config)"] --> GET_SPEC["get_kv_cache_spec()"]
GET_SPEC --> CREATE_SPECS["为每个Attention层创建KVCacheSpec"]
CREATE_SPECS --> SW_SPEC["SlidingWindowSpec
如果attention.sliding_window不为None"]
CREATE_SPECS --> FULL_SPEC["FullAttentionSpec
标准全注意力"]
SW_SPEC --> CREATE_KV["kv_caches[layer] = torch.zeros()
使用attn_backend.get_kv_cache_shape()"]
FULL_SPEC --> CREATE_KV
CREATE_KV --> BIND_KV["bind_kv_cache(kv_caches, forward_context, self.kv_caches)"]
end
%% 请求处理流程
subgraph "请求处理阶段"
EXEC_MODEL["execute_model(scheduler_output)"] --> UPDATE_STATES["_update_states(scheduler_output)"]
UPDATE_STATES --> CHECK_MM["if self.is_multimodal_model"]
CHECK_MM -->|"True"| MM_EXEC["_execute_mm_encoder(scheduler_output)
_gather_mm_embeddings(scheduler_output)"]
UPDATE_STATES --> PREPARE["_prepare_inputs(scheduler_output)"]
PREPARE --> REORDER["attn_metadata_builder.reorder_batch()"]
PREPARE --> CALC_PREFIX["_compute_cascade_attn_prefix_len(
num_scheduled_tokens, num_common_prefix_blocks)"]
CALC_PREFIX --> USE_CASCADE["attn_backend.use_cascade_attention()"]
PREPARE --> BUILD_META["attn_metadata_builder.build(
num_reqs, num_actual_tokens, max_query_len, common_prefix_len)"]
BUILD_META --> FLASH_META["FlashAttentionMetadata"]
end
%% 前向计算阶段
subgraph "前向计算阶段"
PREPARE --> SET_CTX["set_forward_context(attn_metadata, vllm_config)"]
SET_CTX --> MODEL_FWD["model(input_ids, positions, intermediate_tensors, inputs_embeds)"]
CHECK_MM -->|"True"| PREPARE_MM["准备多模态输入"]
PREPARE_MM --> GET_EMBEDS["inputs_embeds = model.get_input_embeddings(input_ids, mm_embeds)"]
MROPE_CHECK -->|"True"| USE_MROPE["positions = mrope_positions[:, :num_tokens]
_calc_mrope_positions()"]
MROPE_CHECK -->|"False"| USE_STD_POS["positions = self.positions[:num_tokens]"]
end
%% 输出处理阶段
subgraph "输出处理阶段"
MODEL_FWD --> GET_HIDDEN["hidden_states = model.forward()"]
GET_HIDDEN --> COMPUTE_LOG["logits = model.compute_logits(hidden_states, None)"]
COMPUTE_LOG --> GRAMMAR["apply_grammar_bitmask()"]
GRAMMAR --> SAMPLE["sampler_output = model.sample(logits, sampling_metadata)"]
SAMPLE --> OUT_TOKENS["获取采样token IDs"]
OUT_TOKENS --> SPEC_CHECK["检查是否使用推测解码"]
SPEC_CHECK -->|"True"| DRAFT["draft_token_ids = generate_draft_token_ids()/drafter.propose()"]
end
%% 关键连接
FLASH_META --> SET_CTX
MM_EXEC --> PREPARE_MM