flowchart TD subgraph "多进程架构" API_SERVER["API Server进程
处理用户请求/分词/解码"] ENGINE_CORE["EngineCore进程
调度/GPU推理"] API_SERVER <--> |"ZeroMQ IPC
SyncMPClient/AsyncMPClient"| ENGINE_CORE end subgraph "EngineCore内部结构" CORE["EngineCore
core.py"] SCHEDULER["Scheduler
core/scheduler.py"] KV_CACHE_MGR["KVCacheManager
core/kv_cache_manager.py"] EXECUTOR["ModelExecutor
executor/"] CORE --> SCHEDULER CORE --> KV_CACHE_MGR CORE --> EXECUTOR end subgraph "Executor架构" EXECUTOR_CLASS["ExecutorClass
executor/__init__.py"] MULTIPROC_EXEC["MultiprocExecutor
executor/mp.py"] WORKER_PROCS["Worker进程
WorkerProc.worker_main()"] GPU_MODEL["GPUModelRunner
worker/gpu_model_runner.py"] EXECUTOR_CLASS --> MULTIPROC_EXEC MULTIPROC_EXEC --> |"创建
make_worker_process()"| WORKER_PROCS WORKER_PROCS --> GPU_MODEL end ENGINE_CORE --- CORE
flowchart TD subgraph "EngineCore初始化
engine/core.py" INIT_ENGINE["EngineCore.__init__()
引擎初始化"] INIT_EXECUTOR["初始化Executor
executor_class.__init__()"] AVAIL_MEM["determine_available_memory()
确定可用显存"] GET_KV_CONFIG["get_kv_cache_config()
获取KV缓存配置"] INIT_KV["initialize_from_config()
初始化KV缓存"] INIT_SCHED["初始化Scheduler
Scheduler()"] end subgraph "Executor初始化
执行者初始化" MP_INIT["MultiprocExecutor._init_executor()
多进程执行者初始化"] CREATE_MQ["创建消息队列
rpc_broadcast_mq"] CREATE_WORKERS["创建Worker进程
make_worker_process()"] WORKER_INIT["Worker进程初始化
worker_main()"] INIT_DEVICE["初始化设备
init_device()"] LOAD_MODEL["加载模型
load_model()"] end subgraph "Worker KV缓存
worker/gpu_model_runner.py" GPU_RUNNER["GPUModelRunner.__init__()
GPU模型运行器初始化"] ALLOC_KV["分配KV缓存Tensor
每层2个(K/V)"] KV_SHAPE["KV缓存形状
(2, num_blocks, block_size, kv_head, head_size)"] end INIT_ENGINE --> INIT_EXECUTOR INIT_ENGINE --> AVAIL_MEM AVAIL_MEM --> GET_KV_CONFIG GET_KV_CONFIG --> INIT_KV INIT_KV --> INIT_SCHED INIT_EXECUTOR --> MP_INIT MP_INIT --> CREATE_MQ MP_INIT --> CREATE_WORKERS CREATE_WORKERS --> WORKER_INIT WORKER_INIT --> INIT_DEVICE WORKER_INIT --> LOAD_MODEL LOAD_MODEL --> GPU_RUNNER GPU_RUNNER --> ALLOC_KV ALLOC_KV --> KV_SHAPE
flowchart LR subgraph "Client进程
API Server" PROMPT["用户输入Prompt"] LLM_GEN["LLM.generate()
entrypoints/llm.py"] ADD_REQ["_add_request()
LLMEngine.add_request()"] TOKENIZE["processor.encode_request()
输入处理/分词"] RUN_ENGINE["_run_engine()
while循环调用step"] GET_OUT["LLMEngine.step()
engine_core.get_output()"] DE_TOKEN["processor.process_output()
解码token"] RESULT["返回结果给用户"] PROMPT --> LLM_GEN LLM_GEN --> ADD_REQ ADD_REQ --> TOKENIZE LLM_GEN --> RUN_ENGINE RUN_ENGINE --> GET_OUT GET_OUT --> DE_TOKEN --> RESULT end subgraph "EngineCore进程" INPUT_Q["input_queue
EngineCoreProc成员"] PROC_INPUT["process_input_socket线程
接收请求"] BUSY_LOOP["run_busy_loop
主线程循环"] HANDLE_REQ["_handle_client_request()
处理客户端请求"] ADD_TO_SCHED["EngineCore.add_request()
添加到Scheduler"] SCHED_STEP["EngineCore.step()
执行一次调度"] OUTPUT_Q["output_queue
EngineCoreProc成员"] PROC_OUTPUT["process_output_socket线程
发送结果"] PROC_INPUT --> INPUT_Q INPUT_Q --> BUSY_LOOP BUSY_LOOP --> HANDLE_REQ HANDLE_REQ --> ADD_TO_SCHED BUSY_LOOP --> SCHED_STEP SCHED_STEP --> OUTPUT_Q OUTPUT_Q --> PROC_OUTPUT end ADD_REQ -->|"socket.send_multipart()
序列化请求"| PROC_INPUT PROC_OUTPUT -->|"socket.recv_multipart()
反序列化结果"| GET_OUT
flowchart TD subgraph "Client进程
请求处理" START["用户发送请求"] ENCODE["processor.encode_request()
将文本转为token"] WRAP["EngineCoreRequest
封装请求"] SEND["MPClient.add_request()
发送请求"] WAIT["_run_engine()
等待结果"] RECEIVE["MPClient.get_output()
接收结果"] DECODE["processor.process_output()
解码结果"] END["返回给用户"] START --> ENCODE --> WRAP --> SEND --> WAIT --> RECEIVE --> DECODE --> END end subgraph "EngineCore进程
请求执行" RECV_REQ["接收请求
process_input_socket线程"] ADD_REQ["add_request()
添加到Scheduler"] SCH_WAIT["等待调度
waiting队列"] GET_PREFIX["get_computed_blocks()
检查prefix cache命中"] ALLOC["allocate_slots()
分配KVCache块"] SCH_RUN["调度执行
running队列"] PREPARE["_prepare_inputs()
准备模型输入"] EXEC["execute_model()
执行模型推理"] UPDATE["_update_states()
更新状态"] CACHE["cache_full_blocks()
缓存满的块"] OUT["输出结果
process_output_socket线程"] RECV_REQ --> ADD_REQ --> SCH_WAIT SCH_WAIT --> GET_PREFIX --> ALLOC --> SCH_RUN SCH_RUN --> PREPARE --> EXEC --> UPDATE --> CACHE --> OUT end SEND -.->|"socket.send_multipart()
序列化请求"| RECV_REQ OUT -.->|"socket.recv_multipart()
序列化结果"| RECEIVE subgraph "抢占恢复流程" FULL["KVCache空间不足"] SELECT["select_requests_to_preempt()
选择抢占对象"] PREEMPT["preempt_requests()
执行抢占"] FREE["free_blocks()
释放KVCache"] WAIT_AGAIN["重新进入waiting队列
保留prefix hash"] RESUME["恢复执行
命中prefix cache"] FULL --> SELECT --> PREEMPT --> FREE --> WAIT_AGAIN --> RESUME end ALLOC -->|"空间不足"| FULL RESUME --> GET_PREFIX
flowchart TD subgraph "Scheduler
core/scheduler.py" INIT_SCHED["__init__()
初始化调度器"] WAITING["waiting队列
等待调度的请求"] RUNNING["running队列
正在执行的请求"] REQ_MAP["requests字典
请求ID到请求对象映射"] end subgraph "调度流程
schedule()方法" SCHED_RUN["schedule_running_requests()
调度running队列中的请求"] ALLOCATE_RUN["为running请求分配KVCache
allocate_slots()"] CHECK_PREEMPT["抢占模式检查
is_preemption_needed()"] EXEC_PREEMPT["执行抢占
execute_preemption()"] SELECT_VICTIM["选择抢占对象
select_requests_to_preempt()"] PREEMPT_REQ["抢占请求
preempt_requests()"] FREE_BLOCKS["释放KVCache块
free_blocks()"] SCHED_WAIT["schedule_waiting_requests()
调度waiting队列"] GET_COMPUTED["获取命中prefix cache的块
get_computed_blocks()"] ALLOCATE_WAIT["为waiting请求分配KVCache
allocate_slots()"] SCHED_RUN --> ALLOCATE_RUN ALLOCATE_RUN -->|"分配失败"| CHECK_PREEMPT CHECK_PREEMPT -->|"需要抢占"| EXEC_PREEMPT EXEC_PREEMPT --> SELECT_VICTIM SELECT_VICTIM --> PREEMPT_REQ PREEMPT_REQ --> FREE_BLOCKS FREE_BLOCKS --> ALLOCATE_RUN SCHED_WAIT --> GET_COMPUTED GET_COMPUTED --> ALLOCATE_WAIT end subgraph "模型执行
worker/gpu_model_runner.py" PREPARE["_prepare_inputs()
准备输入数据"] UPDATE["_update_states()
更新状态"] EXECUTE["execute_model()
执行模型推理"] end INIT_SCHED --> WAITING INIT_SCHED --> RUNNING INIT_SCHED --> REQ_MAP WAITING --> SCHED_WAIT RUNNING --> SCHED_RUN ALLOCATE_RUN -->|"成功"| PREPARE ALLOCATE_WAIT -->|"成功"| PREPARE PREPARE --> UPDATE --> EXECUTE ALLOCATE_WAIT -->|"成功后加入running"| RUNNING PREEMPT_REQ -->|"被抢占的请求加入waiting"| WAITING
flowchart TD %% 初始化阶段 subgraph "初始化阶段" INIT["GPUModelRunner.__init__()"] --> GET_BACKEND["get_attn_backend()"] GET_BACKEND --> BUILDER["attn_metadata_builder = backend.get_builder_cls()()"] INIT --> CONFIG["配置参数设置"] CONFIG --> SLIDING["sliding_window/interleaved_sliding_window
window_size = sliding_window or interleaved_sliding_window"] CONFIG --> BLOCK_SIZE["block_size = cache_config.block_size"] CONFIG --> MM_CHECK["is_multimodal_model = model_config.is_multimodal_model"] CONFIG --> MROPE_CHECK["uses_mrope = model_config.uses_mrope"] CONFIG --> ALIBI_CHECK["use_alibi = check_use_alibi(model_config)"] end %% KV缓存初始化 subgraph "KV缓存初始化" INIT_KV["initialize_kv_cache(kv_cache_config)"] --> GET_SPEC["get_kv_cache_spec()"] GET_SPEC --> CREATE_SPECS["为每个Attention层创建KVCacheSpec"] CREATE_SPECS --> SW_SPEC["SlidingWindowSpec
如果attention.sliding_window不为None"] CREATE_SPECS --> FULL_SPEC["FullAttentionSpec
标准全注意力"] SW_SPEC --> CREATE_KV["kv_caches[layer] = torch.zeros()
使用attn_backend.get_kv_cache_shape()"] FULL_SPEC --> CREATE_KV CREATE_KV --> BIND_KV["bind_kv_cache(kv_caches, forward_context, self.kv_caches)"] end %% 请求处理流程 subgraph "请求处理阶段" EXEC_MODEL["execute_model(scheduler_output)"] --> UPDATE_STATES["_update_states(scheduler_output)"] UPDATE_STATES --> CHECK_MM["if self.is_multimodal_model"] CHECK_MM -->|"True"| MM_EXEC["_execute_mm_encoder(scheduler_output)
_gather_mm_embeddings(scheduler_output)"] UPDATE_STATES --> PREPARE["_prepare_inputs(scheduler_output)"] PREPARE --> REORDER["attn_metadata_builder.reorder_batch()"] PREPARE --> CALC_PREFIX["_compute_cascade_attn_prefix_len(
num_scheduled_tokens, num_common_prefix_blocks)"] CALC_PREFIX --> USE_CASCADE["attn_backend.use_cascade_attention()"] PREPARE --> BUILD_META["attn_metadata_builder.build(
num_reqs, num_actual_tokens, max_query_len, common_prefix_len)"] BUILD_META --> FLASH_META["FlashAttentionMetadata"] end %% 前向计算阶段 subgraph "前向计算阶段" PREPARE --> SET_CTX["set_forward_context(attn_metadata, vllm_config)"] SET_CTX --> MODEL_FWD["model(input_ids, positions, intermediate_tensors, inputs_embeds)"] CHECK_MM -->|"True"| PREPARE_MM["准备多模态输入"] PREPARE_MM --> GET_EMBEDS["inputs_embeds = model.get_input_embeddings(input_ids, mm_embeds)"] MROPE_CHECK -->|"True"| USE_MROPE["positions = mrope_positions[:, :num_tokens]
_calc_mrope_positions()"] MROPE_CHECK -->|"False"| USE_STD_POS["positions = self.positions[:num_tokens]"] end %% 输出处理阶段 subgraph "输出处理阶段" MODEL_FWD --> GET_HIDDEN["hidden_states = model.forward()"] GET_HIDDEN --> COMPUTE_LOG["logits = model.compute_logits(hidden_states, None)"] COMPUTE_LOG --> GRAMMAR["apply_grammar_bitmask()"] GRAMMAR --> SAMPLE["sampler_output = model.sample(logits, sampling_metadata)"] SAMPLE --> OUT_TOKENS["获取采样token IDs"] OUT_TOKENS --> SPEC_CHECK["检查是否使用推测解码"] SPEC_CHECK -->|"True"| DRAFT["draft_token_ids = generate_draft_token_ids()/drafter.propose()"] end %% 关键连接 FLASH_META --> SET_CTX MM_EXEC --> PREPARE_MM
flowchart TD %% 注意力层初始化 subgraph "注意力层初始化" ATTN_INIT["Attention.__init__(
num_heads, head_size, scale, num_kv_heads...)"] ATTN_INIT --> CONFIG_SETUP["设置配置参数:
sliding_window, block_size
kv_cache_dtype, is_attention_free"] ATTN_INIT --> SCALES_INIT["初始化缩放系数:
self._q_scale, self._k_scale, self._v_scale = 1.0"] ATTN_INIT --> GET_BACKEND["获取注意力后端:
get_attn_backend(head_size, dtype, kv_cache_dtype...)"] GET_BACKEND --> IMPL_CLS["获取实现类:
impl_cls = attn_backend.get_impl_cls()"] IMPL_CLS --> CREATE_IMPL["创建实现实例:
self.impl = impl_cls(num_heads, head_size, scale...)"] ATTN_INIT --> KV_CACHE_SETUP["初始化KV缓存占位:
self.kv_cache = [torch.tensor([]) for ...]"] end %% 注意力前向计算 subgraph "注意力前向计算 (forward)" ATTN_FWD["Attention.forward(
query, key, value, output_shape=None)"] ATTN_FWD --> CHECK_CALC["检查是否需要计算K/V缩放系数
if self.calculate_kv_scales"] CHECK_CALC -->|"是"| CALC_SCALES["calc_kv_scales(query, key, value)
计算张量极值与参考范围的比值"] ATTN_FWD --> CHECK_OUTPUT["是否使用输出缓冲区
if self.use_output"] CHECK_OUTPUT -->|"是"| WITH_OUTPUT["使用预分配输出buffer"] WITH_OUTPUT --> RESHAPE_QKV["调整QKV形状:
query.view(-1, num_heads, head_size)
key/value.view(-1, num_kv_heads, head_size)"] RESHAPE_QKV --> CHECK_DIRECT["检查调用方式
if self.use_direct_call"] CHECK_DIRECT -->|"直接调用"| GET_CTX["获取前向上下文:
forward_context = get_forward_context()"] GET_CTX --> GET_META["获取注意力元数据:
attn_metadata = forward_context.attn_metadata"] GET_META --> GET_KV["获取KV缓存:
self_kv_cache = self.kv_cache[virtual_engine]"] GET_KV --> CALL_IMPL["调用具体实现:
self.impl.forward(self, query, key, value,
self_kv_cache, attn_metadata, output=output)"] CHECK_DIRECT -->|"自定义算子"| CALL_OP["调用统一注意力算子:
torch.ops.vllm.unified_attention_with_output(...)"] WITH_OUTPUT --> RESHAPE_OUT["重塑输出:
output.view(-1, hidden_size)"] CHECK_OUTPUT -->|"否"| NO_OUTPUT["不使用预分配输出"] NO_OUTPUT --> CHECK_DIRECT2["检查调用方式"] CHECK_DIRECT2 -->|"直接调用"| DIRECT_IMPL["直接调用具体实现"] CHECK_DIRECT2 -->|"自定义算子"| CALL_OP2["调用统一注意力算子"] end %% KV缓存缩放计算 subgraph "KV缓存缩放计算 (calc_kv_scales)" CALC_KV["calc_kv_scales(query, key, value)"] CALC_KV --> Q_SCALE["计算Q缩放: self._q_scale = torch.abs(query).max() / self.q_range"] CALC_KV --> K_SCALE["计算K缩放: self._k_scale = torch.abs(key).max() / self.k_range"] CALC_KV --> V_SCALE["计算V缩放: self._v_scale = torch.abs(value).max() / self.v_range"] CALC_KV --> SET_FLOAT["设置浮点缩放值:
self._k_scale_float = self._k_scale.item()
self._v_scale_float = self._v_scale.item()"] CALC_KV --> DISABLE_CALC["禁用后续计算:
self.calculate_kv_scales = False"] end %% 自定义算子实现 subgraph "统一注意力算子实现" UNIFIED["unified_attention(query, key, value, layer_name)"] UNIFIED --> GET_CTX2["获取前向上下文"] GET_CTX2 --> GET_LAYER["获取层实例:
self = forward_context.no_compile_layers[layer_name]"] GET_LAYER --> GET_KV2["获取KV缓存"] GET_KV2 --> CALL_BACKEND["调用后端实现:
self.impl.forward(...)"] UNIFIED_OUT["unified_attention_with_output(
query, key, value, output, layer_name)"] UNIFIED_OUT --> SIMILAR["类似流程,但将结果写入预分配的output"] end %% 主要连接线 CALL_IMPL --> RESHAPE_OUT CALL_OP --> RESHAPE_OUT DIRECT_IMPL --> ATTN_FWD CALL_OP2 --> ATTN_FWD
flowchart TD subgraph "KVCacheManager
core/kv_cache_manager.py" INIT_KV["__init__()
初始化KVCache管理器"] BLOCK_POOL["BlockPool
block_pool.py"] REQ_BLOCKS["req_to_blocks
请求ID到KVCache块映射"] REQ_HASH["req_to_block_hashes
请求ID到块Hash映射"] INIT_KV --> BLOCK_POOL INIT_KV --> REQ_BLOCKS INIT_KV --> REQ_HASH end subgraph "BlockPool
core/block_pool.py" INIT_BP["__init__()
创建所有KVCacheBlock"] FREE_Q["free_block_queue
空闲块LRU链表"] CACHED_MAP["cached_block_hash_to_block
Hash值到块的映射"] ALLOC["allocate()
分配一个块"] FREE["free()
释放一个块"] INIT_BP --> FREE_Q INIT_BP --> CACHED_MAP ALLOC --> FREE_Q FREE --> FREE_Q end subgraph "调度函数
core/scheduler.py" GET_COMP["get_computed_blocks()
获取命中prefix cache的块"] ALLOCATE["allocate_slots()
分配KVCache块"] CACHE_FULL["cache_full_blocks()
缓存已满的块"] ALLOCATE --> |"调用"| CACHE_FULL end GET_COMP --> |"查询"| REQ_HASH GET_COMP --> |"返回命中的块"| ALLOCATE ALLOCATE --> |"申请块
block_pool.allocate()"| ALLOC ALLOCATE --> |"更新"| REQ_BLOCKS CACHE_FULL --> |"计算hash并添加
cached_block_hash_to_block[hash]=block"| CACHED_MAP
flowchart TD %% 初始化阶段 subgraph "初始化阶段" INIT["GPUModelRunner.__init__()"] --> GET_BACKEND["get_attn_backend()"] GET_BACKEND --> BUILDER["attn_metadata_builder = backend.get_builder_cls()()"] INIT --> CONFIG["配置参数设置"] CONFIG --> SLIDING["sliding_window/interleaved_sliding_window
window_size = sliding_window or interleaved_sliding_window"] CONFIG --> BLOCK_SIZE["block_size = cache_config.block_size"] CONFIG --> MM_CHECK["is_multimodal_model = model_config.is_multimodal_model"] CONFIG --> MROPE_CHECK["uses_mrope = model_config.uses_mrope"] CONFIG --> ALIBI_CHECK["use_alibi = check_use_alibi(model_config)"] end %% KV缓存初始化 subgraph "KV缓存初始化" INIT_KV["initialize_kv_cache(kv_cache_config)"] --> GET_SPEC["get_kv_cache_spec()"] GET_SPEC --> CREATE_SPECS["为每个Attention层创建KVCacheSpec"] CREATE_SPECS --> SW_SPEC["SlidingWindowSpec
如果attention.sliding_window不为None"] CREATE_SPECS --> FULL_SPEC["FullAttentionSpec
标准全注意力"] SW_SPEC --> CREATE_KV["kv_caches[layer] = torch.zeros()
使用attn_backend.get_kv_cache_shape()"] FULL_SPEC --> CREATE_KV CREATE_KV --> BIND_KV["bind_kv_cache(kv_caches, forward_context, self.kv_caches)"] end %% 请求处理流程 subgraph "请求处理阶段" EXEC_MODEL["execute_model(scheduler_output)"] --> UPDATE_STATES["_update_states(scheduler_output)"] UPDATE_STATES --> CHECK_MM["if self.is_multimodal_model"] CHECK_MM -->|"True"| MM_EXEC["_execute_mm_encoder(scheduler_output)
_gather_mm_embeddings(scheduler_output)"] UPDATE_STATES --> PREPARE["_prepare_inputs(scheduler_output)"] PREPARE --> REORDER["attn_metadata_builder.reorder_batch()"] PREPARE --> CALC_PREFIX["_compute_cascade_attn_prefix_len(
num_scheduled_tokens, num_common_prefix_blocks)"] CALC_PREFIX --> USE_CASCADE["attn_backend.use_cascade_attention()"] PREPARE --> BUILD_META["attn_metadata_builder.build(
num_reqs, num_actual_tokens, max_query_len, common_prefix_len)"] BUILD_META --> FLASH_META["FlashAttentionMetadata"] end %% 前向计算阶段 subgraph "前向计算阶段" PREPARE --> SET_CTX["set_forward_context(attn_metadata, vllm_config)"] SET_CTX --> MODEL_FWD["model(input_ids, positions, intermediate_tensors, inputs_embeds)"] CHECK_MM -->|"True"| PREPARE_MM["准备多模态输入"] PREPARE_MM --> GET_EMBEDS["inputs_embeds = model.get_input_embeddings(input_ids, mm_embeds)"] MROPE_CHECK -->|"True"| USE_MROPE["positions = mrope_positions[:, :num_tokens]
_calc_mrope_positions()"] MROPE_CHECK -->|"False"| USE_STD_POS["positions = self.positions[:num_tokens]"] end %% 输出处理阶段 subgraph "输出处理阶段" MODEL_FWD --> GET_HIDDEN["hidden_states = model.forward()"] GET_HIDDEN --> COMPUTE_LOG["logits = model.compute_logits(hidden_states, None)"] COMPUTE_LOG --> GRAMMAR["apply_grammar_bitmask()"] GRAMMAR --> SAMPLE["sampler_output = model.sample(logits, sampling_metadata)"] SAMPLE --> OUT_TOKENS["获取采样token IDs"] OUT_TOKENS --> SPEC_CHECK["检查是否使用推测解码"] SPEC_CHECK -->|"True"| DRAFT["draft_token_ids = generate_draft_token_ids()/drafter.propose()"] end %% 关键连接 FLASH_META --> SET_CTX MM_EXEC --> PREPARE_MM