同步保存KVCache
功能介绍
将推理计算过程中加速卡产生的多个 KVCache 数据块,批量同步保存到 EMS 缓存池,并在保存完成后返回结果表8。
接口约束
- 同步接口:接口会休眠阻塞。
- 加速卡限制:仅支持华为昇腾加速卡显存拷贝。
- 异常处理:若部分键值保存失败,接口不会直接返回失败,后续读取时将无法命中对应键值。
- V1语义约束:需遵循以下两种语义之一,二选一:
- slot_mapping 语义(使用 hashes + offsets + slot_mapping):通过 hashes , offsets来唯一标识 KVCache 数据块;
- chunk_descs 语义(使用 hashes+ chunk_descs):通过 **kwargs 传入 chunk_descs,标识每个层组中每个chunk对应的 start_slot, length。
方法定义
V0版本:
ContextCaching.save(option, key_list, value_list)
V1版本:
ContextCaching.save(slot_mapping, hashes, offsets, option, **kwargs)
请求参数说明
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| option | 是 | 参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 不能为None。 取值范围: 无。 默认取值: 无。 | |
| key_list | List[PrimaryKey] | 是 | 参数解释: 访问内存池的键名列表。 约束限制: 所有键名必须唯一。 取值范围: 单个key的长度小于128,且保证全局唯一。 默认取值: None。 |
| value_list | List[List[KvBufferWrapper]] | 是 | 参数解释: ContextCaching访问内存池的值列表。 约束限制: 值列表的数目必须跟键列表中的数目相同,形成一一对应的键值对。 取值范围: 无。 默认取值: None。 |
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| distribute_key | string | 必选 | 参数解释: 分布式key,DHT基于该key进行分布式路由。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 默认取值: 无 |
| attribute_key | string | 必选 | 参数解释: 属性key,相同的distribute_key下的所有attribute_key都会存储在同一个节点。 约束限制: 无 取值范围: 大于0。 默认取值: 无 |
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| data_ptr | int | 必选 | 参数解释: 加速卡计算产生的KVCache连续显存起始地址。 约束限制: 只支持NPU显存地址。 取值范围: 大于0。 默认取值: 无 |
| length | int | 必选 | 参数解释: 加速卡计算产生的KVCache连续显存长度。 约束限制: 无 取值范围: 大于0。 默认取值: 无 |
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| slot_mapping | List[int] | 是 | 参数解释: 每个token索引映射到block标识的列表。 约束限制: 不能为空,必须为整数列表,sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: 无。 |
| hashes | List[int] | 是 | 参数解释: 预计算的块前缀哈希列表。 约束限制: 需与offsets长度一致。 取值范围: 无。 默认取值: None。 |
| offsets | List[int] | 是 | 参数解释: 与hashes对应的每块token数,通常等于block_size。 约束限制: sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: None。 |
| option | 否 | 参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 不能为None。 取值范围: 无。 默认取值: 无。 |
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| hashes | List[int] | 是 | 参数解释: 预计算的块前缀哈希列表。 约束限制: len(chunk_descs[i])=len(hashes)。 取值范围: 无。 默认取值: None。 |
| chunk_descs | List[List[Tuple[int, int]]] | 是 | 参数解释: 每个层组中每个chunk对应的 start_slot, length。 约束限制: len(chunk_descs)=len(store_layer_group_desc)且len(chunk_descs[i])=len(hashes)。 取值范围: 无。 默认取值: None。 |
| option | 否 | 参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 不能为None。 取值范围: 无。 默认取值: 无。 |
| 参数名称 | 参数类型 | 是否必选 | 描述 |
|---|---|---|---|
| write_rcache | bool | 可选 | 参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True |
| read_local_only | bool | 可选 | 参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无。 取值范围: True:只读本地缓存 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据 默认取值: False |
| timeout | int | 可选 | 参数解释: 请求超时时间,单位为毫秒。 约束限制: 无。 取值范围: 大于等于0。 默认取值: 5000 |
返回结果说明
| 类型 | 说明 |
|---|---|
| 参数解释: Context Caching访问内存池的执行结果。 取值范围: 无。 |
代码样例
下面为同步保存KVCache到EMS中,同时对异常进行容错处理。
- V0版本同步保存
import torch, torch_npu from ems import CcKvOption, PrimaryKey, KvBufferWrapper option = CcKvOption(timeout=5000) # 组成键值列表 key_list = [PrimaryKey("1", "123"), PrimaryKey("1", "66")] tensor1 = torch.ones(2, device="npu:1") tensor2 = torch.ones(6, device="npu:1") len1 = tensor1.numel() * tensor1.element_size() len2 = tensor2.numel() * tensor2.element_size() val_list = [[KvBufferWrapper(tensor1.data_ptr, len1)], [KvBufferWrapper(tensor2.data_ptr, len2)]] # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 try: cc_result = cc.save(option, key_list, val_list) except EmsException as e: print(f"failed to save, {e}.") exit(2) print(cc_result) - V1版本 slot_mapping语义下的同步保存,参数:slot_mapping + hashes + offsets
# 设置请求的超时时间 option = CcKvOption(timeout=5000) block_size = 4 # 组成参数列表 slot_mapping = [0,1,2,3,4,5,6,7] hashes = [0xABCD, 0x1234] offsets = [4, 4] # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 try: cc_result = cc.save(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option) except EmsException as e: print(f"failed to save, {e}.") exit(2) print(cc_result) - V1版本 chunk_descs 语义下的同步保存,参数:hashes + chunk_descs
# 设置请求的超时时间 option = CcKvOption(timeout=5000) block_size = 4 # 组成参数列表 hashes = [0xABCD, 0x1234] chunk_descs = [[(0, 4),(4, 4)]] # 可以根据不同异常,采取不同处理方式,例如超时错误可以重试。 try: cc_result = cc.save(hashes = hashes, chunk_descs = chunk_descs, option = option) except EmsException as e: print(f"failed to save, {e}.") exit(2) print(cc_result)