更新时间:2022-04-20 GMT+08:00
分享

Controller实现

修改controllers/hwfka_controller.go文件为如下内容:
func (r *HwfkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := r.Log.WithValues("hwfka", req.NamespacedName)

	// 获取 Hwfka CR
	instance := &osctestv1.Hwfka{}
	if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
		log.Error(err, "Failed to get Kafka")
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	for _, component := range components {
		if result, err, requeue := r.checkAndCreateResource(instance, component, ResourceService); requeue {
			return result, err
		}
	}

	// 检查并创建 ZooKeeper StatefulSet 资源
	if result, err, requeue := r.checkAndCreateResource(instance, ComponentZookeeper, ResourceStatefulSet); requeue {
		return result, err
	}
	// 检查 ZooKeeper StatefulSet 是否 Ready
	zkSts := &appsv1.StatefulSet{}
	if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, ComponentZookeeper), Namespace: instance.Namespace}, zkSts); err != nil {
		log.Error(err, "Failed to get resource", "component", ComponentZookeeper, "resource", ResourceStatefulSet)
		return ctrl.Result{}, err
	}
	if zkSts.Status.ReadyReplicas < zkSts.Status.Replicas/2+1 {
		return ctrl.Result{Requeue: true}, nil
	}

	// 检查并创建 Kafka StatefulSet 资源
	if result, err, requeue := r.checkAndCreateResource(instance, ComponentBroker, ResourceStatefulSet); requeue {
		return result, err
	}

	// 更新实例 Status
	if len(instance.Status.Phase) == 0 || len(instance.Status.Server) == 0 {
		instance.Status.Phase = Available
		instance.Status.Server = fmt.Sprintf("%v-%v.%v.svc.cluster.local:%v", instance.Name, ComponentBroker, instance.Namespace, BootstrapServerPort)
		err := r.Status().Update(ctx, instance)
		if err != nil {
			log.Error(err, "Failed to update instance status")
			return ctrl.Result{}, err
		}
	}
	return ctrl.Result{}, nil
}

func (r *HwfkaReconciler) checkAndCreateResource(instance *osctestv1.Hwfka, component string, resource string) (ctrl.Result, error, bool) {
	ctx := context.Background()
	log := r.Log.WithValues("namespace", instance.Namespace, "instance name", instance.Name)

        var rs client.Object
	switch resource {
	case ResourceService:
		rs = &corev1.Service{}
	case ResourceStatefulSet:
		rs = &appsv1.StatefulSet{}
	case ResourceDeployment:
		rs = &appsv1.Deployment{}
	}
	// 检查并创建资源
	if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, component), Namespace: instance.Namespace}, rs); err != nil {
		if errors.IsNotFound(err) {
			// 创建资源
			var obj metav1.Object
			switch resource {
			case ResourceService:
				obj = NewServiceForComponent(instance, component)
			case ResourceStatefulSet:
				if component == ComponentZookeeper {
					obj = NewStatefulSetForZooKeeper(instance)
				} else {
					obj = NewStatefulSetForKafka(instance)
				}
			case ResourceDeployment:
				rs = &appsv1.Deployment{}
			}
			// 设置其 owner 为实例 CR
			if err = ctrl.SetControllerReference(instance, obj, r.Scheme); err != nil {
				log.Error(err, "Failed to set resource owner to Instance", "component", component, "resource", resource)
				return ctrl.Result{}, err, true
			}
			log.Info("Creating a new resource", "component", component, "resource", resource)
                        if err = r.Create(ctx, obj.(client.Object)); err != nil {
				log.Error(err, "Failed to create new resource", "component", component, "resource", resource)
				return ctrl.Result{}, err, true
			}
			return ctrl.Result{Requeue: true}, nil, true
		}

		log.Error(err, "Failed to get resource", "component", component, "resource", resource)
		return ctrl.Result{}, err, true
	}
	return ctrl.Result{}, nil, false
}

创建controllers/constants.go文件,用来统一管理常量。

package controllers

const (
	// 配置 label
	LabelProvider  = "osctest.huawei.com/service_provider"
	LabelInstance  = "osctest.huawei.com/instance"
	LabelComponent = "osctest.huawei.com/component"

	ProviderKafka         = "hwfka"
	ComponentZookeeper    = "zookeeper"
	ComponentBroker       = "broker"
	ComponentKafkaManager = "hwfkamanager"

	ResourceService     = "service"
	ResourceStatefulSet = "statefulset"
	ResourceDeployment  = "deployment"

	// 配置 service 的端口
	ClientPortName          = "client"
	ZkClientPort            = 2181
	ServerPortName          = "server"
	ZkServerPort            = 2888
	LeaderElectionPortName  = "leader-election"
	ZkLeaderElectionPort    = 3888
	BootstrapServerPortName = "bootstrapserver"
	BootstrapServerPort     = 9092
	ExporterPortName        = "exporter"
	ExporterPort            = 9404

	// 环境变量
	EnvNamespace      = "NAMESPACE"
	EnvInstance       = "INSTANCE"
	EnvPodUid         = "POD_UID"
	EnvPodName        = "POD_NAME"
	EnvPodIp          = "POD_IP"
	EnvZkReplicas     = "ZK_REPLICAS"
	EnvZkClientPort   = "ZK_CLIENT_PORT"
	EnvZkServerPort   = "ZK_SERVER_PORT"
	EnvZkElectionPort = "ZK_ELECTION_PORT"

	PodUidRef  = "metadata.uid"
	PodNameRef = "metadata.name"
	PodIpRef   = "status.podIP"

	// 配置健康检查
	InitialDelaySeconds = 10

	// 存储
	DataVolumeName  = "datadir"
	ZkDataPath      = "/var/lib/zookeeper"
	KafkaDataPath   = "/opt/kafka/data"
	StorageClassEVS = "csi-disk"
	StorageClassSFS = "csi-nas"
	EvsAnnotation   = "everest.io/disk-volume-type"
	RegionLabel     = "failure-domain.beta.kubernetes.io/region"
	ZoneLabel       = "failure-domain.beta.kubernetes.io/zone"

	// 实例安装状态
	Installing    string = "Installing"
	InstallFailed string = "InstallFailed"
	Available     string = "Available"
	Abnormal      string = "Abnormal"
	Deleting      string = "Deleting"
)

在controllers下创建controllers/resources.go文件,用来生成所需要的资源。在创建StatefulSet的时候,使用volumeClaimTemplates自动创建PVC。

package controllers

import (
	"fmt"
	"strconv"

	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"

	oscv1 "hwfka-operator/api/v1"
)

var components = []string{
	ComponentZookeeper,
	ComponentBroker,
	ComponentKafkaManager,
}

// 以 CR 的 name-zookeeper/namespace 创建 HeadlessService
func NewServiceForComponent(cr *oscv1.Hwfka, component string) metav1.Object {
	labels := GenerateLabels(cr.Name, component)
	svc := &corev1.Service{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v-%v", cr.Name, component),
			Namespace: cr.Namespace,
			Labels:    labels,
		},
		Spec: corev1.ServiceSpec{
			ClusterIP: "None",
			Ports:     []corev1.ServicePort{},
			// HeadlessService match Pod 的 Label
			Selector: labels,
		},
	}

	switch component {
	case ComponentZookeeper:
		svc.Spec.Ports = []corev1.ServicePort{{
			Port: ZkClientPort,
			Name: ClientPortName,
		}, {
			Port: ZkServerPort,
			Name: ServerPortName,
		}, {
			Port: ZkLeaderElectionPort,
			Name: LeaderElectionPortName,
		}}
	case "linux":
		fmt.Println("Linux.")
	default:
		fmt.Printf("unsupported component: %s\n", component)
	}
	return svc
}

func NewStatefulSetForZooKeeper(cr *oscv1.Hwfka) metav1.Object {
	labels := GenerateLabels(cr.Name, ComponentZookeeper)

	sts := &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper),
			Namespace: cr.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.StatefulSetSpec{
			ServiceName:         fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper),
			Replicas:            &(cr.Spec.Size),
			PodManagementPolicy: appsv1.ParallelPodManagement,
			UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
				Type: appsv1.RollingUpdateStatefulSetStrategyType,
			},
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: getZooKeeperContainer(cr),
				},
			},
			VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
				volumeClaimTemplates(cr),
			},
		},
	}
	return sts
}

func getZooKeeperContainer(cr *oscv1.Hwfka) []corev1.Container {
	//probe := []string{"sh", "-c", "netstat -lnt | grep -q 2181"}
	return []corev1.Container{{
		Name:  ComponentZookeeper,
		Image: cr.Spec.Image,
		//ImagePullPolicy: corev1.PullIfNotPresent,
		ImagePullPolicy: corev1.PullAlways,
		Env:             getZooKeeperEnv(cr),
		Command: []string{
			"sh", "-c", "bin/zkGenConfig.sh && bin/zookeeper-server-start.sh config/zookeeper.properties",
		},
		Ports: []corev1.ContainerPort{
			{
				ContainerPort: ZkClientPort,
				Name:          ClientPortName,
			},
			{
				ContainerPort: ZkServerPort,
				Name:          ServerPortName,
			},
			{
				ContainerPort: ZkLeaderElectionPort,
				Name:          LeaderElectionPortName,
			},
		},
		VolumeMounts: []corev1.VolumeMount{
			{
				Name:      DataVolumeName,
				MountPath: ZkDataPath,
			},
		},
		/*LivenessProbe: &corev1.Probe{
			Handler: corev1.Handler{
				Exec: &corev1.ExecAction{Command: probe},
			},
			InitialDelaySeconds: InitialDelaySeconds,
		},
		ReadinessProbe: &corev1.Probe{
			Handler: corev1.Handler{
				Exec: &corev1.ExecAction{Command: probe},
			},
			InitialDelaySeconds: InitialDelaySeconds,
		},*/
	}}
}

func getZooKeeperEnv(cr *oscv1.Hwfka) []corev1.EnvVar {
	envVar := []corev1.EnvVar{
		{
			Name:  EnvZkReplicas,
			Value: strconv.FormatInt(int64(cr.Spec.Size), 10),
		},
		{
			Name:  EnvZkClientPort,
			Value: strconv.FormatInt(int64(ZkClientPort), 10),
		},
		{
			Name:  EnvZkServerPort,
			Value: strconv.FormatInt(int64(ZkServerPort), 10),
		},
		{
			Name:  EnvZkElectionPort,
			Value: strconv.FormatInt(int64(ZkLeaderElectionPort), 10),
		},
		{
			Name:  EnvNamespace,
			Value: cr.Namespace,
		},
		{
			Name:  EnvInstance,
			Value: cr.Name,
		},
		{
			Name: EnvPodUid,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodUidRef,
				},
			},
		},
		{
			Name: EnvPodName,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodNameRef,
				},
			},
		},
		{
			Name: EnvPodIp,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodIpRef,
				},
			},
		},
	}
	return envVar
}

func volumeClaimTemplates(cr *oscv1.Hwfka) corev1.PersistentVolumeClaim {
	pvc := corev1.PersistentVolumeClaim{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:   cr.Namespace,
			Name:        DataVolumeName,
			Annotations: map[string]string{},
			Labels:      map[string]string{},
			OwnerReferences: []metav1.OwnerReference{
				*metav1.NewControllerRef(cr, schema.GroupVersionKind{
					Group:   cr.GroupVersionKind().Group,
					Version: cr.GroupVersionKind().Version,
					Kind:    cr.Kind,
				}),
			},
		},
		Spec: corev1.PersistentVolumeClaimSpec{
			StorageClassName: &cr.Spec.Storage.Class,
			AccessModes: []corev1.PersistentVolumeAccessMode{
				cr.Spec.Storage.AccessModes,
			},
			Resources: corev1.ResourceRequirements{
				Requests: corev1.ResourceList{
					corev1.ResourceStorage: cr.Spec.Storage.Size,
				},
			},
		},
	}
	if cr.Spec.Storage.Class == StorageClassEVS {
		pvc.ObjectMeta.Annotations[EvsAnnotation] = cr.Spec.Storage.DiskType
		if len(cr.Spec.Storage.Region) != 0 && len(cr.Spec.Storage.Zone) != 0 {
			pvc.ObjectMeta.Labels[RegionLabel] = cr.Spec.Storage.Region
			pvc.ObjectMeta.Labels[ZoneLabel] = cr.Spec.Storage.Zone
		}
	}
	return pvc
}

func NewStatefulSetForKafka(cr *oscv1.Hwfka) metav1.Object {
	labels := GenerateLabels(cr.Name, ComponentBroker)
	sts := &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%v-%v", cr.Name, ComponentBroker),
			Namespace: cr.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.StatefulSetSpec{
			ServiceName:         fmt.Sprintf("%v-%v", cr.Name, ComponentBroker),
			Replicas:            &(cr.Spec.Size),
			PodManagementPolicy: appsv1.ParallelPodManagement,
			UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
				Type: appsv1.RollingUpdateStatefulSetStrategyType,
			},
			// StatefulSet match Pod 的 Label
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Containers: getKafkaContainer(cr),
				},
			},
			VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
				volumeClaimTemplates(cr),
			},
		},
	}
	return sts
}

func getKafkaContainer(cr *oscv1.Hwfka) []corev1.Container {
	zookeeperConnect := fmt.Sprintf("%v-%v:%v", cr.Name, ComponentZookeeper, ZkClientPort)
	logDir := fmt.Sprintf("/opt/kafka/data/kafka/%v/%v/", cr.Namespace, cr.Name)
	return []corev1.Container{{
		Name:            ComponentBroker,
		Image:           cr.Spec.Image,
		ImagePullPolicy: corev1.PullAlways,
		Ports: []corev1.ContainerPort{
			{
				Name:          BootstrapServerPortName,
				ContainerPort: BootstrapServerPort,
			},
			{
				Name:          ExporterPortName,
				ContainerPort: ExporterPort,
			},
		},
		VolumeMounts: []corev1.VolumeMount{
			{
				Name:      DataVolumeName,
				MountPath: KafkaDataPath,
			},
		},
		Command: []string{
			"sh", "-c",
			fmt.Sprintf("bin/kafka-server-start.sh config/server.properties --override broker.id=$(echo $HOSTNAME | awk -F '-' '{print $NF}') --override zookeeper.connect=%v --override log.dir=%v$POD_NAME", zookeeperConnect, logDir),
		},
		Env: getKafkaEnv(cr),
	}}
}

func getKafkaEnv(cr *oscv1.Hwfka) []corev1.EnvVar {
	envVar := []corev1.EnvVar{
		{
			Name: EnvPodUid,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodUidRef,
				},
			},
		},
		{
			Name: EnvPodName,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodNameRef,
				},
			},
		},
		{
			Name: EnvPodIp,
			ValueFrom: &corev1.EnvVarSource{
				FieldRef: &corev1.ObjectFieldSelector{
					FieldPath: PodIpRef,
				},
			},
		},
		{
			Name:  EnvNamespace,
			Value: cr.Namespace,
		},
		{
			Name:  EnvInstance,
			Value: cr.Name,
		},
	}
	return envVar
}

// 以 CR 的 Name, 组件名称 组装 Label
func GenerateLabels(name, component string) map[string]string {
	labels := map[string]string{
		LabelProvider: ProviderKafka,
		LabelInstance: name,
	}

	if len(component) != 0 {
		labels[LabelComponent] = component
	}
	return labels
}

相关文档