Kafka Source Stream
Overview
Create a source stream to obtain data from Kafka as input data for jobs.
Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.
Prerequisites
- If the Kafka server listens on the port using hostname, you need to add the mapping between the hostname and IP address of the Kafka Broker node to the CS cluster. Contact the Kafka service deployment personnel to obtain the hostname and IP address of the Kafka Broker node. For details about how to add an IP-domain mapping, see the description of Adding an IP-Domain Mapping in Cluster Management in the Cloud Stream Service User Guide.
- When using offline Kafka clusters, use VPC peering connections to connect CS to Kafka.
For details about how to set up the VPC peering connection, see VPC Peering Connection in the Cloud Stream Service User Guide.
Syntax
Syntax
CREATE SOURCE STREAM kafka_source (name STRING, age int)WITH (type = "kafka",kafka_bootstrap_servers = "",kafka_group_id = "",kafka_topic = "",encode = "json",json_config="")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
Description
| Parameter | Mandatory | Description |
|---|---|---|
| type | Yes | Data source type. Value Kafka indicates that the data source is Kafka. |
| kafka_bootstrap_servers | Yes | Port that connects CS to Kafka. Use VPC peering connections to connect CS clusters with Kafka clusters. |
| kafka_group_id | Yes | Group ID. |
| kafka_topic | Yes | Kafka topic to be read. |
| encode | Yes | Data encoding format. The value can be json, csv, or blob.
|
| json_config | No | If encode is set to json, you can use this parameter to specify the mapping between JSON fields and stream attribute fields. The format is field1=json_field1;field2=json_field2. |
| field_delimiter | No | If encode is set to csv, you can use this parameter to specify the separator between CSV fields. By default, the comma (,) is used. |
| quote | No | Quoted symbol in a data format. The attribute delimiters between two quoted symbols are treated as common characters.
NOTE: After this parameter is specified, ensure that each field does not contain quoted symbols or contains an even number of quoted symbols. Otherwise, parsing will fail. |
| timeindicator | No | Timestamp added in the source stream. The value can be processing time or event time. NOTE:
|
| start_time | No | Start time when Kafka data is ingested. If this parameter is specified, CS reads data read from the specified time. The parameter value is in the format of yyyy-MM-dd HH:mm:ss. Ensure that the value of start_time is not later than the current time. Otherwise, no data will be obtained. |
Precautions
The attribute type used as the timestamp must be long or timestamp.
Example
Read Kafka topic test. Data instance: {"attr1": "lilei", "attr2": 18}
CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
Last Article: Open-source Ecosystem: Apache Kafka
Next Article: Kafka Sink Stream
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.