Updated on 2024-12-26 GMT+08:00

Creating a Custom Training Image (MindSpore + Ascend)

This section describes how to create an Ascend container image from scratch and use the image for training on ModelArts. The AI engine used in the image is MindSpore, and the resources used for training are powered by Ascend in a dedicated resource pool.

Description

Create a container image with the following configurations and use the image to create an Ascend-powered training job on ModelArts:

  • ubuntu-18.04
  • CANN 6.3.RC2 (commercial edition)
  • python-3.7.13
  • mindspore-2.1.1
  • CANN 6.3.RC2 and MindSpore 2.1.1 are used in the following examples.
  • These examples show how to create an Ascend container image and run the image in a dedicated resource pool with the required Ascend driver or firmware installed.

Constraints

  • This example requires the CANN commercial edition. If you do not have permission to download the CANN commercial edition, see other examples for creating a custom image.
  • Pay attention to the version mapping between MindSpore and CANN, and between CANN and Ascend driver or firmware. Unmatched versions will lead to a training failure.

Step 1 Creating an OBS Bucket and Folder

Create a bucket and folders in OBS for storing the sample dataset and training code. In this example, create a bucket named test-modelarts and folders listed in Table 1.

For details about how to create an OBS bucket and folder, see Creating a Bucket and Creating a Folder.

Ensure that the OBS directory you use and ModelArts are in the same region.

Table 1 Required OBS folders

Folder

Description

obs://test-modelarts/ascend/demo-code/

Store the Ascend training script.

obs://test-modelarts/ascend/demo-code/run_ascend/

Store the startup scripts of the Ascend training script.

obs://test-modelarts/ascend/log/

Store training log files.

Step 2 Preparing Script Files and Uploading Them to OBS

  1. Prepare the training script mindspore-verification.py and Ascend startup scripts (five in total) required in this example.

    The mindspore-verification.py and run_ascend.py scripts are invoked by the Boot Command parameter during training job creation. For details, see Boot Command.

    The common.py, rank_table.py, manager.py, and fmk.py scripts are invoked when the run_ascend.py script is running.

  2. Upload the training script mindspore-verification.py to obs://test-modelarts/ascend/demo-code/ in the OBS bucket.
  3. Upload the five Ascend startup scripts to the obs://test-modelarts/ascend/demo-code/run_ascend/ folder in the OBS bucket.

Step 3 Creating a Custom Image

The following describes how to create a custom image by writing a Dockerfile.

Create a container image with the following configurations and use the image to create a training job on ModelArts:

  • ubuntu-18.04
  • CANN 6.3.RC2 (commercial edition)
  • python-3.7.13
  • mindspore-2.1.1

Pay attention to the version mapping between MindSpore and CANN, and between CANN and Ascend driver or firmware. Unmatched versions will lead to a training failure.

These examples show how to create an Ascend container image and run the image in a dedicated resource pool with the required Ascend driver or firmware installed.

  1. Obtain a Linux AArch64 server running Ubuntu 18.04. Either an ECS or your local PC will do.

    For details about how to purchase an ECS, see Purchasing and Logging In to a Linux ECS. Set CPU Architecture to x86 and Image to Public image. Ubuntu 18.04 images are recommended.

  2. Install Docker.

    The following uses Linux AArch64 as an example to describe how to obtain a Docker installation package. For more details, see official Docker documents. Run the following commands to install Docker:

    curl -fsSL get.docker.com -o get-docker.sh
    sh get-docker.sh

    If the docker images command is executed, Docker has been installed. In this case, skip this step.

    Start Docker.
    systemctl start docker 
  3. Run the following command to check the Docker engine version:
    docker version | grep -A 1 Engine
    The command output is as follows:
     Engine:
      Version:          18.09.0

    Use the Docker engine of the preceding version or later to create a custom image.

  4. Create a folder named context.
    mkdir -p context
  5. Obtain the pip.conf file. In this example, the pip source provided by Huawei Mirrors is used, which is as follows:
    [global]
    index-url = https://repo.huaweicloud.com/repository/pypi/simple
    trusted-host = repo.huaweicloud.com
    timeout = 120

    To obtain pip.conf, go to Huawei Mirrors at https://mirrors.huaweicloud.com/home and search for pypi.

  6. Obtain the APT source file Ubuntu-Ports-bionic.list. In this example, the APT source provided at Huawei Mirrors is used. Run the following command to obtain the APT source file:
    wget -O Ubuntu-Ports-bionic.list https://repo.huaweicloud.com/repository/conf/Ubuntu-Ports-bionic.list

    To obtain the APT source file, go to Huawei Mirrors at https://mirrors.huaweicloud.com/home and search for Ubuntu-Ports.

  7. Download the CANN 6.3.RC2-linux aarch64 and mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl installation files.

    ModelArts supports only the commercial CANN edition.

  8. Download the Miniconda3 installation file.

    Download Miniconda3-py37-4.10.3 (Python 3.7.10) at https://repo.anaconda.com/miniconda/Miniconda3-py37_4.10.3-Linux-aarch64.sh.

  9. Store the pip source file, .run file, .whl file, and Miniconda3 installation file in the context folder, which is as follows:
    context
    ├── Ascend-cann-nnae_6.3.RC2_linux-aarch64.run
    ├── mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    ├── Miniconda3-py37_4.10.3-Linux-aarch64.sh
    ├── pip.conf
    └── Ubuntu-Ports-bionic.list
  10. Write the Dockerfile of the container image.
    Create an empty file named Dockerfile in the context folder and copy the following content to the file:
    # The server on which the container image is created must access the Internet.
    FROM arm64v8/ubuntu:18.04 AS builder
    
    # The default user of the base container image is root.
    # USER root
    
    # Install OS dependencies obtained from Huawei  Mirrors.
    COPY Ubuntu-Ports-bionic.list /tmp
    RUN cp -a /etc/apt/sources.list /etc/apt/sources.list.bak && \
        mv /tmp/Ubuntu-Ports-bionic.list /etc/apt/sources.list && \
        echo > /etc/apt/apt.conf.d/00skip-verify-peer.conf "Acquire { https::Verify-Peer false }" && \
        apt-get update && \
        apt-get install -y \
        # utils
        ca-certificates vim curl \
        # CANN 6.3.RC2
        gcc-7 g++ make cmake zlib1g zlib1g-dev openssl libsqlite3-dev libssl-dev libffi-dev unzip pciutils net-tools libblas-dev gfortran libblas3 && \
        apt-get clean && \
        mv /etc/apt/sources.list.bak /etc/apt/sources.list && \
        # Grant the write permission of the parent directory of the CANN 6.3.RC2 installation directory to ma-user.
        chmod o+w /usr/local
    
    RUN useradd -m -d /home/ma-user -s /bin/bash -g 100 -u 1000 ma-user
    
    # Configure the default user and working directory of the container image.
    USER ma-user
    WORKDIR /home/ma-user
    
    # Use the PyPI configuration provided by Huawei Mirrors.
    RUN mkdir -p /home/ma-user/.pip/
    COPY --chown=ma-user:100 pip.conf /home/ma-user/.pip/pip.conf
    
    # Copy the installation files to the /tmp directory in the base container image.
    COPY --chown=ma-user:100 Miniconda3-py37_4.10.3-Linux-aarch64.sh /tmp
    
    # https://conda.io/projects/conda/en/latest/user-guide/install/linux.html#installing-on-linux
    # Install Miniconda3 in the /home/ma-user/miniconda3 directory of the base container image.
    RUN bash /tmp/Miniconda3-py37_4.10.3-Linux-aarch64.sh -b -p /home/ma-user/miniconda3
    
    ENV PATH=$PATH:/home/ma-user/miniconda3/bin
    
    # Install the CANN 6.3.RC2 Python dependency package.
    RUN pip install numpy~=1.14.3 decorator~=4.4.0 sympy~=1.4 cffi~=1.12.3 protobuf~=3.11.3 \
        attrs pyyaml pathlib2 scipy requests psutil absl-py
    
    # Install CANN 6.3.RC2 in /usr/local/Ascend.
    COPY --chown=ma-user:100 Ascend-cann-nnae_6.3.RC2_linux-aarch64.run /tmp
    RUN chmod +x /tmp/Ascend-cann-nnae_6.3.RC2_linux-aarch64.run && \
        /tmp/Ascend-cann-nnae_6.3.RC2_linux-aarch64.run --install --install-path=/usr/local/Ascend
    
    # Install MindSpore 2.1.1.
    COPY --chown=ma-user:100 mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl /tmp
    RUN chmod +x /tmp/mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl && \
        pip install /tmp/mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    
    # Create the container image.
    FROM arm64v8/ubuntu:18.04
    
    # Install OS dependencies obtained from Huawei  Mirrors.
    COPY Ubuntu-Ports-bionic.list /tmp
    RUN cp -a /etc/apt/sources.list /etc/apt/sources.list.bak && \
        mv /tmp/Ubuntu-Ports-bionic.list /etc/apt/sources.list && \
        echo > /etc/apt/apt.conf.d/00skip-verify-peer.conf "Acquire { https::Verify-Peer false }" && \
        apt-get update && \
        apt-get install -y \
        # utils
        ca-certificates vim curl \
        # CANN 6.3.RC2
        gcc-7 g++ make cmake zlib1g zlib1g-dev openssl libsqlite3-dev libssl-dev libffi-dev unzip pciutils net-tools libblas-dev gfortran libblas3 && \
        apt-get clean && \
        mv /etc/apt/sources.list.bak /etc/apt/sources.list
    
    RUN useradd -m -d /home/ma-user -s /bin/bash -g 100 -u 1000 ma-user
    
    # Copy the directories from the builder stage to the directories with the same name in the current container image.
    COPY --chown=ma-user:100 --from=builder /home/ma-user/miniconda3 /home/ma-user/miniconda3
    COPY --chown=ma-user:100 --from=builder /home/ma-user/Ascend /home/ma-user/Ascend
    COPY --chown=ma-user:100 --from=builder /home/ma-user/var /home/ma-user/var
    COPY --chown=ma-user:100 --from=builder /usr/local/Ascend /usr/local/Ascend
    
    # Configure the preset environment variables of the container image.
    # Configure CANN environment variables.
    # Configure Ascend driver environment variables.
    # Set PYTHONUNBUFFERED to 1 to prevent log loss.
    ENV PATH=$PATH:/usr/local/Ascend/nnae/latest/bin:/usr/local/Ascend/nnae/latest/compiler/ccec_compiler/bin:/home/ma-user/miniconda3/bin \
        LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/Ascend/driver/lib64:/usr/local/Ascend/driver/lib64/common:/usr/local/Ascend/driver/lib64/driver:/usr/local/Ascend/nnae/latest/lib64:/usr/local/Ascend/nnae/latest/lib64/plugin/opskernel:/usr/local/Ascend/nnae/latest/lib64/plugin/nnengine \
        PYTHONPATH=$PYTHONPATH:/usr/local/Ascend/nnae/latest/python/site-packages:/usr/local/Ascend/nnae/latest/opp/built-in/op_impl/ai_core/tbe \
        ASCEND_AICPU_PATH=/usr/local/Ascend/nnae/latest \
        ASCEND_OPP_PATH=/usr/local/Ascend/nnae/latest/opp \
        ASCEND_HOME_PATH=/usr/local/Ascend/nnae/latest \
        PYTHONUNBUFFERED=1
    
    # Configure the default user and working directory of the container image.
    USER ma-user
    WORKDIR /home/ma-user

    For details about how to write a Dockerfile, see official Docker documents.

  11. Verify that the Dockerfile has been created. The following shows the context folder:
    context
    ├── Ascend-cann-nnae_6.3.RC2_linux-aarch64.run
    ├── Dockerfile
    ├── mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    ├── Miniconda3-py37_4.10.3-Linux-aarch64.sh
    ├── pip.conf
    └── Ubuntu-Ports-bionic.list
  12. Create the container image. Run the following command in the directory where the Dockerfile is stored to create a container image:
    1
    docker build . -t mindspore:2.1.1-cann6.3.RC2
    
    The following log shows that the image has been created.
    Successfully tagged mindspore:2.1.1-cann6.3.RC2
  13. Upload the created image to SWR. For details, see Step 4 Uploading the Image to SWR.

Step 4 Uploading the Image to SWR

Upload the created image to SWR so that it can be used to create training jobs on ModelArts.

  1. Log in to the SWR console and select a region. It must share the same region with ModelArts. Otherwise, the image cannot be selected.
  2. Click Create Organization in the upper right corner and enter an organization name to create an organization. Customize the organization name. Replace the organization name deep-learning in subsequent commands with the actual organization name.
  3. Click Generate Login Command in the upper right corner to obtain the login command. In this example, the temporary login command is copied.
  4. Log in to the local environment as user root and enter the copied temporary login command.
  5. Upload the image to SWR.
    1. Run the docker tag command to add tags to the uploaded image:
      # Replace the region, domain, as well as organization name deep-learning with the actual values.
      sudo docker tag mindspore:2.1.1-cann6.3.RC2 swr.{region}.{domain}/deep-learning/mindspore:2.1.1-cann6.3.RC2
    2. Run the following command to upload the image:
      # Replace the region, domain, as well as organization name deep-learning with the actual values.
      sudo docker push swr.{region}.{domain}/deep-learning/mindspore:2.1.1-cann6.3.RC2
  6. After the image is uploaded, choose My Images in navigation pane on the left of the SWR console to view the uploaded custom images.

Step 5 Creating and Debugging a Notebook Instance on ModelArts

  1. Register the image uploaded to SWR with ModelArts Image Management.

    Log in to the ModelArts management console. In the navigation pane on the left, choose Image Management. Click Register and register the image. The registered image can be used to create notebook instances.

  2. Use the custom image to create a notebook instance and debug it. After the debugging is successful, save the image.
    1. Create a custom image using a custom image.
    2. Save the image.
  3. After the image is debugged, create a training job on ModelArts.

Step 6 Creating a Training Job on ModelArts

  1. Log in to the ModelArts management console. In the navigation pane on the left, choose Model Training > Training Jobs. The training job list is displayed by default.
  2. On the Create Training Job page, configure parameters and click Submit.
    • Algorithm Type: Custom algorithm
    • Boot Mode: Custom image
    • Image Path:
    • Code Directory: OBS path to startup scripts, for example, obs://test-modelarts/ascend/demo-code/
    • Boot Command: python ${MA_JOB_DIR}/demo-code/run_ascend/run_ascend.py python ${MA_JOB_DIR}/demo-code/mindspore-verification.py
    • Resource Pool: Dedicated resource pools
    • Resource Type: Ascend with the required driver and firmware version
    • Job Log Path: OBS path to stored training logs, for example, obs://test-modelarts/ascend/log/
  3. Confirm the configurations of the training job and click Submit.
  4. Wait until the training job is created.

    After you submit the job creation request, the system will automatically perform operations on the backend, such as downloading the container image and code directory and running the boot command. A training job requires a certain period of time for running. The duration ranges from dozens of minutes to several hours, depending on the service logic and selected resources. After the training job is executed, logs are displayed.

    Figure 1 Runtime logs of a training job powered by Ascend resources in a dedicated resource pool

Training the mindspore-verification.py File

The mindspore-verification.py file contains the following information:

import os
import numpy as np
from mindspore import Tensor
import mindspore.ops as ops
import mindspore.context as context

print('Ascend Envs')
print('------')
print('JOB_ID: ', os.environ['JOB_ID'])
print('RANK_TABLE_FILE: ', os.environ['RANK_TABLE_FILE'])
print('RANK_SIZE: ', os.environ['RANK_SIZE'])
print('ASCEND_DEVICE_ID: ', os.environ['ASCEND_DEVICE_ID'])
print('DEVICE_ID: ', os.environ['DEVICE_ID'])
print('RANK_ID: ', os.environ['RANK_ID'])
print('------')

context.set_context(device_target="Ascend")
x = Tensor(np.ones([1,3,3,4]).astype(np.float32))
y = Tensor(np.ones([1,3,3,4]).astype(np.float32))

print(ops.add(x, y))

Ascend Startup Scripts

  • run_ascend.py
    import sys
    import os
    
    from common import RunAscendLog
    from common import RankTableEnv
    
    from rank_table import RankTable, RankTableTemplate1, RankTableTemplate2
    
    from manager import FMKManager
    
    if __name__ == '__main__':
        log = RunAscendLog.setup_run_ascend_logger()
    
        if len(sys.argv) <= 1:
            log.error('there are not enough args')
            sys.exit(1)
    
        train_command = sys.argv[1:]
        log.info('training command')
        log.info(train_command)
    
        if os.environ.get(RankTableEnv.RANK_TABLE_FILE_V1) is not None:
            # new format rank table file
            rank_table_path = os.environ.get(RankTableEnv.RANK_TABLE_FILE_V1)
            RankTable.wait_for_available(rank_table_path)
            rank_table = RankTableTemplate1(rank_table_path)
        else:
            # old format rank table file
            rank_table_path_origin = RankTableEnv.get_rank_table_template2_file_path()
            RankTable.wait_for_available(rank_table_path_origin)
            rank_table = RankTableTemplate2(rank_table_path_origin)
    
        if rank_table.get_device_num() >= 1:
            log.info('set rank table %s env to %s' % (RankTableEnv.RANK_TABLE_FILE, rank_table.get_rank_table_path()))
            RankTableEnv.set_rank_table_env(rank_table.get_rank_table_path())
        else:
            log.info('device num < 1, unset rank table %s env' % RankTableEnv.RANK_TABLE_FILE)
            RankTableEnv.unset_rank_table_env()
    
        instance = rank_table.get_current_instance()
        server = rank_table.get_server(instance.server_id)
        current_instance = RankTable.convert_server_to_instance(server)
    
        fmk_manager = FMKManager(current_instance)
        fmk_manager.run(rank_table.get_device_num(), train_command)
        return_code = fmk_manager.monitor()
    
        fmk_manager.destroy()
    
        sys.exit(return_code)
    
  • common.py
    import logging
    import os
    
    logo = 'Training'
    
    
    # Rank Table Constants
    class RankTableEnv:
        RANK_TABLE_FILE = 'RANK_TABLE_FILE'
    
        RANK_TABLE_FILE_V1 = 'RANK_TABLE_FILE_V_1_0'
    
        HCCL_CONNECT_TIMEOUT = 'HCCL_CONNECT_TIMEOUT'
    
        # jobstart_hccl.json is provided by the volcano controller of Cloud-Container-Engine(CCE)
        HCCL_JSON_FILE_NAME = 'jobstart_hccl.json'
    
        RANK_TABLE_FILE_DEFAULT_VALUE = '/user/config/%s' % HCCL_JSON_FILE_NAME
    
        @staticmethod
        def get_rank_table_template1_file_dir():
            parent_dir = os.environ[ModelArts.MA_MOUNT_PATH_ENV]
            return os.path.join(parent_dir, 'rank_table')
    
        @staticmethod
        def get_rank_table_template2_file_path():
            rank_table_file_path = os.environ.get(RankTableEnv.RANK_TABLE_FILE)
            if rank_table_file_path is None:
                return RankTableEnv.RANK_TABLE_FILE_DEFAULT_VALUE
    
            return os.path.join(os.path.normpath(rank_table_file_path), RankTableEnv.HCCL_JSON_FILE_NAME)
    
        @staticmethod
        def set_rank_table_env(path):
            os.environ[RankTableEnv.RANK_TABLE_FILE] = path
    
        @staticmethod
        def unset_rank_table_env():
            del os.environ[RankTableEnv.RANK_TABLE_FILE]
    
    
    class ModelArts:
        MA_MOUNT_PATH_ENV = 'MA_MOUNT_PATH'
        MA_CURRENT_INSTANCE_NAME_ENV = 'MA_CURRENT_INSTANCE_NAME'
        MA_VJ_NAME = 'MA_VJ_NAME'
    
        MA_CURRENT_HOST_IP = 'MA_CURRENT_HOST_IP'
    
        CACHE_DIR = '/cache'
    
        TMP_LOG_DIR = '/tmp/log/'
    
        FMK_WORKSPACE = 'workspace'
    
        @staticmethod
        def get_current_instance_name():
            return os.environ[ModelArts.MA_CURRENT_INSTANCE_NAME_ENV]
    
        @staticmethod
        def get_current_host_ip():
            return os.environ.get(ModelArts.MA_CURRENT_HOST_IP)
    
        @staticmethod
        def get_job_id():
            ma_vj_name = os.environ[ModelArts.MA_VJ_NAME]
            return ma_vj_name.replace('ma-job', 'modelarts-job', 1)
    
        @staticmethod
        def get_parent_working_dir():
            if ModelArts.MA_MOUNT_PATH_ENV in os.environ:
                return os.path.join(os.environ.get(ModelArts.MA_MOUNT_PATH_ENV), ModelArts.FMK_WORKSPACE)
    
            return ModelArts.CACHE_DIR
    
    
    class RunAscendLog:
    
        @staticmethod
        def setup_run_ascend_logger():
            name = logo
            formatter = logging.Formatter(fmt='[run ascend] %(asctime)s - %(levelname)s - %(message)s')
    
            handler = logging.StreamHandler()
            handler.setFormatter(formatter)
    
            logger = logging.getLogger(name)
            logger.setLevel(logging.INFO)
            logger.addHandler(handler)
            logger.propagate = False
            return logger
    
        @staticmethod
        def get_run_ascend_logger():
            return logging.getLogger(logo)
    
  • rank_table.py
    import json
    import time
    import os
    
    from common import ModelArts
    from common import RunAscendLog
    from common import RankTableEnv
    
    log = RunAscendLog.get_run_ascend_logger()
    
    
    class Device:
        def __init__(self, device_id, device_ip, rank_id):
            self.device_id = device_id
            self.device_ip = device_ip
            self.rank_id = rank_id
    
    
    class Instance:
        def __init__(self, pod_name, server_id, devices):
            self.pod_name = pod_name
            self.server_id = server_id
            self.devices = self.parse_devices(devices)
    
        @staticmethod
        def parse_devices(devices):
            if devices is None:
                return []
            device_object_list = []
            for device in devices:
                device_object_list.append(Device(device['device_id'], device['device_ip'], ''))
    
            return device_object_list
    
        def set_devices(self, devices):
            self.devices = devices
    
    
    class Group:
        def __init__(self, group_name, device_count, instance_count, instance_list):
            self.group_name = group_name
            self.device_count = int(device_count)
            self.instance_count = int(instance_count)
            self.instance_list = self.parse_instance_list(instance_list)
    
        @staticmethod
        def parse_instance_list(instance_list):
            instance_object_list = []
            for instance in instance_list:
                instance_object_list.append(
                    Instance(instance['pod_name'], instance['server_id'], instance['devices']))
    
            return instance_object_list
    
    
    class RankTable:
        STATUS_FIELD = 'status'
        COMPLETED_STATUS = 'completed'
    
        def __init__(self):
            self.rank_table_path = ""
            self.rank_table = {}
    
        @staticmethod
        def read_from_file(file_path):
            with open(file_path) as json_file:
                return json.load(json_file)
    
        @staticmethod
        def wait_for_available(rank_table_file, period=1):
            log.info('Wait for Rank table file at %s ready' % rank_table_file)
            complete_flag = False
            while not complete_flag:
                with open(rank_table_file) as json_file:
                    data = json.load(json_file)
                    if data[RankTable.STATUS_FIELD] == RankTable.COMPLETED_STATUS:
                        log.info('Rank table file is ready for read')
                        log.info('\n' + json.dumps(data, indent=4))
                        return True
    
                time.sleep(period)
    
            return False
    
        @staticmethod
        def convert_server_to_instance(server):
            device_list = []
            for device in server['device']:
                device_list.append(
                    Device(device_id=device['device_id'], device_ip=device['device_ip'], rank_id=device['rank_id']))
    
            ins = Instance(pod_name='', server_id=server['server_id'], devices=[])
            ins.set_devices(device_list)
            return ins
    
        def get_rank_table_path(self):
            return self.rank_table_path
    
        def get_server(self, server_id):
            for server in self.rank_table['server_list']:
                if server['server_id'] == server_id:
                    log.info('Current server')
                    log.info('\n' + json.dumps(server, indent=4))
                    return server
    
            log.error('server [%s] is not found' % server_id)
            return None
    
    
    class RankTableTemplate2(RankTable):
    
        def __init__(self, rank_table_template2_path):
            super().__init__()
    
            json_data = self.read_from_file(file_path=rank_table_template2_path)
    
            self.status = json_data[RankTableTemplate2.STATUS_FIELD]
            if self.status != RankTableTemplate2.COMPLETED_STATUS:
                return
    
            # sorted instance list by the index of instance
            # assert there is only one group
            json_data["group_list"][0]["instance_list"] = sorted(json_data["group_list"][0]["instance_list"],
                                                                 key=RankTableTemplate2.get_index)
    
            self.group_count = int(json_data['group_count'])
            self.group_list = self.parse_group_list(json_data['group_list'])
    
            self.rank_table_path, self.rank_table = self.convert_template2_to_template1_format_file()
    
        @staticmethod
        def parse_group_list(group_list):
            group_object_list = []
            for group in group_list:
                group_object_list.append(
                    Group(group['group_name'], group['device_count'], group['instance_count'], group['instance_list']))
    
            return group_object_list
    
        @staticmethod
        def get_index(instance):
            # pod_name example: job94dc1dbf-job-bj4-yolov4-15
            pod_name = instance["pod_name"]
            return int(pod_name[pod_name.rfind("-") + 1:])
    
        def get_current_instance(self):
            """
            get instance by pod name
            specially, return the first instance when the pod name is None
            :return:
            """
            pod_name = ModelArts.get_current_instance_name()
            if pod_name is None:
                if len(self.group_list) > 0:
                    if len(self.group_list[0].instance_list) > 0:
                        return self.group_list[0].instance_list[0]
    
                return None
    
            for group in self.group_list:
                for instance in group.instance_list:
                    if instance.pod_name == pod_name:
                        return instance
            return None
    
        def convert_template2_to_template1_format_file(self):
            rank_table_template1_file = {
                'status': 'completed',
                'version': '1.0',
                'server_count': '0',
                'server_list': []
            }
    
            logic_index = 0
            server_map = {}
            # collect all devices in all groups
            for group in self.group_list:
                if group.device_count == 0:
                    continue
                for instance in group.instance_list:
                    if instance.server_id not in server_map:
                        server_map[instance.server_id] = []
    
                    for device in instance.devices:
                        template1_device = {
                            'device_id': device.device_id,
                            'device_ip': device.device_ip,
                            'rank_id': str(logic_index)
                        }
                        logic_index += 1
                        server_map[instance.server_id].append(template1_device)
    
            server_count = 0
            for server_id in server_map:
                rank_table_template1_file['server_list'].append({
                    'server_id': server_id,
                    'device': server_map[server_id]
                })
                server_count += 1
    
            rank_table_template1_file['server_count'] = str(server_count)
    
            log.info('Rank table file (Template1)')
            log.info('\n' + json.dumps(rank_table_template1_file, indent=4))
    
            if not os.path.exists(RankTableEnv.get_rank_table_template1_file_dir()):
                os.makedirs(RankTableEnv.get_rank_table_template1_file_dir())
    
            path = os.path.join(RankTableEnv.get_rank_table_template1_file_dir(), RankTableEnv.HCCL_JSON_FILE_NAME)
            with open(path, 'w') as f:
                f.write(json.dumps(rank_table_template1_file))
                log.info('Rank table file (Template1) is generated at %s', path)
    
            return path, rank_table_template1_file
    
        def get_device_num(self):
            total_device_num = 0
            for group in self.group_list:
                total_device_num += group.device_count
            return total_device_num
    
    
    class RankTableTemplate1(RankTable):
        def __init__(self, rank_table_template1_path):
            super().__init__()
            self.rank_table_path = rank_table_template1_path
            self.rank_table = self.read_from_file(file_path=rank_table_template1_path)
    
        def get_current_instance(self):
            current_server = None
            server_list = self.rank_table['server_list']
            if len(server_list) == 1:
                current_server = server_list[0]
            elif len(server_list) > 1:
                host_ip = ModelArts.get_current_host_ip()
                if host_ip is not None:
                    for server in server_list:
                        if server['server_id'] == host_ip:
                            current_server = server
                            break
                else:
                    current_server = server_list[0]
    
            if current_server is None:
                log.error('server is not found')
                return None
            return self.convert_server_to_instance(current_server)
    
        def get_device_num(self):
            server_list = self.rank_table['server_list']
            device_num = 0
            for server in server_list:
                device_num += len(server['device'])
            return device_num
    
  • manager.py
    import time
    import os
    import os.path
    import signal
    
    from common import RunAscendLog
    from fmk import FMK
    
    
    log = RunAscendLog.get_run_ascend_logger()
    
    
    class FMKManager:
        # max destroy time: ~20 (15 + 5)
        # ~ 15 (1 + 2 + 4 + 8)
        MAX_TEST_PROC_CNT = 4
    
        def __init__(self, instance):
            self.instance = instance
            self.fmk = []
            self.fmk_processes = []
            self.get_sigterm = False
            self.max_test_proc_cnt = FMKManager.MAX_TEST_PROC_CNT
    
        # break the monitor and destroy processes when get terminate signal
        def term_handle(func):
            def receive_term(signum, stack):
                log.info('Received terminate signal %d, try to destroyed all processes' % signum)
                stack.f_locals['self'].get_sigterm = True
    
            def handle_func(self, *args, **kwargs):
                origin_handle = signal.getsignal(signal.SIGTERM)
                signal.signal(signal.SIGTERM, receive_term)
                res = func(self, *args, **kwargs)
                signal.signal(signal.SIGTERM, origin_handle)
                return res
    
            return handle_func
    
        def run(self, rank_size, command):
            for index, device in enumerate(self.instance.devices):
                fmk_instance = FMK(index, device)
                self.fmk.append(fmk_instance)
    
                self.fmk_processes.append(fmk_instance.run(rank_size, command))
    
        @term_handle
        def monitor(self, period=1):
            # busy waiting for all fmk processes exit by zero
            # or there is one process exit by non-zero
    
            fmk_cnt = len(self.fmk_processes)
            zero_ret_cnt = 0
            while zero_ret_cnt != fmk_cnt:
                zero_ret_cnt = 0
                for index in range(fmk_cnt):
                    fmk = self.fmk[index]
                    fmk_process = self.fmk_processes[index]
                    if fmk_process.poll() is not None:
                        if fmk_process.returncode != 0:
                            log.error('proc-rank-%s-device-%s (pid: %d) has exited with non-zero code: %d'
                                      % (fmk.rank_id, fmk.device_id, fmk_process.pid, fmk_process.returncode))
                            return fmk_process.returncode
    
                        zero_ret_cnt += 1
                if self.get_sigterm:
                    break
                time.sleep(period)
    
            return 0
    
        def destroy(self, base_period=1):
            log.info('Begin destroy training processes')
            self.send_sigterm_to_fmk_process()
            self.wait_fmk_process_end(base_period)
            log.info('End destroy training processes')
    
        def send_sigterm_to_fmk_process(self):
            # send SIGTERM to fmk processes (and process group)
            for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                fmk = self.fmk[r_index]
                fmk_process = self.fmk_processes[r_index]
                if fmk_process.poll() is not None:
                    log.info('proc-rank-%s-device-%s (pid: %d) has exited before receiving the term signal',
                             fmk.rank_id, fmk.device_id, fmk_process.pid)
                    del self.fmk_processes[r_index]
                    del self.fmk[r_index]
    
                try:
                    os.killpg(fmk_process.pid, signal.SIGTERM)
                except ProcessLookupError:
                    pass
    
        def wait_fmk_process_end(self, base_period):
            test_cnt = 0
            period = base_period
            while len(self.fmk_processes) > 0 and test_cnt < self.max_test_proc_cnt:
                for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                    fmk = self.fmk[r_index]
                    fmk_process = self.fmk_processes[r_index]
                    if fmk_process.poll() is not None:
                        log.info('proc-rank-%s-device-%s (pid: %d) has exited',
                                 fmk.rank_id, fmk.device_id, fmk_process.pid)
                        del self.fmk_processes[r_index]
                        del self.fmk[r_index]
                if not self.fmk_processes:
                    break
    
                time.sleep(period)
                period *= 2
                test_cnt += 1
    
            if len(self.fmk_processes) > 0:
                for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                    fmk = self.fmk[r_index]
                    fmk_process = self.fmk_processes[r_index]
                    if fmk_process.poll() is None:
                        log.warn('proc-rank-%s-device-%s (pid: %d) has not exited within the max waiting time, '
                                 'send kill signal',
                                 fmk.rank_id, fmk.device_id, fmk_process.pid)
                        os.killpg(fmk_process.pid, signal.SIGKILL)
    
  • fmk.py
    import os
    import subprocess
    import pathlib
    from contextlib import contextmanager
    
    from common import RunAscendLog
    from common import RankTableEnv
    from common import ModelArts
    
    log = RunAscendLog.get_run_ascend_logger()
    
    
    class FMK:
    
        def __init__(self, index, device):
            self.job_id = ModelArts.get_job_id()
            self.rank_id = device.rank_id
            self.device_id = str(index)
    
        def gen_env_for_fmk(self, rank_size):
            current_envs = os.environ.copy()
    
            current_envs['JOB_ID'] = self.job_id
    
            current_envs['ASCEND_DEVICE_ID'] = self.device_id
            current_envs['DEVICE_ID'] = self.device_id
    
            current_envs['RANK_ID'] = self.rank_id
            current_envs['RANK_SIZE'] = str(rank_size)
    
            FMK.set_env_if_not_exist(current_envs, RankTableEnv.HCCL_CONNECT_TIMEOUT, str(1800))
    
            log_dir = FMK.get_log_dir()
            process_log_path = os.path.join(log_dir, self.job_id, 'ascend', 'process_log', 'rank_' + self.rank_id)
            FMK.set_env_if_not_exist(current_envs, 'ASCEND_PROCESS_LOG_PATH', process_log_path)
            pathlib.Path(current_envs['ASCEND_PROCESS_LOG_PATH']).mkdir(parents=True, exist_ok=True)
    
            return current_envs
    
        @contextmanager
        def switch_directory(self, directory):
            owd = os.getcwd()
            try:
                os.chdir(directory)
                yield directory
            finally:
                os.chdir(owd)
    
        def get_working_dir(self):
            fmk_workspace_prefix = ModelArts.get_parent_working_dir()
            return os.path.join(os.path.normpath(fmk_workspace_prefix), 'device%s' % self.device_id)
    
        @staticmethod
        def get_log_dir():
            parent_path = os.getenv(ModelArts.MA_MOUNT_PATH_ENV)
            if parent_path:
                log_path = os.path.join(parent_path, 'log')
                if os.path.exists(log_path):
                    return log_path
    
            return ModelArts.TMP_LOG_DIR
    
        @staticmethod
        def set_env_if_not_exist(envs, env_name, env_value):
            if env_name in os.environ:
                log.info('env already exists. env_name: %s, env_value: %s ' % (env_name, env_value))
                return
    
            envs[env_name] = env_value
    
        def run(self, rank_size, command):
            envs = self.gen_env_for_fmk(rank_size)
            log.info('bootstrap proc-rank-%s-device-%s' % (self.rank_id, self.device_id))
    
            log_dir = FMK.get_log_dir()
            if not os.path.exists(log_dir):
                os.makedirs(log_dir)
    
            log_file = '%s-proc-rank-%s-device-%s.txt' % (self.job_id, self.rank_id, self.device_id)
            log_file_path = os.path.join(log_dir, log_file)
    
            working_dir = self.get_working_dir()
            if not os.path.exists(working_dir):
                os.makedirs(working_dir)
    
            with self.switch_directory(working_dir):
                # os.setsid: change the process(forked) group id to itself
                training_proc = subprocess.Popen(command, env=envs, preexec_fn=os.setsid,
                                                 stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    
                log.info('proc-rank-%s-device-%s (pid: %d)', self.rank_id, self.device_id, training_proc.pid)
    
                # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait
                subprocess.Popen(['tee', log_file_path], stdin=training_proc.stdout)
    
                return training_proc