更新时间:2024-05-15 GMT+08:00
使用ACL权限访问
实例开启ACL访问控制后,消息生产者和消费者都需要增加用户认证信息。
生产者增加用户认证信息
- 普通消息、顺序消息和定时消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
import ( "context" "fmt" "os" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { p, err := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) if err != nil { fmt.Println("init producer error: " + err.Error()) os.Exit(0) } err = p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- 192.168.0.1:8100:表示实例连接地址和端口。
- AccessKey:表示用户名。创建用户的步骤,请参见创建用户。
- SecretKey:表示用户的密钥。
- test:表示Topic名称。
- 事务消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
package main import ( "context" "fmt" "os" "strconv" "sync" "sync/atomic" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) type DemoListener struct { localTrans *sync.Map transactionIndex int32 } func NewDemoListener() *DemoListener { return &DemoListener{ localTrans: new(sync.Map), } } func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState { nextIndex := atomic.AddInt32(&dl.transactionIndex, 1) fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId) status := nextIndex % 3 dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1)) fmt.Printf("dl") return primitive.UnknowState } func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState { fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId) v, existed := dl.localTrans.Load(msg.TransactionId) if !existed { fmt.Printf("unknow msg: %v, return Commit", msg) return primitive.CommitMessageState } state := v.(primitive.LocalTransactionState) switch state { case 1: fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState case 2: fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg) return primitive.RollbackMessageState case 3: fmt.Printf("checkLocalTransaction unknow: %v\n", msg) return primitive.UnknowState default: fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState } } func main() { p, _ := rocketmq.NewTransactionProducer( NewDemoListener(), producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s\n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ again "+strconv.Itoa(i)))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } time.Sleep(5 * time.Minute) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- 192.168.0.1:8100:表示实例连接地址和端口。
- AccessKey:表示用户名。创建用户的步骤,请参见创建用户。
- SecretKey:表示用户的密钥。
- topic1:表示Topic名称。
消费者增加用户认证信息
无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, err := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) if err != nil { fmt.Println("init consumer error: " + err.Error()) os.Exit(0) } err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }
示例代码中的参数说明如下,请参考收集连接信息获取参数值。
- testGroup:表示消费组名称。
- 192.168.0.1:8100:表示实例连接地址和端口。
- AccessKey:表示用户名。创建用户的步骤,请参见创建用户。
- SecretKey:表示用户的密钥。
- test:表示Topic名称。
父主题: Go(TCP协议)