更新时间:2024-02-06 GMT+08:00
分享

实现Controller

Controller实现背景

在创建Kafka实例时,Kafka-Operator需要创建的Kubernetes资源如下:

  • 1个StatefulSet,包含3个Pod分别启动ZooKeeper;
  • 1个Service,用来暴露ZooKeeper访问地址;
  • 1个StatefulSet,包含3个Pod分别启动Hwfka broker;
  • 1个Service,用来暴露Hwfka访问地址;
  • 1个Deployment,包含1个Pod启动KafkaManager;
  • 1个Service,用来暴露KafkaManager访问地址。

在创建API的时候,SDK已自动创建controllers/hwfka_controller.go,为SetupWithManager添加对Service、StatefulSet、Deployment的监听和管理。

import (
	"context"
	"fmt"

	"github.com/go-logr/logr"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"

	osctestv1 "hwfka-operator/api/v1"
)

func (r *HwfkaReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&osctestv1.Hwfka{}).
		Owns(&corev1.Service{}).
		Owns(&appsv1.StatefulSet{}).
		Owns(&appsv1.Deployment{}).
		Complete(r)
}

调度逻辑

对于Kubernetes集群内任何Hwfka或Service、StatefulSet、Deployment的变化,Controller都会监听到,并生成事件,触发Reconcile()方法。需要在Reconcile()方法中实现协调逻辑,创建Service、StatefulSet等资源,并更新应用实例状态,实现的过程中,可以参考:

  1. 查询资源:控制器使用controller-runtime库中的Client实现对Kubernetes资源的增查改删,示例代码参见:example_test.go
  2. 创建资源:使用Go语言调用Kubernetes API创建资源,可参考Kubernetes API Reference
  3. 设置关联:为创建的Kubernetes资源设置ownerReferences,以便其能在CR删除时被级联删除,可参考如下代码。
    // 为指定命名空间和名称的 secret 设置 owner,返回 secret
    func (k *K8sClient) SetSecretOwner(cr *oscv1.Kafka, secretName string) (*corev1.Secret, error) {
    	ctx := context.Background()
    
    	secret, err := k.GetSecretByNamespaceName(cr.Namespace, secretName)
    	if err != nil {
    		return nil, err
    	}
    	if secret.OwnerReferences == nil {
    		patch := client.MergeFrom(secret.DeepCopy())
    		secret.OwnerReferences = []metav1.OwnerReference{
    			*metav1.NewControllerRef(cr, schema.GroupVersionKind{
    				Group:   cr.GroupVersionKind().Group,
    				Version: cr.GroupVersionKind().Version,
    				Kind:    cr.Kind,
    			}),
    		}
    		if err := k.Patch(ctx, secret, patch); err != nil {
    			return nil, err
    		}
    	}
    	return secret, nil
    }
  4. 如果一个动作的处理时间较长,为了避免Reconcile阻塞,需要使请求返回并重新排队,有四种方法:
    // 请求成功,不再排队
    return ctrl.Result{}, nil
    // 请求失败,重新加入队列
    return ctrl.Result{}, err
    // 请求因某种原因需要重新加入队列
    return ctrl.Result{Requeue: true}, nil
    // 请求因某种原因,需要在指定时间后重新加入队列
    return ctrl.Result{RequeueAfter: time.Second*5}, nil
  5. 当删除CR时,如果需要预先清理Kubernetes之外的资源,此时仅凭ownerReferences无法实现,可以利用finalizers特性。利用finalizers延时删除资源的方法请参见Using Finalizers

    示例代码请参见Controller实现

挂载存储

应用实例可以使用本地磁盘存储(HostPath),也可以使用网络存储或云存储(推荐),在Kubernetes中,通过PersistentVolume(PV)方式挂载存储,具体请参见挂载存储

云存储类型

云存储类型主要指标包括:

  • IO读写时延:连续两次进行读写操作所需要的最小时间间隔。
  • IOPS:每秒进行读写的操作次数。
  • 吞吐量:每秒读取和写入的数据量。

RBAC权限管理

Operator使用基于角色的访问控制机制对Kubernetes中的资源进行访问,因此,要保证Operator正确的运行,需要使用管理员权限的账号对Operator进行授权,即预先创建对应的role和rolebinding。

在Reconcile()上添加markers,增加Operator对于Service、StatefulSet、Deployment、Pod的管理权限,SDK即可自动生成对应的role和rolebinding等资源描述文件。

// +kubebuilder:rbac:groups=osctest.huawei.com,resources=hwfkas,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=osctest.huawei.com,resources=hwfkas/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;update

func (r *HwfkaReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    // ...
}

如果Operator为namespace级别的,则需要还添加namespace,如:

// +kubebuilder:rbac:groups=osctest.huawei.com,namespace=example,resources=hwfkas,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=osctest.huawei.com,namespace=example,resources=hwfkas/status,verbs=get;update;patch

相关文档