异步保存KVCache
功能介绍
将加速卡在推理计算中产生的多个KVCache数据块,批量异步保存在EMS缓存池,提交后立即返回表5 CcFuture句柄,用户后续通过该句柄可以获取执行结果。
接口约束
- 仅支持华为昇腾加速卡显存拷贝。
- 部分键值保存失败,接口不会返回失败,后续读取无法命中该键值。
方法定义
ContextCaching.async_save(option, key_list, value_list)
请求参数说明
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
option |
必选 |
参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 无 取值范围: 无 默认取值: 无 |
|
key_list |
List[string] |
必选 |
参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 |
value_list |
List[List[KvBufferWrapper]] |
必选 |
参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 取值范围: 无 默认取值: 无。 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
write_rcache |
bool |
可选 |
参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True |
read_local_only |
bool |
可选 |
参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无 取值范围: True:只读本地缓存 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据 默认取值: False |
timeout |
int |
可选 |
参数解释: 请求超时时间,单位为毫秒。 约束限制: 无 取值范围: 大于等于0。 默认取值: 5000 |
返回结果说明
类型 |
说明 |
---|---|
参数解释: 返回异步执行Future句柄。 取值范围: 无 |
参数名称 |
参数类型 |
描述 |
---|---|---|
success |
int |
参数解释: 请求的批量key读写连续成功的个数。 约束限制: 无 取值范围: 0~请求key的个数。 默认取值: 无 |
total |
int |
参数解释: 请求的批量key总个数。 约束限制: 无 取值范围: 请求列表中批量key的个数。 默认取值: 无 |
代码样例
本示例用于批量多次下发异步保存KVCache的请求,并获取每个请求的最终执行结果。
import os import torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["123", "66"] key_list2 = ["hello_world"] # 仅支持npu设备的tensor tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") tensor3 = torch.ones(1, 4, 2, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] len3 = tensor2.numel() * tensor3.element_size() val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]] future_list = [] # 多次异步下发异步save请求,增加save并发能力。 try: for idx in range(2): future_list.append(cc.async_save(option, key_list, val_list)) future_list.append(cc.async_save(option, key_list2, val_list2)) except EmsException as e: print(f"failed to save, {e}.") exit(2) try: for future in future_list: result = future.result() print(f"rsult:{result}") except EmsException as e: print(f"failed to get result, {e}.")