Controller实现
func (r *HwfkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("hwfka", req.NamespacedName)
// 获取 Hwfka CR
instance := &osctestv1.Hwfka{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
log.Error(err, "Failed to get Kafka")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
for _, component := range components {
if result, err, requeue := r.checkAndCreateResource(instance, component, ResourceService); requeue {
return result, err
}
}
// 检查并创建 ZooKeeper StatefulSet 资源
if result, err, requeue := r.checkAndCreateResource(instance, ComponentZookeeper, ResourceStatefulSet); requeue {
return result, err
}
// 检查 ZooKeeper StatefulSet 是否 Ready
zkSts := &appsv1.StatefulSet{}
if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, ComponentZookeeper), Namespace: instance.Namespace}, zkSts); err != nil {
log.Error(err, "Failed to get resource", "component", ComponentZookeeper, "resource", ResourceStatefulSet)
return ctrl.Result{}, err
}
if zkSts.Status.ReadyReplicas < zkSts.Status.Replicas/2+1 {
return ctrl.Result{Requeue: true}, nil
}
// 检查并创建 Kafka StatefulSet 资源
if result, err, requeue := r.checkAndCreateResource(instance, ComponentBroker, ResourceStatefulSet); requeue {
return result, err
}
// 更新实例 Status
if len(instance.Status.Phase) == 0 || len(instance.Status.Server) == 0 {
instance.Status.Phase = Available
instance.Status.Server = fmt.Sprintf("%v-%v.%v.svc.cluster.local:%v", instance.Name, ComponentBroker, instance.Namespace, BootstrapServerPort)
err := r.Status().Update(ctx, instance)
if err != nil {
log.Error(err, "Failed to update instance status")
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
func (r *HwfkaReconciler) checkAndCreateResource(instance *osctestv1.Hwfka, component string, resource string) (ctrl.Result, error, bool) {
ctx := context.Background()
log := r.Log.WithValues("namespace", instance.Namespace, "instance name", instance.Name)
var rs client.Object
switch resource {
case ResourceService:
rs = &corev1.Service{}
case ResourceStatefulSet:
rs = &appsv1.StatefulSet{}
case ResourceDeployment:
rs = &appsv1.Deployment{}
}
// 检查并创建资源
if err := r.Get(ctx, types.NamespacedName{Name: fmt.Sprintf("%v-%v", instance.Name, component), Namespace: instance.Namespace}, rs); err != nil {
if errors.IsNotFound(err) {
// 创建资源
var obj metav1.Object
switch resource {
case ResourceService:
obj = NewServiceForComponent(instance, component)
case ResourceStatefulSet:
if component == ComponentZookeeper {
obj = NewStatefulSetForZooKeeper(instance)
} else {
obj = NewStatefulSetForKafka(instance)
}
case ResourceDeployment:
rs = &appsv1.Deployment{}
}
// 设置其 owner 为实例 CR
if err = ctrl.SetControllerReference(instance, obj, r.Scheme); err != nil {
log.Error(err, "Failed to set resource owner to Instance", "component", component, "resource", resource)
return ctrl.Result{}, err, true
}
log.Info("Creating a new resource", "component", component, "resource", resource)
if err = r.Create(ctx, obj.(client.Object)); err != nil {
log.Error(err, "Failed to create new resource", "component", component, "resource", resource)
return ctrl.Result{}, err, true
}
return ctrl.Result{Requeue: true}, nil, true
}
log.Error(err, "Failed to get resource", "component", component, "resource", resource)
return ctrl.Result{}, err, true
}
return ctrl.Result{}, nil, false
}
创建controllers/constants.go文件,用来统一管理常量。
package controllers const ( // 配置 label LabelProvider = "osctest.huawei.com/service_provider" LabelInstance = "osctest.huawei.com/instance" LabelComponent = "osctest.huawei.com/component" ProviderKafka = "hwfka" ComponentZookeeper = "zookeeper" ComponentBroker = "broker" ComponentKafkaManager = "hwfkamanager" ResourceService = "service" ResourceStatefulSet = "statefulset" ResourceDeployment = "deployment" // 配置 service 的端口 ClientPortName = "client" ZkClientPort = 2181 ServerPortName = "server" ZkServerPort = 2888 LeaderElectionPortName = "leader-election" ZkLeaderElectionPort = 3888 BootstrapServerPortName = "bootstrapserver" BootstrapServerPort = 9092 ExporterPortName = "exporter" ExporterPort = 9404 // 环境变量 EnvNamespace = "NAMESPACE" EnvInstance = "INSTANCE" EnvPodUid = "POD_UID" EnvPodName = "POD_NAME" EnvPodIp = "POD_IP" EnvZkReplicas = "ZK_REPLICAS" EnvZkClientPort = "ZK_CLIENT_PORT" EnvZkServerPort = "ZK_SERVER_PORT" EnvZkElectionPort = "ZK_ELECTION_PORT" PodUidRef = "metadata.uid" PodNameRef = "metadata.name" PodIpRef = "status.podIP" // 配置健康检查 InitialDelaySeconds = 10 // 存储 DataVolumeName = "datadir" ZkDataPath = "/var/lib/zookeeper" KafkaDataPath = "/opt/kafka/data" StorageClassEVS = "csi-disk" StorageClassSFS = "csi-nas" EvsAnnotation = "everest.io/disk-volume-type" RegionLabel = "failure-domain.beta.kubernetes.io/region" ZoneLabel = "failure-domain.beta.kubernetes.io/zone" // 实例安装状态 Installing string = "Installing" InstallFailed string = "InstallFailed" Available string = "Available" Abnormal string = "Abnormal" Deleting string = "Deleting" )
在controllers下创建controllers/resources.go文件,用来生成所需要的资源。在创建StatefulSet的时候,使用volumeClaimTemplates自动创建PVC。
package controllers
import (
"fmt"
"strconv"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
oscv1 "hwfka-operator/api/v1"
)
var components = []string{
ComponentZookeeper,
ComponentBroker,
ComponentKafkaManager,
}
// 以 CR 的 name-zookeeper/namespace 创建 HeadlessService
func NewServiceForComponent(cr *oscv1.Hwfka, component string) metav1.Object {
labels := GenerateLabels(cr.Name, component)
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v-%v", cr.Name, component),
Namespace: cr.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Ports: []corev1.ServicePort{},
// HeadlessService match Pod 的 Label
Selector: labels,
},
}
switch component {
case ComponentZookeeper:
svc.Spec.Ports = []corev1.ServicePort{{
Port: ZkClientPort,
Name: ClientPortName,
}, {
Port: ZkServerPort,
Name: ServerPortName,
}, {
Port: ZkLeaderElectionPort,
Name: LeaderElectionPortName,
}}
case "linux":
fmt.Println("Linux.")
default:
fmt.Printf("unsupported component: %s\n", component)
}
return svc
}
func NewStatefulSetForZooKeeper(cr *oscv1.Hwfka) metav1.Object {
labels := GenerateLabels(cr.Name, ComponentZookeeper)
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper),
Namespace: cr.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: fmt.Sprintf("%v-%v", cr.Name, ComponentZookeeper),
Replicas: &(cr.Spec.Size),
PodManagementPolicy: appsv1.ParallelPodManagement,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
},
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: getZooKeeperContainer(cr),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
volumeClaimTemplates(cr),
},
},
}
return sts
}
func getZooKeeperContainer(cr *oscv1.Hwfka) []corev1.Container {
//probe := []string{"sh", "-c", "netstat -lnt | grep -q 2181"}
return []corev1.Container{{
Name: ComponentZookeeper,
Image: cr.Spec.Image,
//ImagePullPolicy: corev1.PullIfNotPresent,
ImagePullPolicy: corev1.PullAlways,
Env: getZooKeeperEnv(cr),
Command: []string{
"sh", "-c", "bin/zkGenConfig.sh && bin/zookeeper-server-start.sh config/zookeeper.properties",
},
Ports: []corev1.ContainerPort{
{
ContainerPort: ZkClientPort,
Name: ClientPortName,
},
{
ContainerPort: ZkServerPort,
Name: ServerPortName,
},
{
ContainerPort: ZkLeaderElectionPort,
Name: LeaderElectionPortName,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: DataVolumeName,
MountPath: ZkDataPath,
},
},
/*LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
Exec: &corev1.ExecAction{Command: probe},
},
InitialDelaySeconds: InitialDelaySeconds,
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
Exec: &corev1.ExecAction{Command: probe},
},
InitialDelaySeconds: InitialDelaySeconds,
},*/
}}
}
func getZooKeeperEnv(cr *oscv1.Hwfka) []corev1.EnvVar {
envVar := []corev1.EnvVar{
{
Name: EnvZkReplicas,
Value: strconv.FormatInt(int64(cr.Spec.Size), 10),
},
{
Name: EnvZkClientPort,
Value: strconv.FormatInt(int64(ZkClientPort), 10),
},
{
Name: EnvZkServerPort,
Value: strconv.FormatInt(int64(ZkServerPort), 10),
},
{
Name: EnvZkElectionPort,
Value: strconv.FormatInt(int64(ZkLeaderElectionPort), 10),
},
{
Name: EnvNamespace,
Value: cr.Namespace,
},
{
Name: EnvInstance,
Value: cr.Name,
},
{
Name: EnvPodUid,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodUidRef,
},
},
},
{
Name: EnvPodName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodNameRef,
},
},
},
{
Name: EnvPodIp,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodIpRef,
},
},
},
}
return envVar
}
func volumeClaimTemplates(cr *oscv1.Hwfka) corev1.PersistentVolumeClaim {
pvc := corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: cr.Namespace,
Name: DataVolumeName,
Annotations: map[string]string{},
Labels: map[string]string{},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(cr, schema.GroupVersionKind{
Group: cr.GroupVersionKind().Group,
Version: cr.GroupVersionKind().Version,
Kind: cr.Kind,
}),
},
},
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: &cr.Spec.Storage.Class,
AccessModes: []corev1.PersistentVolumeAccessMode{
cr.Spec.Storage.AccessModes,
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: cr.Spec.Storage.Size,
},
},
},
}
if cr.Spec.Storage.Class == StorageClassEVS {
pvc.ObjectMeta.Annotations[EvsAnnotation] = cr.Spec.Storage.DiskType
if len(cr.Spec.Storage.Region) != 0 && len(cr.Spec.Storage.Zone) != 0 {
pvc.ObjectMeta.Labels[RegionLabel] = cr.Spec.Storage.Region
pvc.ObjectMeta.Labels[ZoneLabel] = cr.Spec.Storage.Zone
}
}
return pvc
}
func NewStatefulSetForKafka(cr *oscv1.Hwfka) metav1.Object {
labels := GenerateLabels(cr.Name, ComponentBroker)
sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v-%v", cr.Name, ComponentBroker),
Namespace: cr.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: fmt.Sprintf("%v-%v", cr.Name, ComponentBroker),
Replicas: &(cr.Spec.Size),
PodManagementPolicy: appsv1.ParallelPodManagement,
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
},
// StatefulSet match Pod 的 Label
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: getKafkaContainer(cr),
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
volumeClaimTemplates(cr),
},
},
}
return sts
}
func getKafkaContainer(cr *oscv1.Hwfka) []corev1.Container {
zookeeperConnect := fmt.Sprintf("%v-%v:%v", cr.Name, ComponentZookeeper, ZkClientPort)
logDir := fmt.Sprintf("/opt/kafka/data/kafka/%v/%v/", cr.Namespace, cr.Name)
return []corev1.Container{{
Name: ComponentBroker,
Image: cr.Spec.Image,
ImagePullPolicy: corev1.PullAlways,
Ports: []corev1.ContainerPort{
{
Name: BootstrapServerPortName,
ContainerPort: BootstrapServerPort,
},
{
Name: ExporterPortName,
ContainerPort: ExporterPort,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: DataVolumeName,
MountPath: KafkaDataPath,
},
},
Command: []string{
"sh", "-c",
fmt.Sprintf("bin/kafka-server-start.sh config/server.properties --override broker.id=$(echo $HOSTNAME | awk -F '-' '{print $NF}') --override zookeeper.connect=%v --override log.dir=%v$POD_NAME", zookeeperConnect, logDir),
},
Env: getKafkaEnv(cr),
}}
}
func getKafkaEnv(cr *oscv1.Hwfka) []corev1.EnvVar {
envVar := []corev1.EnvVar{
{
Name: EnvPodUid,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodUidRef,
},
},
},
{
Name: EnvPodName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodNameRef,
},
},
},
{
Name: EnvPodIp,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: PodIpRef,
},
},
},
{
Name: EnvNamespace,
Value: cr.Namespace,
},
{
Name: EnvInstance,
Value: cr.Name,
},
}
return envVar
}
// 以 CR 的 Name, 组件名称 组装 Label
func GenerateLabels(name, component string) map[string]string {
labels := map[string]string{
LabelProvider: ProviderKafka,
LabelInstance: name,
}
if len(component) != 0 {
labels[LabelComponent] = component
}
return labels
}