更新时间:2026-05-25 GMT+08:00
分享

同步保存KVCache

功能介绍

将推理计算过程中加速卡产生的多个 KVCache 数据块,批量同步保存到 EMS 缓存池,并在保存完成后返回结果表8

接口约束

  • 同步接口:接口会休眠阻塞。
  • 加速卡限制:仅支持华为昇腾加速卡显存拷贝。
  • 异常处理:若部分键值保存失败,接口不会直接返回失败,后续读取时将无法命中对应键值。
  • V1语义约束:需遵循以下两种语义之一,二选一:
    • slot_mapping 语义(使用 hashes + offsets + slot_mapping):通过 hashes , offsets来唯一标识 KVCache 数据块;
    • chunk_descs 语义(使用 hashes+ chunk_descs):通过 **kwargs 传入 chunk_descs,标识每个层组中每个chunk对应的 start_slot, length。

方法定义

V0版本:

ContextCaching.save(option, key_list, value_list)

V1版本:

ContextCaching.save(slot_mapping, hashes, offsets, option, **kwargs)

请求参数说明

表1 V0请求参数列表

参数名称

参数类型

是否必选

描述

option

表6

参数解释:

ContextCaching访问内存池的KV操作选项。

约束限制:

不能为None。

取值范围:

无。

默认取值:

无。

key_list

List[PrimaryKey]

参数解释:

访问内存池的键名列表。

约束限制:

所有键名必须唯一。

取值范围:

单个key的长度小于128,且保证全局唯一。

默认取值:

None。

value_list

List[List[KvBufferWrapper]]

参数解释:

ContextCaching访问内存池的值列表。

约束限制:

值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。

取值范围:

无。

默认取值:

None。

表2 PrimaryKey

参数名称

参数类型

是否必选

描述

distribute_key

string

必选

参数解释:

分布式key,DHT基于该key进行分布式路由。

约束限制:

只支持NPU显存地址。

取值范围:

大于0。

默认取值:

attribute_key

string

必选

参数解释:

属性key,相同的distribute_key下的所有attribute_key都会存储在同一个节点。

约束限制:

取值范围:

大于0。

默认取值:

表3 KvBufferWrapper

参数名称

参数类型

是否必选

描述

data_ptr

int

必选

参数解释:

加速卡计算产生的KVCache连续显存起始地址。

约束限制:

只支持NPU显存地址。

取值范围:

大于0。

默认取值:

length

int

必选

参数解释:

加速卡计算产生的KVCache连续显存长度。

约束限制:

取值范围:

大于0。

默认取值:

表4 V1请求参数列表(slot_mapping语义)

参数名称

参数类型

是否必选

描述

slot_mapping

List[int]

参数解释:

每个token索引映射到block标识的列表。

约束限制:

不能为空,必须为整数列表,sum(offsets) == len(slot_mapping)。

取值范围:

无。

默认取值:

无。

hashes

List[int]

参数解释:

预计算的块前缀哈希列表。

约束限制:

需与offsets长度一致。

取值范围:

无。

默认取值:

None。

offsets

List[int]

参数解释:

与hashes对应的每块token数,通常等于block_size。

约束限制:

sum(offsets) == len(slot_mapping)。

取值范围:

无。

默认取值:

None。

option

表6

参数解释:

ContextCaching访问内存池的KV操作选项。

约束限制:

不能为None。

取值范围:

无。

默认取值:

无。

表5 V1请求参数列表(chunk_descs语义)

参数名称

参数类型

是否必选

描述

hashes

List[int]

参数解释:

预计算的块前缀哈希列表。

约束限制:

len(chunk_descs[i])=len(hashes)。

取值范围:

无。

默认取值:

None。

chunk_descs

List[List[Tuple[int, int]]]

参数解释:

每个层组中每个chunk对应的 start_slot, length。

约束限制:

len(chunk_descs)=len(store_layer_group_desc)且len(chunk_descs[i])=len(hashes)。

取值范围:

无。

默认取值:

None。

option

表6

参数解释:

ContextCaching访问内存池的KV操作选项。

约束限制:

不能为None。

取值范围:

无。

默认取值:

无。

表6 CcKvOption

参数名称

参数类型

是否必选

描述

write_rcache

bool

可选

参数解释:

是否将本次写入保存为本地读缓存,默认值为True。

约束限制:

无。

取值范围:

True:将本次写入保存为本地读缓存

False:不将本次写入保存为本地读缓存

默认取值:

True

read_local_only

bool

可选

参数解释:

是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。

约束限制:

无。

取值范围:

True:只读本地缓存

False:优先读本地缓存,如果本地未命中,则从其他节点读取数据

默认取值:

False

timeout

int

可选

参数解释:

请求超时时间,单位为毫秒。

约束限制:

无。

取值范围:

大于等于0。

默认取值:

5000

返回结果说明

表7 返回结果

类型

说明

表8

参数解释:

Context Caching访问内存池的执行结果。

取值范围:

无。

表8 CcResult

参数名称

参数类型

描述

success

int

参数解释:

请求的批量key读写连续成功的个数。

约束限制:

无。

取值范围:

0~请求key的个数。

默认取值:

无。

total

int

参数解释:

请求的批量key总个数。

约束限制:

无。

取值范围:

请求列表中批量key的个数。

默认取值:

无。

代码样例

下面为同步保存KVCache到EMS中,同时对异常进行容错处理。

  1. V0版本同步保存
    import torch, torch_npu
    from ems import CcKvOption, PrimaryKey, KvBufferWrapper
    option = CcKvOption(timeout=5000)
    # 组成键值列表
    key_list = [PrimaryKey("1", "123"), PrimaryKey("1", "66")]
    tensor1 = torch.ones(2, device="npu:1")
    tensor2 = torch.ones(6, device="npu:1")
    len1 = tensor1.numel() * tensor1.element_size()
    len2 = tensor2.numel() * tensor2.element_size()
    val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]]
    # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。
    try:
        cc_result = cc.save(option, key_list, val_list)
    except EmsException as e:
        print(f"failed to save, {e}.")
        exit(2)
    print(cc_result)
  2. V1版本 slot_mapping语义下的同步保存,参数:slot_mapping + hashes + offsets
    # 设置请求的超时时间 
    option = CcKvOption(timeout=5000)
    block_size = 4
    # 组成参数列表 
    slot_mapping = [0,1,2,3,4,5,6,7]  
    hashes = [0xABCD, 0x1234] 
    offsets = [4, 4] 
    # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 
    try: 
        cc_result = cc.save(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option) 
    except EmsException as e: 
        print(f"failed to save, {e}.") 
        exit(2) 
    print(cc_result) 
  3. V1版本 chunk_descs 语义下的同步保存,参数:hashes + chunk_descs
    # 设置请求的超时时间 
    option = CcKvOption(timeout=5000)
    block_size = 4
    # 组成参数列表
    hashes = [0xABCD, 0x1234]
    chunk_descs = [[(0, 4),(4, 4)]] 
    # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 
    try: 
        cc_result = cc.save(hashes = hashes, chunk_descs = chunk_descs, option = option) 
    except EmsException as e: 
        print(f"failed to save, {e}.") 
        exit(2) 
    print(cc_result)

相关文档