Skip to content

Commit 5c2b57b

Browse files
committed
resolve Christoph's comments
fix linter issue
1 parent f929711 commit 5c2b57b

7 files changed

Lines changed: 212 additions & 120 deletions

File tree

hack/tools.checksums

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ etcd|GOARCH=arm64;GOOS=linux|cc8c645e5a8df0f35f2a5c51d9b9383037eef0cf0167c52e648
66
gimps|GOARCH=amd64;GOOS=linux|b597efc7e2c72097a44c001b41a06ccca97610963e1f1aec74c3d99c0e0b6c11
77
gimps|GOARCH=arm64;GOOS=linux|2588daec997b4f4b3a8d8875f780fd6faf3c39c933519e7899e19a686476c8e4
88
golangci-lint|GOARCH=amd64;GOOS=linux|8a01a08dad47a14824d7d0f14af07c7144105fc079386c9c31fbe85f08f91643
9+
golangci-lint|GOARCH=arm64;GOOS=darwin|5fd0b6a09353eb0101d3ae81d5e3cf4707b77210c66fb92ae152d7280d959419
910
golangci-lint|GOARCH=arm64;GOOS=linux|2ed9cf2ad070dabc7947ba34cdc5142910be830306f063719898bc8fb44a7074
1011
kube-apiserver|GOARCH=amd64;GOOS=linux|ca822082ec39e54a25836a4011ddb66e482e317a7a4f1a1f73882bbd2cf5a2a1
1112
kube-apiserver|GOARCH=arm64;GOOS=linux|6ade6c2646e2c01fde1095407452afc2b65e89d6da16da29ee39f6223ccaf63b

internal/controller/sync/controller.go

Lines changed: 156 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
corev1 "k8s.io/api/core/v1"
3838
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
39+
"k8s.io/apimachinery/pkg/api/meta"
3940
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4041
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4142
"k8s.io/apimachinery/pkg/labels"
@@ -164,11 +165,31 @@ func Create(
164165
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
165166
}
166167

167-
// Watch origin:kcp related resources so that changes to them trigger reconciliation
168-
// of the owning primary object. Only related resources with a Watch config are covered.
169-
watchedGVKs := sets.New[schema.GroupVersionKind]()
168+
if err := setupRelatedResourceWatches(c, localManager, remoteManager, pubRes, localDummy, remoteDummy, log); err != nil {
169+
return nil, err
170+
}
171+
172+
log.Info("Done setting up unmanaged controller.")
173+
174+
return c, nil
175+
}
176+
177+
// setupRelatedResourceWatches sets up watches for all related resources that have a Watch
178+
// config, on their respective origin side, so that changes trigger primary reconciliation.
179+
func setupRelatedResourceWatches(
180+
c mccontroller.Controller,
181+
localManager manager.Manager,
182+
remoteManager mcmanager.Manager,
183+
pubRes *syncagentv1alpha1.PublishedResource,
184+
localDummy, remoteDummy *unstructured.Unstructured,
185+
log *zap.SugaredLogger,
186+
) error {
187+
// Deduplication is per-origin to allow the same GVK on both sides.
188+
watchedKcpGVKs := sets.New[schema.GroupVersionKind]()
189+
watchedServiceGVKs := sets.New[schema.GroupVersionKind]()
190+
170191
for _, relRes := range pubRes.Spec.Related {
171-
if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp || relRes.Watch == nil {
192+
if relRes.Watch == nil {
172193
continue
173194
}
174195

@@ -178,62 +199,155 @@ func Create(
178199
Resource: relRes.Resource,
179200
}
180201

181-
// Use the local REST mapper to determine the Kind.
182-
gvk, err := localManager.GetRESTMapper().KindFor(gvr)
183-
if err != nil {
184-
log.Warnw("Failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err)
185-
continue
202+
// Use the REST mapper of the origin side: related resources may have projected GVKs
203+
// that differ between kcp and the service cluster, so we must resolve using the
204+
// mapper that actually knows about the GVR on that side.
205+
var originRESTMapper meta.RESTMapper
206+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
207+
originRESTMapper = remoteManager.GetLocalManager().GetRESTMapper()
208+
} else {
209+
originRESTMapper = localManager.GetRESTMapper()
186210
}
187211

188-
// Deduplicate: only set up one watch per GVK.
189-
if watchedGVKs.Has(gvk) {
190-
continue
212+
gvk, err := originRESTMapper.KindFor(gvr)
213+
if err != nil {
214+
return fmt.Errorf("failed to determine Kind for related resource %v (origin: %s): %w", gvr, relRes.Origin, err)
191215
}
192-
watchedGVKs.Insert(gvk)
193216

194217
relatedDummy := &unstructured.Unstructured{}
195218
relatedDummy.SetGroupVersionKind(gvk)
196219

197-
var enqueueForRelated mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request]
220+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
221+
if watchedKcpGVKs.Has(gvk) {
222+
continue
223+
}
224+
watchedKcpGVKs.Insert(gvk)
198225

199-
switch {
200-
case relRes.Watch.ByOwner != nil:
201-
ownerKind := relRes.Watch.ByOwner.Kind
202-
enqueueForRelated = func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
203-
return &byOwnerEventHandler{
204-
clusterName: clusterName,
205-
ownerKind: ownerKind,
206-
}
226+
enqueueForRelated, err := buildKcpRelatedHandler(relRes.Watch, gvk, remoteDummy, log)
227+
if err != nil {
228+
return err
207229
}
208230

209-
case relRes.Watch.ByLabel != nil:
210-
labelTemplates := relRes.Watch.ByLabel
211-
primaryDummy := remoteDummy.DeepCopy()
212-
enqueueForRelated = func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
213-
return &byLabelEventHandler{
214-
clusterName: clusterName,
215-
client: cl.GetClient(),
216-
primaryDummy: primaryDummy,
217-
labelTemplates: labelTemplates,
218-
log: log,
219-
}
231+
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
232+
return fmt.Errorf("failed to setup watch for kcp-origin related resource %v: %w", gvk, err)
233+
}
234+
} else {
235+
if watchedServiceGVKs.Has(gvk) {
236+
continue
220237
}
238+
watchedServiceGVKs.Insert(gvk)
221239

222-
default:
223-
log.Warnw("origin:kcp related resource has Watch set but neither byOwner nor byLabel configured, skipping", "gvk", gvk)
224-
continue
225-
}
240+
enqueueForRelated, err := buildServiceRelatedHandler(relRes.Watch, gvk, localDummy, localManager, log)
241+
if err != nil {
242+
return err
243+
}
226244

227-
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
228-
return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err)
245+
if err := c.Watch(source.TypedKind(localManager.GetCache(), relatedDummy, enqueueForRelated)); err != nil {
246+
return fmt.Errorf("failed to setup watch for service-origin related resource %v: %w", gvk, err)
247+
}
229248
}
230249

231-
log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk)
250+
log.Infow("Set up watch for related resource", "gvk", gvk, "origin", relRes.Origin)
232251
}
233252

234-
log.Info("Done setting up unmanaged controller.")
253+
return nil
254+
}
235255

236-
return c, nil
256+
// buildKcpRelatedHandler constructs the per-cluster event handler for a kcp-origin related resource.
257+
func buildKcpRelatedHandler(
258+
watch *syncagentv1alpha1.RelatedResourceWatch,
259+
gvk schema.GroupVersionKind,
260+
remoteDummy *unstructured.Unstructured,
261+
log *zap.SugaredLogger,
262+
) (mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request], error) {
263+
switch {
264+
case watch.ByOwner != nil:
265+
ownerGVK := remoteDummy.GroupVersionKind()
266+
return func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
267+
return &byOwnerEventHandler{
268+
clusterName: clusterName,
269+
ownerGVK: ownerGVK,
270+
}
271+
}, nil
272+
273+
case watch.BySelector != nil:
274+
labelSelector := watch.BySelector
275+
primaryDummy := remoteDummy.DeepCopy()
276+
return func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
277+
return &bySelectorEventHandler{
278+
clusterName: clusterName,
279+
client: cl.GetClient(),
280+
primaryDummy: primaryDummy,
281+
labelSelector: labelSelector,
282+
log: log,
283+
}
284+
}, nil
285+
286+
default:
287+
return nil, fmt.Errorf("related resource %v (origin: kcp) has Watch set but neither byOwner nor bySelector configured", gvk)
288+
}
289+
}
290+
291+
// buildServiceRelatedHandler constructs the event handler for a service-cluster-origin related resource.
292+
// It maps the changed related resource back to the remote (kcp) primary via sync metadata on the local primary.
293+
func buildServiceRelatedHandler(
294+
watch *syncagentv1alpha1.RelatedResourceWatch,
295+
gvk schema.GroupVersionKind,
296+
localDummy *unstructured.Unstructured,
297+
localManager manager.Manager,
298+
log *zap.SugaredLogger,
299+
) (handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request], error) {
300+
localClient := localManager.GetClient()
301+
302+
switch {
303+
case watch.ByOwner != nil:
304+
ownerGVK := localDummy.GroupVersionKind()
305+
primaryDummy := localDummy.DeepCopy()
306+
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj *unstructured.Unstructured) []mcreconcile.Request {
307+
for _, ref := range obj.GetOwnerReferences() {
308+
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
309+
if err != nil || refGV.Group != ownerGVK.Group || refGV.Version != ownerGVK.Version || ref.Kind != ownerGVK.Kind {
310+
continue
311+
}
312+
localPrimary := primaryDummy.DeepCopy()
313+
if err := localClient.Get(ctx, types.NamespacedName{Namespace: obj.GetNamespace(), Name: ref.Name}, localPrimary); err != nil {
314+
log.Warnw("Failed to fetch local primary for byOwner watch", "owner", ref.Name, "error", err)
315+
return nil
316+
}
317+
if req := sync.RemoteNameForLocalObject(localPrimary); req != nil {
318+
return []mcreconcile.Request{*req}
319+
}
320+
return nil
321+
}
322+
return nil
323+
}), nil
324+
325+
case watch.BySelector != nil:
326+
selector, err := metav1.LabelSelectorAsSelector(watch.BySelector)
327+
if err != nil {
328+
return nil, fmt.Errorf("failed to convert bySelector for service-origin related resource %v: %w", gvk, err)
329+
}
330+
primaryDummy := localDummy.DeepCopy()
331+
return handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, _ *unstructured.Unstructured) []mcreconcile.Request {
332+
primaryList := &unstructured.UnstructuredList{}
333+
primaryList.SetAPIVersion(primaryDummy.GetAPIVersion())
334+
primaryList.SetKind(primaryDummy.GetKind() + "List")
335+
if err := localClient.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
336+
log.Warnw("Failed to list local primary objects for bySelector watch", "selector", selector.String(), "error", err)
337+
return nil
338+
}
339+
var reqs []mcreconcile.Request
340+
for i := range primaryList.Items {
341+
if req := sync.RemoteNameForLocalObject(&primaryList.Items[i]); req != nil {
342+
reqs = append(reqs, *req)
343+
}
344+
}
345+
return reqs
346+
}), nil
347+
348+
default:
349+
return nil, fmt.Errorf("related resource %v (origin: service) has Watch set but neither byOwner nor bySelector configured", gvk)
350+
}
237351
}
238352

239353
func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request) (reconcile.Result, error) {

internal/controller/sync/related_handlers.go

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ package sync
1818

1919
import (
2020
"context"
21+
"fmt"
2122

2223
"go.uber.org/zap"
2324

24-
"github.com/kcp-dev/api-syncagent/internal/sync/templating"
25-
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
2728
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/client-go/util/workqueue"
2930
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,10 +34,10 @@ import (
3334
)
3435

3536
// byOwnerEventHandler enqueues the primary object by inspecting the OwnerReferences
36-
// of the changed related object and finding one with the configured Kind.
37+
// of the changed related object and finding one matching the configured GVK.
3738
type byOwnerEventHandler struct {
3839
clusterName string
39-
ownerKind string
40+
ownerGVK schema.GroupVersionKind
4041
}
4142

4243
func (h *byOwnerEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
@@ -57,7 +58,11 @@ func (h *byOwnerEventHandler) Generic(_ context.Context, evt event.TypedGenericE
5758

5859
func (h *byOwnerEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
5960
for _, ref := range obj.GetOwnerReferences() {
60-
if ref.Kind == h.ownerKind {
61+
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
62+
if err != nil {
63+
continue
64+
}
65+
if refGV.Group == h.ownerGVK.Group && refGV.Version == h.ownerGVK.Version && ref.Kind == h.ownerGVK.Kind {
6166
q.Add(mcreconcile.Request{
6267
ClusterName: h.clusterName,
6368
Request: reconcile.Request{
@@ -72,60 +77,46 @@ func (h *byOwnerEventHandler) enqueue(obj *unstructured.Unstructured, q workqueu
7277
}
7378
}
7479

75-
// byLabelEventHandler enqueues primary objects by evaluating label templates against
76-
// the changed related object and listing primaries matching the resulting label selector.
77-
type byLabelEventHandler struct {
78-
clusterName string
79-
client ctrlruntimeclient.Client
80-
primaryDummy *unstructured.Unstructured
81-
labelTemplates map[string]string
82-
log *zap.SugaredLogger
80+
// bySelectorEventHandler enqueues primary objects by listing primaries matching the configured
81+
// label selector whenever a related object changes.
82+
type bySelectorEventHandler struct {
83+
clusterName string
84+
client ctrlruntimeclient.Client
85+
primaryDummy *unstructured.Unstructured
86+
labelSelector *metav1.LabelSelector
87+
log *zap.SugaredLogger
8388
}
8489

85-
func (h *byLabelEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
90+
func (h *bySelectorEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
8691
h.enqueue(ctx, evt.Object, q)
8792
}
8893

89-
func (h *byLabelEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
94+
func (h *bySelectorEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
9095
h.enqueue(ctx, evt.ObjectNew, q)
9196
}
9297

93-
func (h *byLabelEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
98+
func (h *bySelectorEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
9499
h.enqueue(ctx, evt.Object, q)
95100
}
96101

97-
func (h *byLabelEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
102+
func (h *bySelectorEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
98103
h.enqueue(ctx, evt.Object, q)
99104
}
100105

101-
func (h *byLabelEventHandler) enqueue(ctx context.Context, obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
102-
// Build the template context using the changed related object.
103-
data := map[string]any{
104-
"watchObject": map[string]any{
105-
"name": obj.GetName(),
106-
"namespace": obj.GetNamespace(),
107-
"labels": obj.GetLabels(),
108-
},
109-
}
110-
111-
// Evaluate each label template to build the selector.
112-
matchingLabels := ctrlruntimeclient.MatchingLabels{}
113-
for key, tpl := range h.labelTemplates {
114-
value, err := templating.Render(tpl, data)
115-
if err != nil {
116-
h.log.Warnw("Failed to evaluate byLabel template", "key", key, "template", tpl, "error", err)
117-
return
118-
}
119-
matchingLabels[key] = value
106+
func (h *bySelectorEventHandler) enqueue(ctx context.Context, _ *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
107+
selector, err := metav1.LabelSelectorAsSelector(h.labelSelector)
108+
if err != nil {
109+
h.log.Warnw("Failed to convert bySelector selector", "error", err)
110+
return
120111
}
121112

122-
// List primary objects matching the derived label selector.
113+
// List primary objects matching the label selector.
123114
primaryList := &unstructured.UnstructuredList{}
124115
primaryList.SetAPIVersion(h.primaryDummy.GetAPIVersion())
125116
primaryList.SetKind(h.primaryDummy.GetKind() + "List")
126117

127-
if err := h.client.List(ctx, primaryList, matchingLabels); err != nil {
128-
h.log.Warnw("Failed to list primary objects for byLabel watch", "selector", matchingLabels, "error", err)
118+
if err := h.client.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
119+
h.log.Warnw("Failed to list primary objects for bySelector watch", "selector", fmt.Sprintf("%v", selector), "error", err)
129120
return
130121
}
131122

0 commit comments

Comments
 (0)