Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions configmap/informer/informed_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package informer
import (
"errors"
"fmt"
"log"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -171,6 +172,10 @@ func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
return errors.New("error waiting for ConfigMap informer to sync")
}

// Check which ConfigMaps exist and mark missing ones (without defaults) as done in the synced callback
// so we don't wait for them indefinitely
i.markMissingConfigMapsAsDone(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behaviour should probably be opt-in for now using an option. Someone might be relying checkObservedResourcesExist to return the error and we don't want to break that.


if err := i.checkObservedResourcesExist(); err != nil {
return err
}
Expand Down Expand Up @@ -199,14 +204,39 @@ func (i *InformedWatcher) registerCallbackAndStartInformer(addConfigMapEvent fun
return nil
}

func (i *InformedWatcher) markMissingConfigMapsAsDone(s *syncedCallback) {
i.RLock()
defer i.RUnlock()
// Mark ConfigMaps that don't exist and don't have defaults as done
// so we don't wait for them indefinitely
i.ForEach(func(k string, _ []configmap.Observer) error {
if _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k); err != nil {
if k8serrors.IsNotFound(err) {
if _, ok := i.defaults[k]; !ok {
// ConfigMap doesn't exist and is not defaulted. Mark it as done
// so we don't wait for it. The watcher will pick it up if it's created later.
s.MarkKeyAsDone(k)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably return an error if Get returns something besides IsNotFound no?

}
return nil
})
}

func (i *InformedWatcher) checkObservedResourcesExist() error {
i.RLock()
defer i.RUnlock()
// Check that all objects with Observers exist in our informers.
return i.ForEach(func(k string, _ []configmap.Observer) error {
if _, err := i.informer.Lister().ConfigMaps(i.Namespace).Get(k); err != nil {
if _, ok := i.defaults[k]; ok && k8serrors.IsNotFound(err) {
// It is defaulted, so it is OK that it doesn't exist.
if k8serrors.IsNotFound(err) {
if _, ok := i.defaults[k]; ok {
// It is defaulted, so it is OK that it doesn't exist.
return nil
}
// ConfigMap doesn't exist and is not defaulted. Log a warning but don't fail.
// The watcher will pick up the ConfigMap if it's created later.
log.Printf("WARNING: ConfigMap %q in namespace %q not found, using defaults and watching for creation", k, i.Namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't use log package. we use a zap logger we'll probably want a new constructor that accepts a logger if we want these errors

return nil
}
return err
Expand Down
68 changes: 59 additions & 9 deletions configmap/informer/informed_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestFilterConfigByLabelExists(t *testing.T) {
}
}

func TestWatchMissingFailsOnStart(t *testing.T) {
func TestWatchMissingDoesNotFailOnStart(t *testing.T) {
const (
labelKey = "test/label"
labelVal = "test"
Expand All @@ -232,18 +232,15 @@ func TestWatchMissingFailsOnStart(t *testing.T) {
testCases := map[string]struct {
initialObj []runtime.Object
watchNames []string
expectErr string
labelReq string
}{
"ConfigMap does not exist": {
initialObj: nil,
watchNames: []string{"foo"},
expectErr: `configmap "foo" not found`,
},
"ConfigMap is missing required label": {
initialObj: []runtime.Object{cmWithLabel, cmWithoutLabel},
watchNames: []string{"with-label", "without-label"},
expectErr: `configmap "without-label" not found`,
labelReq: labelKey,
},
}
Expand All @@ -269,11 +266,9 @@ func TestWatchMissingFailsOnStart(t *testing.T) {
defer close(stopCh)
err := cmw.Start(stopCh)

switch {
case err == nil:
t.Fatal("Failed to start InformedWatcher =", err)
case err.Error() != tc.expectErr:
t.Fatal("Unexpected error =", err)
// Missing ConfigMaps should not cause Start to fail
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this - we shouldn't change the default behaviour

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a separate test for the new behaviour

t.Fatal("Start should succeed even with missing ConfigMaps, got error =", err)
}
})
}
Expand Down Expand Up @@ -624,3 +619,58 @@ func TestDeleteConfigMapEventWithInvalidObject(t *testing.T) {
t.Fatalf("foo1.count = %v, want %d (should not change)", got, want)
}
}

func TestMissingConfigMapDoesNotCrash(t *testing.T) {
// Test that watching a non-existent ConfigMap does not cause a crash
// and that the ConfigMap is observed when created later.
kc := fakekubeclientset.NewSimpleClientset()
cmw := NewInformedWatcher(kc, "default")

foo1 := &counter{
name: "foo1",
wg: &sync.WaitGroup{},
}
// Watch a ConfigMap that doesn't exist - this should not crash
cmw.Watch("missing-cm", foo1.callback)

stopCh := make(chan struct{})
defer close(stopCh)

// Start should succeed even though the ConfigMap doesn't exist
if err := cmw.Start(stopCh); err != nil {
t.Fatalf("cmw.Start() failed with missing ConfigMap: %v", err)
}

// Initially, the callback should not have been called since the ConfigMap doesn't exist
if got, want := foo1.count(), 0; got != want {
t.Fatalf("Initial foo1.count = %d, want %d", got, want)
}

// Now create the ConfigMap - it should be observed
foo1.wg.Add(1)
createdCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "missing-cm",
},
Data: map[string]string{
"key": "value",
},
}
if _, err := kc.CoreV1().ConfigMaps("default").Create(context.Background(), createdCM, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create ConfigMap: %v", err)
}

// Wait for the callback to be invoked
foo1.wg.Wait()

// The callback should have been called once when the ConfigMap was created
if got, want := foo1.count(), 1; got != want {
t.Fatalf("After creation foo1.count = %d, want %d", got, want)
}

// Verify the ConfigMap data
if got, want := foo1.cfg[0].Data["key"], "value"; got != want {
t.Errorf("ConfigMap data = %q, want %q", got, want)
}
}
6 changes: 6 additions & 0 deletions configmap/informer/synced_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (s *syncedCallback) Call(obj interface{}, key string) {
s.namedWaitGroup.Done(key)
}

// MarkKeyAsDone marks a key as done without calling the callback. This is useful for
// keys that don't exist and don't have defaults, so we don't wait for them indefinitely.
func (s *syncedCallback) MarkKeyAsDone(key string) {
s.namedWaitGroup.Done(key)
}

// WaitForAllKeys will block until s.Call has been called for all the keys we are tracking or the stop signal is
// received.
func (s *syncedCallback) WaitForAllKeys(stopCh <-chan struct{}) error {
Expand Down
19 changes: 14 additions & 5 deletions injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,16 @@ func SetupConfigMapWatchOrDie(ctx context.Context, logger *zap.SugaredLogger) *c
// calling log.Fatalw. Note, if the config does not exist, it will be defaulted
// and this method will not die.
func WatchLoggingConfigOrDie(ctx context.Context, cmw *cminformer.InformedWatcher, logger *zap.SugaredLogger, atomicLevel zap.AtomicLevel, component string) {
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, logging.ConfigMapName(),
cmName := logging.ConfigMapName()
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, cmName,
metav1.GetOptions{}); err == nil {
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if !apierrors.IsNotFound(err) {
logger.Fatalw("Error reading ConfigMap "+logging.ConfigMapName(), zap.Error(err))
cmw.Watch(cmName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else if apierrors.IsNotFound(err) {
// ConfigMap doesn't exist, but we still register a watcher so updates are picked up if it's created later.
logger.Warnw("ConfigMap "+cmName+" not found, using defaults and watching for creation", zap.String("configmap", cmName))
cmw.Watch(cmName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
} else {
logger.Fatalw("Error reading ConfigMap "+cmName, zap.Error(err))
}
Comment on lines +490 to 499
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we can simply call Watch now? and not have these if clauses here

}

Expand All @@ -513,7 +518,11 @@ func WatchObservabilityConfigOrDie(

if _, err := client.Get(ctx, cmName, metav1.GetOptions{}); err == nil {
cmw.Watch(cmName, observers...)
} else if !apierrors.IsNotFound(err) {
} else if apierrors.IsNotFound(err) {
// ConfigMap doesn't exist, but we still register a watcher so updates are picked up if it's created later.
logger.Warnw("ConfigMap "+cmName+" not found, using defaults and watching for creation", zap.String("configmap", cmName))
cmw.Watch(cmName, observers...)
} else {
Comment on lines +521 to +525
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise we can simplify this and just call Watch and not worry about fetching the config maps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also one thing we could do for the observability config map is maybe kill the main context thus triggering a shutdown of the component. Then Kubernetes would restart the pod.

logger.Fatalw("Error reading ConfigMap "+cmName, zap.Error(err))
}
}
Expand Down
159 changes: 159 additions & 0 deletions injection/sharedmain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,29 @@ package sharedmain

import (
"context"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"

kubeclient "knative.dev/pkg/client/injection/kube/client"
fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
cminformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/injection"
"knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/observability"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing"
)

func TestEnabledControllers(t *testing.T) {
Expand Down Expand Up @@ -121,3 +135,148 @@ func TestWithObservabilityConfig(t *testing.T) {
t.Errorf("(-got, +want) = %s", diff)
}
}

func TestMissingConfigMapsUseDefaults(t *testing.T) {
// Create a fake client with no ConfigMaps
ctx := logtesting.TestContextWithLogger(t)
ctx, _ = fakekubeclient.With(ctx)

// Set up injection context
ctx, _ = injection.Fake.SetupInformers(ctx, &rest.Config{})

// Test that GetLoggingConfig returns defaults when ConfigMap is missing
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
t.Fatalf("GetLoggingConfig() should not fail with missing ConfigMap, got error: %v", err)
}
// Verify it's the default (empty LoggingLevel map)
defaultLoggingConfig, _ := logging.NewConfigFromMap(nil)
if diff := cmp.Diff(loggingConfig, defaultLoggingConfig); diff != "" {
t.Errorf("GetLoggingConfig() with missing ConfigMap should return defaults, (-got, +want) = %s", diff)
}

// Test that GetObservabilityConfig returns defaults when ConfigMap is missing
observabilityConfig, err := GetObservabilityConfig(ctx)
if err != nil {
t.Fatalf("GetObservabilityConfig() should not fail with missing ConfigMap, got error: %v", err)
}
// Verify it's the default config
defaultObservabilityConfig := observability.DefaultConfig()
if diff := cmp.Diff(observabilityConfig, defaultObservabilityConfig); diff != "" {
t.Errorf("GetObservabilityConfig() with missing ConfigMap should return defaults, (-got, +want) = %s", diff)
}

// Test that GetLeaderElectionConfig returns defaults when ConfigMap is missing
leaderElectionConfig, err := GetLeaderElectionConfig(ctx)
if err != nil {
t.Fatalf("GetLeaderElectionConfig() should not fail with missing ConfigMap, got error: %v", err)
}
// Verify it's the default (created from nil ConfigMap)
defaultLeaderElectionConfig, _ := leaderelection.NewConfigFromConfigMap(nil)
if diff := cmp.Diff(leaderElectionConfig, defaultLeaderElectionConfig); diff != "" {
t.Errorf("GetLeaderElectionConfig() with missing ConfigMap should return defaults, (-got, +want) = %s", diff)
}

// Test that SetupConfigMapWatchOrDie doesn't panic
logger := logtesting.TestLogger(t)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
if cmw == nil {
t.Fatal("SetupConfigMapWatchOrDie() should return a watcher")
}

// Test that WatchLoggingConfigOrDie doesn't panic with missing ConfigMap
atomicLevel := zap.NewAtomicLevel()
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, "test-component")

// Test that WatchObservabilityConfigOrDie doesn't panic with missing ConfigMap
// We need a pprof server for this, but we can use nil or create a minimal one
// For now, let's skip the pprof part and just verify the watcher setup doesn't crash
// Actually, we need to import the pprof package, let's check if we can create a minimal test
}

func TestConfigMapWatcherObservesLaterCreation(t *testing.T) {
// Use direct fake client (like in configmap/informer tests) to ensure informer events work
kc := fakekubeclientset.NewSimpleClientset()
ctx := logtesting.TestContextWithLogger(t)
// Set up context with the fake client using the same key as the injection system
ctx = context.WithValue(ctx, kubeclient.Key{}, kc)

// Create watcher directly (similar to SetupConfigMapWatchOrDie but with direct client)
cmw := cminformer.NewInformedWatcher(kc, system.Namespace())

// Track if the update handler was invoked
var handlerInvoked sync.WaitGroup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be simpler to just have a channel that returns ConfigMaps

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you could drop handlerMutex as well

handlerInvoked.Add(1)

var receivedConfigMap *corev1.ConfigMap
var handlerMutex sync.Mutex
updateHandler := func(cm *corev1.ConfigMap) {
handlerMutex.Lock()
defer handlerMutex.Unlock()
receivedConfigMap = cm
handlerInvoked.Done()
}

// Watch a ConfigMap that doesn't exist yet
cmName := "test-configmap"
cmw.Watch(cmName, updateHandler)

// Start the watcher
stopCh := make(chan struct{})
defer close(stopCh)

if err := cmw.Start(stopCh); err != nil {
t.Fatalf("cmw.Start() should succeed even with missing ConfigMap, got error: %v", err)
}

// Give the watcher time to fully start and sync
time.Sleep(500 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use the config map watchers Wait method here


// Now create the ConfigMap
testCM := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cmName,
Namespace: system.Namespace(),
},
Data: map[string]string{
"key": "value",
},
}

createdCM, err := kc.CoreV1().ConfigMaps(system.Namespace()).Create(ctx, testCM, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create ConfigMap: %v", err)
}

// Wait for the handler to be invoked (with timeout)
done := make(chan struct{})
go func() {
handlerInvoked.Wait()
close(done)
}()

select {
case <-done:
// Handler was invoked, verify the ConfigMap
handlerMutex.Lock()
defer handlerMutex.Unlock()
if receivedConfigMap == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use cmp.Diff to make this simpler - see other examples in the codebase

t.Fatal("Update handler was invoked but receivedConfigMap is nil")
}
if receivedConfigMap.Name != cmName {
t.Errorf("Received ConfigMap name = %q, want %q", receivedConfigMap.Name, cmName)
}
if receivedConfigMap.Namespace != system.Namespace() {
t.Errorf("Received ConfigMap namespace = %q, want %q", receivedConfigMap.Namespace, system.Namespace())
}
if got, want := receivedConfigMap.Data["key"], "value"; got != want {
t.Errorf("Received ConfigMap data[key] = %q, want %q", got, want)
}
// Verify it's the same object (or at least equivalent)
if receivedConfigMap.UID != createdCM.UID {
t.Errorf("Received ConfigMap UID = %q, want %q", receivedConfigMap.UID, createdCM.UID)
}
case <-time.After(5 * time.Second):
t.Fatal("Update handler was not invoked within 5 seconds after ConfigMap creation")
}
}
Loading