更新时间:2024-10-30 GMT+08:00
分享

数据脱敏作业

作业输入输出规范

用户完成自定义脱敏算子创建,运行作业容器时Octopus平台向其中注入以下环境变量:

  • input_file:待脱敏的文件路径
  • raw_dir:抽取的image,gnss,lidar数据存放路径
  • desensitized_dir:脱敏后的image,gnss,lidar数据存放路径
  • output_dir:脱敏后的文件存放路径

用户根据需要可以自定义环境变量,以rosbag文件为例,可以定义如下环境变量:

  • rosbag_version:robag版本
  • image_topics:图像数据topic列表
  • gnss_topic:gnss数据topic
  • lidar_topics:lidar数据topic列表

用户作业容器需要将input_file中的image,gnss,lidar数据抽取到raw_dir,待系统内置算子脱敏完成,并将脱敏后的数据存放到desensitized_dir后,用户算子根据input_file和desensitized_dir中的脱敏数据生成新的数据文件存放到output_dir。

文件结构如下:

-raw
--image
---topic0
----timestamp1.jpg
----timestamp2.jpg
---topic1
----timestamp1.jpg
--gnss
---gnss_topic.json
--lidar
---pcd_topic0
----timestamp0.pcd
----timestamp1.pcd
--SUCCESS
-desensitized
--image
---topic0
----timestamp1.jpg
----timestamp2.jpg
---topic1
----timestamp1.jpg
--gnss
---gnss_topic.json
--lidar
---pcd_topic0
----timestamp0.pcd
----timestamp1.pcd
--SUCCESS
-output
--test.bag
--SUCCESS

其中,“SUCCESS”文件为标识文件,标识所在阶段的任务结束。

示例代码

下面是rosbag脱敏的算子示例:

# mask.py
import json
import logging
import multiprocessing as mp
import os
import shutil
import time
from pathlib import Path
from typing import cast

import av
import numpy as np
import open3d
from rosbags.highlevel import AnyReader
from rosbags.interfaces import ConnectionExtRosbag1, ConnectionExtRosbag2
from rosbags.rosbag1 import Writer as Writer1
from rosbags.rosbag2 import Writer as Writer2
from rosbags.serde import cdr_to_ros1, serialize_cdr
from rosbags.typesys import get_types_from_msg, register_types
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
from rosbags.typesys.types import \
    sensor_msgs__msg__CompressedImage as CompressedImage
from rosbags.typesys.types import std_msgs__msg__Header as Header

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)
LOG = logging.getLogger(__file__)

# Octopus数据服务拉起镜像时灌入的环境变量
# 获取环境变量
input_path = os.getenv('input_path', 'data/hangyan-move.bag.bak')
raw_dir = os.getenv('raw_dir', 'empty_dir/raw')  # 抽取的文件存放目录
desens_dir = os.getenv('desensitized_dir', 'empty_dir/desens')  # 脱敏后的文件存放目录
output_dir = os.getenv('output_dir', 'empty_dir/output')
lidar_process_num = os.getenv('lidar_process_num', 5)  # lidar数据进程数

# 用户自定义环境变量
rosbag_version = os.getenv('rosbag_version', '1')  # rosbag版本,取值为'1'或'2'
image_topics = [
    x.strip(' ')
    for x in os.getenv('image_topics', '/camera_encoded_1').split(',')
]  # 图像数据的topic列表
gnss_topic = os.getenv('gnss_topic',
                       '/inspvax')  # gnss数据的topic,gnss数据只能有一个topic
lidar_topics = [
    x.strip(' ') for x in os.getenv('lidar_topic', '/pandar').split(',')
]  # 点云数据的topic列表

# 注册自定义消息类型
Video_encoded_data_text = Path('msgs/Video_encoded_data.msg').read_text()
NovatelMessageHeader_text = Path('msgs/NovatelMessageHeader.msg').read_text()
NovatelExtendedSolutionStatus_text = Path(
    'msgs/NovatelExtendedSolutionStatus.msg').read_text()
NovatelReceiverStatus_text = Path('msgs/NovatelReceiverStatus.msg').read_text()
Inspvax_text = Path('msgs/Inspvax.msg').read_text()
add_types = {}
add_types.update(
    get_types_from_msg(
        Video_encoded_data_text,
        'kyber_msgs/msg/Video_encoded_data',
    ))
add_types.update(
    get_types_from_msg(
        NovatelMessageHeader_text,
        'novatel_gps_msgs/msg/NovatelMessageHeader',
    ))
add_types.update(
    get_types_from_msg(
        NovatelExtendedSolutionStatus_text,
        'novatel_gps_msgs/msg/NovatelExtendedSolutionStatus',
    ))
add_types.update(
    get_types_from_msg(
        NovatelReceiverStatus_text,
        'novatel_gps_msgs/msg/NovatelReceiverStatus',
    ))
add_types.update(
    get_types_from_msg(
        Inspvax_text,
        'novatel_gps_msgs/msg/Inspvax',
    ))
register_types(add_types)

# create gnss file
gnss_file_path = Path(raw_dir, 'gnss') / f'{gnss_topic}.json'.strip('/')
Path.mkdir(gnss_file_path.parent, parents=True, exist_ok=True)


def extract_image(input_rosbag):
    '''从原始rosbag中抽取图像数据.'''
    LOG.info('Start extracting image.')
    codec_ctx = av.codec.Codec('hevc', 'r')
    h265_code = codec_ctx.create()
    with AnyReader([Path(input_rosbag)]) as reader:
        for connection, timestamp, data in reader.messages():
            topic = connection.topic
            if topic in image_topics:
                deserialized_data = reader.deserialize(data,
                                                       connection.msgtype)
                try:
                    data = deserialized_data.raw_data
                    packet = av.packet.Packet(data)
                    out = h265_code.decode(packet)
                    img = None
                    for frame in out:
                        if frame.format.name != 'rgb24':
                            frame = frame.reformat(format='rgb24')
                        img = frame.to_image()
                    # 图像存放路径
                    file_name = f'{timestamp}.jpg'
                    f_path = Path(raw_dir, 'image') / topic.strip('/')
                    tmp_path = Path(raw_dir, 'tmp_image') / topic.strip('/')
                    Path.mkdir(tmp_path, parents=True, exist_ok=True)
                    tmp_file = tmp_path / file_name
                    file = f_path / file_name
                    # 当未建立目录时,先基于topic名称建立目录
                    Path.mkdir(file.parent, parents=True, exist_ok=True)
                    img.save(tmp_file)
                    os.chmod(tmp_file, 0o777)
                    os.chmod(file.parent, 0o777)
                    shutil.move(tmp_file, file)
                except Exception as e:
                    LOG.info("%s frame can not trans to jpg, message: %s",
                             timestamp, str(e))
    LOG.info('Finish extracting image.')


def extract_lidar(task_id, task_num, input_rosbag):
    '''从原始rosbag中抽取点云数据.'''
    LOG.info('Start extracting pcd.')
    with AnyReader([Path(input_rosbag)]) as reader:
        for i, (connection, timestamp, data) in enumerate(reader.messages()):
            if i % task_num != task_id:
                continue
            topic = connection.topic
            if topic in lidar_topics:
                deserialized_data = reader.deserialize(data,
                                                       connection.msgtype)
                pcd = open3d.geometry.PointCloud()
                reshaped = deserialized_data.data.reshape(
                    int(len(deserialized_data.data) / 3), 3)
                pcd.points = open3d.utility.Vector3dVector(reshaped)

                file_name = f'{timestamp}.pcd'
                f_path = Path(raw_dir, 'lidar') / topic.strip('/')
                tmp_path = Path(raw_dir, 'tmp_lidar') / topic.strip('/')
                Path.mkdir(tmp_path, parents=True, exist_ok=True)
                tmp_file = tmp_path / file_name
                file = f_path / file_name
                # 当未建立目录时,先基于topic名称建立目录
                Path.mkdir(file.parent, parents=True, exist_ok=True)
                open3d.io.write_point_cloud(str(tmp_file), pcd)
                os.chmod(tmp_file, 0o777)
                os.chmod(file.parent, 0o777)
                shutil.move(tmp_file, file)
    LOG.info('Finish extracting pcd.')


def extract_gnss(input_rosbag):
    '''从原始rosbag中抽取gnss数据.'''
    LOG.info('Start extracting rosbag.')
    gnss_file = open(gnss_file_path, 'w')
    gnss = dict()
    with AnyReader([Path(input_rosbag)]) as reader:
        for connection, timestamp, data in reader.messages():
            topic = connection.topic
            if topic == gnss_topic:
                deserialized_data = reader.deserialize(data,
                                                       connection.msgtype)
                # 这里以msgytpe为NavSatFix为例
                latitude = deserialized_data.latitude
                longitude = deserialized_data.longitude
                altitude = deserialized_data.altitude
                gnss[timestamp] = {
                    'latitude': latitude,
                    'longitude': longitude,
                    'altitude': altitude
                }
    gnss_file.write(json.dumps(gnss))
    gnss_file.close()
    LOG.info('Finish extracting gnss.')


def _get_masked_image(topic, timestamp):
    '''从脱敏后的图像数据中获取目标图像数据.'''
    file = Path(desens_dir, 'image') / topic.strip('/') / f'{timestamp}.jpg'
    if file.is_file():
        return np.fromfile(file, dtype='uint8')
    else:
        return None


def _get_conn_map(rosbag_version: int, reader, writer):
    '''构建connection的索引.'''
    conn_map = {}
    if rosbag_version == '1':
        for conn in reader.connections:
            if conn.topic in image_topics:
                conn_map[conn.id] = writer.add_connection(
                    '/image',
                    CompressedImage.__msgtype__,
                )
            else:
                ext = cast(ConnectionExtRosbag1, conn.ext)
                conn_map[conn.id] = writer.add_connection(
                    conn.topic,
                    conn.msgtype,
                    conn.msgdef,
                    conn.md5sum,
                    ext.callerid,
                    ext.latching,
                )
    elif rosbag_version == '2':
        for conn in reader.connections:
            if conn.topic in image_topics:
                conn_map[conn.id] = writer.add_connection(
                    '/image',
                    CompressedImage.__msgtype__,
                )
            else:
                ext = cast(ConnectionExtRosbag2, conn.ext)
                conn_map[conn.id] = writer.add_connection(
                    conn.topic,
                    conn.msgtype,
                    ext.serialization_format,
                    ext.offered_qos_profiles,
                )
    return conn_map


def _serialize_data(rosbag_version, data, msgtype):
    '''对数据进行序列化.'''
    if rosbag_version == '1':
        return cdr_to_ros1(serialize_cdr(data, msgtype), msgtype)
    elif rosbag_version == '2':
        return serialize_cdr(data, msgtype)


def generate_rosbag(input_rosbag, output_rosbag):
    '''生成脱敏后rosbag.'''
    LOG.info('Start generating rosbag.')
    gnss_file = open(
        Path(desens_dir, 'gnss') / f'{gnss_topic}.json'.strip('/'), 'r')
    gnss_data = json.load(gnss_file)
    gnss_file.close()
    Writer = Writer1 if rosbag_version == '1' else Writer2
    with AnyReader([Path(input_rosbag)
                    ]) as reader, Writer(Path(output_rosbag)) as writer:
        conn_map = _get_conn_map(rosbag_version, reader, writer)
        for connection, timestamp, data in reader.messages():
            topic = connection.topic
            # 当topic为图像数据的topic时,读取脱敏后图像数据
            if topic in image_topics:
                masked_data = _get_masked_image(topic, timestamp)
                if masked_data is None:  # 没有解析出图像文件时,不要该帧了
                    continue
                deserialized_data = CompressedImage(
                    Header(
                        stamp=Time(
                            sec=int(timestamp // 10**9),
                            nanosec=int(timestamp % 10**9),
                        ),
                        frame_id='0',
                    ),
                    format='jpg',
                    data=masked_data,
                )
                data = _serialize_data(
                    rosbag_version,
                    deserialized_data,
                    CompressedImage.__msgtype__,
                )
            # 当topic为gnss数据时,读取脱敏后gnss数据
            elif topic == gnss_topic:
                deserialized_data = reader.deserialize(data,
                                                       connection.msgtype)
                deserialized_data.latitude = gnss_data.get(
                    str(timestamp)).get('latitude')
                deserialized_data.longitude = gnss_data.get(
                    str(timestamp)).get('longitude')
                deserialized_data.altitude = gnss_data.get(
                    str(timestamp)).get('altitude')
                data = _serialize_data(
                    rosbag_version,
                    deserialized_data,
                    connection.msgtype,
                )
            # 当topic为点云数据时,读取脱敏后点云数据
            elif topic in lidar_topics:
                deserialized_data = reader.deserialize(
                    data,
                    connection.msgtype,
                )
                file = Path(
                    desens_dir,
                    'lidar',
                ) / topic.strip('/') / f'{timestamp}.pcd'
                point_cloud = open3d.io.read_point_cloud(str(file))
                deserialized_data.data = np.asarray(
                    point_cloud.points).flatten()
            writer.write(conn_map[connection.id], timestamp, data)
    # 生成_SUCCESS文件标识完成数据抽取
    Path(output_dir, '_SUCCESS').touch()
    LOG.info('Finish generating rosbag.')


if __name__ == "__main__":
    LOG.info('Start user operator.')
    process_image = mp.Process(target=extract_image, args=(input_path, ))
    pool_lidar = mp.Pool(processes=lidar_process_num)
    for i in range(lidar_process_num):
        pool_lidar.apply_async(extract_lidar,
                               args=(i, lidar_process_num, input_path))
    process_gnss = mp.Process(target=extract_gnss, args=(input_path, ))
    # 启动子进程
    process_image.start()
    pool_lidar.close()
    process_gnss.start()
    process_image.join()
    pool_lidar.join()
    process_gnss.join()
    LOG.info('Child processes exit.')
    # 生成_SUCCESS文件标识完成数据抽取
    Path(raw_dir, '_SUCCESS').touch()

    # 后面输出的rosbag文件与输入的rosbag文件保持同名
    output_rosbag_file = Path(output_dir, Path(input_path).name)
    # 如果输出文件夹不存在,先创建文件夹
    Path.mkdir(output_rosbag_file.parent, parents=True, exist_ok=True)
    # 检测到脱敏任务结束后,生成新的rosbag文件
    while time.sleep(1) is None:
        if Path(desens_dir).joinpath('_SUCCESS').is_file():
            generate_rosbag(Path(input_path), output_rosbag_file)
            break

构建镜像

Dockerfile示例

FROM python:3.10
COPY mask.py /home/main/
WORKDIR /home/main/
RUN pip install rosbags requests numpy
USER root

构建镜像

sudo docker build --no-cache -f Dockerfile -t rosbagmask:0.1 .

本地调试

准备一个待处理的rosbag包。

sudo docker run -v ${HOME}/data/test.bag:/home/main/test.bag --env input_file=/home/main/test.bag –env raw_dir=/home/main/raw_dir –env desensitized_dir=/home/main/desensitized_dir –env output_dir=/home/main/output_dir -it rosbagmask:0.1 /bin/sh -c "python mask.py"

相关文档