异步保存KVCache
功能介绍
将推理计算过程中产生的多个 KVCache 数据块,批量异步保存到 EMS 缓存池。方法调用在提交后会立即返回一个表5句柄,用户可通过该句柄在后续获取保存结果。
接口约束
- 加速卡限制:仅支持华为昇腾加速卡显存拷贝。
- 异常处理:若部分键值保存失败,接口不会直接返回失败,后续读取时将无法命中对应键值。
- 语义约束:需遵循以下两种语义之一,二选一:
- slot_mapping 语义(使用 hashes + offsets + slot_mapping):通过 hashes , offsets来唯一标识 KVCache 数据块;
- chunk_descs 语义(使用 hashes+ chunk_descs):通过 **kwargs 传入 chunk_descs,标识每个层组中每个chunk对应的 start_slot, length。
方法定义
ContextCaching.async_save(slot_mapping, hashes, offsets, option, **kwargs)
请求参数说明
|
参数名称 |
参数类型 |
是否必选 |
描述 |
|---|---|---|---|
|
slot_mapping |
List[int] |
是 |
参数解释: 每个token索引映射到block标识的列表。 约束限制: 不能为空,必须为整数列表,sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: 无。 |
|
hashes |
List[int] |
是 |
参数解释: 预计算的块前缀哈希列表。 约束限制: 需与offsets长度一致。 取值范围: 无。 默认取值: None。 |
|
offsets |
List[int] |
是 |
参数解释: 与hashes对应的每块token数,通常等于block_size。 约束限制: sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: None。 |
|
option |
否 |
参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 不能为None。 取值范围: 无。 默认取值: 无。 |
|
参数名称 |
参数类型 |
是否必选 |
描述 |
|---|---|---|---|
|
hashes |
List[int] |
是 |
参数解释: 预计算的块前缀哈希列表。 约束限制: len(chunk_descs[i])=len(hashes)。 取值范围: 无。 默认取值: None。 |
|
chunk_descs |
List[List[Tuple[int, int]]] |
是 |
参数解释: 每个层组中每个chunk对应的 start_slot, length。 约束限制: len(chunk_descs)=len(store_layer_group_desc)且len(chunk_descs[i])=len(hashes)。 取值范围: 无。 默认取值: None。 |
|
option |
否 |
参数解释: ContextCaching访问内存池的KV操作选项。 约束限制: 不能为None。 取值范围: 无。 默认取值: 无。 |
|
参数名称 |
参数类型 |
是否必选 |
描述 |
|---|---|---|---|
|
write_rcache |
bool |
可选 |
参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无。 取值范围: True:将本次写入保存为本地读缓存 False:不将本次写入保存为本地读缓存 默认取值: True |
|
read_local_only |
bool |
可选 |
参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无。 取值范围: True:只读本地缓存 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据 默认取值: False |
|
timeout |
int |
可选 |
参数解释: 请求超时时间,单位为毫秒。 约束限制: 无。 取值范围: 大于等于0。 默认取值: 5000 |
返回结果说明
|
类型 |
说明 |
|---|---|
|
参数解释: 返回异步执行Future句柄。 取值范围: 无。 |
|
参数名称 |
参数类型 |
描述 |
|---|---|---|
|
success |
int |
参数解释: 请求的批量key读写连续成功的个数。 约束限制: 无。 取值范围: 0~请求key的个数。 默认取值: 无。 |
|
total |
int |
参数解释: 请求的批量key总个数。 约束限制: 无。 取值范围: 请求列表中批量key的个数。 默认取值: 无。 |
代码样例
本示例用于批量多次下发异步保存KVCache的请求,并获取每个请求的最终执行结果。
1、slot_mapping语义下的异步保存,参数:slot_mapping + hashes + offsets
# 设置请求的超时时间
option = CcKvOption(timeout=5000)
block_size = 4
future_list = []
# 多次异步下发异步save请求,增加save并发能力。
try:
slot_mapping = [0, 1, 2, 3, 4, 5]
hashes = [0x1111, 0x2222]
offsets = [4, 4]
future_list.append(cc.async_save(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option))
slot_mapping = [10, 11, 12, 13, 14, 15, 16, 17]
hashes = [0xAAAA, 0xBBBB]
offsets = [4, 4]
future_list.append(cc.async_save(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option))
except EmsException as e:
print(f"failed to save, {e}.")
exit(2)
try:
for future in future_list:
result = future.result()
print(f"rsult:{result}")
except EmsException as e:
print(f"failed to get result, {e}.")
2、chunk_descs 语义下的异步保存,参数:hashes + chunk_descs
# 设置请求的超时时间
option = CcKvOption(timeout=5000)
block_size = 4
future_list = []
# 多次异步下发异步save请求,增加save并发能力。
try:
hashes = [0x1111, 0x2222]
chunk_descs = [[(0, 4),(4, 4)]]
future_list.append(cc.async_save(hashes = hashes, chunk_descs = chunk_descs, option = option))
hashes = [0xAAAA, 0xBBBB]
chunk_descs = [[(10, 4),(14, 4)]]
future_list.append(cc.async_save(hashes = hashes, chunk_descs = chunk_descs, option = option))
except EmsException as e:
print(f"failed to save, {e}.")
exit(2)
try:
for future in future_list:
result = future.result()
print(f"result:{result}")
except EmsException as e:
print(f"failed to get result, {e}.")