链接复制成功!
异步加载KVCache
功能介绍
将推理请求需要的多个KVCache数据块从EMS缓存池读到加速卡显存中,提交后立即返回Future句柄,用户后续可以获取执行结果。
接口约束
- 仅支持华为昇腾加速卡显存拷贝。
- 只返回键列表中连续命中的个数。
方法定义
ContextCaching.load(option, key_list, value_list)
请求参数说明
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
option |
必选 |
参数解释: ContextCaching访问内存池的KV操作选项。 默认取值: 无 |
|
key_list |
List[str] |
必选 |
参数解释: ContextCaching访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: 无 |
value_list |
List[List[KvBufferWrapper]] |
必选 |
参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 默认取值: 无。 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
write_rcache |
bool |
可选 |
参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True |
read_local_only |
bool |
可选 |
参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无 取值范围: True:只读本地缓存 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据 默认取值: False |
timeout |
int |
可选 |
参数解释: 请求超时时间,单位为毫秒。 约束限制: 无 取值范围: 大于等于0。 默认取值: 5000 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
data_ptr |
int |
必选 |
参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 默认取值: 无 |
length |
int |
必选 |
参数解释: 加速卡计算产生的KVCache连续显存长度。 约束限制: 无 取值范围: 大于0。 默认取值: 无 |
返回结果
类型 |
说明 |
---|---|
参数解释: 返回异步执行Future句柄。 取值范围: 无 |
方法名称 |
参数 |
返回结果 |
描述 |
---|---|---|---|
result |
无 |
参数解释: 获取异步执行的结果。 取值范围: 无 |
参数名称 |
参数类型 |
描述 |
---|---|---|
success |
int |
参数解释: 请求的批量key读写连续成功的个数。 约束限制: 无 取值范围: 0~请求key的个数。 默认取值: 无 |
total |
int |
参数解释: 请求的批量key总个数。 约束限制: 无 取值范围: 请求列表中批量key的个数。 默认取值: 无 |
代码样例
本示例用于多次下发异步加载KVCache的请求,并获取每个请求的最终执行结果 。
import os, torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
Ems.init(config)
except EmsException as e:
print(f"exception: {e}.")
exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
print("cc is None.")
exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["123", "66"]
key_list2 = ["hello_world"]
# 仅支持npu设备的tensor
tensor1 = torch.ones(2, device="npu:1")
tensor2 = torch.ones(6, device="npu:1")
tensor3 = torch.ones(1, 4, 2, device="npu:1")
len1 = tensor1.numel() * tensor1.element_size()
len2 = tensor2.numel() * tensor2.element_size()
val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
len3 = tensor2.numel() * tensor3.element_size()
val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]]
future_list = []
# 多次异步下发异步load请求,增加load并发能力。
try:
for idx in range(2):
future_list.append(cc.async_load(option, key_list, val_list))
future_list.append(cc.async_load(option, key_list2, val_list2))
except EmsException as e:
print(f"failed to load, {e}.")
exit(2)
try:
for future in future_list:
result = future.result()
print(f"rsult:{result}")
except EmsException as e:
print(f"failed to get result, {e}.")