Updated on 2024-03-05 GMT+08:00

Controlling Access with ACL

After ACL is enabled for an instance, user authentication information must be added to both the producer and consumer configurations.

Adding User Authentication Information to the Producer

  • For normal, ordered, and scheduled messages, add the following code. Replace the information in bold with the actual values.
    import (
    	"context"
    	"fmt"
    	"os"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    func main() {
    	p, err := rocketmq.NewProducer(
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithCredentials(primitive.Credentials{
    			AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    			SecretKey: os.Getenv("ROCKETMQ_SK"), 
    		}),
    	)
    
    	if err != nil {
    		fmt.Println("init producer error: " + err.Error())
    		os.Exit(0)
    	}
    
    	err = p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s", err.Error())
    		os.Exit(1)
    	}
    	res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
    		[]byte("Hello RocketMQ Go Client!")))
    
    	if err != nil {
    		fmt.Printf("send message error: %s\n", err)
    	} else {
    		fmt.Printf("send message success: result=%s\n", res.String())
    	}
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • 192.168.0.1:8100: instance address and port.
    • AccessKey: username. For details about how to create a user, see Creating a User.
    • SecretKey: secret key of the user.
    • test: topic name.
  • For transactional messages, add the following code. Replace the information in bold with the actual values.
    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"strconv"
    	"sync"
    	"sync/atomic"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    type DemoListener struct {
    	localTrans       *sync.Map
    	transactionIndex int32
    }
    
    func NewDemoListener() *DemoListener {
    	return &DemoListener{
    		localTrans: new(sync.Map),
    	}
    }
    
    func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
    	nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
    	fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
    	status := nextIndex % 3
    	dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
    
    	fmt.Printf("dl")
    	return primitive.UnknowState
    }
    
    func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
    	fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
    	v, existed := dl.localTrans.Load(msg.TransactionId)
    	if !existed {
    		fmt.Printf("unknow msg: %v, return Commit", msg)
    		return primitive.CommitMessageState
    	}
    	state := v.(primitive.LocalTransactionState)
    	switch state {
    	case 1:
    		fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
    		return primitive.CommitMessageState
    	case 2:
    		fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
    		return primitive.RollbackMessageState
    	case 3:
    		fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
    		return primitive.UnknowState
    	default:
    		fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
    		return primitive.CommitMessageState
    	}
    }
    
    func main() {
    	p, _ := rocketmq.NewTransactionProducer(
    		NewDemoListener(),
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithCredentials(primitive.Credentials{
    			AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
    			SecretKey: os.Getenv("ROCKETMQ_SK"),
    		}),
    	)
    	err := p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s\n", err.Error())
    		os.Exit(1)
    	}
    
    	for i := 0; i < 10; i++ {
    		res, err := p.SendMessageInTransaction(context.Background(),
    			primitive.NewMessage("topic1", []byte("Hello RocketMQ again "+strconv.Itoa(i))))
    
    		if err != nil {
    			fmt.Printf("send message error: %s\n", err)
    		} else {
    			fmt.Printf("send message success: result=%s\n", res.String())
    		}
    	}
    	time.Sleep(5 * time.Minute)
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • 192.168.0.1:8100: instance address and port.
    • AccessKey: username. For details about how to create a user, see Creating a User.
    • SecretKey: secret key of the user.
    • topic1: topic name.

Adding User Authentication Information to the Consumer

Add the following code for normal, ordered, scheduled, and transactional messages. Replace the information in bold with the actual values.

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("ROCKETMQ_AK"), //Hard-coded or plaintext username and key are risky. You are advised to store them in ciphertext in a configuration file or an environment variable.
			SecretKey: os.Getenv("ROCKETMQ_SK"),
		}),
	)
	if err != nil {
		fmt.Println("init consumer error: " + err.Error())
		os.Exit(0)
	}

	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}

The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

  • testGroup: consumer group name.
  • 192.168.0.1:8100: instance address and port.
  • AccessKey: username. For details about how to create a user, see Creating a User.
  • SecretKey: secret key of the user.
  • test: topic name.