异步加载KVCache
功能介绍
将推理请求需要的多个KVCache数据块,批量异步加载到EMS缓存中。方法调用在提交后会立即返回一个表5句柄,用户可通过该句柄在后续获取保存结果。
接口约束
- 加速卡限制:仅支持华为昇腾加速卡显存拷贝。
- 异常处理:若部分键值保存失败,接口不会直接返回失败,后续读取时将无法命中对应键值。
- 语义约束:需遵循以下两种语义之一,二选一:
- Hash 语义(使用 hashes + offsets + slot_mapping):通过 hashes , offsets来唯一标识 KVCache 数据块;
- Token 语义(使用 token_ids + mask + slot_mapping):通过 **kwargs 传入 token_ids 与 mask,标识需要拷贝的 token 的位置和数量。
方法定义
ContextCaching.load(slot_mapping, hashes, offsets, option, **kwargs)
请求参数说明
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
slot_mapping |
List[int] |
是 |
参数解释: 每个token索引映射到block标识的列表。 约束限制: 不能为空,必须为整数列表,sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: 无。 |
hashes |
List[int] |
是 |
参数解释: 预计算的块前缀哈希列表。 约束限制: 需与offsets长度一致。 取值范围: 无。 默认取值: None。 |
offsets |
List[int] |
是 |
参数解释: 与hashes对应的每块token数,通常等于block_size。 约束限制: sum(offsets) == len(slot_mapping)。 取值范围: 无。 默认取值: None。 |
option |
否 |
参数解释: Context Caching访问内存池的KV操作选项。 默认取值: 无。 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
slot_mapping |
List[int] |
是 |
参数解释: 每个token索引映射到block标识的列表。 约束限制: 不能为空,必须为整数列表,与token_ids、slot_mapping长度一致。 取值范围: 无。 默认取值: 无。 |
token_ids |
List[int] / Tensor |
是 |
参数解释: token序列 约束限制: 与slot_mapping、mask长度一致。 取值范围: 无。 默认取值: None |
mask |
List[int] |
否 |
参数解释: 前缀=0,后缀=1的掩码序列。 约束限制: 与token_ids、slot_mapping长度一致。 取值范围: 0或1。 默认取值: None。 |
option |
否 |
参数解释: Context Caching访问内存池的KV操作选项。 默认取值: 无。 |
参数名称 |
参数类型 |
是否必选 |
描述 |
---|---|---|---|
write_rcache |
bool |
可选 |
参数解释: 是否将本次写入保存为本地读缓存,默认值为True。 约束限制: 无。 取值范围: True:将本次写入保存为本地读缓存。 False:不将本次写入保存为本地读缓存。 默认取值: True。 |
read_local_only |
bool |
可选 |
参数解释: 是否只读本地缓存,如果置为True,则不会从其他节点读取数据,只有读流程生效。 约束限制: 无。 取值范围: True:只读本地缓存。 False:优先读本地缓存,如果本地未命中,则从其他节点读取数据。 默认取值: False。 |
timeout |
int |
可选 |
参数解释: 请求超时时间,单位为毫秒。 约束限制: 无。 取值范围: 大于等于0。 默认取值: 5000。 |
返回结果
类型 |
说明 |
---|---|
表8-23 |
参数解释: 返回异步执行Future句柄。 取值范围: 无。 |
参数名称 |
参数类型 |
描述 |
---|---|---|
success |
int |
参数解释: 请求的批量key读写连续成功的个数。 约束限制: 无。 取值范围: 0~请求key的个数。 默认取值: 无。 |
total |
int |
参数解释: 请求的批量key总个数。 约束限制: 无。 取值范围: 请求列表中批量key的个数。 默认取值: 无。 |
代码样例
本示例用于多次下发异步加载KVCache的请求,并获取每个请求的最终执行结果 。
- hash语义下的异步加载,参数:slot_mapping + hashes + offsets
# 设置请求的超时时间 option = CcKvOption(timeout=5000) block_size = 4 future_list = [] # 多次异步下发异步save请求,增加save并发能力。 try: slot_mapping = [0, 1, 2, 3, 4, 5] hashes = [0x1111, 0x2222] offsets = [4, 4] future_list.append(cc.async_load(slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option)) slot_mapping = [10, 11, 12, 13, 14, 15, 16, 17] hashes = [0xAAAA, 0xBBBB] offsets = [4, 4] future_list.append(cc.async_ load (slot_mapping = slot_mapping, hashes = hashes, offsets = offsets, option = option)) except EmsException as e: print(f"failed to save, {e}.") exit(2) try: for future in future_list: result = future.result() print(f"rsult:{result}") except EmsException as e: print(f"failed to get result, {e}.")
- token语义下的异步加载,参数:slot_mapping + token_ids + mask
# 设置请求的超时时间 option = CcKvOption(timeout=5000) block_size = 4 future_list = [] # 多次异步下发异步save请求,增加save并发能力。 try: for idx in range(2): slot_mapping = [0, 1, 2, 3, 4, 5] tonken_ids = , 102, 103, 104, 105, 106] mask = [0, 0, 0, 0, 1, 1] future_list.append(cc.async_load(slot_mapping = slot_mapping, tonken_ids = tonken_ids, mask = mask, option = option)) slot_mapping = [0, 1, 2, 3, 4, 5, 6, 7, 8] tonken_ids = [201, 202, 203, 204, 205, 206, 207, 208, 209] mask = [0, 0, 0, 0, 0, 0, 1, 1, 1] future_list.append(cc.async_load(slot_mapping = slot_mapping, tonken_ids = tonken_ids, mask = mask, option = option)) except EmsException as e: print(f"failed to save, {e}.") exit(2) try: for future in future_list: result = future.result() print(f"rsult:{result}") except EmsException as e: print(f"failed to get result, {e}.")