更新时间:2024-03-05 GMT+08:00

使用ACL权限访问

实例开启ACL访问控制后,消息生产者和消费者都需要增加用户认证信息。

生产者增加用户认证信息

  • 普通消息、顺序消息和定时消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
    import (
    	"context"
    	"fmt"
    	"os"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    func main() {
    	p, err := rocketmq.NewProducer(
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithCredentials(primitive.Credentials{
    			AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    			SecretKey: os.Getenv("ROCKETMQ_SK"), 
    		}),
    	)
    
    	if err != nil {
    		fmt.Println("init producer error: " + err.Error())
    		os.Exit(0)
    	}
    
    	err = p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s", err.Error())
    		os.Exit(1)
    	}
    	res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
    		[]byte("Hello RocketMQ Go Client!")))
    
    	if err != nil {
    		fmt.Printf("send message error: %s\n", err)
    	} else {
    		fmt.Printf("send message success: result=%s\n", res.String())
    	}
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • 192.168.0.1:8100:表示实例连接地址和端口。
    • AccessKey:表示用户名。创建用户的步骤,请参见创建用户
    • SecretKey:表示用户的密钥。
    • test:表示Topic名称。
  • 事务消息,参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。
    package main
    
    import (
    	"context"
    	"fmt"
    	"os"
    	"strconv"
    	"sync"
    	"sync/atomic"
    	"time"
    
    	"github.com/apache/rocketmq-client-go/v2"
    	"github.com/apache/rocketmq-client-go/v2/primitive"
    	"github.com/apache/rocketmq-client-go/v2/producer"
    )
    
    type DemoListener struct {
    	localTrans       *sync.Map
    	transactionIndex int32
    }
    
    func NewDemoListener() *DemoListener {
    	return &DemoListener{
    		localTrans: new(sync.Map),
    	}
    }
    
    func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
    	nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
    	fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
    	status := nextIndex % 3
    	dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
    
    	fmt.Printf("dl")
    	return primitive.UnknowState
    }
    
    func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
    	fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
    	v, existed := dl.localTrans.Load(msg.TransactionId)
    	if !existed {
    		fmt.Printf("unknow msg: %v, return Commit", msg)
    		return primitive.CommitMessageState
    	}
    	state := v.(primitive.LocalTransactionState)
    	switch state {
    	case 1:
    		fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
    		return primitive.CommitMessageState
    	case 2:
    		fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
    		return primitive.RollbackMessageState
    	case 3:
    		fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
    		return primitive.UnknowState
    	default:
    		fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
    		return primitive.CommitMessageState
    	}
    }
    
    func main() {
    	p, _ := rocketmq.NewTransactionProducer(
    		NewDemoListener(),
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
    		producer.WithRetry(2),
    		producer.WithCredentials(primitive.Credentials{
    			AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
    			SecretKey: os.Getenv("ROCKETMQ_SK"),
    		}),
    	)
    	err := p.Start()
    	if err != nil {
    		fmt.Printf("start producer error: %s\n", err.Error())
    		os.Exit(1)
    	}
    
    	for i := 0; i < 10; i++ {
    		res, err := p.SendMessageInTransaction(context.Background(),
    			primitive.NewMessage("topic1", []byte("Hello RocketMQ again "+strconv.Itoa(i))))
    
    		if err != nil {
    			fmt.Printf("send message error: %s\n", err)
    		} else {
    			fmt.Printf("send message success: result=%s\n", res.String())
    		}
    	}
    	time.Sleep(5 * time.Minute)
    	err = p.Shutdown()
    	if err != nil {
    		fmt.Printf("shutdown producer error: %s", err.Error())
    	}
    }

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • 192.168.0.1:8100:表示实例连接地址和端口。
    • AccessKey:表示用户名。创建用户的步骤,请参见创建用户
    • SecretKey:表示用户的密钥。
    • topic1:表示Topic名称。

消费者增加用户认证信息

无论是普通消息、顺序消息、定时消息,还是事务消息,都参考如下代码。以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: os.Getenv("ROCKETMQ_AK"), //用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。
			SecretKey: os.Getenv("ROCKETMQ_SK"),
		}),
	)
	if err != nil {
		fmt.Println("init consumer error: " + err.Error())
		os.Exit(0)
	}

	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}

示例代码中的参数说明如下,请参考收集连接信息获取参数值。

  • testGroup:表示消费组名称。
  • 192.168.0.1:8100:表示实例连接地址和端口。
  • AccessKey:表示用户名。创建用户的步骤,请参见创建用户
  • SecretKey:表示用户的密钥。
  • test:表示Topic名称。