实现Controller
Controller实现背景
在创建Kafka实例时,Kafka-Operator需要创建的Kubernetes资源如下:
- 1个StatefulSet,包含3个Pod分别启动ZooKeeper;
- 1个Service,用来暴露ZooKeeper访问地址;
- 1个StatefulSet,包含3个Pod分别启动Hwfka broker;
- 1个Service,用来暴露Hwfka访问地址;
- 1个Deployment,包含1个Pod启动KafkaManager;
- 1个Service,用来暴露KafkaManager访问地址。
在创建API的时候,SDK已自动创建controllers/hwfka_controller.go,为SetupWithManager添加对Service、StatefulSet、Deployment的监听和管理。
import ( "context" "fmt" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" osctestv1 "hwfka-operator/api/v1" ) func (r *HwfkaReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&osctestv1.Hwfka{}). Owns(&corev1.Service{}). Owns(&appsv1.StatefulSet{}). Owns(&appsv1.Deployment{}). Complete(r) }
调度逻辑
对于Kubernetes集群内任何Hwfka或Service、StatefulSet、Deployment的变化,Controller都会监听到,并生成事件,触发Reconcile()方法。需要在Reconcile()方法中实现协调逻辑,创建Service、StatefulSet等资源,并更新应用实例状态,实现的过程中,可以参考:
- 查询资源:控制器使用controller-runtime库中的Client实现对Kubernetes资源的增查改删,示例代码参见:example_test.go。
- 创建资源:使用Go语言调用Kubernetes API创建资源,可参考Kubernetes API Reference。
- 设置关联:为创建的Kubernetes资源设置ownerReferences,以便其能在CR删除时被级联删除,可参考如下代码。
// 为指定命名空间和名称的 secret 设置 owner,返回 secret func (k *K8sClient) SetSecretOwner(cr *oscv1.Kafka, secretName string) (*corev1.Secret, error) { ctx := context.Background() secret, err := k.GetSecretByNamespaceName(cr.Namespace, secretName) if err != nil { return nil, err } if secret.OwnerReferences == nil { patch := client.MergeFrom(secret.DeepCopy()) secret.OwnerReferences = []metav1.OwnerReference{ *metav1.NewControllerRef(cr, schema.GroupVersionKind{ Group: cr.GroupVersionKind().Group, Version: cr.GroupVersionKind().Version, Kind: cr.Kind, }), } if err := k.Patch(ctx, secret, patch); err != nil { return nil, err } } return secret, nil }
- 如果一个动作的处理时间较长,为了避免Reconcile阻塞,需要使请求返回并重新排队,有四种方法:
// 请求成功,不再排队 return ctrl.Result{}, nil // 请求失败,重新加入队列 return ctrl.Result{}, err // 请求因某种原因需要重新加入队列 return ctrl.Result{Requeue: true}, nil // 请求因某种原因,需要在指定时间后重新加入队列 return ctrl.Result{RequeueAfter: time.Second*5}, nil
- 当删除CR时,如果需要预先清理Kubernetes之外的资源,此时仅凭ownerReferences无法实现,可以利用finalizers特性。利用finalizers延时删除资源的方法请参见Using Finalizers。
示例代码请参见Controller实现。
挂载存储
应用实例可以使用本地磁盘存储(HostPath),也可以使用网络存储或云存储(推荐),在Kubernetes中,通过PersistentVolume(PV)方式挂载存储,具体请参见挂载存储 。
云存储类型
云存储类型主要指标包括:
- IO读写时延:连续两次进行读写操作所需要的最小时间间隔。
- IOPS:每秒进行读写的操作次数。
- 吞吐量:每秒读取和写入的数据量。
- 更多信息请参见云硬盘EVS:磁盘类型及性能介绍,云磁盘计费方式请参见云硬盘计费说明。
- 通过在集群中创建PVC的方式创建存储,创建方式请参见创建存储。
RBAC权限管理
Operator使用基于角色的访问控制机制对Kubernetes中的资源进行访问,因此,要保证Operator正确的运行,需要使用管理员权限的账号对Operator进行授权,即预先创建对应的role和rolebinding。
在Reconcile()上添加markers,增加Operator对于Service、StatefulSet、Deployment、Pod的管理权限,SDK即可自动生成对应的role和rolebinding等资源描述文件。
// +kubebuilder:rbac:groups=osctest.huawei.com,resources=hwfkas,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=osctest.huawei.com,resources=hwfkas/status,verbs=get;update;patch // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;update func (r *HwfkaReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { // ... }
如果Operator为namespace级别的,则需要还添加namespace,如:
// +kubebuilder:rbac:groups=osctest.huawei.com,namespace=example,resources=hwfkas,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=osctest.huawei.com,namespace=example,resources=hwfkas/status,verbs=get;update;patch