Controlling Access with ACL
After ACL is enabled for an instance, user authentication information must be added to both the producer and consumer configurations.
Adding User Authentication Information to the Producer
- For normal, ordered, and scheduled messages, add the following code. Replace the information in bold with the actual values.
import ( "context" "fmt" "os" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) func main() { p, err := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) if err != nil { fmt.Println("init producer error: " + err.Error()) os.Exit(0) } err = p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } res, err := p.SendSync(context.Background(), primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!"))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- 192.168.0.1:8100: instance address and port.
- AccessKey: username. For details about how to create a user, see Creating a User.
- SecretKey: secret key of the user.
- test: topic name.
- For transactional messages, add the following code. Replace the information in bold with the actual values.
package main import ( "context" "fmt" "os" "strconv" "sync" "sync/atomic" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) type DemoListener struct { localTrans *sync.Map transactionIndex int32 } func NewDemoListener() *DemoListener { return &DemoListener{ localTrans: new(sync.Map), } } func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState { nextIndex := atomic.AddInt32(&dl.transactionIndex, 1) fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId) status := nextIndex % 3 dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1)) fmt.Printf("dl") return primitive.UnknowState } func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState { fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId) v, existed := dl.localTrans.Load(msg.TransactionId) if !existed { fmt.Printf("unknow msg: %v, return Commit", msg) return primitive.CommitMessageState } state := v.(primitive.LocalTransactionState) switch state { case 1: fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState case 2: fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg) return primitive.RollbackMessageState case 3: fmt.Printf("checkLocalTransaction unknow: %v\n", msg) return primitive.UnknowState default: fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState } } func main() { p, _ := rocketmq.NewTransactionProducer( NewDemoListener(), producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), producer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s\n", err.Error()) os.Exit(1) } for i := 0; i < 10; i++ { res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("topic1", []byte("Hello RocketMQ again "+strconv.Itoa(i)))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } } time.Sleep(5 * time.Minute) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- 192.168.0.1:8100: instance address and port.
- AccessKey: username. For details about how to create a user, see Creating a User.
- SecretKey: secret key of the user.
- topic1: topic name.
Adding User Authentication Information to the Consumer
Add the following code for normal, ordered, scheduled, and transactional messages. Replace the information in bold with the actual values.
package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, err := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithCredentials(primitive.Credentials{ AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable. SecretKey: os.Getenv("ROCKETMQ_SK"), }), ) if err != nil { fmt.Println("init consumer error: " + err.Error()) os.Exit(0) } err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { fmt.Printf("subscribe callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } }
The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.
- testGroup: consumer group name.
- 192.168.0.1:8100: instance address and port.
- AccessKey: username. For details about how to create a user, see Creating a User.
- SecretKey: secret key of the user.
- test: topic name.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot