GDS-Kafka数据接入
GDS-Kafka的工作方式是从kafka中消费数据并缓存,当达到设置好的时间或数据量之后,通过COPY写入DWS临时表,再从临时表进行插入或更新操作。
- Kafka的消息生产端必须按照一定的格式要求进行数据生产,其中消息格式由“kafka.source.event.type”配置参数指定。当前支持的消息格式详见 GDS-Kafka支持的消息格式 。
- GDS-Kafka支持直接insert(仅限无主键表)和merge覆盖更新两种入库模式,您可以根据DWS目标表的类型进行灵活配置,直接insert模式由于不涉及更新在性能上要更优一些。其中入库模式由“app.insert.directly”配置参数和有无主键共同决定,详见GDS-Kafka入库模式 。
- GDS-kafka只支持目标表表名和字段全小写。
- GDS-Kafka的删除是根据扩展字段中的pos进行历史删除,如果入库数据中有delete操作,则必须使用扩展字段。
GDS-Kafka支持的消息格式
kafka.source.event.type |
格式示例 |
格式说明 |
||
---|---|---|---|---|
cdc.drs.avro |
华为云DRS的内部格式,DRS生产至Kafka的avro格式,GDS-Kafka可直接对接进行解析入库。 |
无 |
||
drs.cdc |
使用drs.cdc的avro格式需要在Kafka上游的业务程序中引入GDS-Kafka-common和GDS-Kafka-source的maven依赖,然后在代码中创建并填充Record对象,一个Record对象表示一条表记录,最后将Record对象序列化为byte[]数组生产至Kafka供下游的GDS-Kafka使用。 如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为0的记录的name字段由a改为b;将age字段由18改为20:
|
标准avro格式:
|
||
cdc.json |
如下示例所示,目标表为public模式下的person表;person表由id,name,age 3个字段组成;op_type为U表示是一条更新操作;将id为1的记录的name字段由a改为b;将age字段由18改为20:
|
标准json格式:
|
||
industrial.iot.json |
|
IoT数据格式:
|
||
industrial.iot.recursion.json |
|
IoT数据格式:
|
||
industrial.iot.event.json.independent.table |
|
IoT事件流数据格式:
|
||
industrial.iot.json.multi.events |
|
IoT事件流数据格式:
|
GDS-Kafka入库模式
GDS-Kafka的数据入库都是先将数据copy至临时表,然后再根据客户的使用场景以及目标表有无主键进行merge或者insert,详见下表:
入库操作 |
app.insert.directly |
是否主键表 |
入库模式 |
---|---|---|---|
insert |
true(仅支持无主键表) |
否 |
使用insert select从临时表写入到目标表。 |
false |
是 |
根据主键从临时表merge到目标表。 |
|
否 |
使用insert select从临时表写入到目标表。 |
||
delete |
true(仅支持无主键表) |
否 |
使用insert select从临时表写入到目标表。 |
false
说明:
delete操作支持标记删除,通过配置app.del.flag参数可以指定删除标记字段,如果配置了标记删除字段,则会通过将删除字段设置为1来标记删除的记录。 |
是 |
|
|
否 |
|
||
update |
true(仅支持无主键表) |
否 |
使用insert select从临时表写入到目标表。 |
false
说明:
update操作会被拆分,将before或者beforeImage中的消息拆分为delete操作,将after或者afterImage中的消息拆分为insert操作,然后再按照insert和delete的行为进行入库处理。 |
是 |
相当于有主键表的insert+delete操作。 |
|
否 |
相当于无主键表的insert+delete操作。 |