func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
select {
case <-stopCh:
return
default:
}
if !sliding {
t = backoff.Backoff()
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = backoff.Backoff()
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
if !t.Stop() {
<-t.C()
}
return
case <-t.C():
}
}
}
// relist queries the container runtime for list of pods/containers, compare
// with the internal pods/containers, and generates events accordingly.
func (g *GenericPLEG) relist() {
klog.V(5).InfoS("GenericPLEG: Relisting")
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}
timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()
// Get all the pods.
podList, err := g.runtime.GetPods(true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
return
}
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)
// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// If there are events associated with a pod, we should update the
// podCache.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
// Map from containerId to exit code; used as a temporary cache for lookup
containerExitCode := make(map[string]int)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
}
}
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
}
}
if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
}
// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}
// make sure we retain the list of pods that need reinspecting the next time relist is called
g.podsToReinspect = needsReinspection
}
// GetPods returns a list of containers grouped by pods. The boolean parameter
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
pods := make(map[kubetypes.UID]*kubecontainer.Pod)
sandboxes, err := m.getKubeletSandboxes(all)
if err != nil {
return nil, err
}
for i := range sandboxes {
s := sandboxes[i]
if s.Metadata == nil {
klog.V(4).InfoS("Sandbox does not have metadata", "sandbox", s)
continue
}
podUID := kubetypes.UID(s.Metadata.Uid)
if _, ok := pods[podUID]; !ok {
pods[podUID] = &kubecontainer.Pod{
ID: podUID,
Name: s.Metadata.Name,
Namespace: s.Metadata.Namespace,
}
}
p := pods[podUID]
converted, err := m.sandboxToKubeContainer(s)
if err != nil {
klog.V(4).InfoS("Convert sandbox of pod failed", "runtimeName", m.runtimeName, "sandbox", s, "podUID", podUID, "err", err)
continue
}
p.Sandboxes = append(p.Sandboxes, converted)
}
containers, err := m.getKubeletContainers(all)
if err != nil {
return nil, err
}
for i := range containers {
c := containers[i]
if c.Metadata == nil {
klog.V(4).InfoS("Container does not have metadata", "container", c)
continue
}
labelledInfo := getContainerInfoFromLabels(c.Labels)
pod, found := pods[labelledInfo.PodUID]
if !found {
pod = &kubecontainer.Pod{
ID: labelledInfo.PodUID,
Name: labelledInfo.PodName,
Namespace: labelledInfo.PodNamespace,
}
pods[labelledInfo.PodUID] = pod
}
converted, err := m.toKubeContainer(c)
if err != nil {
klog.V(4).InfoS("Convert container of pod failed", "runtimeName", m.runtimeName, "container", c, "podUID", labelledInfo.PodUID, "err", err)
continue
}
pod.Containers = append(pod.Containers, converted)
}
// Convert map to list.
var result []*kubecontainer.Pod
for _, pod := range pods {
result = append(result, pod)
}
return result, nil
}
代码跳转来到getKubeletSandboxes()。
// getKubeletSandboxes lists all (or just the running) sandboxes managed by kubelet.
func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
var filter *runtimeapi.PodSandboxFilter
if !all {
readyState := runtimeapi.PodSandboxState_SANDBOX_READY
filter = &runtimeapi.PodSandboxFilter{
State: &runtimeapi.PodSandboxStateValue{
State: readyState,
},
}
}
resp, err := m.runtimeService.ListPodSandbox(filter)
if err != nil {
klog.ErrorS(err, "Failed to list pod sandboxes")
return nil, err
}
return resp, nil
}
func (c *runtimeServiceClient) ListPodSandbox(ctx context.Context, in *ListPodSandboxRequest, opts ...grpc.CallOption) (*ListPodSandboxResponse, error) {
out := new(ListPodSandboxResponse)
err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/ListPodSandbox", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
最终来到grpc调用。
// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
在runtimeState.runtimeErrors()会执行health check ,如果健康检查失败(hc.fn()返回false ),则会记录PLEG is not healthy。
func (s *runtimeState) runtimeErrors() error {
s.RLock()
defer s.RUnlock()
errs := []error{}
if s.lastBaseRuntimeSync.IsZero() {
errs = append(errs, errors.New("container runtime status check may not have completed yet"))
} else if !s.lastBaseRuntimeSync.Add(s.baseRuntimeSyncThreshold).After(time.Now()) {
errs = append(errs, errors.New("container runtime is down"))
}
for _, hc := range s.healthChecks {
if ok, err := hc.fn(); !ok {
errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
}
}
if s.runtimeError != nil {
errs = append(errs, s.runtimeError)
}
return utilerrors.NewAggregate(errs)
}
// Healthy check if PLEG work properly.
// relistThreshold is the maximum interval between two relist.
func (g *GenericPLEG) Healthy() (bool, error) {
relistTime := g.getRelistTime()
if relistTime.IsZero() {
return false, fmt.Errorf("pleg has yet to be successful")
}
// Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
elapsed := g.clock.Since(relistTime)
if elapsed > relistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
}
return true, nil
}
综上可知,Kubelet启动后会持续执行健康检查,如果relist超时3分钟会导致健康检查失败,进而报错PLEG is not healthy。
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
// if cloud is not nil, we expect the cloud resource sync manager to exist
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
}
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
nodestatus.DaemonEndpoints(kl.daemonEndpoints),
nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
nodestatus.GoRuntime(),
)
// Volume limits
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
setters = append(setters,
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
// these side-effects by decoupling the decisions to send events and partial status recording
// from the Node setters.
kl.recordNodeSchedulableEvent,
)
return setters
}
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
// To reduce the load on etcd, we are serving GET operations from
// apiserver cache (the data might be slightly delayed but it doesn't
// seem to cause more conflict - the delays are pretty small).
// If it result in a conflict, all retries are served directly from etcd.
opts := metav1.GetOptions{}
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
podCIDRChanged := false
if len(node.Spec.PodCIDRs) != 0 {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf(err.Error())
}
}
kl.setNodeStatus(node)
now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
return nil
}
}
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}
kl.lastStatusReportTime = now
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
}
// ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
func ReadyCondition(
nowFunc func() time.Time, // typically Kubelet.clock.Now
runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors
networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors
storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors
appArmorValidateHostFunc func() error, // typically Kubelet.appArmorValidator.ValidateHost, might be nil depending on whether there was an appArmorValidator
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
nodeShutdownManagerErrorsFunc func() error, // typically kubelet.shutdownManager.errors.
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
) Setter {
return func(node *v1.Node) error {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := metav1.NewTime(nowFunc())
newNodeReadyCondition := v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc()}
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
}
missingCapacities := []string{}
for _, resource := range requiredCapacities {
if _, found := node.Status.Capacity[resource]; !found {
missingCapacities = append(missingCapacities, string(resource))
}
}
if len(missingCapacities) > 0 {
errs = append(errs, fmt.Errorf("missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
}
if aggregatedErr := errors.NewAggregate(errs); aggregatedErr != nil {
newNodeReadyCondition = v1.NodeCondition{
Type: v1.NodeReady,
Status: v1.ConditionFalse,
Reason: "KubeletNotReady",
Message: aggregatedErr.Error(),
LastHeartbeatTime: currentTime,
}
}
// Append AppArmor status if it's enabled.
// TODO(tallclair): This is a temporary message until node feature reporting is added.
if appArmorValidateHostFunc != nil && newNodeReadyCondition.Status == v1.ConditionTrue {
if err := appArmorValidateHostFunc(); err == nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. AppArmor enabled", newNodeReadyCondition.Message)
}
}
// Record any soft requirements that were not met in the container manager.
status := cmStatusFunc()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == v1.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == v1.ConditionTrue {
recordEventFunc(v1.EventTypeNormal, events.NodeReady)
} else {
recordEventFunc(v1.EventTypeNormal, events.NodeNotReady)
klog.Infof("Node became not ready: %+v", newNodeReadyCondition)
}
}
return nil
}
}
如下是PLEG相关的两个参数,可见事件多少也会影响PLEG是否健康。
// Capacity of the channel for receiving pod lifecycle events. This number
// is a bit arbitrary and may be adjusted in the future.
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
从前面分析可知,当出现PLEG is not healthy时,kubelet会主动将节点设置为NotReady并上报给控制面。出现这种情况可能有如下原因。