上一节咱们学习了LLaMA 2的补全和谈天两种API的运用办法。本节咱们来看看LLaMA 2的源码。

补全函数text_completion源码解析

上一节咱们讲了LLaMA 2的编程办法。咱们来温习一下:

    generator = Llama.build(
        ckpt_dir=ckpt_dir,
        tokenizer_path=tokenizer_path,
        max_seq_len=max_seq_len,
        max_batch_size=max_batch_size,
    )
    prompts = [
        "上下五千年,英豪万万千。黄沙百战穿金甲,不破楼兰终不还",
    ]
    results = generator.text_completion(
        prompts,
        max_gen_len=max_gen_len,
        temperature=temperature,
        top_p=top_p,
    )

咱们先来看看text_completion函数的参数是什么意思,该函数的原型为:

def text_completion(
self,
prompts: List[str],
temperature: float = 0.6,
top_p: float = 0.9,
max_gen_len: Optional[int] = None,
logprobs: bool = False,
echo: bool = False,
) -> List[CompletionPrediction]:

咱们来看下这些参数的含义:

  • prompts:这是一个字符串列表,每个字符串都是一个用于生成文本的提示。
  • temperature(默许值为0.6):这是一个操控生成文本随机性的参数。温度值越高,生成的文本就越随机;温度值越低,生成的文本就越倾向于最可能的输出。
  • top_p(默许值为0.9):这是一个操控生成文本多样性的参数,它设定了从最高概率的词开端,累计到总概率超过top_p的词中止,然后从这些词中随机挑选一个词作为生成的词。这种办法也被称为nucleus sampling或top-p sampling。
  • max_gen_len:可选参数,表明生成的文本的最大长度。假如未指定,那么将运用模型参数中的最大序列长度减1。
  • logprobs(默许值为False):假如为True,那么在回来的成果中会包含生成的每个词的对数概率。
  • echo(默许值为False):这是一个操控是否在生成的文本中包含输入提示的参数。

参数明白了之后咱们看text_completion完好完成:

    def text_completion(
        self,
        prompts: List[str],
        temperature: float = 0.6,
        top_p: float = 0.9,
        max_gen_len: Optional[int] = None,
        logprobs: bool = False,
        echo: bool = False,
    ) -> List[CompletionPrediction]:
        if max_gen_len is None:
            max_gen_len = self.model.params.max_seq_len - 1
        prompt_tokens = [self.tokenizer.encode(x, bos=True, eos=False) for x in prompts]
        generation_tokens, generation_logprobs = self.generate(
            prompt_tokens=prompt_tokens,
            max_gen_len=max_gen_len,
            temperature=temperature,
            top_p=top_p,
            logprobs=logprobs,
            echo=echo,
        )
        if logprobs:
            return [
                {
                    "generation": self.tokenizer.decode(t),
                    "tokens": [self.tokenizer.decode(x) for x in t],
                    "logprobs": logprobs_i,
                }
                for t, logprobs_i in zip(generation_tokens, generation_logprobs)
            ]
        return [{"generation": self.tokenizer.decode(t)} for t in generation_tokens]

总结起来就三步,这个text_completion其实便是generate的包装函数:

  • 编码:调用tokenizer.encode
  • 生成:调用generate
  • 解码:调用tokenizer.decode

分词

2023年的深度学习入门指南(19) - LLaMA 2源码解析

import os
from logging import getLogger
from typing import List
from sentencepiece import SentencePieceProcessor
logger = getLogger()
class Tokenizer:
    def __init__(self, model_path: str):
        # reload tokenizer
        assert os.path.isfile(model_path), model_path
        self.sp_model = SentencePieceProcessor(model_file=model_path)
        logger.info(f"Reloaded SentencePiece model from {model_path}")
        # BOS / EOS token IDs
        self.n_words: int = self.sp_model.vocab_size()
        self.bos_id: int = self.sp_model.bos_id()
        self.eos_id: int = self.sp_model.eos_id()
        self.pad_id: int = self.sp_model.pad_id()
        logger.info(
            f"#words: {self.n_words} - BOS ID: {self.bos_id} - EOS ID: {self.eos_id}"
        )
        assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()
    def encode(self, s: str, bos: bool, eos: bool) -> List[int]:
        assert type(s) is str
        t = self.sp_model.encode(s)
        if bos:
            t = [self.bos_id] + t
        if eos:
            t = t + [self.eos_id]
        return t
    def decode(self, t: List[int]) -> str:
        return self.sp_model.decode(t)

首先是用到了分词组件SentencePieceProcessor。SentencePieceProcessor是SentencePiece库中的一个组件,它完成了子词(subword)tokenize和detokenize的功用。

其首要作用包含:

  • 将文本tokenize成子词(subword)。SentencePiece 运用的数据驱动办法,能够学习文本的词汇表并将文本tokenize成子词单元。
  • 将子词detokenize合并成原始文本。能够将tokenize后的子词序列从头合并为原始文本。
  • 供给vocab办理。能够获得tokenize的子词词汇表等信息。
  • 支撑多种语言文本的tokenize和detokenize。
  • 供给高效的完成。底层运用C++完成,能够快速处理大规模文本。
  • 供给多种模型挑选,如BPE、unigram等。
  • 支撑自定义练习子词化模型。

好,下面咱们回到这段代码自身。这段代码完成了一个依据SentencePiece的Tokenizer类,能够进行文本的tokenize和detokenize。

首要逻辑:

  • 在初始化时加载SentencePiece模型文件model_path。
  • 获取模型的词汇表巨细n_words,以及特殊token的id(bos_id,eos_id,pad_id)。
  • encode办法能够将字符串文本s tokenize成id列表。能够挑选在开端加入bos_id,完毕加入eos_id。
  • decode办法能够将id列表解码还原为字符串文本。

这样就构建了一个封装SentencePiece tokenize/detokenize的Tokenizer类。能够加载自定义的SentencePiece模型,然后就能够方便地对文本进行子词化处理。

这种方式能够重复运用已练习好的SentencePiece模型,为下游NLP使命供给牢靠的tokenize和detokenize功用。

最终咱们再讲一讲几个特殊的符号bos_id、eos_id和pad_id:

  • bos_id: 开端符(Beginning of Sentence)的id。用于表明一个序列的开端。
  • eos_id: 完毕符(End of Sentence)的id。用于表明一个序列的完毕。
  • pad_id: 填充符(Padding)的id。当需要将多个序列长度对齐时,能够运用pad_id在较短序列后面填充。

谈天函数chat_completion

在进入generate函数之前,咱们再看看chat_completion是怎么完成的。

    def chat_completion(
        self,
        dialogs: List[Dialog],
        temperature: float = 0.6,
        top_p: float = 0.9,
        max_gen_len: Optional[int] = None,
        logprobs: bool = False,
    ) -> List[ChatPrediction]:
        if max_gen_len is None:
            max_gen_len = self.model.params.max_seq_len - 1
        prompt_tokens = []
        for dialog in dialogs:
            if dialog[0]["role"] != "system":
                dialog = [
                    {
                        "role": "system",
                        "content": DEFAULT_SYSTEM_PROMPT,
                    }
                ] + dialog
            dialog = [
                {
                    "role": dialog[1]["role"],
                    "content": B_SYS
                    + dialog[0]["content"]
                    + E_SYS
                    + dialog[1]["content"],
                }
            ] + dialog[2:]
            assert all([msg["role"] == "user" for msg in dialog[::2]]) and all(
                [msg["role"] == "assistant" for msg in dialog[1::2]]
            ), (
                "model only supports 'system', 'user' and 'assistant' roles, "
                "starting with 'system', then 'user' and alternating (u/a/u/a/u...)"
            )
            dialog_tokens: List[int] = sum(
                [
                    self.tokenizer.encode(
                        f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()} ",
                        bos=True,
                        eos=True,
                    )
                    for prompt, answer in zip(
                        dialog[::2],
                        dialog[1::2],
                    )
                ],
                [],
            )
            assert (
                dialog[-1]["role"] == "user"
            ), f"Last message must be from user, got {dialog[-1]['role']}"
            dialog_tokens += self.tokenizer.encode(
                f"{B_INST} {(dialog[-1]['content']).strip()} {E_INST}",
                bos=True,
                eos=False,
            )
            prompt_tokens.append(dialog_tokens)
        generation_tokens, generation_logprobs = self.generate(
            prompt_tokens=prompt_tokens,
            max_gen_len=max_gen_len,
            temperature=temperature,
            top_p=top_p,
            logprobs=logprobs,
        )
        if logprobs:
            return [
                {
                    "generation": {
                        "role": "assistant",
                        "content": self.tokenizer.decode(t),
                    },
                    "tokens": [self.tokenizer.decode(x) for x in t],
                    "logprobs": logprobs_i,
                }
                for t, logprobs_i in zip(generation_tokens, generation_logprobs)
            ]
        return [
            {"generation": {"role": "assistant", "content": self.tokenizer.decode(t)}}
            for t in generation_tokens
        ]

咱们先看一下参数:

  • dialogs:一个对话列表,其中每个对话都是一个字典列表,表明一段对话。
  • temperature:一个浮点数,表明生成文本时运用的温度。默许值为 0.6。
  • top_p:一个浮点数,表明生成文本时运用的 top-p 采样。默许值为 0.9。
  • max_gen_len:一个可选的整数,表明生成文本的最大长度。假如未指定,则运用模型参数中的最大序列长度减一。
  • logprobs:一个布尔值,表明是否回来生成文本的对数概率。默许值为 False。 函数回来一个 ChatPrediction 列表,其中每个元素都是一个字典,包含生成的回复和相关信息。

函数首先查看 max_gen_len 是否为 None,假如是,则将其设置为模型参数中的最大序列长度减一。然后,关于每个对话,函数执行以下操作:

  • 假如第一条音讯的人物不是 “system”,则在对话的最初添加一条默许的体系提示。
  • 将第一条和第二条音讯合并为一条音讯,并更新对话。
  • 查看对话中音讯的人物是否符合要求(即以 “system” 开端,然后交替出现 “user” 和 “assistant”)。
  • 关于每一组相邻的提示和答复(即每两条音讯),运用 tokenizer 对其进行编码,并将编码后的 token 连接起来。
  • 查看最终一条音讯是否来自用户。
  • 对最终一条音讯进行编码,并将编码后的 token 添加到 token 列表中。

接下来,函数调用 generate 办法生成回复,并依据 logprobs 参数的值回来相应的成果。假如 logprobs 为 True,则回来包含生成回复、token 和对数概率的字典列表;否则,回来仅包含生成回复的字典列表。这些生成回复都具有 “assistant” 人物,并运用 tokenizer 进行解码。

总体来说,仅仅增加了关于对话人物的业务逻辑处理,中心还是调用generate函数。

温度与top p采样

在进入解说generate函数之前,咱们先讲一个小知识点,便是温度temperature的作用。咱们看下面的代码:

            if temperature > 0:
                probs = torch.softmax(logits[:, -1] / temperature, dim=-1)
                next_token = sample_top_p(probs, top_p)
            else:
                next_token = torch.argmax(logits[:, -1], dim=-1)

temperature 是一个超参数,用于操控生成文本的多样性。当 temperature 较高时,概率散布更加平坦,因此采样出的符号更具多样性。当 temperature 较低时,概率散布更加尖锐,因此采样出的符号更倾向于概率最大的那个。当 temperature 等于 0 时,直接挑选概率最大的符号。

2023年的深度学习入门指南(19) - LLaMA 2源码解析

那么,sample_top_p是怎么完成的呢?我把解说写在代码注释里面了:

def sample_top_p(probs, p):
    # 这行代码将输入的概率 probs 按照降序排序。probs_sort 是排序后的概率,probs_idx 是对应的索引。
    probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True)
    # 这行代码核算 probs_sort 的累积和。累积和是从第一个元素开端,依次将序列中的每个元素与前面一切元素的和相加得到的。
    probs_sum = torch.cumsum(probs_sort, dim=-1)
    # 这行代码生成一个布尔掩码,用于指示哪些累积和减去当时概率的值大于 p。这用于确认哪些概率应该被设为0,以保证被抽样的概率和不超过 p。
    mask = probs_sum - probs_sort > p
    # 这行代码运用上述生成的掩码,将那些使累积和减去当时概率的值大于 p 的 probs_sort 中的元素设为0。
    probs_sort[mask] = 0.0
    # 这行代码将 probs_sort 中的每个元素除以它们的和,以便从头归一化概率散布。
    probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True))
    # 这行代码从归一化的 probs_sort 中抽取一个样本。torch.multinomial 是PyTorch中的多项式散布抽样函数,它依据每个元素的权重抽取样本。
    next_token = torch.multinomial(probs_sort, num_samples=1)
    # 这行代码运用 torch.gather 函数从 probs_idx 中搜集对应 next_token 的索引,这样就能得到原始概率 probs 中对应的索引。
    next_token = torch.gather(probs_idx, -1, next_token)
    return next_token

总的来说,sample_top_p保存了按概率高低排序的大致散布,但过滤了长尾部分的低概率噪声。然后从重归一化的散布中采样,既保证了质量,又增加了恰当的随机性。

generate函数

好,咱们终于开端探究最中心的生成函数上了:

    @torch.inference_mode()
    def generate(
        self,
        prompt_tokens: List[List[int]],
        max_gen_len: int,
        temperature: float = 0.6,
        top_p: float = 0.9,
        logprobs: bool = False,
        echo: bool = False,
    ) -> Tuple[List[List[int]], Optional[List[List[float]]]]:

首先是这个函数的参数,其实咱们现已比较了解了。包含输入的提示 tokens(prompt_tokens),最大生成长度(max_gen_len),温度参数(temperature,影响生成文本的随机性), top_p(用于决定采样过程中保存的 token 集合的概率阈值,也被称为 “nucleus sampling”),是否回来每个 token 的对数概率(logprobs),以及是否将输入的提示回来(echo)。

        params = self.model.params
        bsz = len(prompt_tokens)
        assert bsz <= params.max_batch_size, (bsz, params.max_batch_size)
        min_prompt_len = min(len(t) for t in prompt_tokens)
        max_prompt_len = max(len(t) for t in prompt_tokens)
        assert max_prompt_len <= params.max_seq_len
        total_len = min(params.max_seq_len, max_gen_len + max_prompt_len)
        pad_id = self.tokenizer.pad_id
        tokens = torch.full((bsz, total_len), pad_id, dtype=torch.long, device="cuda")
        for k, t in enumerate(prompt_tokens):
            tokens[k, : len(t)] = torch.tensor(t, dtype=torch.long, device="cuda")
        if logprobs:
            token_logprobs = torch.zeros_like(tokens, dtype=torch.float)
        prev_pos = 0
        eos_reached = torch.tensor([False] * bsz, device="cuda")
        input_text_mask = tokens != pad_id

接着,依据供给的 prompt_tokens 初始化一个 tokens 张量,长度为 total_len,并填充模型的 pad_id。然后,将 prompt_tokens 的内容复制到 tokens 张量的对应位置。

        for cur_pos in range(min_prompt_len, total_len):
            logits = self.model.forward(tokens[:, prev_pos:cur_pos], prev_pos)
            if logprobs:
                token_logprobs[:, prev_pos + 1 : cur_pos + 1] = -F.cross_entropy(
                    input=logits.transpose(1, 2),
                    target=tokens[:, prev_pos + 1 : cur_pos + 1],
                    reduction="none",
                    ignore_index=pad_id,
                )
            if temperature > 0:
                probs = torch.softmax(logits[:, -1] / temperature, dim=-1)
                next_token = sample_top_p(probs, top_p)
            else:
                next_token = torch.argmax(logits[:, -1], dim=-1)
            next_token = next_token.reshape(-1)
            # only replace token if prompt has already been generated
            next_token = torch.where(
                input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token
            )
            tokens[:, cur_pos] = next_token
            eos_reached |= (~input_text_mask[:, cur_pos]) & (
                next_token == self.tokenizer.eos_id
            )
            prev_pos = cur_pos
            if all(eos_reached):
                break

然后,关于 tokens 张量中的每一个位置,核算下一个 token 的 logits,并依据这些 logits 生成下一个 token。假如 logprobs 参数为真,则核算每个 token 的对数概率。假如温度大于 0,则运用 softmax 函数和温度参数对 logits 进行缩放,然后运用 top-p 采样生成下一个 token。否则,直接挑选 logits 最大的 token。新生成的 token 会替换 tokens 张量中的对应位置。

假如生成的 token 是完毕符号(eos_id),则更新 eos_reached 符号。假如一切的序列都现已生成了完毕符号,则中止生成。

        if logprobs:
            token_logprobs = token_logprobs.tolist()
        out_tokens, out_logprobs = [], []
        for i, toks in enumerate(tokens.tolist()):
            # cut to max gen len
            start = 0 if echo else len(prompt_tokens[i])
            toks = toks[start : len(prompt_tokens[i]) + max_gen_len]
            probs = None
            if logprobs:
                probs = token_logprobs[i][start : len(prompt_tokens[i]) + max_gen_len]
            # cut to eos tok if any
            if self.tokenizer.eos_id in toks:
                eos_idx = toks.index(self.tokenizer.eos_id)
                toks = toks[:eos_idx]
                probs = probs[:eos_idx] if logprobs else None
            out_tokens.append(toks)
            out_logprobs.append(probs)
        return (out_tokens, out_logprobs if logprobs else None)

最终,假如 logprobs 参数为真,则将 token_logprobs 转换为列表。然后,关于 tokens 张量中的每一行(即每一个生成的序列),假如 echo 参数为假,则去掉提示部分。然后,假如存在完毕符号,则去掉完毕符号之后的部分。最终,回来生成的 tokens 和对数概率(假如 logprobs 参数为真)。

这个函数回来的是一个元组,第一个元素是一个列表,包含每一个生成的 token 序列。第二个元素是一个列表,包含每一个生成的对数概率序列(假如 logprobs 参数为真)。

build结构函数

最终咱们再说下结构Llama的部分:

    @staticmethod
    def build(
        ckpt_dir: str,
        tokenizer_path: str,
        max_seq_len: int,
        max_batch_size: int,
        model_parallel_size: Optional[int] = None,
    ) -> "Llama":
        if not torch.distributed.is_initialized():
            torch.distributed.init_process_group("nccl")
        if not model_parallel_is_initialized():
            if model_parallel_size is None:
                model_parallel_size = int(os.environ.get("WORLD_SIZE", 1))
            initialize_model_parallel(model_parallel_size)
        local_rank = int(os.environ.get("LOCAL_RANK", 0))
        torch.cuda.set_device(local_rank)
        # seed must be the same in all processes
        torch.manual_seed(1)
        if local_rank > 0:
            sys.stdout = open(os.devnull, "w")
        start_time = time.time()
        checkpoints = sorted(Path(ckpt_dir).glob("*.pth"))
        assert len(checkpoints) > 0, f"no checkpoint files found in {ckpt_dir}"
        assert model_parallel_size == len(
            checkpoints
        ), f"Loading a checkpoint for MP={len(checkpoints)} but world size is {model_parallel_size}"
        ckpt_path = checkpoints[get_model_parallel_rank()]
        checkpoint = torch.load(ckpt_path, map_location="cpu")
        with open(Path(ckpt_dir) / "params.json", "r") as f:
            params = json.loads(f.read())
        model_args: ModelArgs = ModelArgs(
            max_seq_len=max_seq_len,
            max_batch_size=max_batch_size,
            **params,
        )
        tokenizer = Tokenizer(model_path=tokenizer_path)
        model_args.vocab_size = tokenizer.n_words
        torch.set_default_tensor_type(torch.cuda.HalfTensor)
        model = Transformer(model_args)
        model.load_state_dict(checkpoint, strict=False)
        print(f"Loaded in {time.time() - start_time:.2f} seconds")
        return Llama(model, tokenizer)

尽管这么一大段,但其实都是一些初始化的工作。

  • 散布式设置:首先,这段代码查看是否现已初始化了 PyTorch 的散布式环境,假如没有则进行初始化。然后,查看是否现已初始化了模型并行环境,假如没有,则获取环境变量 WORLD_SIZE 的值作为模型并行的巨细,并进行初始化。

  • 设备设置:获取环境变量 LOCAL_RANK 的值作为本地排名,并设置当时设备为该排名对应的 GPU。

  • 随机种子设置:为了保证一切进程生成的随机数相同,设置随机种子为 1。

  • 规范输出设置:假如本地排名大于 0,则将规范输出重定向到空设备,即不显现任何输出。

  • 加载模型查看点:找到查看点目录中的一切查看点文件,并按照文件名排序。然后,依据模型并行的排名挑选一个查看点文件,并加载该查看点。然后,加载模型参数。

  • 构建模型和分词器:运用加载的模型参数和供给的 max_seq_len 和 max_batch_size 构建模型参数目标。然后,加载分词器,并设置模型参数的词汇表巨细为分词器的词汇表巨细。然后,设置默许的张量类型为半精度浮点型(以节约内存和核算资源)。然后,构建 Transformer 模型,并加载模型查看点。

  • 最终,构建一个 Llama 目标,包含加载的模型和分词器,并回来该目标。

小结

本节咱们学习了LLaMA 2的源码,包含补全函数text_completion和谈天函数chat_completion的完成,以及它们的真正完成generate函数的原理。咱们还学习了温度temperature和top p采样的原理。

关于没有搞到深度学习生成的同学,可能有一点难度。

LLaMA的代码还差一部分便是模型的部分,咱们放到下一节来讲。要不然知识点太多大家简单大脑缺氧:)