Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support SSA patch for metadata propagation #136

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ linters-settings:
- github.com/go-logr/logr
- github.com/coredns/corefile-migration/migration
- github.com/pkg/errors
- github.com/davecgh/go-spew/spew

- k8s.io/api
- k8s.io/apimachinery/pkg
Expand Down
2 changes: 2 additions & 0 deletions controlplane/controllers/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ const (
etcdRemovalRequeueAfter = 30 * time.Second

k3sHookName = "k3s"

kcpManagerName = "capi-kthreescontrolplane"
)
97 changes: 97 additions & 0 deletions controlplane/controllers/kthreescontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -53,6 +54,8 @@ import (
"github.com/k3s-io/cluster-api-k3s/pkg/machinefilters"
"github.com/k3s-io/cluster-api-k3s/pkg/secret"
"github.com/k3s-io/cluster-api-k3s/pkg/token"
"github.com/k3s-io/cluster-api-k3s/pkg/util/contract"
"github.com/k3s-io/cluster-api-k3s/pkg/util/ssa"
)

// KThreesControlPlaneReconciler reconciles a KThreesControlPlane object.
Expand All @@ -68,6 +71,7 @@ type KThreesControlPlaneReconciler struct {

managementCluster k3s.ManagementCluster
managementClusterUncached k3s.ManagementCluster
ssaCache ssa.Cache
}

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
Expand Down Expand Up @@ -302,6 +306,7 @@ func (r *KThreesControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
r.Scheme = mgr.GetScheme()
r.controller = c
r.recorder = mgr.GetEventRecorderFor("k3s-control-plane-controller")
r.ssaCache = ssa.NewCache()

if r.managementCluster == nil {
r.managementCluster = &k3s.Management{
Expand Down Expand Up @@ -516,6 +521,10 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return reconcile.Result{}, err
}

if err := r.syncMachines(ctx, controlPlane); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to sync Machines")
}

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false))
Expand Down Expand Up @@ -673,6 +682,94 @@ func (r *KThreesControlPlaneReconciler) reconcileKubeconfig(ctx context.Context,
return reconcile.Result{}, nil
}

// syncMachines updates Machines, InfrastructureMachines and KThreesConfigs to propagate in-place mutable fields from KCP.
// Note: It also cleans up managed fields of all Machines so that Machines that were
// created/patched before (<= v0.2.0) the controller adopted Server-Side-Apply (SSA) can also work with SSA.
// Note: For InfrastructureMachines and KThreesConfigs it also drops ownership of "metadata.labels" and
// "metadata.annotations" from "manager" so that "capi-kthreescontrolplane" can own these fields and can work with SSA.
// Otherwise, fields would be co-owned by our "old" "manager" and "capi-kthreescontrolplane" and then we would not be
// able to e.g. drop labels and annotations.
func (r *KThreesControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *k3s.ControlPlane) error {
patchHelpers := map[string]*patch.Helper{}
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
// If the machine is already being deleted, we don't need to update it.
if !m.DeletionTimestamp.IsZero() {
continue
}

// Cleanup managed fields of all Machines.
// We do this so that Machines that were created/patched before the controller adopted Server-Side-Apply (SSA)
// (<= v0.2.0) can also work with SSA. Otherwise, fields would be co-owned by our "old" "manager" and
// "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.CleanUpManagedFieldsForSSAAdoption(ctx, r.Client, m, kcpManagerName); err != nil {
return errors.Wrapf(err, "failed to update Machine: failed to adjust the managedFields of the Machine %s", klog.KObj(m))
}
// Update Machine to propagate in-place mutable fields from KCP.
updatedMachine, err := r.updateMachine(ctx, m, controlPlane.KCP, controlPlane.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m))
}
controlPlane.Machines[machineName] = updatedMachine
// Since the machine is updated, re-create the patch helper so that any subsequent
// Patch calls use the correct base machine object to calculate the diffs.
// Example: reconcileControlPlaneConditions patches the machine objects in a subsequent call
// and, it should use the updated machine to calculate the diff.
// Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent
// Patch calls will fail because the patch will be calculated based on an outdated machine and will error
// because of outdated resourceVersion.
// TODO: This should be cleaned-up to have a more streamline way of constructing and using patchHelpers.
patchHelper, err := patch.NewHelper(updatedMachine, r.Client)
if err != nil {
return err
}
patchHelpers[machineName] = patchHelper

labelsAndAnnotationsManagedFieldPaths := []contract.Path{
{"f:metadata", "f:annotations"},
{"f:metadata", "f:labels"},
}
infraMachine, infraMachineFound := controlPlane.InfraResources[machineName]
// Only update the InfraMachine if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if infraMachineFound {
// Cleanup managed fields of all InfrastructureMachines to drop ownership of labels and annotations
// from "manager". We do this so that InfrastructureMachines that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, infraMachine, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine))
}
// Update in-place mutating fields on InfrastructureMachine.
if err := r.updateExternalObject(ctx, infraMachine, controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine))
}
}

kthreesConfigs, kthreesConfigsFound := controlPlane.KthreesConfigs[machineName]
// Only update the kthreesConfigs if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if kthreesConfigsFound {
// Note: Set the GroupVersionKind because updateExternalObject depends on it.
kthreesConfigs.SetGroupVersionKind(m.Spec.Bootstrap.ConfigRef.GroupVersionKind())
// Cleanup managed fields of all KThreesConfigs to drop ownership of labels and annotations
// from "manager". We do this so that KThreesConfigs that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, kthreesConfigs, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of kthreesConfigs %s", klog.KObj(kthreesConfigs))
}
// Update in-place mutating fields on BootstrapConfig.
if err := r.updateExternalObject(ctx, kthreesConfigs, controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update KThreesConfigs %s", klog.KObj(kthreesConfigs))
}
}
}
// Update the patch helpers.
controlPlane.SetPatchHelpers(patchHelpers)
return nil
}

// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and
// the status of the etcd cluster.
func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *k3s.ControlPlane) error {
Expand Down
157 changes: 124 additions & 33 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand All @@ -36,10 +37,12 @@ import (
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta2"
controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2"
k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s"
"github.com/k3s-io/cluster-api-k3s/pkg/util/ssa"
)

var ErrPreConditionFailed = errors.New("precondition check failed")
Expand Down Expand Up @@ -253,6 +256,12 @@ func selectMachineForScaleDown(ctx context.Context, controlPlane *k3s.ControlPla
func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KThreesControlPlane, bootstrapSpec *bootstrapv1.KThreesConfigSpec, failureDomain *string) error {
var errs []error

// Compute desired Machine
machine, err := r.computeDesiredMachine(kcp, cluster, failureDomain, nil)
if err != nil {
return errors.Wrap(err, "failed to create Machine: failed to compute desired Machine")
}

// Since the cloned resource should eventually have a controller ref for the Machine, we create an
// OwnerReference here without the Controller field set
infraCloneOwner := &metav1.OwnerReference{
Expand All @@ -275,6 +284,7 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte
// Safe to return early here since no resources have been created yet.
return fmt.Errorf("failed to clone infrastructure template: %w", err)
}
machine.Spec.InfrastructureRef = *infraRef

// Clone the bootstrap configuration
bootstrapRef, err := r.generateKThreesConfig(ctx, kcp, cluster, bootstrapSpec)
Expand All @@ -284,8 +294,9 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte

// Only proceed to generating the Machine if we haven't encountered an error
if len(errs) == 0 {
if err := r.generateMachine(ctx, kcp, cluster, infraRef, bootstrapRef, failureDomain); err != nil {
errs = append(errs, fmt.Errorf("failed to create Machine: %w", err))
machine.Spec.Bootstrap.ConfigRef = bootstrapRef
if err := r.createMachine(ctx, kcp, machine); err != nil {
errs = append(errs, errors.Wrap(err, "failed to create Machine"))
}
}

Expand Down Expand Up @@ -355,55 +366,135 @@ func (r *KThreesControlPlaneReconciler) generateKThreesConfig(ctx context.Contex
return bootstrapRef, nil
}

func (r *KThreesControlPlaneReconciler) generateMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, infraRef, bootstrapRef *corev1.ObjectReference, failureDomain *string) error {
machine := &clusterv1.Machine{
// updateExternalObject updates the external object with the labels and annotations from KCP.
func (r *KThreesControlPlaneReconciler) updateExternalObject(ctx context.Context, obj client.Object, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) error {
updatedObject := &unstructured.Unstructured{}
updatedObject.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
updatedObject.SetNamespace(obj.GetNamespace())
updatedObject.SetName(obj.GetName())
// Set the UID to ensure that Server-Side-Apply only performs an update
// and does not perform an accidental create.
updatedObject.SetUID(obj.GetUID())

// Update labels
updatedObject.SetLabels(k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate))
// Update annotations
updatedObject.SetAnnotations(kcp.Spec.MachineTemplate.ObjectMeta.Annotations)

if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil {
return errors.Wrapf(err, "failed to update %s", obj.GetObjectKind().GroupVersionKind().Kind)
}
return nil
}

func (r *KThreesControlPlaneReconciler) createMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, machine *clusterv1.Machine) error {
if err := ssa.Patch(ctx, r.Client, kcpManagerName, machine); err != nil {
return errors.Wrap(err, "failed to create Machine")
}
// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil
}

func (r *KThreesControlPlaneReconciler) updateMachine(ctx context.Context, machine *clusterv1.Machine, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) (*clusterv1.Machine, error) {
updatedMachine, err := r.computeDesiredMachine(kcp, cluster, machine.Spec.FailureDomain, machine)
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine: failed to compute desired Machine")
}

err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine})
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine")
}
return updatedMachine, nil
}

// computeDesiredMachine computes the desired Machine.
// This Machine will be used during reconciliation to:
// * create a new Machine
// * update an existing Machine
// Because we are using Server-Side-Apply we always have to calculate the full object.
// There are small differences in how we calculate the Machine depending on if it
// is a create or update. Example: for a new Machine we have to calculate a new name,
// while for an existing Machine we have to use the name of the existing Machine.
// Also, for an existing Machine, we will not copy its labels, as they are not managed by the KThreesControlPlane controller.
func (r *KThreesControlPlaneReconciler) computeDesiredMachine(kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, failureDomain *string, existingMachine *clusterv1.Machine) (*clusterv1.Machine, error) {
var machineName string
var machineUID types.UID
var version *string
annotations := map[string]string{}
if existingMachine == nil {
// Creating a new machine
machineName = names.SimpleNameGenerator.GenerateName(kcp.Name + "-")
version = &kcp.Spec.Version

// Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane.
// We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any.
serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal cluster configuration")
}
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig)

// In case this machine is being created as a consequence of a remediation, then add an annotation
// tracking remediating data.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
}
} else {
// Updating an existing machine
machineName = existingMachine.Name
machineUID = existingMachine.UID
version = existingMachine.Spec.Version

// For existing machine only set the ClusterConfiguration annotation if the machine already has it.
// We should not add the annotation if it was missing in the first place because we do not have enough
// information.
if serverConfig, ok := existingMachine.Annotations[controlplanev1.KThreesServerConfigurationAnnotation]; ok {
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = serverConfig
}

// If the machine already has remediation data then preserve it.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := existingMachine.Annotations[controlplanev1.RemediationForAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
}
}

// Construct the basic Machine.
desiredMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"),
Name: machineName,
Namespace: kcp.Namespace,
UID: machineUID,
Labels: k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KThreesControlPlane")),
},
},
Spec: clusterv1.MachineSpec{
ClusterName: cluster.Name,
Version: &kcp.Spec.Version,
InfrastructureRef: *infraRef,
Bootstrap: clusterv1.Bootstrap{
ConfigRef: bootstrapRef,
},
ClusterName: cluster.Name,
Version: version,
FailureDomain: failureDomain,
NodeDrainTimeout: kcp.Spec.MachineTemplate.NodeDrainTimeout,
NodeVolumeDetachTimeout: kcp.Spec.MachineTemplate.NodeVolumeDetachTimeout,
NodeDeletionTimeout: kcp.Spec.MachineTemplate.NodeDeletionTimeout,
},
}

annotations := map[string]string{}

// Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane.
// We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any.
serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig)
if err != nil {
return fmt.Errorf("failed to marshal cluster configuration: %w", err)
}
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig)

// In case this machine is being created as a consequence of a remediation, then add an annotation
// tracking remediating data.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
// Set annotations
for k, v := range kcp.Spec.MachineTemplate.ObjectMeta.Annotations {
annotations[k] = v
}

machine.SetAnnotations(annotations)
desiredMachine.SetAnnotations(annotations)

if err := r.Client.Create(ctx, machine); err != nil {
return fmt.Errorf("failed to create machine: %w", err)
if existingMachine != nil {
desiredMachine.Spec.InfrastructureRef = existingMachine.Spec.InfrastructureRef
desiredMachine.Spec.Bootstrap.ConfigRef = existingMachine.Spec.Bootstrap.ConfigRef
}

// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil
return desiredMachine, nil
}
Loading
Loading