Kafka Source Stream

Overview

Create a source stream to obtain data from Kafka as input data for jobs.

Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.

Prerequisites

  • If the Kafka server listens on the port using hostname, you need to add the mapping between the hostname and IP address of the Kafka Broker node to the CS cluster. Contact the Kafka service deployment personnel to obtain the hostname and IP address of the Kafka Broker node. For details about how to add an IP-domain mapping, see the description of Adding an IP-Domain Mapping in Cluster Management in the Cloud Stream Service User Guide.
  • When using offline Kafka clusters, use VPC peering connections to connect CS to Kafka.

    For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.

Syntax

Syntax

CREATE SOURCE STREAM kafka_source (name STRING, age int)WITH (type = "kafka",kafka_bootstrap_servers = "",kafka_group_id = "",kafka_topic = "",encode = "json",json_config="")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Description

Table 1 Syntax description

Parameter

Mandatory

Description

type

Yes

Data source type. Value Kafka indicates that the data source is Kafka.

kafka_bootstrap_servers

Yes

Port that connects CS to Kafka. Use VPC peering connections to connect CS clusters with Kafka clusters.

kafka_group_id

Yes

Group ID.

kafka_topic

Yes

Kafka topic to be read.

encode

Yes

Data encoding format. The value can be json, csv, or blob.

  • field_delimiter must be specified if this parameter is set to csv.
  • json_config must be specified if this parameter is set to json.
  • If this parameter is set to blob, the received data is not parsed, only one stream attribute exists, and the stream attribute is of the Array[TINYINT] type.

json_config

No

If encode is set to json, you can use this parameter to specify the mapping between JSON fields and stream attribute fields. The format is field1=json_field1;field2=json_field2.

field_delimiter

No

If encode is set to csv, you can use this parameter to specify the separator between CSV fields. By default, the comma (,) is used.

quote

No

Quoted symbol in a data format. The attribute delimiters between two quoted symbols are treated as common characters.

  • If double quotation marks are used as the quoted symbol, set this parameter to "\u005c\u0022" for character conversion.
  • If a single quotation mark is used as the quoted symbol, set this parameter to a comma (,).
NOTE:

After this parameter is specified, ensure that each field does not contain quoted symbols or contains an even number of quoted symbols. Otherwise, parsing will fail.

timeindicator

No

Timestamp added in the source stream. The value can be processing time or event time.

NOTE:
  • If this parameter is set to processing time, the format is proctime.proctime.

    In this case, an attribute proctime will be added to the original attribute field. If there are three attributes in the original attribute field, four attributes will be exported after this parameter is set to processing time. However, the attribute length remains unchanged if the rowtime attribute is specified.

  • If this parameter is set to event time, you can select an attribute in the stream as the timestamp. The format is attr_name.rowtime.
  • This parameter can be simultaneously set to processing time and event time.

start_time

No

Start time when Kafka data is ingested.

If this parameter is specified, CS reads data read from the specified time. The parameter value is in the format of yyyy-MM-dd HH:mm:ss. Ensure that the value of start_time is not later than the current time. Otherwise, no data will be obtained.

Precautions

The attribute type used as the timestamp must be long or timestamp.

Example

Read Kafka topic test. Data instance: {"attr1": "lilei", "attr2": 18}

CREATE SOURCE STREAM kafka_source (name STRING, age int)
WITH (
  type = "kafka",
  kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
  kafka_group_id = "sourcegroup1", 
  kafka_topic = "test",
  encode = "json",
  json_config = "name=attr1;age=attr2"
);