更新时间:2026-04-20 GMT+08:00
分享

异步加载KVCache

功能介绍

将推理请求需要的多个KVCache数据块,批量异步加载到EMS缓存中。方法调用在提交后会立即返回一个表5句柄,用户可通过该句柄在后续获取保存结果。

接口约束

  • 加速卡限制:仅支持华为昇腾加速卡显存拷贝。
  • 异常处理:若部分键值保存失败,接口不会直接返回失败,后续读取时将无法命中对应键值。
  • 语义约束:需遵循以下两种语义之一,二选一:
    • slot_mapping 语义(使用 hashes + offsets + slot_mapping):通过 hashes , offsets来唯一标识 KVCache 数据块;
    • chunk_descs 语义(使用 hashes+ chunk_descs):通过 **kwargs 传入 chunk_descs,标识每个层组中每个chunk对应的 start_slot, length。

方法定义

ContextCaching.load(slot_mapping, hashes, offsets, option, **kwargs)

请求参数说明

表1 请求参数列表(slot_mapping语义)

参数名称

参数类型

是否必选

描述

slot_mapping

List[int]

参数解释:

每个token索引映射到block标识的列表。

约束限制:

不能为空,必须为整数列表,sum(offsets) == len(slot_mapping)。

取值范围:

无。

默认取值:

无。

hashes

List[int]

参数解释:

预计算的块前缀哈希列表。

约束限制:

需与offsets长度一致。

取值范围:

无。

默认取值:

None。

offsets

List[int]

参数解释:

与hashes对应的每块token数,通常等于block_size。

约束限制:

sum(offsets) == len(slot_mapping)。

取值范围:

无。

默认取值:

None。

option

表3

参数解释:

ContextCaching访问内存池的KV操作选项。

约束限制:

不能为None。

取值范围:

无。

默认取值:

无。

表2 请求参数列表(chunk_descs语义)

参数名称

参数类型

是否必选

描述

hashes

List[int]

参数解释:

预计算的块前缀哈希列表。

约束限制:

len(chunk_descs[i])=len(hashes)。

取值范围:

无。

默认取值:

None。

chunk_descs

List[List[Tuple[int, int]]]

参数解释:

每个层组中每个chunk对应的 start_slot, length。

约束限制:

len(chunk_descs)=len(store_layer_group_desc)且len(chunk_descs[i])=len(hashes)。

取值范围:

无。

默认取值:

None。

option

表3

参数解释:

ContextCaching访问内存池的KV操作选项。

约束限制:

不能为None。

取值范围:

无。

默认取值:

无。

表3 CcKvOption

参数名称

参数类型

是否必选

描述

write_rcache

bool

可选

参数解释:

是否将本次写入保存为本地读缓存,默认值为True。

约束限制:

无。

取值范围:

True:将本次写入保存为本地读缓存。

False:不将本次写入保存为本地读缓存。

默认取值:

True。

read_local_only

bool

可选

参数解释:

是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。

约束限制:

无。

取值范围:

True:只读本地缓存。

False:优先读本地缓存,如果本地未命中,则从其他节点读取数据。

默认取值:

False。

timeout

int

可选

参数解释:

请求超时时间,单位为毫秒。

约束限制:

无。

取值范围:

大于等于0。

默认取值:

5000。

返回结果

表4 返回结果

类型

说明

表8-23

参数解释:

返回异步执行Future句柄。

取值范围:

无。

表5 CcFuture

方法名称

参数

返回结果

描述

result

表8-24

参数解释:

获取异步执行的结果。

取值范围:

无。

表6 CcResult

参数名称

参数类型

描述

success

int

参数解释:

请求的批量key读写连续成功的个数。

约束限制:

无。

取值范围:

0~请求key的个数。

默认取值:

无。

total

int

参数解释:

请求的批量key总个数。

约束限制:

无。

取值范围:

请求列表中批量key的个数。

默认取值:

无。

代码样例

本示例用于多次下发异步加载KVCache的请求,并获取每个请求的最终执行结果 。

  1. slot_mapping语义下的异步加载,参数:slot_mapping + hashes + offsets
    # 设置请求的超时时间 
    option = CcKvOption(timeout=5000)
    block_size = 4
    future_list = [] 
    # 多次异步下发异步load请求,增加load并发能力。 
    try: 
        slot_mapping = [0, 1, 2, 3, 4, 5] 
        hashes = [0x1111, 0x2222]
        offsets = [4, 4]
        future_list.append(cc.async_load(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option))
        slot_mapping = [10, 11, 12, 13, 14, 15, 16, 17] 
        hashes = [0xAAAA, 0xBBBB]
        offsets = [4, 4]
        future_list.append(cc.async_ load (slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option))
    except EmsException as e: 
        print(f"failed to save, {e}.") 
        exit(2) 
    try: 
        for future in future_list: 
            result = future.result() 
            print(f"rsult:{result}") 
    except EmsException as e: 
        print(f"failed to get result, {e}.")
  2. chunk_descs 语义下的异步加载,参数:hashes + chunk_descs
    # 设置请求的超时时间 
    option = CcKvOption(timeout=5000)
    block_size = 4
    future_list = [] 
    # 多次异步下发异步load请求,增加load并发能力。 
    try: 
        for idx in range(2): 
            hashes = [0x1111, 0x2222]
            chunk_descs = [[(0, 4),(4, 4)]] 
            future_list.append(cc.async_load(hashes = hashes, chunk_descs = chunk_descs, option = option))
            hashes = [0xAAAA, 0xBBBB]
            chunk_descs = [[(10, 4),(14, 4)]] 
            future_list.append(cc.async_load(hashes = hashes, chunk_descs = chunk_descs, option = option))
    except EmsException as e: 
        print(f"failed to load, {e}.") 
        exit(2) 
    try: 
        for future in future_list: 
            result = future.result() 
            print(f"result:{result}") 
    except EmsException as e: 
        print(f"failed to get result, {e}.")

相关文档