更新时间:2024-03-28 GMT+08:00

GDS-Kafka数据接入

GDS-Kafka的工作方式是从kafka中消费数据并缓存,当达到设置好的时间或数据量之后,通过COPY写入DWS临时表,再从临时表进行插入或更新操作。

  1. Kafka的消息生产端必须按照一定的格式要求进行数据生产,其中消息格式由“kafka.source.event.type”配置参数指定。当前支持的消息格式详见 GDS-Kafka支持的消息格式
  2. GDS-Kafka支持直接insert(仅限无主键表)和merge覆盖更新两种入库模式,您可以根据DWS目标表的类型进行灵活配置,直接insert模式由于不涉及更新在性能上要更优一些。其中入库模式由“app.insert.directly”配置参数和有无主键共同决定,详见GDS-Kafka入库模式
  • GDS-kafka只支持目标表表名和字段全小写。
  • GDS-Kafka的删除是根据扩展字段中的pos进行历史删除,如果入库数据中有delete操作,则必须使用扩展字段。

GDS-Kafka支持的消息格式

表1 GDS-Kafka支持的消息格式

kafka.source.event.type

格式示例

格式说明

cdc.drs.avro

华为云DRS的内部格式,DRS生产至Kafka的avro格式,GDS-Kafka可直接对接进行解析入库。

drs.cdc

使用drs.cdc的avro格式需要在Kafka上游的业务程序中引入GDS-Kafka-common和GDS-Kafka-source的maven依赖,然后在代码中创建并填充Record对象,一个Record对象表示一条表记录,最后将Record对象序列化为byte[]数组生产至Kafka供下游的GDS-Kafka使用。

如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为0的记录的name字段由a改为b;将age字段由18改为20:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Record record = new Record();
// 设置目标表schema和table名称
record.setTableName("public.person");
// 设置字段列表
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", 0));
fields.add(new Field("name", 1));
fields.add(new Field("age", 2));
record.setFields(fields);
// 设置表记录更新前的字段值列表
List<Object> before = new ArrayList<>();
before.add(new Integer(0, "0"));
before.add(new Character("utf-8", ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8))));
before.add(new Integer(0, "18"));
record.setBeforeImages(before);
// 设置表记录更新后的字段值列表
List<Object> after = new ArrayList<>();
after.add(new Integer(0, "0"));
after.add(new Character("utf-8", ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8))));
after.add(new Integer(0, "20"));
record.setAfterImages(after);
// 设置操作类型
record.setOperation("U");
// 设置操作时间
record.setUpdateTimestamp(325943905);
// Record对象序列化为byte[]数组
byte[] msg = Record.getEncoder().encode(record).array();

标准avro格式:

  • tableName字段用于描述当前记录所属的目标表名和schema名称。【必需】
  • operation字段用于描述当前记录是何种类型的操作:I表示insert操作,U表示update操作,D表示delete操作。【必需】
  • updateTimestamp表示源端操作发生的时间。【非必需】
  • beforeImages列表只有在operation为U或D时需要,用于描述当前记录在更新或删除之前的信息,before body体中的字段对应目标表中的字段;【U/D必需】
  • afterImages列表只有在op_type为U或I时需要,用于描述当前记录更新后的信息或新插入的信息;【U/D必需】
  • fields列表用于描述当前表记录的字段列表,字段的index值必须与beforeImage和afterImage中的顺序一致;【必需】

cdc.json

如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为1的记录的name字段由a改为b;将age字段由18改为20:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
"table": "public.person",
"op_type": "U",
"op_ts": "1668426344",
"current_ts": "1668426344",
"before": {
"id":"1",
"name":"a",
"age": 18
},
"after": {
"id":"1",
"name":"b",
"age": 20
}
}

标准json格式:

  • table字段用于描述当前记录所属的目标表名和schema名称;【必需】
  • op_type字段用于描述当前记录是何种类型的操作:I表示insert操作,U表示update操作,D表示delete操作;【必需】
  • op_ts表示源端操作发生的时间;【非必需】
  • current_ts表示该消息入Kafka的时间;【非必需】
  • before对象只有在op_type为U或D时需要,用于描述当前记录在更新或删除之前的信息,before body体中的字段对应目标表中的字段;【U/D必需】
  • after对象只有在op_type为U或I时需要,用于描述当前记录更新后的信息或新插入的信息;【U/D必需】

industrial.iot.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"working_time":"10000"
},
}

IoT数据格式:

  • header中的thing_model_name表示表名。【必需】
  • header中的thing_id, instance_id, timestamp和body中的内容一起构成当前记录的字段内容。【必需】
  • IoT数据为时序数据,不会存在修改和删除场景,只有insert。

industrial.iot.recursion.json

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
"header": {
"thing_id":"a0001",
"instance_id":"1",
"thing_model_name":"computer",
"timestamp":"1668426344"
},
"body": {
"status":"Normal",
"temperature":"10",
"property":{
  "key1":"1",
  "key2":2
},
"working_time":"10000"
},
}

IoT数据格式:

  • header中的thing_model_name表示表名。【必需】
  • header中的thing_id, instance_id, timestamp和body中的内容一起构成当前记录的字段内容。【必需】
  • IoT数据为时序数据,不会存在修改和删除场景,只有insert。
  • 该数据格式会对body属性拆分,将其key、value分别添加到新样式的property、value中,生成多条新数据,完成行转列。

industrial.iot.event.json.independent.table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2
    }
}

IoT事件流数据格式:

  • event_name表示表名。【必需】
  • event_id, start_time, end_time和fields中的内容一起构成当前记录的字段内容。【必需】
  • IoT事件流数据为时序数据,不会存在修改和删除场景,只有insert。

industrial.iot.json.multi.events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
"event_id":"1",
"event_name":"test",
"start_time":"1970-1-1T00:00:00.000Z",
"end_time":"1970-1-1T00:00:00.000Z",
"fields":{
    "field1":"value1",
    "field2":2,
    "field3":{
       "key1":"1",
       "key2":2
       }
    }
}

IoT事件流数据格式:

  • event_name表示表名。【必需】
  • event_id, start_time, end_time和fields中的内容一起构成当前记录的字段内容。【必需】
  • IoT事件流数据为时序数据,不会存在修改和删除场景,只有insert。
  • 该数据格式会对fields属性拆分,将其key、value分别添加到新样式的field_name、field_value中,生成多条新数据,完成行转列。

GDS-Kafka入库模式

GDS-Kafka的数据入库都是先将数据copy至临时表,然后再根据客户的使用场景以及目标表有无主键进行merge或者insert,详见下表:

表2 GDS-Kafka入库模式

入库操作

app.insert.directly

是否主键表

入库模式

insert

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

根据主键从临时表merge到目标表。

使用insert select从临时表写入到目标表。

delete

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

说明:

delete操作支持标记删除,通过配置app.del.flag参数可以指定删除标记字段,如果配置了标记删除字段,则会通过将删除字段设置为1来标记删除的记录。

  • 如果设置了delflag字段,则会根据主键进行匹配merge,如果匹配到主键并且目标表中记录的pos小于临时表记录的pos,则会将delflag字段置为1,否则将插入一条新的记录。
  • 如果没有设置delflag字段,则会根据主键进行匹配,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将目标表中匹配到的记录删除。

  • 如果设置了delflag字段,则会使用临时表中记录的所有字段与目标表进行匹配merge,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将delflag字段值置为1,否则将插入一条新的记录。
  • 如果没有设置delflag字段,则会使用临时表中记录的所有字段与目标表进行匹配,如果匹配到记录并且目标表中记录的pos小于临时表记录的pos,则会将目标表中匹配到的记录删除。

update

true(仅支持无主键表)

使用insert select从临时表写入到目标表。

false

说明:

update操作会被拆分,将before或者beforeImage中的消息拆分为delete操作,将after或者afterImage中的消息拆分为insert操作,然后再按照insert和delete的行为进行入库处理。

相当于有主键表的insert+delete操作。

相当于无主键表的insert+delete操作。