reward_mananger
最原生的reward_mananger:
class NaiveRewardManager:
"""The reward manager."""
def __init__(self, tokenizer, num_examine, compute_score=None, reward_fn_key="data_source") -> None:
self.tokenizer = tokenizer
self.num_examine = num_examine # the number of batches of decoded responses to print to the console
self.compute_score = compute_score or default_compute_score
self.reward_fn_key = reward_fn_key
def __call__(self, data: DataProto, return_dict=False):
"""We will expand this function gradually based on the available datasets"""
# If there is rm score, we directly return rm score. Otherwise, we compute via rm_score_fn
if "rm_scores" in data.batch.keys():
if return_dict:
return {"reward_tensor": data.batch["rm_scores"]}
else:
return data.batch["rm_scores"]
= torch.zeros_like(data.batch["responses"], dtype=torch.float32)
reward_tensor = defaultdict(list)
reward_extra_info
= {}
already_print_data_sources
for i in range(len(data)):
= data[i] # DataProtoItem
data_item
= data_item.batch["prompts"]
prompt_ids
= prompt_ids.shape[-1]
prompt_length
= data_item.batch["attention_mask"][:prompt_length].sum()
valid_prompt_length = prompt_ids[-valid_prompt_length:]
valid_prompt_ids
= data_item.batch["responses"]
response_ids = data_item.batch["attention_mask"][prompt_length:].sum()
valid_response_length = response_ids[:valid_response_length]
valid_response_ids
# decode
= self.tokenizer.decode(valid_prompt_ids, skip_special_tokens=True)
prompt_str = self.tokenizer.decode(valid_response_ids, skip_special_tokens=True)
response_str
= data_item.non_tensor_batch["reward_model"]["ground_truth"]
ground_truth
= data_item.non_tensor_batch[self.reward_fn_key]
data_source
= data_item.non_tensor_batch.get("extra_info", None)
extra_info
= self.compute_score(
score =data_source,
data_source=response_str,
solution_str=ground_truth,
ground_truth=extra_info,
extra_info
)
if isinstance(score, dict):
= score["score"]
reward # Store the information including original reward
for key, value in score.items():
reward_extra_info[key].append(value)else:
= score
reward
- 1] = reward
reward_tensor[i, valid_response_length
if data_source not in already_print_data_sources:
= 0
already_print_data_sources[data_source]
if already_print_data_sources[data_source] < self.num_examine:
+= 1
already_print_data_sources[data_source] print("[prompt]", prompt_str)
print("[response]", response_str)
print("[ground_truth]", ground_truth)
if isinstance(score, dict):
for key, value in score.items():
print(f"[{key}]", value)
else:
print("[score]", score)
if return_dict:
return {
"reward_tensor": reward_tensor,
"reward_extra_info": reward_extra_info,
}else:
return reward_tensor
逻辑很简单,就是通过compute_score函数来计算score。