更新时间:2025-06-19 GMT+08:00
分享

异步加载KVCache

功能介绍

将推理请求需要的多个KVCache数据块从EMS缓存池读到加速卡显存中,提交后立即返回Future句柄,用户后续可以获取执行结果。

接口约束

  • 仅支持华为昇腾加速卡显存拷贝。
  • 只返回键列表中连续命中的个数。

方法定义

ContextCaching.load(option, key_list, value_list)

请求参数说明

表1 请求参数列表

参数名称

参数类型

是否必选

描述

option

CcKvOption

必选

参数解释

ContextCaching访问内存池的KV操作选项。

默认取值:

key_list

List[str]

必选

参数解释:

ContextCaching访问内存池的键名列表。

约束限制:

所有键名必须唯一。

取值范围:

单个key的长度小于128,且保证全局唯一。

默认取值:

value_list

List[List[KvBufferWrapper]]

必选

参数解释

ContextCaching访问内存池的值列表。

约束限制:

值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。

默认取值:

无。

表2 CcKvOption

参数名称

参数类型

是否必选

描述

write_rcache

bool

可选

参数解释:

是否将本次写入保存为本地读缓存,默认值为True。

约束限制:

取值范围:

True:将本次写入保存为本地读缓存

False:不将本次写入保存为本地读缓存

默认取值:

True

read_local_only

bool

可选

参数解释:

是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。

约束限制:

取值范围:

True:只读本地缓存

False:优先读本地缓存,如果本地未命中,则从其他节点读取数据

默认取值:

False

timeout

int

可选

参数解释:

请求超时时间,单位为毫秒。

约束限制:

取值范围:

大于等于0。

默认取值:

5000

表3 KvBufferWrapper

参数名称

参数类型

是否必选

描述

data_ptr

int

必选

参数解释:

加速卡计算产生的KVCache连续显存起始地址。

约束限制:

只支持NPU显存地址。

取值范围:

大于0。

默认取值:

length

int

必选

参数解释:

加速卡计算产生的KVCache连续显存长度。

约束限制:

取值范围:

大于0。

默认取值:

返回结果

表4 返回结果

类型

说明

CcFuture

参数解释:

返回异步执行Future句柄。

取值范围:

表5 CcFuture

方法名称

参数

返回结果

描述

result

CcResult

参数解释:

获取异步执行的结果。

取值范围:

表6 CcResult

参数名称

参数类型

描述

success

int

参数解释:

请求的批量key读写连续成功的个数。

约束限制:

取值范围:

0~请求key的个数。

默认取值:

total

int

参数解释:

请求的批量key总个数。

约束限制:

取值范围:

请求列表中批量key的个数。

默认取值:

代码样例

本示例用于多次下发异步加载KVCache的请求,并获取每个请求的最终执行结果 。

import os, torch, torch_npu
from ems import Ems, EmsConfig, EmsException, CcConfig, CcKvOption, KvBufferWrapper
# 初始化cc配置
cc_config = CcConfig(rank_id=8, device_id=0, model_id="llama2-13b")
# 初始化Ems
config = EmsConfig(cc_config=cc_config)
try:
    Ems.init(config)
except EmsException as e:
    print(f"exception: {e}.")
    exit(1)
# 获取context caching对象
cc = Ems.get_cc()
if cc is None:
    print("cc is None.")
    exit(1)
# 设置save请求的超时时间
option = CcKvOption(timeout=5000)
# 组成键值列表
key_list = ["123", "66"]
key_list2 = ["hello_world"]
# 仅支持npu设备的tensor
tensor1 = torch.ones(2, device="npu:1")
tensor2 = torch.ones(6, device="npu:1")
tensor3 = torch.ones(1, 4, 2, device="npu:1")
len1 = tensor1.numel() * tensor1.element_size()
len2 = tensor2.numel() * tensor2.element_size()
val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
len3 = tensor2.numel() * tensor3.element_size()
val_list2 = [[KvBufferWrapper(tensor1.data_ptr, len3)]]
future_list = []
# 多次异步下发异步load请求,增加load并发能力。
try:
    for idx in range(2):
        future_list.append(cc.async_load(option, key_list, val_list))
        future_list.append(cc.async_load(option, key_list2, val_list2))
except EmsException as e:
    print(f"failed to load, {e}.")
    exit(2)
try:
    for future in future_list:
        result = future.result()
        print(f"rsult:{result}")
except EmsException as e:
    print(f"failed to get result, {e}.")

相关文档