Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 controller/machine: use unstructured caching client #8896

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,22 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag

// MachineReconciler reconciles a Machine object.
type MachineReconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker
Client client.Client
UnstructuredCachingClient client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
}

func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
return (&machinecontroller.Reconciler{
Client: r.Client,
APIReader: r.APIReader,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
Client: r.Client,
UnstructuredCachingClient: r.UnstructuredCachingClient,
APIReader: r.APIReader,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
}).SetupWithManager(ctx, mgr, options)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/cluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start ClusterReconciler: %v", err))
}
if err := (&machinecontroller.Reconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
Client: mgr.GetClient(),
UnstructuredCachingClient: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
}).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
panic(fmt.Sprintf("Failed to start MachineReconciler: %v", err))
}
Expand Down
9 changes: 5 additions & 4 deletions internal/controllers/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ var (

// Reconciler reconciles a Machine object.
type Reconciler struct {
Client client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker
Client client.Client
UnstructuredCachingClient client.Client
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
Expand Down Expand Up @@ -755,7 +756,7 @@ func (r *Reconciler) reconcileDeleteExternal(ctx context.Context, m *clusterv1.M
}

// get the external object
obj, err := external.Get(ctx, r.Client, ref, m.Namespace)
obj, err := external.Get(ctx, r.UnstructuredCachingClient, ref, m.Namespace)
if err != nil && !apierrors.IsNotFound(errors.Cause(err)) {
return nil, errors.Wrapf(err, "failed to get %s %q for Machine %q in namespace %q",
ref.GroupVersionKind(), ref.Name, m.Name, m.Namespace)
Expand Down
10 changes: 7 additions & 3 deletions internal/controllers/machine/machine_controller_noderef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ func TestGetNode(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())

r := &Reconciler{
Tracker: tracker,
Client: env,
Tracker: tracker,
Client: env,
UnstructuredCachingClient: env,
}

w, err := ctrl.NewControllerManagedBy(env.Manager).For(&corev1.Node{}).Build(r)
Expand Down Expand Up @@ -740,7 +741,10 @@ func TestPatchNode(t *testing.T) {
},
}

r := Reconciler{Client: env}
r := Reconciler{
Client: env,
UnstructuredCachingClient: env,
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/machine/machine_controller_phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (r *Reconciler) reconcileExternal(ctx context.Context, cluster *clusterv1.C
return external.ReconcileOutput{}, err
}

obj, err := external.Get(ctx, r.Client, ref, m.Namespace)
obj, err := external.Get(ctx, r.UnstructuredCachingClient, ref, m.Namespace)
if err != nil {
if apierrors.IsNotFound(errors.Cause(err)) {
log.Info("could not find external ref, requeuing", ref.Kind, klog.KRef(ref.Namespace, ref.Name))
Expand Down
28 changes: 16 additions & 12 deletions internal/controllers/machine/machine_controller_phases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,13 +900,15 @@ func TestReconcileBootstrap(t *testing.T) {
}

bootstrapConfig := &unstructured.Unstructured{Object: tc.bootstrapConfig}
c := fake.NewClientBuilder().
WithObjects(tc.machine,
builder.GenericBootstrapConfigCRD.DeepCopy(),
builder.GenericInfrastructureMachineCRD.DeepCopy(),
bootstrapConfig,
).Build()
r := &Reconciler{
Client: fake.NewClientBuilder().
WithObjects(tc.machine,
builder.GenericBootstrapConfigCRD.DeepCopy(),
builder.GenericInfrastructureMachineCRD.DeepCopy(),
bootstrapConfig,
).Build(),
Client: c,
UnstructuredCachingClient: c,
}

s := &scope{cluster: defaultCluster, machine: tc.machine}
Expand Down Expand Up @@ -1111,13 +1113,15 @@ func TestReconcileInfrastructure(t *testing.T) {
}

infraConfig := &unstructured.Unstructured{Object: tc.infraConfig}
c := fake.NewClientBuilder().
WithObjects(tc.machine,
builder.GenericBootstrapConfigCRD.DeepCopy(),
builder.GenericInfrastructureMachineCRD.DeepCopy(),
infraConfig,
).Build()
r := &Reconciler{
Client: fake.NewClientBuilder().
WithObjects(tc.machine,
builder.GenericBootstrapConfigCRD.DeepCopy(),
builder.GenericInfrastructureMachineCRD.DeepCopy(),
infraConfig,
).Build(),
Client: c,
UnstructuredCachingClient: c,
}
s := &scope{cluster: defaultCluster, machine: tc.machine}
result, err := r.reconcileInfrastructure(ctx, s)
Expand Down
79 changes: 48 additions & 31 deletions internal/controllers/machine/machine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,14 @@ func TestMachineFinalizer(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

c := fake.NewClientBuilder().WithObjects(
clusterCorrectMeta,
machineValidCluster,
machineWithFinalizer,
).Build()
mr := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(
clusterCorrectMeta,
machineValidCluster,
machineWithFinalizer,
).Build(),
Client: c,
UnstructuredCachingClient: c,
}

_, _ = mr.Reconcile(ctx, tc.request)
Expand Down Expand Up @@ -556,8 +558,9 @@ func TestMachineOwnerReference(t *testing.T) {
machineValidControlled,
).WithStatusSubresource(&clusterv1.Machine{}).Build()
mr := &Reconciler{
Client: c,
APIReader: c,
Client: c,
UnstructuredCachingClient: c,
APIReader: c,
}

key := client.ObjectKey{Namespace: tc.m.Namespace, Name: tc.m.Name}
Expand Down Expand Up @@ -727,9 +730,10 @@ func TestReconcileRequest(t *testing.T) {
).WithStatusSubresource(&clusterv1.Machine{}).WithIndex(&corev1.Node{}, index.NodeProviderIDField, index.NodeByProviderID).Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
Client: clientFake,
UnstructuredCachingClient: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

result, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&tc.machine)})
Expand Down Expand Up @@ -975,9 +979,10 @@ func TestMachineConditions(t *testing.T) {
Build()

r := &Reconciler{
Client: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
Client: clientFake,
UnstructuredCachingClient: clientFake,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), clientFake, scheme.Scheme, client.ObjectKey{Name: testCluster.Name, Namespace: testCluster.Namespace}),
ssaCache: ssa.NewCache(),
}

_, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: util.ObjectKey(&machine)})
Expand Down Expand Up @@ -1064,8 +1069,10 @@ func TestReconcileDeleteExternal(t *testing.T) {
objs = append(objs, bootstrapConfig)
}

c := fake.NewClientBuilder().WithObjects(objs...).Build()
r := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(objs...).Build(),
Client: c,
UnstructuredCachingClient: c,
}

obj, err := r.reconcileDeleteExternal(ctx, machine, machine.Spec.Bootstrap.ConfigRef)
Expand Down Expand Up @@ -1106,8 +1113,10 @@ func TestRemoveMachineFinalizerAfterDeleteReconcile(t *testing.T) {
},
}
key := client.ObjectKey{Namespace: m.Namespace, Name: m.Name}
c := fake.NewClientBuilder().WithObjects(testCluster, m).WithStatusSubresource(&clusterv1.Machine{}).Build()
mr := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(testCluster, m).WithStatusSubresource(&clusterv1.Machine{}).Build(),
Client: c,
UnstructuredCachingClient: c,
}
_, err := mr.Reconcile(ctx, reconcile.Request{NamespacedName: key})
g.Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -1232,8 +1241,10 @@ func TestIsNodeDrainedAllowed(t *testing.T) {
var objs []client.Object
objs = append(objs, testCluster, tt.machine)

c := fake.NewClientBuilder().WithObjects(objs...).Build()
r := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(objs...).Build(),
Client: c,
UnstructuredCachingClient: c,
}

got := r.isNodeDrainAllowed(tt.machine)
Expand Down Expand Up @@ -1357,8 +1368,10 @@ func TestIsNodeVolumeDetachingAllowed(t *testing.T) {
var objs []client.Object
objs = append(objs, testCluster, tt.machine)

c := fake.NewClientBuilder().WithObjects(objs...).Build()
r := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(objs...).Build(),
Client: c,
UnstructuredCachingClient: c,
}

got := r.isNodeVolumeDetachingAllowed(tt.machine)
Expand Down Expand Up @@ -1709,16 +1722,18 @@ func TestIsDeleteNodeAllowed(t *testing.T) {
m2.Labels[clusterv1.MachineControlPlaneLabel] = ""
}

c := fake.NewClientBuilder().WithObjects(
tc.cluster,
tc.machine,
m1,
m2,
emp,
mcpBeingDeleted,
empBeingDeleted,
).Build()
mr := &Reconciler{
Client: fake.NewClientBuilder().WithObjects(
tc.cluster,
tc.machine,
m1,
m2,
emp,
mcpBeingDeleted,
empBeingDeleted,
).Build(),
Client: c,
UnstructuredCachingClient: c,
}

err := mr.isDeleteNodeAllowed(ctx, tc.cluster, tc.machine)
Expand Down Expand Up @@ -1981,7 +1996,8 @@ func TestNodeToMachine(t *testing.T) {
}

r := &Reconciler{
Client: env,
Client: env,
UnstructuredCachingClient: env,
}
for _, node := range fakeNodes {
request := r.nodeToMachine(ctx, node)
Expand Down Expand Up @@ -2159,10 +2175,11 @@ func TestNodeDeletion(t *testing.T) {
tracker := remote.NewTestClusterCacheTracker(ctrl.Log, fakeClient, fakeScheme, client.ObjectKeyFromObject(&testCluster))

r := &Reconciler{
Client: fakeClient,
Tracker: tracker,
recorder: record.NewFakeRecorder(10),
nodeDeletionRetryTimeout: 10 * time.Millisecond,
Client: fakeClient,
UnstructuredCachingClient: fakeClient,
Tracker: tracker,
recorder: record.NewFakeRecorder(10),
nodeDeletionRetryTimeout: 10 * time.Millisecond,
}

cluster := testCluster.DeepCopy()
Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start ClusterCacheReconciler: %v", err))
}
if err := (&Reconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
Client: mgr.GetClient(),
UnstructuredCachingClient: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
}).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
panic(fmt.Sprintf("Failed to start MachineReconciler: %v", err))
}
Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/machinehealthcheck/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start Reconciler : %v", err))
}
if err := (&machinecontroller.Reconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
Client: mgr.GetClient(),
UnstructuredCachingClient: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
}).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
panic(fmt.Sprintf("Failed to start MachineReconciler: %v", err))
}
Expand Down
7 changes: 4 additions & 3 deletions internal/controllers/machineset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start MMachineSetReconciler: %v", err))
}
if err := (&machinecontroller.Reconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
Client: mgr.GetClient(),
UnstructuredCachingClient: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
}).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
panic(fmt.Sprintf("Failed to start MachineReconciler: %v", err))
}
Expand Down
24 changes: 13 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,15 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
})
}

unstructuredCachingClient, err := client.New(mgr.GetConfig(), client.Options{
HTTPClient: mgr.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
Unstructured: true,
},
})

if feature.Gates.Enabled(feature.ClusterTopology) {
unstructuredCachingClient, err := client.New(mgr.GetConfig(), client.Options{
HTTPClient: mgr.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
Unstructured: true,
},
})
if err != nil {
setupLog.Error(err, "unable to create unstructured caching client", "controller", "ClusterTopology")
os.Exit(1)
Expand Down Expand Up @@ -435,10 +436,11 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
os.Exit(1)
}
if err := (&controllers.MachineReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
UnstructuredCachingClient: unstructuredCachingClient,
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(machineConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Machine")
os.Exit(1)
Expand Down