Skip to content

Commit ca2d4f8

Browse files
joe4devclaude
andcommitted
feat(init): add runtime exit error reporting via supervisor and events API
Ports the supervisor and events API from PR #41 to enable proper error reporting when a Lambda runtime process exits unexpectedly (e.g. sys.exit() or missing wrapper script), instead of LocalStack timing out with a generic error. - Add LocalStackSupervisor: wraps ProcessSupervisor, detects unexpected runtime-* process exits and emits SendFault(RuntimeExit) events - Add LocalStackEventsAPI: wraps StandaloneEventsAPI, overrides SendFault to forward errors to LocalStack via SendStatus(error, ...) - Wire both into SandboxBuilder via SetEventsAPI / SetSupervisor - Refactor NewCustomInteropServer to accept a pre-created *LocalStackAdapter shared with the events API - Improve SendInitErrorResponse: properly deserialises the payload, includes RequestId, and sends asynchronously (non-blocking) Enables test_lambda_runtime_exit and test_lambda_runtime_wrapper_not_found. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 136f735 commit ca2d4f8

4 files changed

Lines changed: 234 additions & 4 deletions

File tree

cmd/localstack/custom_interop.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,44 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E
204204
return c.delegate.SendErrorResponse(invokeID, resp)
205205
}
206206

207-
// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT.
207+
// SendInitErrorResponse forwards the init error to LocalStack and then propagates it to the delegate.
208208
func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error {
209209
log.Traceln("SendInitErrorResponse called")
210-
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
211-
log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.")
210+
211+
// Deserialize the raw payload so we can include the requestId and structured fields.
212+
var parsed struct {
213+
ErrorMessage string `json:"errorMessage"`
214+
ErrorType string `json:"errorType"`
215+
StackTrace []string `json:"stackTrace,omitempty"`
216+
}
217+
if err := json.Unmarshal(resp.Payload, &parsed); err != nil {
218+
log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload")
219+
if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil {
220+
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
221+
Error("Failed to send init error to LocalStack")
222+
}
223+
return c.delegate.SendInitErrorResponse(resp)
224+
}
225+
226+
adaptedResp := lsapi.ErrorResponse{
227+
ErrorMessage: parsed.ErrorMessage,
228+
ErrorType: parsed.ErrorType,
229+
RequestId: c.delegate.GetCurrentInvokeID(),
230+
StackTrace: parsed.StackTrace,
231+
}
232+
body, err := json.Marshal(adaptedResp)
233+
if err != nil {
234+
log.WithError(err).Error("Failed to marshal adapted init error response")
235+
body = resp.Payload
212236
}
237+
238+
go func() {
239+
if err := c.localStackAdapter.SendStatus(Error, body); err != nil {
240+
log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId).
241+
Error("Failed to send init error to LocalStack")
242+
}
243+
}()
244+
213245
return c.delegate.SendInitErrorResponse(resp)
214246
}
215247

cmd/localstack/events.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
9+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry"
10+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
11+
)
12+
13+
// LocalStackEventsAPI intercepts fault events and forwards them to LocalStack as error status callbacks.
14+
type LocalStackEventsAPI struct {
15+
*telemetry.StandaloneEventsAPI
16+
adapter *LocalStackAdapter
17+
requestID string
18+
mu sync.RWMutex
19+
}
20+
21+
func NewLocalStackEventsAPI(adapter *LocalStackAdapter) *LocalStackEventsAPI {
22+
return &LocalStackEventsAPI{
23+
adapter: adapter,
24+
StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI),
25+
}
26+
}
27+
28+
func (ev *LocalStackEventsAPI) SendFault(data interop.FaultData) error {
29+
_ = ev.StandaloneEventsAPI.SendFault(data)
30+
31+
requestID := string(data.RequestID)
32+
if data.RequestID == "" {
33+
ev.mu.RLock()
34+
requestID = ev.requestID
35+
ev.mu.RUnlock()
36+
}
37+
38+
resp := lsapi.ErrorResponse{
39+
ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, data.ErrorMessage),
40+
ErrorType: string(data.ErrorType),
41+
}
42+
43+
payload, err := json.Marshal(resp)
44+
if err != nil {
45+
return err
46+
}
47+
48+
return ev.adapter.SendStatus(Error, payload)
49+
}
50+
51+
func (ev *LocalStackEventsAPI) SetCurrentRequestID(id interop.RequestID) {
52+
ev.mu.Lock()
53+
defer ev.mu.Unlock()
54+
ev.requestID = string(id)
55+
ev.StandaloneEventsAPI.SetCurrentRequestID(id)
56+
}

cmd/localstack/main.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,36 @@ func main() {
179179
localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector)
180180
tracer := NewLocalStackTracer()
181181

182+
// Create LocalStack adapter upfront so it can be shared with the events API and interop server
183+
lsAdapter := &LocalStackAdapter{
184+
UpstreamEndpoint: lsOpts.RuntimeEndpoint,
185+
RuntimeId: lsOpts.RuntimeId,
186+
}
187+
188+
// Events API forwards runtime fault events (unexpected exits) to LocalStack as error callbacks
189+
lsEventsAPI := NewLocalStackEventsAPI(lsAdapter)
190+
191+
// Supervisor intercepts runtime process terminations and emits fault events via the events API
192+
supervisorCtx, cancelSupervisor := context.WithCancel(context.Background())
193+
194+
localStackSupv := NewLocalStackSupervisor(supervisorCtx, lsEventsAPI)
195+
182196
// build sandbox
183197
sandbox := rapidcore.
184198
NewSandboxBuilder().
185199
//SetTracer(tracer).
186200
AddShutdownFunc(func() {
187201
log.Debugln("Stopping file watcher")
188202
cancelFileWatcher()
203+
log.Debugln("Stopping supervisor")
204+
cancelSupervisor()
189205
}).
190206
SetExtensionsFlag(true).
191207
SetInitCachingFlag(true).
192208
SetLogsEgressAPI(localStackLogsEgressApi).
193-
SetTracer(tracer)
209+
SetTracer(tracer).
210+
SetEventsAPI(lsEventsAPI).
211+
SetSupervisor(localStackSupv)
194212

195213
// Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable.
196214
// We need to ensure the runtime server is up before the INIT phase,

cmd/localstack/supervisor.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"sync/atomic"
8+
9+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror"
10+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
11+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor"
12+
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/supervisor/model"
13+
"github.com/google/uuid"
14+
log "github.com/sirupsen/logrus"
15+
)
16+
17+
// LocalStackSupervisor wraps a ProcessSupervisor and intercepts runtime process termination events.
18+
// When a runtime process exits unexpectedly it sends a fault event via the EventsAPI so LocalStack
19+
// receives a proper error instead of timing out.
20+
type LocalStackSupervisor struct {
21+
model.ProcessSupervisor
22+
eventsChan chan model.Event
23+
eventsAPI interop.EventsAPI
24+
25+
isShuttingDown *atomic.Bool
26+
}
27+
28+
func NewLocalStackSupervisor(ctx context.Context, evs interop.EventsAPI) *LocalStackSupervisor {
29+
var isShuttingDown atomic.Bool
30+
ls := &LocalStackSupervisor{
31+
ProcessSupervisor: supervisor.NewLocalSupervisor(),
32+
eventsAPI: evs,
33+
eventsChan: make(chan model.Event),
34+
isShuttingDown: &isShuttingDown,
35+
}
36+
37+
go ls.loop(ctx)
38+
39+
return ls
40+
}
41+
42+
func (ls *LocalStackSupervisor) loop(ctx context.Context) {
43+
inCh, err := ls.ProcessSupervisor.Events(ctx, nil)
44+
if err != nil {
45+
panic(err)
46+
}
47+
defer close(ls.eventsChan)
48+
for {
49+
select {
50+
case event, ok := <-inCh:
51+
if !ok {
52+
return
53+
}
54+
55+
select {
56+
case ls.eventsChan <- event:
57+
case <-ctx.Done():
58+
return
59+
}
60+
61+
if ls.isShuttingDown.Load() {
62+
continue
63+
}
64+
65+
termination := event.Event.ProcessTerminated()
66+
if termination == nil {
67+
continue
68+
}
69+
70+
if !strings.Contains(*termination.Name, "runtime-") {
71+
log.Debugf("Ignoring non-runtime process termination: %s", *termination.Name)
72+
continue
73+
}
74+
75+
if termination.Signaled() != nil {
76+
log.Debugf("Runtime process signalled: %d", *termination.Signo)
77+
}
78+
79+
faultData := interop.FaultData{
80+
RequestID: interop.RequestID(uuid.NewString()),
81+
ErrorMessage: fmt.Errorf("Runtime exited without providing a reason"),
82+
ErrorType: fatalerror.RuntimeExit,
83+
}
84+
if !termination.Success() {
85+
faultData.ErrorMessage = fmt.Errorf("Runtime exited with error: %s", termination.String())
86+
}
87+
88+
if err := ls.eventsAPI.SendFault(faultData); err != nil {
89+
log.WithError(err).Error("Failed to send runtime fault event")
90+
}
91+
case <-ctx.Done():
92+
return
93+
}
94+
}
95+
}
96+
97+
func (ls *LocalStackSupervisor) Exec(ctx context.Context, request *model.ExecRequest) error {
98+
if request.Domain == "runtime" {
99+
ls.isShuttingDown.Store(false)
100+
}
101+
return ls.ProcessSupervisor.Exec(ctx, request)
102+
}
103+
104+
func (ls *LocalStackSupervisor) Terminate(ctx context.Context, request *model.TerminateRequest) error {
105+
defer func() {
106+
if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") {
107+
ls.isShuttingDown.Store(true)
108+
}
109+
}()
110+
return ls.ProcessSupervisor.Terminate(ctx, request)
111+
}
112+
113+
func (ls *LocalStackSupervisor) Kill(ctx context.Context, request *model.KillRequest) error {
114+
defer func() {
115+
if request.Domain == "runtime" && strings.HasPrefix(request.Name, "runtime-") {
116+
ls.isShuttingDown.Store(true)
117+
}
118+
}()
119+
return ls.ProcessSupervisor.Kill(ctx, request)
120+
}
121+
122+
func (ls *LocalStackSupervisor) Events(ctx context.Context, _ *model.EventsRequest) (<-chan model.Event, error) {
123+
return ls.eventsChan, nil
124+
}

0 commit comments

Comments
 (0)