Best Practices for High-Performance EMS KV Cache in CCE Distributed Inference
This section describes how to deploy the Kimi-25 model in a CCE cluster using the vLLM-ascend inference engine and the Kthena framework. The architecture employs a 1P1D (1 Prefill + 1 Decode) separation pattern, which physically isolates the prefill and decode phases. Combined with Elastic Memory Storage (EMS), this design enables efficient KV Cache transmission, significantly improving resource utilization and inference performance.
Context
Kthena is a cloud-native, high-performance routing, orchestration, and scheduling system for large language model (LLM) inference, designed specifically for Kubernetes environments. As a sub-project of Volcano, a cloud-native batch computing initiative, Kthena addresses the operational complexity enterprises face when deploying, managing, and running large-scale model inference services in production. Its core capabilities include:
- Declarative orchestration: Users define, deploy, and scale complex LLM inference clusters through configuration files. A single inference task can be decomposed into distinct roles and distributed across multiple pods, with collaborative components orchestrated in a unified manner.
- Intelligent routing: Unlike traditional load balancers, the Kthena Router monitors the real-time status and load of underlying inference engines to implement context-aware request routing.
- Auto scaling: Integrated with the Kthena Autoscaler, the system supports fine-grained elastic scaling, for example, adjusting replica counts for different roles during high-concurrency scenarios, while enabling rolling updates without service interruption.
Although Kthena effectively handles macro-orchestration and request routing for inference clusters at the control plane, the physical separation of prefill and decode phases on the data plane requires massive amounts of intermediate state data, specifically the key-value (KV) cache, to be transferred efficiently between nodes. Huawei Cloud EMS addresses this requirement by providing high-performance, low-latency memory-tier channels for cross-node KV cache transmission.
With the explosive growth of LLM applications, online inference has become a primary driver of AI compute consumption. Inference workloads such as multi-turn dialogue, information retrieval, code assistance, and text generation exhibit high concurrency and strict latency requirements. However, traditional deployment architectures face a significant accelerator memory bottleneck.
- Compute resource waste and throughput bottleneck
During Transformer model inference, intermediate activation data (the KV cache) must reside in AI servers' accelerator memory to avoid redundant recomputation. Because the accelerator memory capacity of a single server is typically limited to several tens of gigabytes, the remaining space after loading model parameters is minimal. Consequently, a single node can process only a small number of concurrent requests. Forcibly scaling compute resources to handle higher concurrency causes inference costs to rise disproportionately.
- Context fragmentation due to memory constraints
Limited accelerator memory forces the system to discard the KV cache of historical conversations frequently to free space. During long-context interactions, this causes the model to lose coherence by effectively forgetting early content, leading to fragmented context and degraded user experience.
- High TTFT
When a previously discarded session or a request with a common prefix is reactivated, the system must expend substantial GPU compute to recompute the KV cache from scratch. This redundant computation not only increases inference cost but also extends the time from input submission to first token generation, significantly degrading response latency.
To address these challenges, Huawei Cloud has launched EMS. EMS is a distributed memory pool management technology that uses DRAM as its primary storage medium. It provides high-performance caching and acceleration for LLM inference, substantially reducing compute costs while significantly improving overall throughput.
Features
The EMS architecture comprises three layers: a domain-specific service SDK, a distributed memory pool, and a management plane.

- The domain-specific service SDK provides a suite of add-ons and API service SDKs tailored for AI application scenarios. It delivers functions including service system integration, data layout optimization, and near-data processing to accelerate memory access for service requests. The SDK primarily targets LLM inference, leveraging the distributed memory pool to improve processing efficiency and reduce costs.
- The distributed memory pool manages memory space across nodes and balances data loads. It enables shared access to cached data through space pooling. In converged deployment mode, DRAM on AI servers is pooled for distributed sharing, with affinity scheduling and access optimization.
- The management plane provides EMS service deployment, monitoring, upgrades, and operations based on the cloud-native infrastructure of Huawei Cloud.
Core Competitiveness
- CachedAttention: To eliminate redundant computation in multi-turn dialogue and common-prefix scenarios, EMS implements the CachedAttention mechanism. The system asynchronously persists the KV cache generated by historical sessions to the EMS distributed memory pool. When a session is reactivated, the system loads and reuses the KV cache directly from EMS, avoiding expensive GPU recomputation. This mechanism significantly reduces TTFT and substantially improves throughput during the inference prefill phase.
- Multi-tier cache, breaking the accelerator memory bottleneck: EMS has built an innovative three-tier cache system comprising accelerator memory, DRAM, and persistent storage. Serving as a high-performance memory cache layer between the compute and storage tiers, EMS overcomes the physical memory limitation of a single AI server and enables dynamic extension of accelerator memory space. This architecture improves data access speed at the persistence layer while ensuring responsive request handling under large-scale concurrency.
- Converged deployment and cost optimization: To address low DRAM utilization in AI scenarios, the EMS data plane adopts a semi-hosted converged deployment mode that directly coordinates and pools idle DRAM resources across AI servers for reuse. Users achieve high resource elasticity and cost optimization without purchasing dedicated cache hardware.
- Distributed sharing and high cache hit ratio: The EMS distributed memory pool eliminates the silo effect of single-node deployments and enables efficient cross-node sharing. Through automatic awareness of scheduler task queues and affinity scheduling policies, the system significantly improves the global cross-node cache hit ratio, meeting the stringent demands of large-scale distributed inference.
- Comprehensive ecosystem integration: EMS is not an isolated infrastructure component. It integrates with Huawei Cloud AI development platform ModelArts, CCE, SFS Turbo, and OBS to help enterprises rapidly build a one-stop, high-performance, serverless full-stack foundation model inference pipeline.
Prerequisites
- A CCE Turbo cluster of v1.29 or later is available, with NodeLocal DNSCache and Volcano Scheduler v1.22.1 or later installed.
- At least five Snt9b or Snt9b23 compute nodes have been accepted or created in the cluster. Do not mix nodes of different flavors.
Notes and Constraints
This function is in initial rollout. Check the console for currently supported regions.
Procedure
- Enable EMS and node labeling.
If the EMS configuration item does not exist, submit a service ticket to contact customer service.
- Log in to the CCE console and click the cluster name to access the cluster console. In the navigation pane, choose Settings. In the right pane, click the Heterogeneous Resources tab.
- Enable EMS and label the target Snt9b or Snt9b23 nodes.

- In the displayed dialog box, click Save.
- (Optional) Configure security group rules.
If the node uses hostNetwork, you must manually configure a security group to allow communication with EMS components.
- Search for the resource stack using the EMS cluster ID, access it, and verify that the deployment completed successfully.
- Obtain the ID of the default security group.
- Switch to the Dashboard page of the target cluster on the CCE console, locate the Networking Configuration area, and click the link next to Default Node Security Group.

- On the Summary tab page, click
next to the ID to copy the security group ID. 
- Switch to the Dashboard page of the target cluster on the CCE console, locate the Networking Configuration area, and click the link next to Default Node Security Group.
- Configure EMS security group rules.
- Click
above the Summary tab. - On the Access Control > Security Groups page, search for {EMS-cluster-ID} in the search box and click ems-zookeeper-{EMS-cluster-ID}.

- On the Inbound Rules tab, click Add Rule.
- In the Add Inbound Rule panel, set Protocol & Port to 10000, choose Security group from the Source drop-down list, and enter $.{node's-default-security-group-ID} in the text box below.

- Click Add Rule in the lower part. Set Protocol & Port to 11000, choose Security group from the Source drop-down list, enter $.{node's-default-security-group-ID} in the text box below, and click OK.

- Click
- Restart the EMS controller.
- On the Workloads page of the target CCE cluster, click the Deployments tab, find the namespace ems-{ems_cluster_id}, and click the name of the EMS controller workload.

- On the Pods area, select all pods and click Delete. In the displayed dialog box, click Yes.

- Repeat steps 1 and 2 to perform the same operations on the ems-server workload.
- On the Heterogeneous Resources tab page of the Settings, wait until the component is loaded.
- On the Workloads page of the target CCE cluster, click the Deployments tab, find the namespace ems-{ems_cluster_id}, and click the name of the EMS controller workload.
- Build an inference image.
- On the Settings page of the target CCE cluster, click the Scheduling tab and set the default scheduler to Volcano.
- Check the RoCE network status. For details, see Prefill-Decode Disaggregation (Deepseek).
- Prepare a local foundation model. You can download the model from ModelScope. This example uses the Kimi-K2.5-W4A8 model. Ensure that you download the model to /mnt/pass/models on the target node.
- Prepare an Ascend-compatible engine image.
- Download the OpenSSL and Cyrus SASL installation packages.
https://github.com/openssl/openssl/releases/download/OpenSSL_1_1_1w/openssl-1.1.1w.tar.gz https://github.com/cyrusimap/cyrus-sasl/releases/download/cyrus-sasl-2.1.28/cyrus-sasl-2.1.28.tar.gz
- Go to the CCE cluster. In the navigation pane, choose Settings. On the right side of the page, click the Heterogeneous Resources tab. In EMS Settings, download the EMS SDK installation package and place it in the same directory as the Dockerfile.

- Create the ems_store folder.
mkdir ems_store
Create the __init__.py, ems_adapter.py, ems_env.py, and ems_store_connector.py files in the folder.
- __init__.py
vi __init__.py
This file is empty. Press Esc and enter wq to save the file.
- ems_adapter.py
vi ems_adapter.py
File content:
from typing import List, Tuple, Dict, Set import threading import time import os from collections import deque import torch import numpy as np from vllm.config import VllmConfig from vllm.distributed import get_tp_group, get_pp_group from vllm.logger import logger from ems import Ems, CcKvOption, EmsException, EmsErrorCode, EmsConfig, CcConfig_v1 from ems.cc_v1.cc_config import KVCacheType from vllm_ascend.distributed.ems_store.ems_env import EmsEnv class EmsAdapter: def __init__(self, vllm_config: VllmConfig): self.vllm_config = vllm_config self.is_mla = vllm_config.model_config.use_mla self.model_name = "_".join(os.path.normpath(vllm_config.model_config.model).split("/")[-2:]) logger.info(f"[EMS] EmsAdapter init with vllm_config: {vllm_config}, " f"is_mla: {self.is_mla}, model_name: {self.model_name}") self.rank = vllm_config.parallel_config.rank self.world_size = vllm_config.parallel_config.world_size self.block_size = vllm_config.cache_config.block_size self.load_futures = {} self.save_futures = {} self.failed_block_ids: set[int] = set() self._inited = False self._need_reinit = False self._registered = False self._pending_kv_caches = None self.ems_cfg = self.get_ems_cfg() self.context_caching = None self.init_ems() self.task_manager = None self.init_task_manager() def get_ems_cfg(self) -> EmsConfig: tp_group = get_tp_group() pp_group = get_pp_group() cc_config_v1 = CcConfig_v1( rank_id=tp_group.rank, device_id=tp_group.local_rank, model_id=EmsEnv.model_id, tp_world_size=tp_group.world_size, pp_world_size=pp_group.world_size, rank_in_tp_group=tp_group.rank_in_group, rank_in_pp_group=pp_group.rank_in_group, llm_engine=f"{EmsEnv.llm_engine}@{self.model_name}" ) if self.is_mla: cc_config_v1.kvcache_type = KVCacheType.MLA ems_cfg = EmsConfig( access_id=EmsEnv.access_id, access_key=EmsEnv.access_key, cc_config_v1=cc_config_v1 ) return ems_cfg def init_ems(self) -> None: try: Ems.init(self.ems_cfg) self.context_caching = Ems.get_cc() self._inited = True logger.info(f"[EMS][Init] EmsConnector init succeed, EMS ready.") except EmsException as e: if e.status_code() == EmsErrorCode.EMS_RECOVERD_ERROR: self._need_reinit = True logger.error(f"[EMS][Init] EmsConnector init fail, error: {e}.") if not self._inited and self.context_caching: logger.warning(f"[EMS][Init][degraded] EmsConnector init failed but get cc, reason=EMS not ready.") def init_task_manager(self) -> None: try: self.task_manager = PeriodicTaskManager(check_fn=self._check_health) logger.info(f"[EMS] init periodic task manager succeeded.") except Exception as e: logger.error(f"[EMS] init periodic task manager failed, error: {e}.") raise def register_kv_caches(self, kv_caches: dict[str, Tuple[torch.Tensor]]) -> None: if not self._inited: self._pending_kv_caches = kv_caches logger.warning("[EMS][Register] EMS not ready, skip register_kvcache.") return kv_caches_list: List[List[torch.Tensor]] = [] for _, kv_caches_layer in kv_caches.items(): num_kv = len(kv_caches_layer) kv_tensor_list: List[torch.Tensor] = [kv_caches_layer[idx] for idx in range(num_kv)] kv_caches_list.append(kv_tensor_list) try: self.context_caching.register_kvcache(kv_caches_list) self._registered = True self._pending_kv_caches = None # shape of kv_caches_list: [layers, [k_v_index, [gpu_blocks, block_size, heads, head_size]]] ds_layers = len(kv_caches_list) ds_k_v_index = len(kv_caches_list[0]) ds_gpu_blocks, ds_block_size, ds_heads, ds_head_size = list(kv_caches_list[0][0].shape) logger.info(f"[EMS][Register] layer_num={ds_layers}, block_size={ds_block_size}, " f"is_mla={self.is_mla}, kvcache_dim={ds_k_v_index}") except EmsException as e: logger.error(f"[EMS][Register] register kvcache error: {e}.") def _cal_block_offsets(self, block_ids: List[int], block_size: int) -> List[int]: return [block_size] * len(block_ids) def _cal_block_slot_mapping(self, block_ids: List[int], block_size: int) -> List[int]: block_ids_np = np.array(block_ids) range_arr = np.arange(block_size) result_matrix = block_ids_np[:, np.newaxis] * block_size + range_arr flattened_result = result_matrix.ravel() return flattened_result.tolist() def exists_block_num(self, req_id: str, block_hashes: list[int]) -> int: if not self._inited or not self._registered: logger.warning(f"[EMS][exists][degraded] req_id={req_id} reason=EMS not ready.") return 0 if not self.task_manager.get_status(): logger.warning(f"[EMS][exists][skip] req_id={req_id} reason=Unhealthy EMS") return 0 option = CcKvOption( write_rcache=EmsEnv.ems_enable_write_rcache, read_local_only=EmsEnv.ems_enable_read_local_only, timeout=EmsEnv.ems_timeout ) try: cc_result = self.context_caching.exists(hashes=block_hashes, option=option) return cc_result.success except EmsException as e: logger.debug(f"[EMS][Exist] fallback to len(hashes) due to: {e}") return len(block_hashes) def async_load(self, req_id: str, block_hashes: List[int], block_ids: List[int], num_computed_blocks: int) -> None: future = None submit_time = time.perf_counter() if not self._check_params(req_id, block_hashes, block_ids, "AsyncLoad"): self.load_futures[req_id] = (future, submit_time, num_computed_blocks, block_ids) return option = CcKvOption( write_rcache=EmsEnv.ems_enable_write_rcache, read_local_only=EmsEnv.ems_enable_read_local_only, timeout=EmsEnv.ems_timeout ) offsets = self._cal_block_offsets(block_ids, self.block_size) slot_mapping = self._cal_block_slot_mapping(block_ids, self.block_size) try: future = self.context_caching.async_load( slot_mapping=slot_mapping, hashes=block_hashes, offsets=offsets, option=option ) logger.info(f"[EMS][AsyncLoad][start] req_id={req_id} timeout_ms={EmsEnv.ems_timeout} " f"planned_blocks={len(block_ids)} first_block_hash={block_hashes[0]}, future=submitted") except EmsException as e: self._process_exception(e) logger.error(f"[EMS][AsyncLoad][error] req_id={req_id} timeout_ms={EmsEnv.ems_timeout} ..., err={e}") self.load_futures[req_id] = (future, submit_time, num_computed_blocks, block_ids) def async_save(self, req_id: str, block_hashes: List[int], block_ids: List[int]) -> None: future = None submit_time = time.perf_counter() if not self._check_params(req_id, block_hashes, block_ids, "AsyncSave"): self.save_futures[req_id] = (future, submit_time) return option = CcKvOption( write_rcache=EmsEnv.ems_enable_write_rcache, read_local_only=EmsEnv.ems_enable_read_local_only, timeout=EmsEnv.ems_timeout ) offsets = self._cal_block_offsets(block_ids, self.block_size) slot_mapping = self._cal_block_slot_mapping(block_ids, self.block_size) try: future = self.context_caching.async_save( slot_mapping=slot_mapping, hashes=block_hashes, offsets=offsets, option=option ) logger.info(f"[EMS][AsyncSave][start] req_id={req_id} timeout_ms={EmsEnv.ems_timeout} " f"planned_blocks={len(block_ids)} first_block_hash={block_hashes[0]}, future=submitted") except EmsException as e: self._process_exception(e) logger.error(f"[EMS][AsyncSave][error] req_id={req_id} timeout_ms={EmsEnv.ems_timeout} ..., err={e}") self.save_futures[req_id] = (future, submit_time) def get_finished_load_reqs(self) -> Set[str]: finished_reqs = set() for req_id, (future, submit_time, num_computed_blocks, block_ids) in list(self.load_futures.items()): if future and not self.context_caching.is_ready(future): continue try: if future: result = self.context_caching.get_result(future) cost_ms = 1e3 * (time.perf_counter() - submit_time) logger.info(f"[EMS][GetResult] Req {req_id} async load done, success_blocks={result.success}, " f"total_blocks_num={result.total}, status=SUCCESS, cost_ms={cost_ms:.2f}") self.task_manager.update_req_stat("LOAD", result.success, cost_ms) self.task_manager.update_hit_stat(result.success, result.total) if result.success < len(block_ids): failed_part = block_ids[result.success:] self.failed_block_ids.update(failed_part) else: cost_ms = 1e3 * (time.perf_counter() - submit_time) logger.info(f"[EMS][GetResult] Req {req_id} async load done, success_blocks={0}, " f"total_blocks_num={0}, status=EMS_INTERNAL_ERROR, cost_ms={cost_ms:.2f}") self.task_manager.update_req_stat("LOAD", 0, cost_ms) self.failed_block_ids.update(block_ids) except EmsException as e: cost_ms = 1e3 * (time.perf_counter() - submit_time) self.failed_block_ids.update(block_ids) self._process_exception(e) logger.info(f"[EMS][GetResult] Req {req_id} async load done, success_blocks={0}, " f"total_blocks_num={0}, status={e.status_code().name}, cost_ms={cost_ms:.2f}") finished_reqs.add(req_id) self.load_futures.pop(req_id) return finished_reqs def get_block_ids_with_load_errors(self) -> Set[int]: failed_block_ids = self.failed_block_ids self.failed_block_ids = set() return failed_block_ids def get_finished_save_reqs(self) -> Set[str]: finished_reqs = set() for req_id, (future, submit_time) in list(self.save_futures.items()): if future and not self.context_caching.is_ready(future): continue try: if future: result = self.context_caching.get_result(future) cost_ms = 1e3 * (time.perf_counter() - submit_time) logger.info(f"[EMS][GetResult] Req {req_id} async save done, success_blocks={result.success}, " f"total_blocks_num={result.total}, status=SUCCESS, cost_ms={cost_ms:.2f}") self.task_manager.update_req_stat("SAVE", result.success, cost_ms) else: cost_ms = 1e3 * (time.perf_counter() - submit_time) logger.info(f"[EMS][GetResult] Req {req_id} async save done, success_blocks={0}, " f"total_blocks_num={0}, status=EMS_INTERNAL_ERROR, cost_ms={cost_ms:.2f}") self.task_manager.update_req_stat("SAVE", 0, cost_ms) except EmsException as e: cost_ms = 1e3 * (time.perf_counter() - submit_time) self._process_exception(e) logger.info(f"[EMS][GetResult] Req {req_id} async save done, success_blocks={0}, " f"total_blocks_num={0}, status={e.status_code().name}, cost_ms={cost_ms:.2f}") finished_reqs.add(req_id) self.save_futures.pop(req_id) return finished_reqs def sync_save_reqs(self) -> None: for req_id, (future, submit_time) in self.save_futures.items(): if future is None: continue try: result = self.context_caching.get_result(future) cost_ms = 1e3 * (time.perf_counter() - submit_time) logger.info(f"[EMS][GetResult] Req {req_id} async save done, success_blocks={result.success}, " f"total_blocks_num={result.total}, status=SUCCESS, cost_ms={cost_ms:.2f}") self.task_manager.update_req_stat("SAVE", result.success, cost_ms) except EmsException as e: cost_ms = 1e3 * (time.perf_counter() - submit_time) self._process_exception(e) logger.info(f"[EMS][GetResult] Req {req_id} async save done, success_blocks={0}, " f"total_blocks_num={0}, status={e.status_code().name}, cost_ms={cost_ms:.2f}") self.save_futures.clear() def _check_health(self) -> bool: is_health = Ems.check_health() if is_health: logger.info("[EMS] EMS health status is ok.") else: logger.info("[EMS] EMS health status is abnormal.") if not self._inited and is_health and self._need_reinit: logger.info(f"[EMS][Init] re-init during health check.") self.init_ems() if self._inited: logger.info("[EMS][Init] re-init succeed during health check.") if (not self._registered) and (self._pending_kv_caches is not None): self.register_kv_caches(self._pending_kv_caches) logger.info(f"[EMS][Register] re-register pending kvcache succeed.") return is_health def _check_params(self, req_id: str, block_hashes: List[int], block_ids: List[int], called_at: str) -> bool: if not self._inited or not self._registered: logger.warning(f"[EMS][{called_at}][degraded] req_id={req_id} reason=EMS not ready.") return False if not self.task_manager.get_status(): logger.warning(f"[EMS][{called_at}][skip] req_id={req_id} reason=Unhealthy EMS") return False if (len(block_hashes) == 0 or len(block_ids) == 0) or (len(block_hashes) != len(block_ids)): logger.error(f"[EMS] req {req_id} has invalid block_hashes or block_ids: " f"len(block_hashes) == {len(block_hashes)}, len(block_ids) == {len(block_ids)}.") return False return True def _process_exception(self, e: EmsException) -> None: if e.status_code() == EmsErrorCode.EMS_INVALID_ARGUMENT: return self.task_manager.reset_status() class PeriodicTaskManager: HEALTH_CHECK_INTERVAL = 10 # Health check interval, in seconds FLAPPING_WINDOW = 60 # Flapping detection window, in seconds FLAPPING_LIMIT = 3 # Maximum number of status changes allowed within the window SUCCESS_THRESHOLD = 3 # Number of consecutive successful checks required for recovery PRINT_INTERVAL = 30 # Check interval, in seconds def __init__(self, check_fn): self.check_fn = check_fn self._ems_ok = False self._consecutive_success_count = 0 # Status changes recorded by flapping detection within a sliding window self._change_history: deque = deque() self.last_log_time = time.perf_counter() self.stat = { # [sum, min, max], init min with a large number: 2**30 (int) or 1e9 (float) "LOAD": {"count": 0, "block_nums": [0, 2**30, 0], "cost_times": [0.0, 1e9, 0.0]}, "SAVE": {"count": 0, "block_nums": [0, 2**30, 0], "cost_times": [0.0, 1e9, 0.0]}, "HIT": {"num_hit_blocks": 0, "num_total_blocks": 0}, } self.thread_lock = threading.Lock() self.start_loop() def get_status(self) -> bool: return self._ems_ok def reset_status(self) -> None: if not self._ems_ok: return logger.info(f"[EMS] Ems health status reset to False") self._ems_ok = False def check_health_status(self) -> None: is_healthy = False try: is_healthy = self.check_fn() except Exception as e: logger.error(f"[EMS] EMS health check failed, error: {e}.") self._process_check_result(is_healthy) def _process_check_result(self, is_healthy: bool): if is_healthy: self._handle_success() else: self._handle_failure() def _handle_success(self): self._consecutive_success_count += 1 if not self._ems_ok: if self._consecutive_success_count >= self.SUCCESS_THRESHOLD: # An attempt to transition the health status to True is intercepted by flapping detection. if self._try_switch_status(new_status=True): pass else: # The attempt is blocked by flapping detection, and the status change counter is reset. self._consecutive_success_count = 0 else: self._consecutive_success_count = min(self._consecutive_success_count, self.SUCCESS_THRESHOLD) def _handle_failure(self): self._consecutive_success_count = 0 if self._ems_ok: # An attempt to transition the health status to False bypasses flapping detection and triggers an immediate failover. self._try_switch_status(new_status=False) def _try_switch_status(self, new_status: bool) -> bool: current_time = time.monotonic() # 1. Clear the sliding window. self._clean_flapping_history(current_time) # 2. Evaluate flapping conditions. is_flapping = len(self._change_history) >= self.FLAPPING_LIMIT if is_flapping: if new_status is True: # An attempt to restore service is blocked while flapping is active. logger.warning( f"[EMS] EMS status flapping detected ({len(self._change_history)} changes in {self.FLAPPING_WINDOW}s). " f"Blocking recovery (switch to True)." ) return False else: logger.info( f"[EMS] EMS status flapping detected, but forcing status to False (Fail Fast strategy)." ) # 3. Execute the switchover. logger.info(f"[EMS] EMS health status changing: {self._ems_ok} -> {new_status}") self._ems_ok = new_status # 4. Record the state change. self._change_history.append(current_time) return True def _clean_flapping_history(self, current_time: float): threshold_time = current_time - self.FLAPPING_WINDOW while self._change_history and self._change_history[0] < threshold_time: self._change_history.popleft() def update_req_stat(self, event: str, block_num: int, cost_time: float) -> None: with self.thread_lock: self.stat[event]["count"] += 1 self.stat[event]["block_nums"][0] += block_num self.stat[event]["block_nums"][1] = min(block_num, self.stat[event]["block_nums"][1]) self.stat[event]["block_nums"][2] = max(block_num, self.stat[event]["block_nums"][2]) self.stat[event]["cost_times"][0] += cost_time self.stat[event]["cost_times"][1] = min(cost_time, self.stat[event]["cost_times"][1]) self.stat[event]["cost_times"][2] = max(cost_time, self.stat[event]["cost_times"][2]) def update_hit_stat(self, num_hit_blocks: int, num_total_blocks: int) -> None: with self.thread_lock: self.stat["HIT"]["num_hit_blocks"] += num_hit_blocks self.stat["HIT"]["num_total_blocks"] += num_total_blocks def print_stat(self) -> None: with self.thread_lock: stat = self.stat self.stat = { # [sum, min, max], init min with a large number: 2**30 (int) or 1e9 (float) "LOAD": {"count": 0, "block_nums": [0, 2**30, 0], "cost_times": [0.0, 1e9, 0.0]}, "SAVE": {"count": 0, "block_nums": [0, 2**30, 0], "cost_times": [0.0, 1e9, 0.0]}, "HIT": {"num_hit_blocks": 0, "num_total_blocks": 0}, } load = stat["LOAD"]["count"] avg_load_block_num = stat["LOAD"]["block_nums"][0] / load if load else 0.0 min_load_block_num = stat["LOAD"]["block_nums"][1] if load else 0 max_load_block_num = stat["LOAD"]["block_nums"][2] if load else 0 avg_load_cost_time = stat["LOAD"]["cost_times"][0] / load if load else 0.0 min_load_cost_time = stat["LOAD"]["cost_times"][1] if load else 0.0 max_load_cost_time = stat["LOAD"]["cost_times"][2] if load else 0.0 save = stat["SAVE"]["count"] avg_save_block_num = stat["SAVE"]["block_nums"][0] / save if save else 0.0 min_save_block_num = stat["SAVE"]["block_nums"][1] if save else 0 max_save_block_num = stat["SAVE"]["block_nums"][2] if save else 0 avg_save_cost_time = stat["SAVE"]["cost_times"][0] / save if save else 0.0 min_save_cost_time = stat["SAVE"]["cost_times"][1] if save else 0.0 max_save_cost_time = stat["SAVE"]["cost_times"][2] if save else 0.0 num_hit_blocks = stat["HIT"]["num_hit_blocks"] num_total_blocks = stat["HIT"]["num_total_blocks"] hit_rate = 100 * (num_hit_blocks / num_total_blocks) if num_total_blocks else 0.0 logger.info(f"[EMS][{self.PRINT_INTERVAL}Sec Summary] req(load={load},save={save}) " f"LOAD(avg_block_num={avg_load_block_num:.1f} [min:{min_load_block_num}, max:{max_load_block_num}]; " f"avg_cost_ms={avg_load_cost_time:.1f} [min:{min_load_cost_time:.1f}, max:{max_load_cost_time:.1f}]) " f"SAVE(avg_block_num={avg_save_block_num:.1f} [min:{min_save_block_num}, max:{max_save_block_num}]; " f"avg_cost_ms={avg_save_cost_time:.1f} [min:{min_save_cost_time:.1f}, max:{max_save_cost_time:.1f}]) " f"HIT(hit_rate={hit_rate:.1f} [hit:{num_hit_blocks}, total:{num_total_blocks}])") def task_loop(self) -> None: while True: time.sleep(self.HEALTH_CHECK_INTERVAL) self.check_health_status() cur_time = time.perf_counter() if cur_time - self.last_log_time > self.PRINT_INTERVAL: self.last_log_time = cur_time self.print_stat() def start_loop(self) -> None: logger.info("[EMS] EMS start periodic tasks.") self.check_health_status() self.periodic_task_thread = threading.Thread(target=self.task_loop, daemon=True, name="ems_task_loop") self.periodic_task_thread.start() logger.info(f"[EMS] periodic tasks subthread \"ems_task_loop\" started.") - ems_env.py
vi ems_env.py
File content:
# Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved import os class EmsEnv: llm_engine = os.environ.get("LLM_ENGINE", "vllm") model_id = os.environ.get("MODEL_ID", "cc_kvstore@_@ds_default_ns_001") service_name = os.environ.get("SERVICE_NAME", "deepseek") access_id = os.environ.get("ACCELERATE_ID", "cc_kvstore@_@ds_default_ns_001") access_key = os.environ.get("ACCELERATE_KEY", "") ems_timeout: int = int(os.environ.get("EMS_TIMEOUT", "5000")) ems_enable_write_rcache: bool = os.environ.get("EMS_ENABLE_WRITE_RCACHE", "1") == "1" ems_enable_read_local_only: bool = os.environ.get("EMS_ENABLE_READ_LOCAL_ONLY", "0") == "1" ems_num_min_reuse_tokens: int = int(os.environ.get("EMS_NUM_MIN_REUSE_TOKENS", "2048")) ems_num_min_load_blocks: int = int(os.environ.get("EMS_NUM_MIN_LOAD_BLOCKS", "1")) ems_lookup_key_server_ip = os.environ.get("EMS_LOOKUP_KEY_SERVER_IP", "127.0.0.1") ems_lookup_key_server_base_port = os.environ.get("EMS_LOOKUP_KEY_SERVER_BASE_PORT", "50005") - ems_store_connector.py
vi ems_store_connector.py
File content:
import os import threading from dataclasses import dataclass from typing import Any, Optional, Tuple, List, Set, Dict import torch import zmq from vllm.distributed import get_tp_group from vllm.v1.attention.backend import AttentionMetadata from vllm.config import VllmConfig from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorBase_V1, KVConnectorMetadata, KVConnectorRole from vllm.forward_context import ForwardContext from vllm.logger import logger from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.request import Request from vllm_ascend.distributed.ems_store.ems_adapter import EmsAdapter from vllm_ascend.distributed.ems_store.ems_env import EmsEnv @dataclass class CcReqMeta: req_id: str num_computed_blocks: int num_total_blocks: int block_hashes: List[int] block_ids: Optional[List[int]] operation: str @dataclass class CcConnectorMetadata(KVConnectorMetadata): def __init__(self): self.requests: List[CcReqMeta] = [] def add_requests(self, req_meta: CcReqMeta): self.requests.append(req_meta) class EmsStoreConnector(KVConnectorBase_V1): def __init__(self, vllm_config: VllmConfig, role: KVConnectorRole, kv_cache_config=None ): super().__init__( vllm_config=vllm_config, role=role, kv_cache_config=kv_cache_config ) if role == KVConnectorRole.SCHEDULER: self.connector_scheduler = CcConnectorScheduler(vllm_config) if role == KVConnectorRole.WORKER: self.connector_worker = CcConnectorWorker(vllm_config) if get_tp_group().rank_in_group == 0: self.lookup_server = LookupKeyServer(vllm_config.parallel_config.data_parallel_index, self.connector_worker) ############################################################ # Scheduler Side Methods ############################################################ def get_num_new_matched_tokens(self, request: Request, num_computed_tokens: int) -> Tuple[int, bool]: return self.connector_scheduler.get_num_new_matched_tokens(request, num_computed_tokens) def update_state_after_alloc(self, request: Request, blocks: KVCacheBlocks, num_external_tokens: int): self.connector_scheduler.update_state_after_alloc(request, blocks, num_external_tokens) def build_connector_meta(self, scheduler_output: SchedulerOutput) -> KVConnectorMetadata: return self.connector_scheduler.build_connector_meta(scheduler_output) def request_finished(self, request: Request, block_ids: List[int]) -> Tuple[bool, Dict[str, Any] | None]: return self.connector_scheduler.request_finished(request, block_ids) ############################################################ # Worker Side Methods ############################################################ def register_kv_caches(self, kv_caches: Dict[str, torch.Tensor]): self.connector_worker.register_kv_caches(kv_caches) def start_load_kv(self, forward_context: ForwardContext, **kwargs): self.connector_worker.start_load_kv(self._get_connector_metadata()) def wait_for_layer_load(self, layer_name: str): pass def save_kv_layer(self, layer_name: str, kv_layer: torch.Tensor, attn_metadata: "AttentionMetadata", **kwargs): pass def wait_for_save(self): self.connector_worker.wait_for_save(self._get_connector_metadata()) def get_finished(self, finished_req_ids: Set[str]) -> Tuple[Set[str] | None, Set[str] | None]: return self.connector_worker.get_finished(finished_req_ids) def get_block_ids_with_load_errors(self) -> Set[int]: return self.connector_worker.get_block_ids_with_load_errors() class CcConnectorScheduler: def __init__(self, vllm_config: VllmConfig): logger.info(f"[EMS] CcConnectorScheduler init.") self.block_size = vllm_config.cache_config.block_size self.lookup_client = LookupKeyClient(vllm_config.parallel_config.data_parallel_index) self.processed_requests: Set[str] = set() self.meta_load_reqs: Dict[str, CcReqMeta] = {} self.need_save_req_ids: Set[str] = set() def get_num_new_matched_tokens(self, request: Request, num_computed_tokens: int) -> Tuple[int, bool]: if request.request_id in self.processed_requests: logger.info(f"[EMS][Scheduler] req {request.request_id} already in processed requests.") return 0, False num_computed_blocks = num_computed_tokens // self.block_size num_total_blocks = (len(request.prompt_token_ids) - 1) // self.block_size if not self._need_load(num_computed_blocks, num_total_blocks): logger.info(f"[EMS][Scheduler] req {request.request_id} no need to load, num_computed_blocks: {num_computed_blocks}, " f"num_total_blocks: {num_total_blocks}.") return 0, False block_hashes = self._cal_block_hashes( request.prompt_token_ids, self.block_size )[num_computed_blocks:num_total_blocks] num_exist_blocks = self.lookup_client.lookup(request.request_id, block_hashes) num_total_blocks = num_computed_blocks + num_exist_blocks block_hashes = block_hashes[:num_exist_blocks] if not self._need_load(num_computed_blocks, num_total_blocks): logger.info(f"[EMS][Scheduler] req {request.request_id} num_total_blocks is updated,still no need to load, num_computed_blocks: {num_computed_blocks}, " f"num_computed_blocks + num_exist_blocks: {num_total_blocks}.") return 0, False req_meta = CcReqMeta( req_id=request.request_id, num_computed_blocks=num_computed_blocks, num_total_blocks=num_total_blocks, block_hashes=block_hashes, block_ids=None, operation="no-op" ) self.meta_load_reqs[request.request_id] = req_meta logger.debug(f"[EMS][Scheduler] req {request.request_id} meta: {req_meta}.") num_new_matched_tokens = (num_total_blocks - num_computed_blocks) * self.block_size logger.info(f"[EMS][Scheduler] matched result, req_id={request.request_id}, " f"num_computed_blocks={num_computed_blocks}, " f"num_exist_blocks={num_exist_blocks}, new num_computed_blocks={num_total_blocks}, " f"num_new_matched_tokens={num_new_matched_tokens}, prompt_len={len(request.prompt_token_ids)}") return num_new_matched_tokens, True def update_state_after_alloc(self, request: Request, blocks: KVCacheBlocks, num_external_tokens: int): if request.request_id not in self.meta_load_reqs: logger.debug(f"[EMS][Scheduler] req {request.request_id} not in meta_load_reqs.") return if num_external_tokens == 0: logger.debug(f"[EMS][Scheduler] req {request.request_id} num_external_tokens is 0, no need to update block ids.") self.meta_load_reqs[request.request_id].operation = "no-op" return req_meta = self.meta_load_reqs[request.request_id] all_block_ids = blocks.get_block_ids()[0] req_meta.num_total_blocks = min(req_meta.num_total_blocks, len(all_block_ids)) req_meta.block_ids = all_block_ids[req_meta.num_computed_blocks:req_meta.num_total_blocks] req_meta.block_hashes = req_meta.block_hashes[:len(req_meta.block_ids)] req_meta.operation = "load" self.processed_requests.add(request.request_id) logger.info(f"[EMS][Scheduler] req {request.request_id} update block ids, meta: {req_meta}.") def build_connector_meta(self, scheduler_output: SchedulerOutput) -> KVConnectorMetadata: connector_meta = CcConnectorMetadata() for req_id, req_meta in self.meta_load_reqs.items(): if req_meta.operation != "load": continue connector_meta.add_requests(req_meta) self.meta_load_reqs.clear() for new_req in scheduler_output.scheduled_new_reqs: num_total_tokens = scheduler_output.num_scheduled_tokens[new_req.req_id] + new_req.num_computed_tokens num_computed_blocks = new_req.num_computed_tokens // self.block_size num_total_blocks = num_total_tokens // self.block_size if num_computed_blocks < num_total_blocks: block_hashes = self._cal_block_hashes(new_req.prompt_token_ids, self.block_size)[ num_computed_blocks:num_total_blocks] block_ids = new_req.block_ids[0][num_computed_blocks:num_total_blocks] req_meta = CcReqMeta( req_id=new_req.req_id, num_computed_blocks=num_computed_blocks, num_total_blocks=num_total_blocks, block_hashes=block_hashes, block_ids=block_ids, operation="save" ) logger.debug(f"[EMS] req {new_req.req_id} need save, meta: {req_meta}.") connector_meta.add_requests(req_meta) self.need_save_req_ids.add(new_req.req_id) return connector_meta def request_finished(self, request: Request, block_ids: List[int], ) -> Tuple[bool, Optional[Dict[str, Any]]]: if request.request_id in self.processed_requests: self.processed_requests.remove(request.request_id) need_save = request.request_id in self.need_save_req_ids self.need_save_req_ids.discard(request.request_id) logger.debug(f"[EMS][Scheduler] request finished, req {request.request_id}, " f"need_save={need_save}") return need_save, None def _need_load(self, num_computed_blocks: int, num_total_blocks: int) -> bool: # Blocks are not loaded if their length is less than ems_num_min_reuse_tokens. if num_total_blocks * self.block_size < EmsEnv.ems_num_min_reuse_tokens: return False # Blocks are not loaded if the number of load blocks is less than ems_num_min_load_blocks. if num_total_blocks - num_computed_blocks <= EmsEnv.ems_num_min_load_blocks: return False return True def _cal_block_hashes(self, token_ids: List[int], block_size) -> List[int]: result: List[int] = [] prev_block_hash = 0 num_blocks = len(token_ids) // block_size for block_id in range(num_blocks): block_hash = self._cal_block_hash(token_ids[block_id * block_size:(block_id + 1) * block_size], prev_block_hash) result.append(block_hash) prev_block_hash = block_hash return result def _cal_block_hash(self, block_token_ids: List[int], prev_block_hash: int) -> int: return hash((prev_block_hash, *block_token_ids)) class CcConnectorWorker: def __init__(self, vllm_config: VllmConfig): self.ems_adapter = EmsAdapter(vllm_config) self.finished_req_ids = set() self.finished_save_req_ids = set() def register_kv_caches(self, kv_caches: Dict[str, Tuple[torch.Tensor]]): return self.ems_adapter.register_kv_caches(kv_caches) def start_load_kv(self, metadata: CcConnectorMetadata): for request in metadata.requests: if request.operation != "load": continue self.ems_adapter.async_load( request.req_id, request.block_hashes, request.block_ids, request.num_computed_blocks ) def wait_for_save(self, metadata: CcConnectorMetadata): for request in metadata.requests: if request.operation != "save": continue import torch_npu torch_npu.npu.synchronize() self.ems_adapter.async_save( request.req_id, request.block_hashes, request.block_ids ) def get_finished(self, finished_req_ids: Set[str]) -> Tuple[Set[str], Set[str]]: self.finished_req_ids.update(finished_req_ids) self.finished_save_req_ids.update(self.ems_adapter.get_finished_save_reqs()) finished_save_req_ids = self.finished_req_ids & self.finished_save_req_ids self.finished_req_ids -= finished_save_req_ids self.finished_save_req_ids -= finished_save_req_ids finished_load_req_ids = self.ems_adapter.get_finished_load_reqs() if finished_save_req_ids or finished_load_req_ids: logger.info(f"[EMS][Worker] finished_save_req_ids: {finished_save_req_ids}, " f"finished_load_req_ids: {finished_load_req_ids}.") return finished_save_req_ids, finished_load_req_ids def get_block_ids_with_load_errors(self) -> Set[int]: block_ids_with_load_errors = self.ems_adapter.get_block_ids_with_load_errors() if block_ids_with_load_errors: logger.info(f"[EMS][Worker] block_ids_with_load_errors: {block_ids_with_load_errors}") return block_ids_with_load_errors def _lookup_block_hashes(self, req_id: str, block_hashes: List[int]) -> int: return self.ems_adapter.exists_block_num(req_id, block_hashes) class LookupKeyServer: def __init__(self, data_parallel_rank, worker_connector): self.worker_connector = worker_connector self.zmq_ctx = zmq.Context() self.socket = self.zmq_ctx.socket(zmq.REP) port = int(EmsEnv.ems_lookup_key_server_base_port) + data_parallel_rank self.addr = f"tcp://{EmsEnv.ems_lookup_key_server_ip}:{port}" self.socket.bind(self.addr) def process_lookup(): while True: req_id, block_hashes = self.socket.recv_pyobj() num_exist_blocks = self.worker_connector._lookup_block_hashes(req_id, block_hashes) self.socket.send_pyobj(num_exist_blocks) self.thread = threading.Thread(target=process_lookup, daemon=True, name="ems_exist_proxy") self.thread.start() class LookupKeyClient: def __init__(self, data_parallel_rank): self.zmq_ctx = zmq.Context() self.socket = self.zmq_ctx.socket(zmq.REQ) port = int(EmsEnv.ems_lookup_key_server_base_port) + data_parallel_rank self.addr = f"tcp://{EmsEnv.ems_lookup_key_server_ip}:{port}" self.socket.connect(self.addr) def lookup(self, req_id: str, block_hashes: List[int]) -> int: self.socket.send_pyobj([req_id, block_hashes]) num_exist_blocks = self.socket.recv_pyobj() return num_exist_blocks
- __init__.py
- Create the kv_transfer folder. Then, create __init__.py inside it.
mkdir kv_transfer vi __init__.py
__init__.py content:
# # Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. # This file is a part of the vllm-ascend project. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory def register_connector(): # override multi_connector as ascend_multi_connector if "MultiConnector" in KVConnectorFactory._registry: KVConnectorFactory._registry.pop("MultiConnector") KVConnectorFactory.register_connector( "MultiConnector", "vllm_ascend.distributed.kv_transfer.ascend_multi_connector", "AscendMultiConnector" ) KVConnectorFactory.register_connector( "MooncakeConnectorV1", "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector", "MooncakeConnector" ) KVConnectorFactory.register_connector( "MooncakeConnectorStoreV1", "vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.ascend_store_connector", "AscendStoreConnector", ) KVConnectorFactory.register_connector( "AscendStoreConnector", "vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.ascend_store_connector", "AscendStoreConnector", ) KVConnectorFactory.register_connector( "MooncakeLayerwiseConnector", "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector", "MooncakeLayerwiseConnector", ) KVConnectorFactory.register_connector( "UCMConnector", "vllm_ascend.distributed.kv_transfer.kv_pool.ucm_connector", "UCMConnectorV1" ) KVConnectorFactory.register_connector( "LMCacheAscendConnector", "vllm_ascend.distributed.kv_transfer.kv_pool.lmcache_ascend_connector", "LMCacheConnectorV1", ) KVConnectorFactory.register_connector( "EmsStoreConnector", "vllm_ascend.distributed.ems_store.ems_store_connector", "EmsStoreConnector", )
- Download the OpenSSL and Cyrus SASL installation packages.
- Create an image.
- Create a Dockerfile.
FROM quay.io/ascend/vllm-ascend:v0.18.0-a3 # Install OpenSSL 1.1.1w from source COPY openssl-1.1.1w.tar.gz . RUN tar xzvf openssl-1.1.1w.tar.gz \ && cd openssl-1.1.1w \ && ./config \ && make \ && make install \ && ln -sf /usr/local/lib/libssl.so.1.1 /usr/lib/aarch64-linux-gnu/libssl.so.1.1 \ && ln -sf /usr/local/lib/libcrypto.so.1.1 /usr/lib/aarch64-linux-gnu/libcrypto.so.1.1 \ && cd .. \ && rm -rf openssl-1.1.1w openssl-1.1.1w.tar.gz # Install cyrus-sasl 2.1.28 from source COPY cyrus-sasl-2.1.28.tar.gz . RUN tar xzvf cyrus-sasl-2.1.28.tar.gz \ && cd cyrus-sasl-2.1.28 \ && ./configure \ && make \ && make install \ && ln -sf /usr/local/lib/libsasl2.so.3 /usr/lib/aarch64-linux-gnu/libsasl2.so.3 \ && ln -sf /usr/local/lib/sasl2 /usr/lib/aarch64-linux-gnu/sasl2 \ && cd .. \ && rm -rf cyrus-sasl-2.1.28 cyrus-sasl-2.1.28.tar.gz RUN mkdir -p /vllm-workspace/vllm-ascend/vllm_ascend/distributed/ems_store/ COPY ./ems_store/__init__.py /vllm-workspace/vllm-ascend/vllm_ascend/distributed/ems_store/ COPY ./ems_store/ems_store_connector.py /vllm-workspace/vllm-ascend/vllm_ascend/distributed/ems_store/ COPY ./ems_store/ems_env.py /vllm-workspace/vllm-ascend/vllm_ascend/distributed/ems_store/ COPY ./ems_store/ems_adapter.py /vllm-workspace/vllm-ascend/vllm_ascend/distributed/ems_store/ COPY ./kv_transfer/__init__.py /vllm-workspace/vllm-ascend/vllm_ascend/distributed/kv_transfer/ COPY ems-26.3.0-cp311-cp311-linux_aarch64.whl . RUN pip install --no-deps ems-26.3.0-cp311-cp311-linux_aarch64.whl \ && rm ems-26.3.0-cp311-cp311-linux_aarch64.whl - Build the image.
docker build -t vllm-ascend-ems:v0.18.0-a3 .
- Create a Dockerfile.
- Create a ConfigMap containing the startup script. Adjust startup script parameters based on the selected model. For details, see vllm-ascend.
kubectl apply -f config.yaml
The following sample deployment template uses nic_name. Obtain this value by running ip route | grep default.
kind: ConfigMap apiVersion: v1 metadata: name: kimi-k25-pd-cm data: prefill.sh: | nic_name="enp23s0f3" # network card name local_ip=$POD_IP export HCCL_IF_IP=$local_ip export GLOO_SOCKET_IFNAME=$nic_name export TP_SOCKET_IFNAME=$nic_name export HCCL_SOCKET_IFNAME=$nic_name export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libjemalloc.so.2:$LD_PRELOAD sysctl -w vm.swappiness=0 sysctl -w kernel.numa_balancing=0 sysctl kernel.sched_migration_cost_ns=50000 export VLLM_RPC_TIMEOUT=3600000 export VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS=30000 export HCCL_OP_EXPANSION_MODE="AIV" export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True export OMP_PROC_BIND=false export OMP_NUM_THREADS=1 export TASK_QUEUE_ENABLE=1 export ASCEND_BUFFER_POOL=4:8 export HCCL_BUFFSIZE=256 export VLLM_ASCEND_ENABLE_FLASHCOMM1=1 # --- EMS Configuration Exports --- export EMS_NUM_MIN_REUSE_TOKENS=0 export EMS_NUM_MIN_LOAD_BLOCKS=0 export EMS_BLOCK_GROUP_SIZE=2 export LD_LIBRARY_PATH=/usr/local/python3.11.14/lib/python3.11/site-packages/ems/lib:$LD_LIBRARY_PATH export EMS_LOOKUP_KEY_SERVER_BASE_PORT=60687 vllm serve $MODEL_LOCATION \ --host $POD_IP \ --port "7100" \ --data-parallel-size 2 \ --data-parallel-address $POD_IP \ --data-parallel-rpc-port 12321 \ --tensor-parallel-size 8 \ --enable-expert-parallel \ --seed 1024 \ --quantization ascend \ --served-model-name kimi_k25 \ --trust-remote-code \ --max-num-seqs 8 \ --max-model-len 32768 \ --max-num-batched-tokens 16384 \ --disable-hybrid-kv-cache-manager \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.8 \ --enforce-eager \ --speculative-config '{"method": "eagle3", "model":"/models/kimi-k2.5-eagle3", "num_speculative_tokens": 3}' \ --additional-config '{"recompute_scheduler_enable":true}' \ --mm-encoder-tp-mode data \ --kv-transfer-config \ '{ "kv_connector": "EmsStoreConnector", "kv_role": "kv_producer" }' decode.sh: | nic_name="enp23s0f3" # network card name local_ip=$POD_IP export HCCL_IF_IP=$local_ip export GLOO_SOCKET_IFNAME=$nic_name export TP_SOCKET_IFNAME=$nic_name export HCCL_SOCKET_IFNAME=$nic_name export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libjemalloc.so.2:$LD_PRELOAD sysctl -w vm.swappiness=0 sysctl -w kernel.numa_balancing=0 sysctl kernel.sched_migration_cost_ns=50000 export VLLM_RPC_TIMEOUT=3600000 export VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS=30000 export HCCL_OP_EXPANSION_MODE="AIV" export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True export OMP_PROC_BIND=false export OMP_NUM_THREADS=1 export TASK_QUEUE_ENABLE=1 export ASCEND_BUFFER_POOL=4:8 export HCCL_BUFFSIZE=2200 export VLLM_ASCEND_ENABLE_MLAPO=0 # --- EMS Configuration Exports --- export EMS_NUM_MIN_REUSE_TOKENS=0 export EMS_NUM_MIN_LOAD_BLOCKS=0 export EMS_BLOCK_GROUP_SIZE=2 export LD_LIBRARY_PATH=/usr/local/python3.11.14/lib/python3.11/site-packages/ems/lib:$LD_LIBRARY_PATH export EMS_LOOKUP_KEY_SERVER_BASE_PORT=60687 vllm serve $MODEL_LOCATION \ --host $POD_IP \ --port "7100" \ --data-parallel-size 16 \ --data-parallel-address $POD_IP \ --data-parallel-rpc-port 12322 \ --tensor-parallel-size 1 \ --enable-expert-parallel \ --seed 1024 \ --quantization ascend \ --served-model-name kimi_k25 \ --trust-remote-code \ --max-num-seqs 48 \ --max-model-len 32768 \ --max-num-batched-tokens 256 \ --disable-hybrid-kv-cache-manager \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.95 \ --compilation-config '{"cudagraph_mode": "FULL_DECODE_ONLY", "cudagraph_capture_sizes":[4,8,16,32,48,64,80,96,112,128,144,160]}' \ --additional-config '{"recompute_scheduler_enable":true,"multistream_overlap_shared_expert": false}' \ --speculative-config '{"method": "eagle3", "model":"/models/kimi-k2.5-eagle3", "num_speculative_tokens": 3}' \ --kv-transfer-config \ '{ "kv_connector": "EmsStoreConnector", "kv_role": "kv_consumer" }' - Deploy the ModelServing resource.
kubectl apply -f kimi-k25-serv.yaml
The following shows a sample deployment template:
apiVersion: workload.serving.volcano.sh/v1alpha1 kind: ModelServing metadata: name: kimi-k25-pd namespace: default spec: schedulerName: volcano replicas: 1 plugins: - name: pod-discovery recoveryPolicy: ServingGroupRecreate template: restartGracePeriodSeconds: 60 roles: - name: prefill replicas: 1 workerReplicas: 0 entryTemplate: spec: hostNetwork: true containers: - name: prefill image: docker.io/library/vllm-ascend-ems:v0.18.0-a3 command: - /bin/bash args: - '-c' - cd /workspace && ./prefill.sh env: - name: ROLE value: "prefill" - name: GROUP_NAME valueFrom: fieldRef: fieldPath: metadata.labels['modelserving.volcano.sh/group-name'] - name: ROLE_ID valueFrom: fieldRef: fieldPath: metadata.labels['modelserving.volcano.sh/role-id'] - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP - name: NODE_IP valueFrom: fieldRef: fieldPath: status.hostIP - name: MODEL_LOCATION value: /models/Kimi-K2.5-w4a8 - name: TP_SIZE value: "8" - name: DP_SIZE value: "2" readinessProbe: httpGet: path: /health port: 7100 scheme: HTTP initialDelaySeconds: 60 periodSeconds: 10 timeoutSeconds: 2 failureThreshold: 3 resources: limits: cpu: '188' huawei.com/ascend-1980: '16' memory: 1800Gi requests: cpu: '64' huawei.com/ascend-1980: '16' memory: 700Gi ports: - containerPort: 7100 name: server volumeMounts: - name: model mountPath: /models - name: dshm mountPath: /dev/shm - name: ems-shm mountPath: /dev/shm/ems - name: hccn-conf mountPath: /etc/hccn.conf - name: hccn-tool mountPath: /usr/local/Ascend/driver/tools/hccn_tool - name: ascend-install-info mountPath: /etc/ascend_install.info - name: config mountPath: /workspace/prefill.sh subPath: prefill.sh volumes: - name: model hostPath: path: /mnt/paas/models type: Directory - name: dshm emptyDir: medium: Memory - name: ems-shm hostPath: path: /mnt/paas/kubernetes/kubelet/ems type: DirectoryOrCreate - name: hccn-conf hostPath: path: /etc/hccn.conf - name: hccn-tool hostPath: path: /usr/local/Ascend/driver/tools/hccn_tool - name: ascend-install-info hostPath: path: /etc/ascend_install.info - name: config configMap: name: kimi-k25-pd-cm defaultMode: 0777 - name: decode replicas: 1 workerReplicas: 0 entryTemplate: spec: hostNetwork: true containers: - name: decode image: docker.io/library/vllm-ascend-ems:v0.18.0-a3 command: - /bin/bash args: - '-c' - cd /workspace && ./decode.sh env: - name: ROLE value: "decode" - name: ENGINE_ID valueFrom: fieldRef: fieldPath: metadata.name - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP - name: NODE_IP valueFrom: fieldRef: fieldPath: status.hostIP - name: GROUP_NAME valueFrom: fieldRef: fieldPath: metadata.labels['modelserving.volcano.sh/group-name'] - name: ROLE_ID valueFrom: fieldRef: fieldPath: metadata.labels['modelserving.volcano.sh/role-id'] - name: MODEL_LOCATION value: /models/Kimi-K2.5-w4a8 - name: TP_SIZE value: "1" - name: DP_SIZE value: "16" readinessProbe: httpGet: path: /health port: 7100 scheme: HTTP initialDelaySeconds: 60 periodSeconds: 10 timeoutSeconds: 2 failureThreshold: 3 ports: - containerPort: 7100 name: server resources: limits: cpu: '188' huawei.com/ascend-1980: '16' memory: 1800Gi requests: cpu: '64' huawei.com/ascend-1980: '16' memory: 700Gi volumeMounts: - name: model mountPath: /models - name: dshm mountPath: /dev/shm - name: ems-shm mountPath: /dev/shm/ems - name: hccn-conf mountPath: /etc/hccn.conf - name: hccn-tool mountPath: /usr/local/Ascend/driver/tools/hccn_tool - name: ascend-install-info mountPath: /etc/ascend_install.info - name: config mountPath: /workspace/decode.sh subPath: decode.sh volumes: - name: model hostPath: path: /mnt/paas/models type: Directory - name: dshm emptyDir: medium: Memory - name: ems-shm hostPath: path: /mnt/paas/kubernetes/kubelet/ems type: DirectoryOrCreate - name: hccn-conf hostPath: path: /etc/hccn.conf - name: hccn-tool hostPath: path: /usr/local/Ascend/driver/tools/hccn_tool - name: ascend-install-info hostPath: path: /etc/ascend_install.info - name: config configMap: name: kimi-k25-pd-cm defaultMode: 0777 - Configure a proxy.
- Download the proxy script.
wget https://raw.githubusercontent.com/vllm-project/vllm-ascend/main/examples/disaggregated_prefill_v1/load_balance_proxy_server_example.py
- Obtain the pod IP address.
kubectl get pods -owide
The following is an example of the returned result:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kimi-k25-pd-0-decode-0-0 1/1 Running 0 9m57s 192.168.0.156 192.168.0.156 <none> <none> kimi-k25-pd-0-prefill-0-0 1/1 Running 0 9m57s 192.168.0.163 192.168.0.163 <none> <none>
- Start the proxy.
python3 load_balance_proxy_server_example.py \ --port 8181 \ --host 0.0.0.0 \ --prefiller-hosts 192.168.0.163 \ --prefiller-ports 7100 \ --decoder-hosts 192.168.0.156 \ --decoder-ports 7100
- Download the proxy script.
- Verify the deployment by sending a standard Chat API request via curl to confirm that the inference endpoint is functional.
- Send the request.
curl -X POST http://192.168.0.163:8181/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "kimi_k25", "messages": [ { "role": "user", "content": "Hello, how are you?" } ], "max_tokens": 100 }' - Check the returned result.
If the deployment is successful, the response returns JSON data containing the choices and usage fields.
{ "id": "chatcmpl-fd56a4a3-829f-49c7-9ffa-f890e8******", "object": "chat.completion", "created": 1779713507, "model": "kimi_k25", "choices": [ { "index": 0, "message": { "role": "assistant", "content": " The user is greeting me and asking how I am. This is a standard conversational opening. I should respond politely, let them know I'm doing well (as an AI, I don't have feelings, but it's social convention to respond positively), and invite them to continue the conversation or ask how I can help them today.\n\nI should keep it friendly, concise, and open-ended so they can tell me what they need help with. </think> Hello! I'm doing well, thank you for asking. How can I", "refusal": null, "annotations": null, "audio": null, "function_call": null, "tool_calls": [], "reasoning": null }, "logprobs": null, "finish_reason": "length", "stop_reason": null, "token_ids": null } ], "service_tier": null, "system_fingerprint": null, "usage": { "prompt_tokens": 14, "total_tokens": 114, "completion_tokens": 100, "prompt_tokens_details": null, "completion_tokens_details": null }, "prompt_logprobs": null, "prompt_token_ids": null, "kv_transfer_params": null } - Confirm logs on the proxy.
On the terminal where the proxy script is running, verify that the request is received and forwarded successfully.
INFO: Started server process [303949] INFO: Waiting for application startup. Initialized 1 prefill clients and 1 decode clients. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8181 (Press CTRL+C to quit) INFO: 192.168.0.163:37794 - "POST /v1/chat/completions HTTP/1.1" 200 OK
- Send the request.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot
