-
Notifications
You must be signed in to change notification settings - Fork 135
Add: Checkpoint/restore for pods #1399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -282,6 +282,8 @@ func (s *GenericPodService) run(ctx context.Context, authInfo *auth.AuthInfo, st | |
| fmt.Sprintf("STUB_ID=%s", stub.ExternalId), | ||
| fmt.Sprintf("STUB_TYPE=%s", stub.Type), | ||
| fmt.Sprintf("KEEP_WARM_SECONDS=%d", stubConfig.KeepWarmSeconds), | ||
| fmt.Sprintf("CHECKPOINT_ENABLED=%t", stubConfig.CheckpointEnabled), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CHECKPOINT_ENABLED env var reflects stubConfig instead of the computed effective value, causing mismatches when checkpointing is disabled (e.g., multi-GPU). Prompt for AI agents |
||
| fmt.Sprintf("CHECKPOINT_CONDITION=%s", stubConfig.CheckpointCondition), | ||
| }...) | ||
|
|
||
| gpuRequest := types.GpuTypesToStrings(stubConfig.Runtime.Gpus) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |||||
| "context" | ||||||
| _ "embed" | ||||||
| "encoding/json" | ||||||
| "errors" | ||||||
| "fmt" | ||||||
| "io" | ||||||
| "log/slog" | ||||||
|
|
@@ -84,14 +85,14 @@ func InitializeCRIUManager(ctx context.Context, config types.CRIUConfig) (CRIUMa | |||||
| return criuManager, nil | ||||||
| } | ||||||
|
|
||||||
| func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.ContainerRequest, outputLogger *slog.Logger, outputWriter io.Writer, startedChan chan int, checkpointPIDChan chan int, configPath string) (int, string, error) { | ||||||
| func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types.ContainerRequest, outputLogger *slog.Logger, outputWriter io.Writer, startedChan chan int, checkpointPIDChan chan int, configPath string, exposeNetwork func() error) (int, string, error) { | ||||||
| state, createCheckpoint := s.shouldCreateCheckpoint(request) | ||||||
|
|
||||||
| // If checkpointing is enabled, attempt to create a checkpoint | ||||||
| if createCheckpoint { | ||||||
| outputLogger.Info("Attempting to create container checkpoint...") | ||||||
|
|
||||||
| exitCode, err := s.createCheckpoint(ctx, request, outputWriter, outputLogger, startedChan, checkpointPIDChan, configPath) | ||||||
| exitCode, err := s.createCheckpoint(ctx, request, outputWriter, outputLogger, startedChan, checkpointPIDChan, configPath, exposeNetwork) | ||||||
| if err != nil { | ||||||
| return -1, "", err | ||||||
| } | ||||||
|
|
@@ -113,6 +114,11 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types. | |||||
| } | ||||||
| defer f.Close() | ||||||
|
|
||||||
| err = exposeNetwork() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Network is exposed before a successful restore, potentially allowing premature connections; expose after a successful restore instead. (Based on the PR description stating the worker only exposes ports after checkpoint or restore.) Prompt for AI agents |
||||||
| if err != nil { | ||||||
| return -1, "", fmt.Errorf("failed to expose network: %v", err) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use %w to wrap underlying error for exposeNetwork failure to preserve error chain. Prompt for AI agents
Suggested change
|
||||||
| } | ||||||
|
|
||||||
| exitCode, err := s.criuManager.RestoreCheckpoint(ctx, &RestoreOpts{ | ||||||
| request: request, | ||||||
| state: state, | ||||||
|
|
@@ -123,31 +129,30 @@ func (s *Worker) attemptCheckpointOrRestore(ctx context.Context, request *types. | |||||
| configPath: configPath, | ||||||
| }) | ||||||
| if err != nil { | ||||||
| updateStateErr := s.updateCheckpointState(request, types.CheckpointStatusRestoreFailed) | ||||||
| if updateStateErr != nil { | ||||||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||||||
| var e *runc.ExitError | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [TODO]: This logic is not complete. @luke-lombardi I want to chat with you about stop and error conditions related to a running RestoreCheckpoint process. A SIGKILL from container stop event from user would put the checkpoint into a |
||||||
| if errors.As(err, &e) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restore failures that are not runc.ExitError won't update checkpoint state or be logged, leaving stale/incorrect state. Prompt for AI agents |
||||||
| code := e.Status | ||||||
|
|
||||||
| if code != 137 { | ||||||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to restore checkpoint: %v", err) | ||||||
| updateStateErr := s.updateCheckpointState(request, types.CheckpointStatusRestoreFailed) | ||||||
| if updateStateErr != nil { | ||||||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| return exitCode, "", err | ||||||
| } | ||||||
|
|
||||||
| outputLogger.Info("Checkpoint found and restored") | ||||||
| return exitCode, request.ContainerId, nil | ||||||
| } | ||||||
|
|
||||||
| // If a checkpoint exists but is not available (previously failed), run the container normally | ||||||
| bundlePath := filepath.Dir(configPath) | ||||||
|
|
||||||
| exitCode, err := s.runcHandle.Run(s.ctx, request.ContainerId, bundlePath, &runc.CreateOpts{ | ||||||
| OutputWriter: outputWriter, | ||||||
| Started: startedChan, | ||||||
| }) | ||||||
|
|
||||||
| return exitCode, request.ContainerId, err | ||||||
| return -1, "", fmt.Errorf("checkpoint not found") | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The parent function already handles this exact intent of running the code without checkpoints as a fallback. This is just duplicated.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing fallback to normal run when checkpoint is not found; this now returns an error instead of starting the container normally. (Based on the PR description that the worker falls back to a normal run if no checkpoint is found.) Prompt for AI agents |
||||||
| } | ||||||
|
|
||||||
| // Waits for the container to be ready to checkpoint at the desired point in execution, ie. | ||||||
| // after all processes within a container have reached a checkpointable state | ||||||
| func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerRequest, outputWriter io.Writer, outputLogger *slog.Logger, startedChan chan int, checkpointPIDChan chan int, configPath string) (int, error) { | ||||||
| func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerRequest, outputWriter io.Writer, outputLogger *slog.Logger, startedChan chan int, checkpointPIDChan chan int, configPath string, exposeNetwork func() error) (int, error) { | ||||||
| bundlePath := filepath.Dir(configPath) | ||||||
|
|
||||||
| go func() { | ||||||
|
|
@@ -214,6 +219,11 @@ func (s *Worker) createCheckpoint(ctx context.Context, request *types.ContainerR | |||||
| if updateStateErr != nil { | ||||||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to update checkpoint state: %v", updateStateErr) | ||||||
| } | ||||||
|
|
||||||
| err = exposeNetwork() | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we are checkpointing, we do not want requests to taint the checkpoint. It should only checkpoint the ready state of the pod and not any running requests. In the case of an API server, the running request would be a connection. |
||||||
| if err != nil { | ||||||
| log.Error().Str("container_id", request.ContainerId).Msgf("failed to expose network: %v", err) | ||||||
| } | ||||||
| }() | ||||||
|
|
||||||
| exitCode, err := s.criuManager.Run(ctx, request, bundlePath, &runc.CreateOpts{ | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding CHECKPOINT_ENABLED to request.Env duplicates the worker-controlled flag and can override it due to merge order; it also ignores the computed effective value (GPU>1), risking inconsistent checkpoint behavior.
Prompt for AI agents