更新时间:2024-02-07 GMT+08:00

CloudTable OpenTSDB输出流

功能描述

DLI将Flink作业的输出数据输出到CloudTable的OpenTSDB中。OpenTSDB是基于HBase的分布式的,可伸缩的时间序列数据库。它存储的是时间序列数据,时间序列数据是指在不同时间点上收集到的数据,这类数据反映了一个对象随时间的变化状态或程度。支持秒级别数据的采集监控,进行永久存储,索引和查询,可用于系统监控和测量、物联网数据、金融数据和科学实验结果数据的收集监控。

表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。

前提条件

  • 该场景作业需要运行在DLI的独享队列上,因此要与CloudTable HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立增强型跨源连接,请参见《数据湖探索用户指南》增强型跨源连接章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》“安全组”章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "opentsdb",
    region = "",
    cluster_id = "",
    tsdb_metrics = "",
    tsdb_timestamps = "",
    tsdb_values = "",
    tsdb_tags = "",
    batch_insert_data_num = ""
  )

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,“opentsdb”表示输出到表格存储服务CloudTable(OpenTSDB)。

region

表格存储服务所在区域。

cluster_id

待插入数据所属集群的id,该参数与tsdb_link_address必须指定其中一个。

tsdb_metrics

数据点的metric,支持参数化。

tsdb_timestamps

数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。

tsdb_values

数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。

tsdb_tags

数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。

batch_insert_data_num

表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。

tsdb_link_address

待插入数据所属集群的OpenTsdb链接地址,使用该参数时,作业需要运行在独享DLI队列,且DLI队列需要与CloudTable集群建立增强型跨源,该参数与cluster_id必须指定其中一个。

说明:

如何建立增强型跨源连接,请参见《数据湖探索用户指南》增强型跨源连接章节。

注意事项

当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。

示例

将流weather_out的数据输出到表格存储服务CloudTable的OpenTSDB中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE SINK STREAM weather_out (
  timestamp_value LONG, /* 时间 */
  temperature FLOAT, /* 温度值 */
  humidity FLOAT, /* 湿度值 */
  location STRING /* 地点 */
)
  WITH (
    type = "opentsdb",
    region = "xxx",
    cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f",
    tsdb_metrics = "weather",
    tsdb_timestamps = "${timestamp_value}",
    tsdb_values = "${temperature}; ${humidity}",
    tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity",
    batch_insert_data_num = "10"
);