CloudTable OpenTSDB输出流
功能描述
DLI将Flink作业的输出数据输出到CloudTable的OpenTSDB中。OpenTSDB是基于HBase的分布式的,可伸缩的时间序列数据库。它存储的是时间序列数据,时间序列数据是指在不同时间点上收集到的数据,这类数据反映了一个对象随时间的变化状态或程度。支持秒级别数据的采集监控,进行永久存储,索引和查询,可用于系统监控和测量、物联网数据、金融数据和科学实验结果数据的收集监控。
表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
前提条件
- 确保CloudTable的集群已经开启了OpenTSDB服务。
- 该场景作业需要运行在DLI的独享队列上,因此要与CloudTable HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 8 9 10 11 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", cluster_id = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" ) |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,“opentsdb”表示输出到表格存储服务CloudTable(OpenTSDB)。 |
region |
是 |
表格存储服务所在区域。 |
cluster_id |
否 |
待插入数据所属集群的id,该参数与tsdb_link_address必须指定其中一个。 |
tsdb_metrics |
是 |
数据点的metric,支持参数化。 |
tsdb_timestamps |
是 |
数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 |
tsdb_values |
是 |
数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 |
tsdb_tags |
是 |
数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 |
batch_insert_data_num |
否 |
表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。 |
tsdb_link_address |
否 |
待插入数据所属集群的OpenTsdb链接地址,使用该参数时,作业需要运行在独享DLI队列,且DLI队列需要与CloudTable集群建立增强型跨源,该参数与cluster_id必须指定其中一个。
说明:
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 |
注意事项
当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
示例
将流weather_out的数据输出到表格存储服务CloudTable的OpenTSDB中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" ); |