更新时间:2022-04-20 GMT+08:00
Controller实现
修改controllers/hwfka_controller.go文件为如下内容:
func (r *HwfkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.WithValues("hwfka", req.NamespacedName) // 获取 Hwfka CR instance := &osctestv1.Hwfka{} if err := r.Get(ctx, req.NamespacedName, instance); err != nil { log.Error(err, "Failed to get Kafka") return ctrl.Result{}, client.IgnoreNotFound(err) } for _, component := range components { if result, err, requeue := r.checkAndCreateResource(instance, component, ResourceService); requeue { return result, err } } // 检查并创建 ZooKeeper StatefulSet 资源 if result, err, requeue := r.checkAndCreateResource(instance, ComponentZookeeper, ResourceStatefulSet); requeue { return result, err } // 检查 ZooKeeper StatefulSet 是否 Ready zkSts := &appsv1.StatefulSet{} if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, ComponentZookeeper), Namespace: instance.Namespace}, zkSts); err != nil { log.Error(err, "Failed to get resource", "component", ComponentZookeeper, "resource", ResourceStatefulSet) return ctrl.Result{}, err } if zkSts.Status.ReadyReplicas < zkSts.Status.Replicas/2+1 { return ctrl.Result{Requeue: true}, nil } // 检查并创建 Kafka StatefulSet 资源 if result, err, requeue := r.checkAndCreateResource(instance, ComponentBroker, ResourceStatefulSet); requeue { return result, err } // 更新实例 Status if len(instance.Status.Phase) == 0 || len(instance.Status.Server) == 0 { instance.Status.Phase = Available instance.Status.Server = fmt.Sprintf("%v-%v.%v.svc.cluster.local:%v", instance.Name, ComponentBroker, instance.Namespace, BootstrapServerPort) err := r.Status().Update(ctx, instance) if err != nil { log.Error(err, "Failed to update instance status") return ctrl.Result{}, err } } return ctrl.Result{}, nil } func (r *HwfkaReconciler) checkAndCreateResource(instance *osctestv1.Hwfka, component string, resource string) (ctrl.Result, error, bool) { ctx := context.Background() log := r.Log.WithValues("namespace", instance.Namespace, "instance name", instance.Name) var rs client.Object switch resource { case ResourceService: rs = &corev1.Service{} case ResourceStatefulSet: rs = &appsv1.StatefulSet{} case ResourceDeployment: rs = &appsv1.Deployment{} } // 检查并创建资源 if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, component), Namespace: instance.Namespace}, rs); err != nil { if errors.IsNotFound(err) { // 创建资源 var obj metav1.Object switch resource { case ResourceService: obj = NewServiceForComponent(instance, component) case ResourceStatefulSet: if component == ComponentZookeeper { obj = NewStatefulSetForZooKeeper(instance) } else { obj = NewStatefulSetForKafka(instance) } case ResourceDeployment: rs = &appsv1.Deployment{} } // 设置其 owner 为实例 CR if err = ctrl.SetControllerReference(instance, obj, r.Scheme); err != nil { log.Error(err, "Failed to set resource owner to Instance", "component", component, "resource", resource) return ctrl.Result{}, err, true } log.Info("Creating a new resource", "component", component, "resource", resource) if err = r.Create(ctx, obj.(client.Object)); err != nil { log.Error(err, "Failed to create new resource", "component", component, "resource", resource) return ctrl.Result{}, err, true } return ctrl.Result{Requeue: true}, nil, true } log.Error(err, "Failed to get resource", "component", component, "resource", resource) return ctrl.Result{}, err, true } return ctrl.Result{}, nil, false }
创建controllers/constants.go文件,用来统一管理常量。
package controllers const ( // 配置 label LabelProvider = "osctest.huawei.com/service_provider" LabelInstance = "osctest.huawei.com/instance" LabelComponent = "osctest.huawei.com/component" ProviderKafka = "hwfka" ComponentZookeeper = "zookeeper" ComponentBroker = "broker" ComponentKafkaManager = "hwfkamanager" ResourceService = "service" ResourceStatefulSet = "statefulset" ResourceDeployment = "deployment" // 配置 service 的端口 ClientPortName = "client" ZkClientPort = 2181 ServerPortName = "server" ZkServerPort = 2888 LeaderElectionPortName = "leader-election" ZkLeaderElectionPort = 3888 BootstrapServerPortName = "bootstrapserver" BootstrapServerPort = 9092 ExporterPortName = "exporter" ExporterPort = 9404 // 环境变量 EnvNamespace = "NAMESPACE" EnvInstance = "INSTANCE" EnvPodUid = "POD_UID" EnvPodName = "POD_NAME" EnvPodIp = "POD_IP" EnvZkReplicas = "ZK_REPLICAS" EnvZkClientPort = "ZK_CLIENT_PORT" EnvZkServerPort = "ZK_SERVER_PORT" EnvZkElectionPort = "ZK_ELECTION_PORT" PodUidRef = "metadata.uid" PodNameRef = "metadata.name" PodIpRef = "status.podIP" // 配置健康检查 InitialDelaySeconds = 10 // 存储 DataVolumeName = "datadir" ZkDataPath = "/var/lib/zookeeper" KafkaDataPath = "/opt/kafka/data" StorageClassEVS = "csi-disk" StorageClassSFS = "csi-nas" EvsAnnotation = "everest.io/disk-volume-type" RegionLabel = "failure-domain.beta.kubernetes.io/region" ZoneLabel = "failure-domain.beta.kubernetes.io/zone" // 实例安装状态 Installing string = "Installing" InstallFailed string = "InstallFailed" Available string = "Available" Abnormal string = "Abnormal" Deleting string = "Deleting" )
在controllers下创建controllers/resources.go文件,用来生成所需要的资源。在创建StatefulSet的时候,使用volumeClaimTemplates自动创建PVC。
package controllers import ( "fmt" "strconv" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" oscv1 "hwfka-operator/api/v1" ) var components = []string{ ComponentZookeeper, ComponentBroker, ComponentKafkaManager, } // 以 CR 的 name-zookeeper/namespace 创建 HeadlessService func NewServiceForComponent(cr *oscv1.Hwfka, component string) metav1.Object { labels := GenerateLabels(cr.Name, component) svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v-%v", cr.Name, component), Namespace: cr.Namespace, Labels: labels, }, Spec: corev1.ServiceSpec{ ClusterIP: "None", Ports: []corev1.ServicePort{}, // HeadlessService match Pod 的 Label Selector: labels, }, } switch component { case ComponentZookeeper: svc.Spec.Ports = []corev1.ServicePort{{ Port: ZkClientPort, Name: ClientPortName, }, { Port: ZkServerPort, Name: ServerPortName, }, { Port: ZkLeaderElectionPort, Name: LeaderElectionPortName, }} case "linux": fmt.Println("Linux.") default: fmt.Printf("unsupported component: %s\n", component) } return svc } func NewStatefulSetForZooKeeper(cr *oscv1.Hwfka) metav1.Object { labels := GenerateLabels(cr.Name, ComponentZookeeper) sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper), Namespace: cr.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ ServiceName: fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper), Replicas: &(cr.Spec.Size), PodManagementPolicy: appsv1.ParallelPodManagement, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ Type: appsv1.RollingUpdateStatefulSetStrategyType, }, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ Containers: getZooKeeperContainer(cr), }, }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ volumeClaimTemplates(cr), }, }, } return sts } func getZooKeeperContainer(cr *oscv1.Hwfka) []corev1.Container { //probe := []string{"sh", "-c", "netstat -lnt | grep -q 2181"} return []corev1.Container{{ Name: ComponentZookeeper, Image: cr.Spec.Image, //ImagePullPolicy: corev1.PullIfNotPresent, ImagePullPolicy: corev1.PullAlways, Env: getZooKeeperEnv(cr), Command: []string{ "sh", "-c", "bin/zkGenConfig.sh && bin/zookeeper-server-start.sh config/zookeeper.properties", }, Ports: []corev1.ContainerPort{ { ContainerPort: ZkClientPort, Name: ClientPortName, }, { ContainerPort: ZkServerPort, Name: ServerPortName, }, { ContainerPort: ZkLeaderElectionPort, Name: LeaderElectionPortName, }, }, VolumeMounts: []corev1.VolumeMount{ { Name: DataVolumeName, MountPath: ZkDataPath, }, }, /*LivenessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{Command: probe}, }, InitialDelaySeconds: InitialDelaySeconds, }, ReadinessProbe: &corev1.Probe{ Handler: corev1.Handler{ Exec: &corev1.ExecAction{Command: probe}, }, InitialDelaySeconds: InitialDelaySeconds, },*/ }} } func getZooKeeperEnv(cr *oscv1.Hwfka) []corev1.EnvVar { envVar := []corev1.EnvVar{ { Name: EnvZkReplicas, Value: strconv.FormatInt(int64(cr.Spec.Size), 10), }, { Name: EnvZkClientPort, Value: strconv.FormatInt(int64(ZkClientPort), 10), }, { Name: EnvZkServerPort, Value: strconv.FormatInt(int64(ZkServerPort), 10), }, { Name: EnvZkElectionPort, Value: strconv.FormatInt(int64(ZkLeaderElectionPort), 10), }, { Name: EnvNamespace, Value: cr.Namespace, }, { Name: EnvInstance, Value: cr.Name, }, { Name: EnvPodUid, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodUidRef, }, }, }, { Name: EnvPodName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodNameRef, }, }, }, { Name: EnvPodIp, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodIpRef, }, }, }, } return envVar } func volumeClaimTemplates(cr *oscv1.Hwfka) corev1.PersistentVolumeClaim { pvc := corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Namespace: cr.Namespace, Name: DataVolumeName, Annotations: map[string]string{}, Labels: map[string]string{}, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(cr, schema.GroupVersionKind{ Group: cr.GroupVersionKind().Group, Version: cr.GroupVersionKind().Version, Kind: cr.Kind, }), }, }, Spec: corev1.PersistentVolumeClaimSpec{ StorageClassName: &cr.Spec.Storage.Class, AccessModes: []corev1.PersistentVolumeAccessMode{ cr.Spec.Storage.AccessModes, }, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: cr.Spec.Storage.Size, }, }, }, } if cr.Spec.Storage.Class == StorageClassEVS { pvc.ObjectMeta.Annotations[EvsAnnotation] = cr.Spec.Storage.DiskType if len(cr.Spec.Storage.Region) != 0 && len(cr.Spec.Storage.Zone) != 0 { pvc.ObjectMeta.Labels[RegionLabel] = cr.Spec.Storage.Region pvc.ObjectMeta.Labels[ZoneLabel] = cr.Spec.Storage.Zone } } return pvc } func NewStatefulSetForKafka(cr *oscv1.Hwfka) metav1.Object { labels := GenerateLabels(cr.Name, ComponentBroker) sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v-%v", cr.Name, ComponentBroker), Namespace: cr.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ ServiceName: fmt.Sprintf("%v-%v", cr.Name, ComponentBroker), Replicas: &(cr.Spec.Size), PodManagementPolicy: appsv1.ParallelPodManagement, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ Type: appsv1.RollingUpdateStatefulSetStrategyType, }, // StatefulSet match Pod 的 Label Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ Containers: getKafkaContainer(cr), }, }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ volumeClaimTemplates(cr), }, }, } return sts } func getKafkaContainer(cr *oscv1.Hwfka) []corev1.Container { zookeeperConnect := fmt.Sprintf("%v-%v:%v", cr.Name, ComponentZookeeper, ZkClientPort) logDir := fmt.Sprintf("/opt/kafka/data/kafka/%v/%v/", cr.Namespace, cr.Name) return []corev1.Container{{ Name: ComponentBroker, Image: cr.Spec.Image, ImagePullPolicy: corev1.PullAlways, Ports: []corev1.ContainerPort{ { Name: BootstrapServerPortName, ContainerPort: BootstrapServerPort, }, { Name: ExporterPortName, ContainerPort: ExporterPort, }, }, VolumeMounts: []corev1.VolumeMount{ { Name: DataVolumeName, MountPath: KafkaDataPath, }, }, Command: []string{ "sh", "-c", fmt.Sprintf("bin/kafka-server-start.sh config/server.properties --override broker.id=$(echo $HOSTNAME | awk -F '-' '{print $NF}') --override zookeeper.connect=%v --override log.dir=%v$POD_NAME", zookeeperConnect, logDir), }, Env: getKafkaEnv(cr), }} } func getKafkaEnv(cr *oscv1.Hwfka) []corev1.EnvVar { envVar := []corev1.EnvVar{ { Name: EnvPodUid, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodUidRef, }, }, }, { Name: EnvPodName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodNameRef, }, }, }, { Name: EnvPodIp, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: PodIpRef, }, }, }, { Name: EnvNamespace, Value: cr.Namespace, }, { Name: EnvInstance, Value: cr.Name, }, } return envVar } // 以 CR 的 Name, 组件名称 组装 Label func GenerateLabels(name, component string) map[string]string { labels := map[string]string{ LabelProvider: ProviderKafka, LabelInstance: name, } if len(component) != 0 { labels[LabelComponent] = component } return labels }
父主题: Operator代码示例