/images/avatar.png

vllbc

agent概览

根据Anthropic的定义,agent定义如下:

At Anthropic, we categorize all these variations as agentic systems, but draw an important architectural distinction between workflows and agents: Workflows are systems where LLMs and tools are orchestrated through predefined code paths. Agents, on the other hand, are systems where LLMs dynamically direct their own processes and tool usage, maintaining control over how they accomplish tasks.

verl总体概览

Hybrid Engine

在 RLHF 流程中,actor model 的 generation 和 rollout 占据了绝大多数运行时间(在 veRL 是 58.9%)。并且,由于 PPO 是 online 算法,经验(experiences)必须来自于被 train 的模型本身,因此,rollout 和 training 是必须串行的。如果这两者使用不同的资源组,比如 rollout 用 2 张卡,而 training 用 4 张卡,rollout 的时候 training 的资源闲置,training 的时候 rollout 的资源闲置,无论如何都会浪费大量的计算资源。由此,veRL 将 training 和 rollout engine 放置在同一个资源组中串行执行。training 时,将 rollout engine 的显存回收(offload 到 CPU 上 或者直接析构掉),rollout 时,再将 training engine 的显存释放掉。这种将 actor model 的不同 engine 放置在同一个资源组上的方案,就称为 hybrid engine。

reward_mananger

最原生的reward_mananger:

class NaiveRewardManager:
    """The reward manager."""

    def __init__(self, tokenizer, num_examine, compute_score=None, reward_fn_key="data_source") -> None:
        self.tokenizer = tokenizer
        self.num_examine = num_examine  # the number of batches of decoded responses to print to the console
        self.compute_score = compute_score or default_compute_score
        self.reward_fn_key = reward_fn_key

    def __call__(self, data: DataProto, return_dict=False):
        """We will expand this function gradually based on the available datasets"""

        # If there is rm score, we directly return rm score. Otherwise, we compute via rm_score_fn
        if "rm_scores" in data.batch.keys():
            if return_dict:
                return {"reward_tensor": data.batch["rm_scores"]}
            else:
                return data.batch["rm_scores"]

        reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32)
        reward_extra_info = defaultdict(list)

        already_print_data_sources = {}

        for i in range(len(data)):
            data_item = data[i]  # DataProtoItem

            prompt_ids = data_item.batch["prompts"]

            prompt_length = prompt_ids.shape[-1]

            valid_prompt_length = data_item.batch["attention_mask"][:prompt_length].sum()
            valid_prompt_ids = prompt_ids[-valid_prompt_length:]

            response_ids = data_item.batch["responses"]
            valid_response_length = data_item.batch["attention_mask"][prompt_length:].sum()
            valid_response_ids = response_ids[:valid_response_length]

            # decode
            prompt_str = self.tokenizer.decode(valid_prompt_ids, skip_special_tokens=True)
            response_str = self.tokenizer.decode(valid_response_ids, skip_special_tokens=True)

            ground_truth = data_item.non_tensor_batch["reward_model"]["ground_truth"]

            data_source = data_item.non_tensor_batch[self.reward_fn_key]

            extra_info = data_item.non_tensor_batch.get("extra_info", None)

            score = self.compute_score(
                data_source=data_source,
                solution_str=response_str,
                ground_truth=ground_truth,
                extra_info=extra_info,
            )

            if isinstance(score, dict):
                reward = score["score"]
                # Store the information including original reward
                for key, value in score.items():
                    reward_extra_info[key].append(value)
            else:
                reward = score

            reward_tensor[i, valid_response_length - 1] = reward

            if data_source not in already_print_data_sources:
                already_print_data_sources[data_source] = 0

            if already_print_data_sources[data_source] < self.num_examine:
                already_print_data_sources[data_source] += 1
                print("[prompt]", prompt_str)
                print("[response]", response_str)
                print("[ground_truth]", ground_truth)
                if isinstance(score, dict):
                    for key, value in score.items():
                        print(f"[{key}]", value)
                else:
                    print("[score]", score)

        if return_dict:
            return {
                "reward_tensor": reward_tensor,
                "reward_extra_info": reward_extra_info,
            }
        else:
            return reward_tensor

逻辑很简单,就是通过compute_score函数来计算score。

MCTS和PRM

核心总结

  • PRM和MCTS实际上是两种可以独立使用的技术,只不过,往往它们组合使用时往往能产生1+1>2的效果。例如,
    • 单独使用PRM:我们可以让模型对同一个prompt采样多个不同solution,无需MCTS,只需利用模型的temperature等随机参数让每次生成结果不同,然后用PRM对每个solution的每一步打分,最终选择分数最高的路径返回。
    • 单独使用MCTS:使用MCTS生成多个解题路径时,不一定要用PRM来决定哪个节点值得扩展,可以用外部大模型(如GPT-4)来选择,也可以用模型自身的perplexity来判断。本质上,我们需要的是找到最值得扩展的节点,PRM只是挑选的众多方法之一。
  • PRM 和 MCTS 既可以应用于优化训练数据,也可以用来预测用
    • 用于得到高质量训练数据:如rStar论文中,可以用PRM和MCTS的方式来迭代地筛选得到质量更好的思维链SFT数据或者RLHF数据,还可以生成更精确的reward model训练数据。
    • 用于推理:很简单,推理用MCTS的方式把 test-scaling 做上来,再结合PRM的方式从众多路径中挑选最佳答案。
  • PRM和MCTS的缺点
    这方面 DeepSeek-R1和 kimi1.5的论文已经说得很情况了。
  • Process Reward Model(PRM) 在实际应用中有三大局限:
    • 第一,难以清晰界定一般推理中的细粒度步骤,说白了,怎么定义什么为一个步骤。
    • 第二,判断当前步骤的正误难度大,模型自动化标注不如人意,人工标注又难以拓展。
    • 第三,引入基于模型的PRM易致reward hacking,有时为了训练 policy model,但反而更多时间去优化 reward model 去了。
  • 对MCTS的看法:
    • 文本的生成搜索空间指数级增长,为应对,给节点设扩展上限,却容易让模型陷入局部最优解困境。
    • MCTS往往要结合一个精确的PRM来用才能发挥最大效果,但PRM又有上述的问题,陷入一个死循环。

参考

https://zhuanlan.zhihu.com/p/27278317894 rStar-Math: Small LLMs Can Master Math Reasoning with Self-Evolved Deep Thinking

LEGB

a = 'global'

def outer():

    # def len(in_var):
    #     print('called my len() function: ', end="")
    #     l = 0
    #     for i in in_var:
    #         l += 1
    #     return l

    a = 'local'

    def inner():
        nonlocal a
        a += ' variable'
    inner()
    print('a is', a)
    # print(len(a))


outer()

# print(len(a))
print('a is', a)

此时为nonlocal a,会按照local-闭包-global的顺序找到闭包变量a。a的值为local variable

debugger

python调试工具,类似于vscode的调试工具,使用命令行进行调试。

使用方法

插入式

import pdb; pdb.set_trace()

或者

breakpoint()

非插入式

python -m pdb [-c command] (-m module | pyfile) [args ...]

常用命令

h

即help,可用命令如下 image.png

generate

理论部分在这:generate相关 ## generate参数

    def generate(
        self,
        inputs: Optional[torch.Tensor] = None,
        generation_config: Optional[GenerationConfig] = None,
        logits_processor: Optional[LogitsProcessorList] = None,
        stopping_criteria: Optional[StoppingCriteriaList] = None,
        prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None,
        synced_gpus: Optional[bool] = None,
        assistant_model: Optional["PreTrainedModel"] = None,
        streamer: Optional["BaseStreamer"] = None,
        negative_prompt_ids: Optional[torch.Tensor] = None,
        negative_prompt_attention_mask: Optional[torch.Tensor] = None,
        **kwargs,
    ) -> Union[GenerateOutput, torch.LongTensor]:

在代码中可以看到在函数入口显式的定义了很多参数。他们的具体含义如下

einsum

einops.einsum calls einsum operations with einops-style named axes indexing, computing tensor products with an arbitrary number of tensors. Unlike typical einsum syntax, here you must pass tensors first, and then the pattern.

Also, note that rearrange operations such as "(batch chan) out", or singleton axes (), are not currently supported.

爱因斯坦求和

image.png image.png