// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
}
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
rc.unmountVolumes()
// Next we mount required volumes. This function could also trigger
// attach if kubelet is responsible for attaching volumes.
// If underlying PVC was resized while in-use then this function also handles volume
// resizing.
rc.mountAttachVolumes()
// Ensure devices that should be detached/unmounted are detached/unmounted.
rc.unmountDetachDevices()
}
func (rc *reconciler) unmountVolumes() {
// Ensure volumes that should be unmounted are unmounted.
for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
}
if err == nil {
klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
}
}
}
}
func (oe *operationExecutor) UnmountVolume(
volumeToUnmount MountedVolume,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
podsDir string) error {
fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
if err != nil {
return err
}
var generatedOperations volumetypes.GeneratedOperations
if fsVolume {
// Filesystem volume case
// Unmount a volume if a volume is mounted
generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
volumeToUnmount, actualStateOfWorld, podsDir)
} else {
// Block volume case
// Unmap a volume if a volume is mapped
generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
volumeToUnmount, actualStateOfWorld)
}
if err != nil {
return err
}
// All volume plugins can execute unmount/unmap for multiple pods referencing the
// same volume in parallel
podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
return oe.pendingOperations.Run(
volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
}
func (ed *emptyDir) teardownDefault(dir string) error {
// Remove any quota
err := fsquota.ClearQuota(ed.mounter, dir)
if err != nil {
klog.Warningf("Warning: Failed to clear quota on %s: %v", dir, err)
}
// Renaming the directory is not required anymore because the operation executor
// now handles duplicate operations on the same volume
err = os.RemoveAll(dir)
if err != nil {
return err
}
return nil
}
msg="get state for xxx" error="context deadline exceeded: unknown"
关联代码
// /Users/kangxiaoning/workspace/containerd/integration/client/daemon.go
func (d *daemon) waitForStart(ctx context.Context) (*Client, error) {
var (
client *Client
serving bool
err error
ticker = time.NewTicker(500 * time.Millisecond)
)
defer ticker.Stop()
for {
select {
case <-ticker.C:
client, err = New(d.addr)
if err != nil {
continue
}
serving, err = client.IsServing(ctx)
if !serving {
client.Close()
if err == nil {
err = errors.New("connection was successful but service is not available")
}
continue
}
return client, err
case <-ctx.Done():
return nil, fmt.Errorf("context deadline exceeded: %w", err)
}
}
}
分析结论
无明确线索,从关联代码判断该日志是在启动容器过程中打印的,与场景不匹配。
3.2 Failed to handle backOff event
日志信息
msg="Failed to handle backOff event &TaskExit{ContainerID:xxx,ID:xxx,Pid:xxx,ExitStatus:255,ExitedAt:xxx,XXX_unrecognized:[],} for xxx" error="failed to handle container TaskExit event: failed to stop container: context deadline exceeded: unknown"
关联代码
// /Users/kangxiaoning/workspace/containerd/pkg/cri/server/events.go
// start starts the event monitor which monitors and handles all subscribed events.
// It returns an error channel for the caller to wait for stop errors from the
// event monitor.
//
// NOTE:
// 1. start must be called after subscribe.
// 2. The task exit event has been handled in individual startSandboxExitMonitor
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
// it puts the event into backoff retry queue and event monitor will handle
// it later.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
panic("event channel is nil")
}
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
log.L.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace {
log.L.Debugf("Ignoring events in namespace - %q", e.Namespace)
break
}
id, evt, err := convertEvent(e.Event)
if err != nil {
log.L.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(id) {
log.L.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
em.backOff.enBackOff(id, evt)
break
}
if err := em.handleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle event %+v for %s", evt, id)
em.backOff.enBackOff(id, evt)
}
case err := <-em.errCh:
// Close errCh in defer directly if there is no error.
if err != nil {
log.L.WithError(err).Error("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
ids := em.backOff.getExpiredIDs()
for _, id := range ids {
queue := em.backOff.deBackOff(id)
for i, evt := range queue.events {
if err := em.handleEvent(evt); err != nil {
log.L.WithError(err).Errorf("Failed to handle backOff event %+v for %s", evt, id)
em.backOff.reBackOff(id, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
return errCh
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()
switch e := any.(type) {
case *eventtypes.TaskExit:
log.L.Infof("TaskExit event %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr, cntr.SandboxID, em.c); err != nil {
return fmt.Errorf("failed to handle container TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskExit event: %w", err)
}
sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.SandboxExit:
log.L.Infof("SandboxExit event %+v", e)
sb, err := em.c.sandboxStore.Get(e.GetSandboxID())
if err == nil {
if err := handleSandboxExit(ctx, sb, e.ExitStatus, e.ExitedAt.AsTime(), em.c); err != nil {
return fmt.Errorf("failed to handle sandbox TaskExit event: %w", err)
}
return nil
} else if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find sandbox for TaskExit event: %w", err)
}
return nil
case *eventtypes.TaskOOM:
log.L.Infof("TaskOOM event %+v", e)
// For TaskOOM, we only care which container it belongs to.
cntr, err := em.c.containerStore.Get(e.ContainerID)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("can't find container for TaskOOM event: %w", err)
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
return fmt.Errorf("failed to update container status for TaskOOM event: %w", err)
}
// TODO: ImageService should handle these events directly
case *eventtypes.ImageCreate:
log.L.Infof("ImageCreate event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
log.L.Infof("ImageUpdate event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
log.L.Infof("ImageDelete event %+v", e)
return em.c.UpdateImage(ctx, e.Name)
}
return nil
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container, sandboxID string, c *criService) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
// We can't directly return cntr.IO here, because
// even if cntr.IO is nil, the cio.IO interface
// is not.
// See https://tour.golang.org/methods/12:
// Note that an interface value that holds a nil
// concrete value is itself non-nil.
if cntr.IO != nil {
return cntr.IO, nil
}
return nil, nil
},
)
if err != nil {
if !errdefs.IsNotFound(err) && !errdefs.IsUnavailable(err) {
return fmt.Errorf("failed to load task for container: %w", err)
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx, c.nri.WithContainerExit(&cntr), containerd.WithProcessKill); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container: %w", err)
}
// Move on to make sure container status is updated.
}
}
// NOTE: Both sb.Container.Task and task.Delete interface always ensures
// that the status of target task. However, the interfaces return
// ErrNotFound, which doesn't mean that the shim instance doesn't exist.
//
// There are two caches for task in containerd:
//
// 1. io.containerd.service.v1.tasks-service
// 2. io.containerd.runtime.v2.task
//
// First one is to maintain the shim connection and shutdown the shim
// in Delete API. And the second one is to maintain the lifecycle of
// task in shim server.
//
// So, if the shim instance is running and task has been deleted in shim
// server, the sb.Container.Task and task.Delete will receive the
// ErrNotFound. If we don't delete the shim instance in io.containerd.service.v1.tasks-service,
// shim will be leaky.
//
// Based on containerd/containerd#7496 issue, when host is under IO
// pressure, the umount2 syscall will take more than 10 seconds so that
// the CRI plugin will cancel this task.Delete call. However, the shim
// server isn't aware about this. After return from umount2 syscall, the
// shim server continue delete the task record. And then CRI plugin
// retries to delete task and retrieves ErrNotFound and marks it as
// stopped. Therefore, The shim is leaky.
//
// It's hard to handle the connection lost or request canceled cases in
// shim server. We should call Delete API to io.containerd.service.v1.tasks-service
// to ensure that shim instance is shutdown.
//
// REF:
// 1. https://github.com/containerd/containerd/issues/7496#issuecomment-1671100968
// 2. https://github.com/containerd/containerd/issues/8931
if errdefs.IsNotFound(err) {
_, err = c.client.TaskService().Delete(ctx, &apitasks.DeleteTaskRequest{ContainerID: cntr.Container.ID()})
if err != nil {
err = errdefs.FromGRPC(err)
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to cleanup container %s in task-service: %w", cntr.Container.ID(), err)
}
}
log.L.Infof("Ensure that container %s in task-service has been cleanup successfully", cntr.Container.ID())
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
if status.FinishedAt == 0 {
status.Pid = 0
status.FinishedAt = protobuf.FromTimestamp(e.ExitedAt).UnixNano()
status.ExitCode = int32(e.ExitStatus)
}
// Unknown state can only transit to EXITED state, so we need
// to handle unknown state here.
if status.Unknown {
log.L.Debugf("Container %q transited from UNKNOWN to EXITED", cntr.ID)
status.Unknown = false
}
return status, nil
})
if err != nil {
return fmt.Errorf("failed to update container state: %w", err)
}
// Using channel to propagate the information of container stop
cntr.Stop()
c.generateAndSendContainerEvent(ctx, cntr.ID, sandboxID, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT)
return nil
}
// /Users/kangxiaoning/workspace/containerd/client/task.go
// Delete deletes the task and its runtime state
// it returns the exit status of the task and any errors that were encountered
// during cleanup
func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStatus, error) {
for _, o := range opts {
if err := o(ctx, t); err != nil {
return nil, err
}
}
status, err := t.Status(ctx)
if err != nil && errdefs.IsNotFound(err) {
return nil, err
}
switch status.Status {
case Stopped, Unknown, "":
case Created:
if t.client.runtime == plugins.RuntimePlugin.String()+".windows" {
// On windows Created is akin to Stopped
break
}
if t.pid == 0 {
// allow for deletion of created tasks with PID 0
// https://github.com/containerd/containerd/issues/7357
break
}
fallthrough
default:
return nil, fmt.Errorf("task must be stopped before deletion: %s: %w", status.Status, errdefs.ErrFailedPrecondition)
}
if t.io != nil {
// io.Wait locks for restored tasks on Windows unless we call
// io.Close first (https://github.com/containerd/containerd/issues/5621)
// in other cases, preserve the contract and let IO finish before closing
if t.client.runtime == plugins.RuntimePlugin.String()+".windows" {
t.io.Close()
}
// io.Cancel is used to cancel the io goroutine while it is in
// fifo-opening state. It does not stop the pipes since these
// should be closed on the shim's side, otherwise we might lose
// data from the container!
t.io.Cancel()
t.io.Wait()
}
r, err := t.client.TaskService().Delete(ctx, &tasks.DeleteTaskRequest{
ContainerID: t.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
// Only cleanup the IO after a successful Delete
if t.io != nil {
t.io.Close()
}
return &ExitStatus{code: r.ExitStatus, exitedAt: protobuf.FromTimestamp(r.ExitedAt)}, nil
}