文档首页/
AI开发平台ModelArts/
最佳实践/
LLM大语言模型训练/
主流开源大模型基于Lite Cluster适配AscendFactory NPU训练解决方案/
训练服务配置说明/
VeRL数据处理样例脚本
更新时间:2025-11-19 GMT+08:00
VeRL数据处理样例脚本
VeRL框架中的样例数据处理脚本分为大语言模型和多模态模型,样例脚本如下,根据模型类型选择:
大语言模型gsm8k数据处理
import argparse
import os
import re
import datasets
from verl.utils.hdfs_io import copy, makedirs
def extract_solution(solution_str):
solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str)
assert solution is not None
final_solution = solution.group(0)
final_solution = final_solution.split("#### ")[1].replace(",", "")
return final_solution
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="~/data/gsm8k")
parser.add_argument("--hdfs_dir", default=None)
args = parser.parse_args()
data_source = "openai/gsm8k"
dataset = datasets.load_dataset(xxx/xxx/xxx,"main") # xxx/xxx/xxx是数据集路径
train_dataset = dataset["train"]
test_dataset = dataset["test"]
instruction_following = 'Let\'s think step by step and output the final answer after "####".'
# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
question_raw = example.pop("question")
question = question_raw + " " + instruction_following
answer_raw = example.pop("answer")
solution = extract_solution(answer_raw)
data = {
"data_source": data_source,
"prompt": [
{
"role": "user",
"content": question,
}
],
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": solution},
"extra_info": {
"split": split,
"index": idx,
"answer": answer_raw,
"question": question_raw,
},
}
return data
return process_fn
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_dir, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
多模态模型geometry3k数据处理
import argparse
import os
import datasets
from verl.utils.hdfs_io import copy, makedirs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="~/data/geo3k")
parser.add_argument("--hdfs_dir", default=None)
args = parser.parse_args()
data_source = "hiyouga/geometry3k"
dataset = datasets.load_dataset(xxx/xxx/xxx) # xxx/xxx/xxx是数据集路径
train_dataset = dataset["train"]
test_dataset = dataset["test"]
instruction_following = (
r"You FIRST think about the reasoning process as an internal monologue and then provide the final answer. "
r"The reasoning process MUST BE enclosed within <think> </think> tags. The final answer MUST BE put in \boxed{}."
)
# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
problem = example.pop("problem")
prompt = problem + " " + instruction_following
answer = example.pop("answer")
images = example.pop("images")
data = {
"data_source": data_source,
"prompt": [
{
"role": "user",
"content": prompt,
}
],
"images": images,
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": answer},
"extra_info": {
"split": split,
"index": idx,
"answer": answer,
"question": problem,
},
}
return data
return process_fn
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True, num_proc=8)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True, num_proc=8)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_dir, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
多轮对话Agent训练gsm8k数据处理
import argparse
import os
import re
import datasets
from verl.utils.hdfs_io import copy, makedirs
def extract_solution(solution_str):
solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str)
assert solution is not None
final_solution = solution.group(0)
final_solution = final_solution.split("#### ")[1].replace(",", "")
return final_solution
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir", default="~/data/gsm8k", help="The save directory for the preprocessed dataset."
)
args = parser.parse_args()
local_dataset_path = args.local_dataset_path
data_source = "openai/gsm8k"
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path, "main")
else:
dataset = datasets.load_dataset(data_source, "main")
train_dataset = dataset["train"]
test_dataset = dataset["test"]
instruction_following = "Let's think step by step and output the final answer after `####`."
# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
question_raw = example.pop("question")
question = question_raw + " " + instruction_following
answer_raw = example.pop("answer")
solution = extract_solution(answer_raw)
data = {
"data_source": data_source,
"agent_name": "tool_agent",
"prompt": [
{
"role": "system",
"content": (
"You are a math expert. You are given a question and you need to solve it step by step. "
"Reasoning step by step before any tool call. "
"You should use the `calc_gsm8k_reward` tool after step by step solving the question, "
"before generate final answer at least once and refine your answer if necessary. "
"Put your final answer in the format of `#### <answer>`."
),
},
{
"role": "user",
"content": question,
},
],
"ability": "math",
"reward_model": {"style": "rule", "ground_truth": solution},
"extra_info": {
"split": split,
"index": idx,
"answer": answer_raw,
"question": question_raw,
"need_tools_kwargs": True,
"tools_kwargs": {
"calc_gsm8k_reward": {
"create_kwargs": {"ground_truth": solution},
# "execute_kwargs": {},
# "calc_reward_kwargs": {},
# "release_kwargs": {},
},
},
"interaction_kwargs": {
"query": question,
"ground_truth": solution,
},
},
}
return data
return process_fn
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)
hdfs_dir = args.hdfs_dir
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)
父主题: 训练服务配置说明