共计 6848 个字符,预计需要花费 18 分钟才能阅读完成。
导读 | 这篇文章主要为大家介绍了 Kubernetes controller manager 运行机制源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 |
Run
确立目标
理解 kube-controller-manager 的运行机制
从主函数找到 run 函数,代码较长,这里精简了一下
func Run(c *config.CompletedConfig, stopCh | |
StartControllers | |
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { | |
// 关键性的循环,启动每个 controllers,key 为控制器名字,value 为初始化函数 | |
for controllerName, initFn := range controllers { | |
// 是否允许启动 | |
if !ctx.IsControllerEnabled(controllerName) {klog.Warningf("%q is disabled", controllerName) | |
continue | |
} | |
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) | |
klog.V(1).Infof("Starting %q", controllerName) | |
// 调用 init 函数进行启动 | |
debugHandler, started, err := initFn(ctx) | |
if err != nil {klog.Errorf("Error starting %q", controllerName) | |
return err | |
} | |
if !started {klog.Warningf("Skipping %q", controllerName) | |
continue | |
} | |
// 注册对应 controller 到 debug 的 url 中 | |
if debugHandler != nil && unsecuredMux != nil { | |
basePath := "/debug/controllers/" + controllerName | |
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler)) | |
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler)) | |
} | |
klog.Infof("Started %q", controllerName) | |
} | |
return nil | |
} | |
// 我们再去传入 controller 的函数去看看,对应的 controller 有哪些,这里有我们很多常见的概念,不一一细讲 | |
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {controllers := map[string]InitFunc{} | |
controllers["endpoint"] = startEndpointController | |
controllers["endpointslice"] = startEndpointSliceController | |
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController | |
controllers["replicationcontroller"] = startReplicationController | |
controllers["podgc"] = startPodGCController | |
controllers["resourcequota"] = startResourceQuotaController | |
controllers["namespace"] = startNamespaceController | |
controllers["serviceaccount"] = startServiceAccountController | |
controllers["garbagecollector"] = startGarbageCollectorController | |
controllers["daemonset"] = startDaemonSetController | |
controllers["job"] = startJobController | |
controllers["deployment"] = startDeploymentController | |
controllers["replicaset"] = startReplicaSetController | |
controllers["horizontalpodautoscaling"] = startHPAController | |
controllers["disruption"] = startDisruptionController | |
controllers["statefulset"] = startStatefulSetController | |
controllers["cronjob"] = startCronJobController | |
controllers["csrsigning"] = startCSRSigningController | |
controllers["csrapproving"] = startCSRApprovingController | |
controllers["csrcleaner"] = startCSRCleanerController | |
controllers["ttl"] = startTTLController | |
controllers["bootstrapsigner"] = startBootstrapSignerController | |
controllers["tokencleaner"] = startTokenCleanerController | |
controllers["nodeipam"] = startNodeIpamController | |
controllers["nodelifecycle"] = startNodeLifecycleController | |
if loopMode == IncludeCloudLoops {controllers["service"] = startServiceController | |
controllers["route"] = startRouteController | |
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController | |
} | |
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController | |
controllers["attachdetach"] = startAttachDetachController | |
controllers["persistentvolume-expander"] = startVolumeExpandController | |
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController | |
controllers["pvc-protection"] = startPVCProtectionController | |
controllers["pv-protection"] = startPVProtectionController | |
controllers["ttl-after-finished"] = startTTLAfterFinishedController | |
controllers["root-ca-cert-publisher"] = startRootCACertPublisher | |
controllers["ephemeral-volume"] = startEphemeralVolumeController | |
return controllers | |
} | |
ReplicaSet | |
由于我们的示例是创建一个 nginx 的 pod,涉及到 kube-controller-manager 的内容很少。 | |
但是,为了加深大家对 kube-controller-manager 的认识,我们引入一个新的概念 - ReplicaSet,下面是官方说明: | |
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods. | |
ReplicaSet 的目的是维护一组在任何时候都处于运行状态的 Pod 副本的稳定集合。因此,它通常用来保证给定数量的、完全相同的 Pod 的可用性。 | |
简单来说,ReplicaSet 就是用来生成指定个数的 Pod | |
代码在 pkg/controller/replica_set.go | |
ReplicaSetController | |
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {return nil, false, nil} | |
// 用 goroutine 异步运行,包含了 ReplicaSet 和 Pod 的两个 Informer | |
// 这一点很好理解:我们是要控制 ReplicaSet 声明的数量和运行的 Pod 数量一致,需要同时观察者两种资源 | |
go replicaset.NewReplicaSetController(ctx.InformerFactory.Apps().V1().ReplicaSets(), | |
ctx.InformerFactory.Core().V1().Pods(), | |
ctx.ClientBuilder.ClientOrDie("replicaset-controller"), | |
replicaset.BurstReplicas, | |
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) | |
return nil, true, nil | |
} | |
// 运行函数 | |
func (rsc *ReplicaSetController) Run(workers int, stopCh | |
syncReplicaSet | |
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {startTime := time.Now() | |
defer func() {klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) | |
}() | |
// 从 key 中拆分出 namespace 和 name | |
namespace, name, err := cache.SplitMetaNamespaceKey(key) | |
if err != nil {return err} | |
// 根据 name,从 Lister 获取对应的 ReplicaSets 信息 | |
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) | |
if errors.IsNotFound(err) {klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) | |
rsc.expectations.DeleteExpectations(key) | |
return nil | |
} | |
if err != nil {return err} | |
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) | |
// 获取 selector (k8s 是根据 selector 中的 label 来匹配 ReplicaSets 和 Pod 的) | |
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) | |
if err != nil {utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err)) | |
return nil | |
} | |
// 根据 namespace 和 labels 获取所有的 pod | |
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) | |
if err != nil {return err} | |
// 过滤无效的 pod | |
filteredPods := controller.FilterActivePods(allPods) | |
// 根据 selector 再过滤 pod | |
filteredPods, err = rsc.claimPods(rs, selector, filteredPods) | |
if err != nil {return err} | |
var manageReplicasErr error | |
if rsNeedsSync && rs.DeletionTimestamp == nil { | |
// 管理 ReplicaSet,下面详细分析 | |
manageReplicasErr = rsc.manageReplicas(filteredPods, rs) | |
} | |
rs = rs.DeepCopy() | |
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) | |
// 更新状态 | |
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) | |
if err != nil {return err} | |
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && | |
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && | |
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) | |
} | |
return manageReplicasErr | |
} | |
// 我们再一起看看,当 Pod 数量和 ReplicaSet 中声明的不同时,是怎么工作的 | |
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { | |
// diff = 当前 pod 数 - 期望 pod 数 | |
diff := len(filteredPods) - int(*(rs.Spec.Replicas)) | |
rsKey, err := controller.KeyFunc(rs) | |
if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) | |
return nil | |
} | |
// diff 小于 0,表示需要扩容,即新增 Pod | |
if diff 0 { } | |
return nil | |
} | |
站在前人的肩膀上,向前辈致敬,Respect! | |
Summary | |
kube-controller-manager 的核心思想是:根据期望状态和当前状态,管理 Kubernetes 中的资源。以 ReplicaSet 为例,它对比了定义声明的 Pod 数和当前集群中满足条件的 Pod 数,进行相对应的扩缩容。 | |
阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配 | |
腾讯云新客低至 82 元 / 年,老客户 99 元 / 年 | |
代金券:在阿里云专用满减优惠券 | |
正文完
星哥玩云-微信公众号
