Skip to content
Self-Knowing

LLM Engine

约 501 个字 115 行代码 预计阅读时间 3 分钟

LLMEngine 是 NanoVLLM 的用户侧入口:负责构造配置、初始化 tokenizer 、scheduler 和 model_runner,把外部请求转换成 Sequence,并通过 Scheduler 推进 prefill / decode。

Tensor parallel config

self.ps = []
self.events = []
ctx = mp.get_context("spawn")
for i in range(1, config.tensor_parallel_size): # 从 1 开始
    event = ctx.Event()
    process = ctx.Process(target=ModelRunner, args=(config, i, event))
    process.start()
    self.ps.append(process)
    self.events.append(event)
  • self.ps :TP worker 的进程列表。rank 0 是主进程,rank1/2 各是一个 mp.Process

  • self.events: 同步时间列表,每个 worker 对应一个 event,用于 shared memory 的读写同步。

NanoVLLM 里面只实现了 tensor parallel,所以选择了比较简单的实现方法:shared_memory + pickle + Event , 只有 model_runner 进程,没有独立的 scheduler。

vLLM 需要 pipeline parallel、disaggedated Prefill 等更复杂的并行策略,所以在架构上会更复杂

generate

  1. 入队:prompt tokenize 后包装成 Sequence,然后进入 scheduler.waiting
  2. 循环 step:每轮 schedule 选 seq -> model_runner 跑 -> postprocess 更新状态,直到 waiting 和 running 为空。
  3. 按序输出:按照 seq_id 排序输出,只包含生成的 token 部分,decode 回文本。
def step(self):
    seqs, is_prefill = self.scheduler.schedule()
    num_tokens = sum(seq.num_scheduled_tokens for seq in seqs) if is_prefill else -len(seqs)
    token_ids = self.model_runner.call("run", seqs, is_prefill)
    self.scheduler.postprocess(seqs, token_ids, is_prefill)
    outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
    return outputs, num_tokens

Continues Batching

先把 NanoVLLM 拆开看,完整的 Continuous Batching 应该包含三个能力:

1. 动态加请求

vLLMadd_request() 随时可调,有独立的 API server,请求异步进来,引擎后台循环,队列持续灌入。

NanoVLLM

# generate() 里面
for prompt, sp in zip(prompts, sampling_params):
    self.add_request(prompt, sp)    # ← 全部在 while 前入队

while not self.is_finished():
    self.step()                     # ← 这里不再 add_request

没有 中途加请求的能力。所有 seq 都是 while 循环开始前一次性灌入,之后 step() 循环里再没有任何新血。外层的 add_request() 是公开的,但没有线程安全、没有事件通知,它不是设计给多线程调用的。

2. 动态拼 batch + 动态出队

这是 NanoVLLM 真正做到了的部分。每轮 step() 都从零拼 batch:

schedule(): 
  waiting 有东西 → 从 waiting 取 seq → prefill 轮
  waiting 空了   → 从 running 取 seq → decode 轮
  seq 做完标记 FINISHED → postprocess 里从 running 移除

batch 成分每轮都可能不同,seq 完成即退出,不等别人。这确实是 Continuous Batching 的核心行为。

3. Prefill 和 Decode 混合在同一 batch

vLLM:可以。用 token budget 在 prefill 和 decode 之间分配,同一轮 forward 里既有 prefill 也有 decode。

NanoVLLM不可以。代码里 prefill 的 while 结束后有东西就直接 return,不会落到 decode 分支:

if scheduled_seqs:
    return scheduled_seqs, True    # prefill,直接走人
# 下面的 decode 循环完全不会执行

Source Code

import atexit
from dataclasses import fields
from time import perf_counter
from tqdm.auto import tqdm
from transformers import AutoTokenizer
import torch.multiprocessing as mp

from nanovllm.config import Config
from nanovllm.sampling_params import SamplingParams
from nanovllm.engine.sequence import Sequence
from nanovllm.engine.scheduler import Scheduler
from nanovllm.engine.model_runner import ModelRunner


class LLMEngine:

    def __init__(self, model, **kwargs):
        config_fields = {field.name for field in fields(Config)}
        config_kwargs = {k: v for k, v in kwargs.items() if k in config_fields}
        config = Config(model, **config_kwargs)
        Sequence.block_size = config.kvcache_block_size
        self.ps = []
        self.events = []
        ctx = mp.get_context("spawn")
        for i in range(1, config.tensor_parallel_size):
            event = ctx.Event()
            process = ctx.Process(target=ModelRunner, args=(config, i, event))
            process.start()
            self.ps.append(process)
            self.events.append(event)
        self.model_runner = ModelRunner(config, 0, self.events)
        self.tokenizer = AutoTokenizer.from_pretrained(config.model, use_fast=True)
        config.eos = self.tokenizer.eos_token_id
        self.scheduler = Scheduler(config)
        atexit.register(self.exit)

    def exit(self):
        self.model_runner.call("exit")
        del self.model_runner
        for p in self.ps:
            p.join()

    def add_request(self, prompt: str | list[int], sampling_params: SamplingParams):
        if isinstance(prompt, str):
            prompt = self.tokenizer.encode(prompt)
        seq = Sequence(prompt, sampling_params)
        self.scheduler.add(seq)

    def step(self):
        seqs, is_prefill = self.scheduler.schedule()
        num_tokens = sum(seq.num_scheduled_tokens for seq in seqs) if is_prefill else -len(seqs)
        token_ids = self.model_runner.call("run", seqs, is_prefill)
        self.scheduler.postprocess(seqs, token_ids, is_prefill)
        outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
        return outputs, num_tokens

    def is_finished(self):
        return self.scheduler.is_finished()

    def generate(
        self,
        prompts: list[str] | list[list[int]],
        sampling_params: SamplingParams | list[SamplingParams],
        use_tqdm: bool = True,
    ) -> list[str]:
        pbar = tqdm(total=len(prompts), desc="Generating", dynamic_ncols=True, disable=not use_tqdm)
        if not isinstance(sampling_params, list):
            sampling_params = [sampling_params] * len(prompts)
        for prompt, sp in zip(prompts, sampling_params):
            self.add_request(prompt, sp)
        outputs = {}
        prefill_throughput = decode_throughput = 0.
        while not self.is_finished():
            t = perf_counter()
            output, num_tokens = self.step()
            if num_tokens > 0:
                prefill_throughput = num_tokens / (perf_counter() - t)
            else:
                decode_throughput = -num_tokens / (perf_counter() - t)
            pbar.set_postfix({
                "Prefill": f"{int(prefill_throughput)}tok/s",
                "Decode": f"{int(decode_throughput)}tok/s",
            })
            for seq_id, token_ids in output:
                outputs[seq_id] = token_ids
                pbar.update(1)
        pbar.close()
        outputs = [outputs[seq_id] for seq_id in sorted(outputs.keys())]
        outputs = [{"text": self.tokenizer.decode(token_ids), "token_ids": token_ids} for token_ids in outputs]
        return outputs

Created: May 5, 2026
Last update: May 12, 2026

Discussion