异步加载KVCache
功能介绍
将推理请求需要的多个KVCache数据块从EMS缓存池读到加速卡显存中,提交后立即返回Future句柄,用户后续可以获取执行结果。
接口约束
- 仅支持华为昇腾加速卡显存拷贝。
- 只返回键列表中连续命中的个数。
方法定义
ContextCaching.load(option, key_list, value_list)
请求参数说明
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
option |
必选 |
参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 |
|
key_list |
List[str] |
必选 |
参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 |
value_list |
List[List[KvBufferWrapper]] |
必选 |
参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
write_rcache |
bool |
可选 |
参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True |
read_local_only |
bool |
可选 |
参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无 取值范围: True:只读本地缓存 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据 默认取值: False |
timeout |
int |
可选 |
参数解释: 请求超时时间,单位为毫秒。 约束限制: 无 取值范围: 大于等于0。 默认取值: 5000 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
data_ptr |
int |
必选 |
参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 默认取值: 无 |
length |
int |
必选 |
参数解释: 加速卡计算产生的KVCache连续显存长度。 约束限制: 无 取值范围: 大于0。 默认取值: 无 |
返回结果
类型 |
说明 |
---|---|
参数解释: 返回异步执行Future句柄。 取值范围: 无 |
方法名称 |
参数 |
返回结果 |
描述 |
---|---|---|---|
result |
无 |
参数解释: 获取异步执行的结果。 取值范围: 无 |
参数名称 |
参数类型 |
描述 |
---|---|---|
success |
int |
参数解释: 请求的批量key读写连续成功的个数。 约束限制: 无 取值范围: 0~请求key的个数。 默认取值: 无 |
total |
int |
参数解释: 请求的批量key总个数。 约束限制: 无 取值范围: 请求列表中批量key的个数。 默认取值: 无 |
代码样例
本示例用于多次下发异步加载KVCache的请求,并获取每个请求的最终执行结果 。
import os, torch, torch_npu from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper # 初始化cc配置 cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b") # 初始化Ems config = EmsConfig(cc_config=cc_config) try: Ems.init(config) except EmsException as e: print(f"exception: {e}.") exit(1) # 获取context caching对象 cc = Ems.get_cc() if cc is None: print("cc is None.") exit(1) # 设置save请求的超时时间 option = CcKvOption(timeout=5000) # 组成键值列表 key_list = ["123", "66"] key_list2 = ["hello_world"] # 仅支持npu设备的tensor tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") tensor3 = torch.ones(1, 4, 2, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] len3 = tensor2.numel() * tensor3.element_size() val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]] future_list = [] # 多次异步下发异步load请求,增加load并发能力。 try: for idx in range(2): future_list.append(cc.async_load(option, key_list, val_list)) future_list.append(cc.async_load(option, key_list2, val_list2)) except EmsException as e: print(f"failed to load, {e}.") exit(2) try: for future in future_list: result = future.result() print(f"rsult:{result}") except EmsException as e: print(f"failed to get result, {e}.")