更新时间:2024-07-10 GMT+08:00
分享

实例管理

实例管理概述

数据迁移功能采用独立集群的方式为用户提供安全可靠的数据迁移服务,各集群之间相互隔离,不可互相访问。其中实例管理通过购买GDS-Kafka实例帮助用户创建、管理集群。GDS-Kafka的工作方式是从Kafka中消费数据并缓存,当达到设置好的时间或数据量之后,通过COPY写入GaussDB(DWS)临时表,再从临时表进行插入或更新操作。

  1. Kafka的消息生产端必须按照一定的格式要求进行数据生产,其中消息格式由“kafka.source.event.type”配置参数指定。当前支持的消息格式详见GDS-Kafka支持的消息格式
  2. GDS-Kafka支持直接insert(仅限无主键表)和merge覆盖更新两种入库模式,您可以根据DWS目标表的类型进行灵活配置,直接insert模式由于不涉及更新在性能上要更优一些。其中入库模式由“app.insert.directly”配置参数和有无主键共同决定,详见GDS-Kafka入库模式
  • GDS-kafka只支持目标表表名和字段全小写。
  • GDS-Kafka的删除是根据扩展字段中的pos进行历史删除,如果入库数据中有delete操作,则必须使用扩展字段。

购买GDS-Kafka实例

使用数据迁移功能,首先需要购买一个GDS-kafka实例(集群)。集群实例可以为用户提供安全可靠的数据迁移服务,各集群之间相互隔离。

约束与限制

  • 目前仅支持单机集群。
  • 目前仅支持按需计费模式。

操作步骤

  1. 登录GaussDB(DWS)控制台。
  2. 在左侧导航栏选择“数据 > 数据集成 > 实例管理”,进入实例管理页面。
  3. 单击页面右上角“购买GDS-Kafka实例”,进入购买页面配置集群参数。

    表1 参数说明

    参数名

    参数解释

    样例值

    CPU架构

    CPU架构有:

    • X86
    • 鲲鹏
    说明:

    X86和鲲鹏只是底层架构不一致,应用层不感知,sql语法一致,如果创建集群时提示X86售罄,可以选择鲲鹏架构。

    x86

    规格

    请根据业务需求合理选择节点规格。

    -

    存储

    单个节点可用存储容量。

    -

    当前规格

    显示集群当前规格。

    -

    名称

    设置数据仓库集群的名称。

    集群名称长度为4到64个字符,必须以字母开头,可以包含字母、数字、中划线或者下划线,不能包含其他的特殊字符。字母不区分大小写。

    -

    版本

    显示集群中安装的数据库实例版本。图片仅供参考,请以实际显示版本号为准。

    8.2.1.300

    虚拟私有云

    指定集群节点使用的虚拟专用网络,实现不同业务的网络隔离。

    首次创建数据仓库集群时,如果未配置过虚拟私有云,可以单击“查看虚拟私有云”进入虚拟私有云管理控制台,新创建一个满足需求的虚拟私有云。

    -

    子网

    指定虚拟私有云的一个子网。

    集群使用子网实现与其他网络的隔离,并独享所有网络资源,以提高网络安全。

    -

    安全组

    指定虚拟私有云的安全组。

    安全组限制安全访问规则,加强集群与其它服务间的安全访问。

    -

    公网访问

    指定用户是否可以在互联网上使用客户端连接集群数据库。支持如下方式:

    • “暂不使用”:暂不使用弹性IP。如DWS使用于生产环境,则需绑定ELB,则不选择使用弹性IP,通过绑定ELB后,再从ELB页面进行弹性IP绑定。
    • “自动分配”:用户指定弹性IP的带宽,系统将自动为集群分配独享带宽的弹性IP,通过弹性IP可以从互联网对集群进行访问。自动分配的弹性IP的带宽名称都是以集群名称开头的。
    • “使用已有”:为集群绑定指定的弹性IP。如果下拉框中没有可用的弹性IP,可以单击“创建弹性IP”进入弹性公网IP页面创建一个满足需要的弹性IP。带宽可根据用户需要设置。

    -

    企业项目

    配置集群所属的企业项目。已开通企业项目管理服务的用户才可以配置该参数。默认值为default。

    default

  4. 确认无误后,单击“提交”进行创建。

查看实例详情

在实例详情页面用户可以查看集群的详细信息,用户可以在此查看集群的基本信息、网络信息等。

操作步骤

  1. 登录GaussDB(DWS)控制台。
  2. 在左侧导航栏选择“数据 > 数据集成 > 实例管理”,进入实例管理页面。
  3. 单击指定实例名称,进入实例详情页面。

    图1 查看实例详情

GDS-Kafka支持的消息格式

表2 GDS-Kafka支持的消息格式

kafka.source.event.type

格式示例

格式说明

cdc.drs.avro

华为云DRS的内部格式,DRS生产至Kafka的avro格式,GDS-Kafka可直接对接进行解析入库。

drs.cdc

使用drs.cdc的avro格式需要在Kafka上游的业务程序中引入GDS-Kafka-common和GDS-Kafka-source的maven依赖,然后在代码中创建并填充Record对象,一个Record对象表示一条表记录,最后将Record对象序列化为byte[]数组生产至Kafka供下游的GDS-Kafka使用。

如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为0的记录的name字段由a改为b;将age字段由18改为20:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Record record = new Record();
// 设置目标表schema和table名称
record.setTableName("public.person");
// 设置字段列表
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", 0));
fields.add(new Field("name", 1));
fields.add(new Field("age", 2));
record.setFields(fields);
// 设置表记录更新前的字段值列表
List<Object> before = new ArrayList<>();
before.add(new Integer(0, "0"));
before.add(new Character("utf-8", ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8))));
before.add(new Integer(0, "18"));
record.setBeforeImages(before);
// 设置表记录更新后的字段值列表
List<Object> after = new ArrayList<>();
after.add(new Integer(0, "0"));
after.add(new Character("utf-8", ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8))));
after.add(new Integer(0, "20"));
record.setAfterImages(after);
// 设置操作类型
record.setOperation("U");
// 设置操作时间
record.setUpdateTimestamp(325943905);
// Record对象序列化为byte[]数组
byte[] msg = Record.getEncoder().encode(record).array();

标准avro格式:

  • tableName字段用于描述当前记录所属的目标表名和schema名称。【必需】
  • operation字段用于描述当前记录是何种类型的操作:I表示insert操作,U表示update操作,D表示delete操作。【必需】
  • updateTimestamp表示源端操作发生的时间。【非必需】
  • beforeImages列表只有在operation为U或D时需要,用于描述当前记录在更新或删除之前的信息,before body体中的字段对应目标表中的字段;【U/D必需】
  • afterImages列表只有在op_type为U或I时需要,用于描述当前记录更新后的信息或新插入的信息;【U/D必需】
  • fields列表用于描述当前表记录的字段列表,字段的index值必须与beforeImage和afterImage中的顺序一致;【必需】

cdc.json

如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为1的记录的name字段由a改为b;将age字段由18改为20:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
"table": "public.person",
"op_type": "U",
"op_ts": "1668426344",
"current_ts": "1668426344",
"before": {
"id":"1",
"name":"a",
"age": 18
},
"after": {
"id":"1",
"name":"b",
"age": 20
}
}

标准json格式:

  • table字段用于描述当前记录所属的目标表名和schema名称;【必需】
  • op_type字段用于描述当前记录是何种类型的操作:I表示insert操作,U表示update操作,D表示delete操作;【必需】
  • op_ts表示源端操作发生的时间;【非必需】
  • current_ts表示该消息入Kafka的时间;【非必需】
  • before对象只有在op_type为U或D时需要,用于描述当前记录在更新或删除之前的信息,before body体中的字段对应目标表中的字段;【U/D必需】
  • after对象只有在op_type为U或I时需要,用于描述当前记录更新后的信息或新插入的信息;【U/D必需】

industrial.iot.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"working_time":"10000"
},
}

IOT数据格式:

  • header中的thing_model_name表示表名【必需】
  • header中的thing_id, instance_id, timestamp和body中的内容一起构成当前记录的字段内容【必需】
  • IOT数据为时序数据,不会存在修改和删除场景,只有insert。

industrial.iot.recursion.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"property":{
  "key1":"1",
  "key2":2
},
"working_time":"10000"
},
}

IOT数据格式:

  • header中的thing_model_name表示表名【必需】
  • header中的thing_id, instance_id, timestamp和body中的内容一起构成当前记录的字段内容【必需】
  • IOT数据为时序数据,不会存在修改和删除场景,只有insert
  • 该数据格式会对body属性拆分,将其key、value分别添加到新样式的property、value中,生成多条新数据,完成行转列。

industrial.iot.event.json.independent.table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2
    }
}

IOT事件流数据格式:

  • event_name表示表名【必需】
  • event_id, start_time, end_time和fields中的内容一起构成当前记录的字段内容【必需】
  • IOT事件流数据为时序数据,不会存在修改和删除场景,只有insert。

industrial.iot.json.multi.events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2,
    "field3":{
       "key1":"1",
       "key2":2
       }
    }
}

IOT事件流数据格式:

  • event_name表示表名【必需】
  • event_id, start_time, end_time和fields中的内容一起构成当前记录的字段内容【必需】
  • IOT事件流数据为时序数据,不会存在修改和删除场景,只有insert
  • 该数据格式会对fields属性拆分,将其key、value分别添加到新样式的field_name、field_value中,生成多条新数据,完成行转列。

GDS-Kafka入库模式

GDS-Kafka的数据入库都是先将数据copy至临时表,然后再根据客户的使用场景以及目标表有无主键进行merge或者insert,详见下表:

表3 GDS-Kafka入库模式

入库操作

app.insert.directly

是否主键表

入库模式

insert

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

根据主键从临时表merge到目标表。

使用insert select从临时表写入到目标表。

delete

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

说明:

delete操作支持标记删除,通过配置app.del.flag参数可以指定删除标记字段,如果配置了标记删除字段,则会通过将删除字段设置为1来标记删除的记录。

  • 如果设置了delflag字段,则会根据主键进行匹配merge,如果匹配到主键并且目标表中记录的pos小于临时表记录的pos,则会将delflag字段置为1,否则将插入一条新的记录。
  • 如果没有设置delflag字段,则会根据主键进行匹配,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将目标表中匹配到的记录删除。

  • 如果设置了delflag字段,则会使用临时表中记录的所有字段与目标表进行匹配merge,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将delflag字段值置为1,否则将插入一条新的记录。
  • 如果没有设置delflag字段,则会使用临时表中记录的所有字段与目标表进行匹配,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将目标表中匹配到的记录删除。

update

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

说明:

update操作会被拆分,将before或者beforeImage中的消息拆分为delete操作,将after或者afterImage中的消息拆分为insert操作,然后再按照insert和delete的行为进行入库处理。

相当于有主键表的insert+delete操作。

相当于无主键表的insert+delete操作。

分享:

    相关文档

    相关产品