LLM Engine¶
约 501 个字 115 行代码 预计阅读时间 3 分钟
LLMEngine 是 NanoVLLM 的用户侧入口:负责构造配置、初始化 tokenizer 、scheduler 和 model_runner,把外部请求转换成 Sequence,并通过 Scheduler 推进 prefill / decode。
Tensor parallel config¶
self.ps = []
self.events = []
ctx = mp.get_context("spawn")
for i in range(1, config.tensor_parallel_size): # 从 1 开始
event = ctx.Event()
process = ctx.Process(target=ModelRunner, args=(config, i, event))
process.start()
self.ps.append(process)
self.events.append(event)
-
self.ps:TP worker 的进程列表。rank 0 是主进程,rank1/2 各是一个mp.Process -
self.events: 同步时间列表,每个 worker 对应一个 event,用于 shared memory 的读写同步。
NanoVLLM 里面只实现了 tensor parallel,所以选择了比较简单的实现方法:shared_memory + pickle + Event , 只有 model_runner 进程,没有独立的 scheduler。
vLLM 需要 pipeline parallel、disaggedated Prefill 等更复杂的并行策略,所以在架构上会更复杂
generate¶
- 入队:prompt tokenize 后包装成 Sequence,然后进入
scheduler.waiting - 循环 step:每轮 schedule 选 seq
->model_runner 跑->postprocess 更新状态,直到 waiting 和 running 为空。 - 按序输出:按照
seq_id排序输出,只包含生成的 token 部分,decode 回文本。
def step(self):
seqs, is_prefill = self.scheduler.schedule()
num_tokens = sum(seq.num_scheduled_tokens for seq in seqs) if is_prefill else -len(seqs)
token_ids = self.model_runner.call("run", seqs, is_prefill)
self.scheduler.postprocess(seqs, token_ids, is_prefill)
outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
return outputs, num_tokens
Continues Batching¶
先把 NanoVLLM 拆开看,完整的 Continuous Batching 应该包含三个能力:
1. 动态加请求¶
vLLM:add_request() 随时可调,有独立的 API server,请求异步进来,引擎后台循环,队列持续灌入。
NanoVLLM:
# generate() 里面
for prompt, sp in zip(prompts, sampling_params):
self.add_request(prompt, sp) # ← 全部在 while 前入队
while not self.is_finished():
self.step() # ← 这里不再 add_request
没有 中途加请求的能力。所有 seq 都是 while 循环开始前一次性灌入,之后 step() 循环里再没有任何新血。外层的 add_request() 是公开的,但没有线程安全、没有事件通知,它不是设计给多线程调用的。
2. 动态拼 batch + 动态出队¶
这是 NanoVLLM 真正做到了的部分。每轮 step() 都从零拼 batch:
schedule():
waiting 有东西 → 从 waiting 取 seq → prefill 轮
waiting 空了 → 从 running 取 seq → decode 轮
seq 做完标记 FINISHED → postprocess 里从 running 移除
batch 成分每轮都可能不同,seq 完成即退出,不等别人。这确实是 Continuous Batching 的核心行为。
3. Prefill 和 Decode 混合在同一 batch¶
vLLM:可以。用 token budget 在 prefill 和 decode 之间分配,同一轮 forward 里既有 prefill 也有 decode。
NanoVLLM:不可以。代码里 prefill 的 while 结束后有东西就直接 return,不会落到 decode 分支:
Source Code¶
import atexit
from dataclasses import fields
from time import perf_counter
from tqdm.auto import tqdm
from transformers import AutoTokenizer
import torch.multiprocessing as mp
from nanovllm.config import Config
from nanovllm.sampling_params import SamplingParams
from nanovllm.engine.sequence import Sequence
from nanovllm.engine.scheduler import Scheduler
from nanovllm.engine.model_runner import ModelRunner
class LLMEngine:
def __init__(self, model, **kwargs):
config_fields = {field.name for field in fields(Config)}
config_kwargs = {k: v for k, v in kwargs.items() if k in config_fields}
config = Config(model, **config_kwargs)
Sequence.block_size = config.kvcache_block_size
self.ps = []
self.events = []
ctx = mp.get_context("spawn")
for i in range(1, config.tensor_parallel_size):
event = ctx.Event()
process = ctx.Process(target=ModelRunner, args=(config, i, event))
process.start()
self.ps.append(process)
self.events.append(event)
self.model_runner = ModelRunner(config, 0, self.events)
self.tokenizer = AutoTokenizer.from_pretrained(config.model, use_fast=True)
config.eos = self.tokenizer.eos_token_id
self.scheduler = Scheduler(config)
atexit.register(self.exit)
def exit(self):
self.model_runner.call("exit")
del self.model_runner
for p in self.ps:
p.join()
def add_request(self, prompt: str | list[int], sampling_params: SamplingParams):
if isinstance(prompt, str):
prompt = self.tokenizer.encode(prompt)
seq = Sequence(prompt, sampling_params)
self.scheduler.add(seq)
def step(self):
seqs, is_prefill = self.scheduler.schedule()
num_tokens = sum(seq.num_scheduled_tokens for seq in seqs) if is_prefill else -len(seqs)
token_ids = self.model_runner.call("run", seqs, is_prefill)
self.scheduler.postprocess(seqs, token_ids, is_prefill)
outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
return outputs, num_tokens
def is_finished(self):
return self.scheduler.is_finished()
def generate(
self,
prompts: list[str] | list[list[int]],
sampling_params: SamplingParams | list[SamplingParams],
use_tqdm: bool = True,
) -> list[str]:
pbar = tqdm(total=len(prompts), desc="Generating", dynamic_ncols=True, disable=not use_tqdm)
if not isinstance(sampling_params, list):
sampling_params = [sampling_params] * len(prompts)
for prompt, sp in zip(prompts, sampling_params):
self.add_request(prompt, sp)
outputs = {}
prefill_throughput = decode_throughput = 0.
while not self.is_finished():
t = perf_counter()
output, num_tokens = self.step()
if num_tokens > 0:
prefill_throughput = num_tokens / (perf_counter() - t)
else:
decode_throughput = -num_tokens / (perf_counter() - t)
pbar.set_postfix({
"Prefill": f"{int(prefill_throughput)}tok/s",
"Decode": f"{int(decode_throughput)}tok/s",
})
for seq_id, token_ids in output:
outputs[seq_id] = token_ids
pbar.update(1)
pbar.close()
outputs = [outputs[seq_id] for seq_id in sorted(outputs.keys())]
outputs = [{"text": self.tokenizer.decode(token_ids), "token_ids": token_ids} for token_ids in outputs]
return outputs
Last update: May 12, 2026
Discussion