应用与数据集成平台 ROMA Connect
应用与数据集成平台 ROMA Connect
- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- 开始使用ROMA Connect
- 实例管理
- 集成应用管理
-
数据源管理
- ROMA Connect支持的数据源
- 接入API数据源
- 接入ActiveMQ数据源
- 接入ArtemisMQ数据源
- 接入DB2数据源
- 接入DIS数据源
- 接入DWS数据源
- 接入DM数据源
- 接入Gauss100数据源
- 接入FTP数据源
- 接入HL7数据源
- 接入HANA数据源
- 接入HIVE数据源
- 接入LDAP数据源
- 接入IBM MQ数据源
- 接入Kafka数据源
- 接入MySQL数据源
- 接入MongoDB数据源
- 接入MQS数据源
- 接入MRS Hive数据源
- 接入MRS HDFS数据源
- 接入MRS HBase数据源
- 接入MRS Kafka数据源
- 接入OBS数据源
- 接入Oracle数据源
- 接入PostgreSQL数据源
- 接入Redis数据源
- 接入RabbitMQ数据源
- 接入RocketMQ数据源
- 接入SAP数据源
- 接入SNMP数据源
- 接入SQL Server数据源
- 接入GaussDB(for MySQL)数据源
- 接入WebSocket数据源
- 接入自定义数据源
- 数据集成指导
- 服务集成指导
- 服务集成指导(旧版界面)
- 消息集成指导
- 设备集成指导
- 应用业务模型使用指导
- 扩大资源配额
- 查看审计日志
- 查看监控指标
- 权限管理
- 用户指南(新版)
- 最佳实践
-
开发指南
- 数据集成开发指导
-
服务集成开发指导
- 开发说明
- API调用认证开发(APP认证)
- API调用认证开发(IAM认证)
-
自定义后端开发(函数后端)
- 函数后端脚本开发说明
- AesUtils类说明
- APIConnectResponse类说明
- Base64Utils类说明
- CacheUtils类说明
- CipherUtils类说明
- ConnectionConfig类说明
- DataSourceClient类说明
- DataSourceConfig类说明
- ExchangeConfig类说明
- HttpClient类说明
- HttpConfig类说明
- JedisConfig类说明
- JSON2XMLHelper类说明
- JSONHelper类说明
- JsonUtils类说明
- JWTUtils类说明
- KafkaConsumer类说明
- KafkaProducer类说明
- KafkaConfig类说明
- MD5Encoder类说明
- Md5Utils类说明
- QueueConfig类说明
- RabbitMqConfig类说明
- RabbitMqProducer类说明
- RedisClient类说明
- RomaWebConfig类说明
- RSAUtils类说明
- SapRfcClient类说明
- SapRfcConfig类说明
- SoapClient类说明
- SoapConfig类说明
- StringUtils类说明
- TextUtils类说明
- XmlUtils类说明
- 自定义后端开发(数据后端)
- 后端服务签名校验开发
- 消息集成开发指导
- 设备集成开发指导
-
API参考
- 使用前必读
- API概览
- 如何调用API
- 公共资源API
- 数据集成API
- 服务集成API
- 消息集成API
- 设备集成API
- 应用示例
- 权限和授权项
- 附录
- 历史API
- 修订记录
- SDK参考
-
常见问题
- 实例管理
-
数据集成
-
数据集成普通任务
- FDI各类数据库支持哪些数据类型?
- 跟踪号是什么,能跟踪到数据吗?
- FDI任务是否支持清空目标表?
- FDI任务只能采集单张表到单张表吗?
- 用户创建的FDI任务,同一账号的其他用户可见吗?
- FDI通过公网对接其他租户的MRS HIVE如何配置?
- 从OBS解析文件到RDS数据库,采集过一次后,后面采集会进行更新吗?
- OBS源端的CSV文件解析到关系型数据库时,列的值不对怎么办?
- MRS Hive目标字段和源端字段数据类型不匹配时,数据是否能集成到目标端?
- MRS Hive、MRS HBase和MongoDB的Mapping映射手动输入时,是否区分大小写?
- MRS Hive是否支持分区?
- 源端API类型数据源自定义周期如何设置?
- SAP是否支持分页读取视图?
- 数据集成组合任务
-
数据集成普通任务
- 服务集成
- 消息集成
- 设备集成
-
故障排除
-
数据集成任务
- MRS Hive目标端写入时出现数据乱码
- MRS Hive写入时数据全部写在第一个字段里
- 目标端任务报任务运行超时
- MySQL到MRS Hive时目标端报“could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation”错误
- Mysql到Mysql时源端报“Illegal mix of collations for operation 'UNION'”错误
- 源端Mysql增量采集每小时执行一次时部分数据丢失
- API到MySQL时源端报“401 unauthorized”错误
- Kafka集到Mysql目标端报“cannot find record mapping field”错误
- API到MySQL的定时任务时会出现源端报“connect timeout”错误
- Kafka到Mysql的实时任务时,MQS中的Topic下有数据,但是FDI任务没有采集到数据。
- Mysql到Mysql的定时任务,源端有类型为tinyint(1),值为2的字段,但是采集到目标端值就变成了1
- 目标端数据源为公网Kafka时,定时任务目标端报“The task executes failed.Writer data to kafka failed”错误
- 数据集成组合任务
- 数据源
- 服务集成
- 设备集成
-
数据集成任务
- 视频帮助
- 文档下载
- 通用参考
本文导读
链接复制成功!
Go客户端使用说明
操作场景
本文以Linux CentOS环境为例,介绍Go版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。
引入Kafka客户端
MQS基于Kafka社区版本1.1.0、2.7,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Go开源客户端的版本使用请参见客户端版本使用建议。
执行以下命令,安装对应版本的Go Kafka客户端。
go get github.com/confluentinc/confluent-kafka-go/kafka
生产消息
- SASL认证方式
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- brokers:MQS连接地址和端口。
- topics:要生产消息的Topic名称。
- user和password:开启SASL_SSL认证时所使用的用户名和密码。
- caFile:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
package main import ( "bufio" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" topics = "topic_name" ) func main() { log.Println("Starting a new kafka producer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, } producer, err := kafka.NewProducer(config) if err != nil { log.Panicf("producer error, err: %v", err) return } go func() { for e := range producer.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { log.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { log.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) fmt.Println("please enter message:") go func() { for { err := producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny}, Value: GetInput(), }, nil) if err != nil { log.Panicf("send message fail, err: %v", err) return } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } // Wait for message deliveries before shutting down producer.Flush(15 * 1000) producer.Close() } func GetInput() []byte { reader := bufio.NewReader(os.Stdin) data, _, _ := reader.ReadLine() return data }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- brokers:MQS连接地址和端口。
- topics:要生产消息的Topic名称。
消费消息
- SASL认证方式
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group_id" topics = "topic_name" user = "username" password = "password" caFile = "phy_ca.crt" ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": user, "sasl.password": password, "ssl.ca.location": caFile, } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- brokers:MQS连接地址和端口。
- group:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- topics:要消费消息的Topic名称。
- user和password:开启SASL_SSL认证时所使用的用户名和密码。
- caFile:开启SASL_SSL认证时所使用的客户端证书。
- 非SASL认证方式
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "log" "os" "os/signal" "syscall" ) var ( brokers = "ip1:port1,ip2:port2,ip3:port3" group = "group_id" topics = "topic_name" ) func main() { log.Println("Starting a new kafka consumer") config := &kafka.ConfigMap{ "bootstrap.servers": brokers, "group.id": group, "auto.offset.reset": "earliest", } consumer, err := kafka.NewConsumer(config) if err != nil { log.Panicf("Error creating consumer: %v", err) return } err = consumer.SubscribeTopics([]string{topics}, nil) if err != nil { log.Panicf("Error subscribe consumer: %v", err) return } go func() { for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Consumer error: %v (%v)", err, msg) } else { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } } }() sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) select { case <-sigterm: log.Println("terminating: via signal") } if err = consumer.Close(); err != nil { log.Panicf("Error closing consumer: %v", err) } }
示例代码中的参数说明,可参考获取MQS连接信息获取参数值。
- brokers:MQS连接地址和端口。
- group:消费组名称。根据业务需要,自定义消费组名称,如果设置的消费组不存在,系统会自动创建。
- topics:要消费消息的Topic名称。
父主题: MQS连接开发(开源客户端)