Skip to content

Commit

Permalink
support ssa patch
Browse files Browse the repository at this point in the history
Signed-off-by: nasusoba <[email protected]>

finished implementation

Signed-off-by: nasusoba <[email protected]>

add test

Signed-off-by: nasusoba <[email protected]>

fix ssaCache init

Signed-off-by: nasusoba <[email protected]>

increase control plane replicas for test

Signed-off-by: nasusoba <[email protected]>

fix typo

Signed-off-by: nasusoba <[email protected]>

fix typo
  • Loading branch information
nasusoba committed Aug 20, 2024
1 parent 030a7bc commit baeded4
Show file tree
Hide file tree
Showing 18 changed files with 1,491 additions and 46 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ linters-settings:
- github.com/go-logr/logr
- github.com/coredns/corefile-migration/migration
- github.com/pkg/errors
- github.com/davecgh/go-spew/spew

- k8s.io/api
- k8s.io/apimachinery/pkg
Expand Down
2 changes: 2 additions & 0 deletions controlplane/controllers/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ const (
etcdRemovalRequeueAfter = 30 * time.Second

k3sHookName = "k3s"

kcpManagerName = "capi-kthreescontrolplane"
)
97 changes: 97 additions & 0 deletions controlplane/controllers/kthreescontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -53,6 +54,8 @@ import (
"github.com/k3s-io/cluster-api-k3s/pkg/machinefilters"
"github.com/k3s-io/cluster-api-k3s/pkg/secret"
"github.com/k3s-io/cluster-api-k3s/pkg/token"
"github.com/k3s-io/cluster-api-k3s/pkg/util/contract"
"github.com/k3s-io/cluster-api-k3s/pkg/util/ssa"
)

// KThreesControlPlaneReconciler reconciles a KThreesControlPlane object.
Expand All @@ -68,6 +71,7 @@ type KThreesControlPlaneReconciler struct {

managementCluster k3s.ManagementCluster
managementClusterUncached k3s.ManagementCluster
ssaCache ssa.Cache
}

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
Expand Down Expand Up @@ -302,6 +306,7 @@ func (r *KThreesControlPlaneReconciler) SetupWithManager(ctx context.Context, mg
r.Scheme = mgr.GetScheme()
r.controller = c
r.recorder = mgr.GetEventRecorderFor("k3s-control-plane-controller")
r.ssaCache = ssa.NewCache()

if r.managementCluster == nil {
r.managementCluster = &k3s.Management{
Expand Down Expand Up @@ -516,6 +521,10 @@ func (r *KThreesControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return reconcile.Result{}, err
}

if err := r.syncMachines(ctx, controlPlane); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to sync Machines")
}

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef(), conditions.WithStepCounterIf(false))
Expand Down Expand Up @@ -673,6 +682,94 @@ func (r *KThreesControlPlaneReconciler) reconcileKubeconfig(ctx context.Context,
return reconcile.Result{}, nil
}

// syncMachines updates Machines, InfrastructureMachines and KThreesConfigs to propagate in-place mutable fields from KCP.
// Note: It also cleans up managed fields of all Machines so that Machines that were
// created/patched before (<= v0.2.0) the controller adopted Server-Side-Apply (SSA) can also work with SSA.
// Note: For InfrastructureMachines and KThreesConfigs it also drops ownership of "metadata.labels" and
// "metadata.annotations" from "manager" so that "capi-kthreescontrolplane" can own these fields and can work with SSA.
// Otherwise, fields would be co-owned by our "old" "manager" and "capi-kthreescontrolplane" and then we would not be
// able to e.g. drop labels and annotations.
func (r *KThreesControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *k3s.ControlPlane) error {
patchHelpers := map[string]*patch.Helper{}
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
// If the machine is already being deleted, we don't need to update it.
if !m.DeletionTimestamp.IsZero() {
continue
}

// Cleanup managed fields of all Machines.
// We do this so that Machines that were created/patched before the controller adopted Server-Side-Apply (SSA)
// (<= v0.2.0) can also work with SSA. Otherwise, fields would be co-owned by our "old" "manager" and
// "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.CleanUpManagedFieldsForSSAAdoption(ctx, r.Client, m, kcpManagerName); err != nil {
return errors.Wrapf(err, "failed to update Machine: failed to adjust the managedFields of the Machine %s", klog.KObj(m))
}
// Update Machine to propagate in-place mutable fields from KCP.
updatedMachine, err := r.updateMachine(ctx, m, controlPlane.KCP, controlPlane.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m))
}
controlPlane.Machines[machineName] = updatedMachine
// Since the machine is updated, re-create the patch helper so that any subsequent
// Patch calls use the correct base machine object to calculate the diffs.
// Example: reconcileControlPlaneConditions patches the machine objects in a subsequent call
// and, it should use the updated machine to calculate the diff.
// Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent
// Patch calls will fail because the patch will be calculated based on an outdated machine and will error
// because of outdated resourceVersion.
// TODO: This should be cleaned-up to have a more streamline way of constructing and using patchHelpers.
patchHelper, err := patch.NewHelper(updatedMachine, r.Client)
if err != nil {
return err
}
patchHelpers[machineName] = patchHelper

labelsAndAnnotationsManagedFieldPaths := []contract.Path{
{"f:metadata", "f:annotations"},
{"f:metadata", "f:labels"},
}
infraMachine, infraMachineFound := controlPlane.InfraResources[machineName]
// Only update the InfraMachine if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if infraMachineFound {
// Cleanup managed fields of all InfrastructureMachines to drop ownership of labels and annotations
// from "manager". We do this so that InfrastructureMachines that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, infraMachine, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine))
}
// Update in-place mutating fields on InfrastructureMachine.
if err := r.updateExternalObject(ctx, infraMachine, controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine))
}
}

kthreesConfigs, kthreesConfigsFound := controlPlane.KthreesConfigs[machineName]
// Only update the kthreesConfigs if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if kthreesConfigsFound {
// Note: Set the GroupVersionKind because updateExternalObject depends on it.
kthreesConfigs.SetGroupVersionKind(m.Spec.Bootstrap.ConfigRef.GroupVersionKind())
// Cleanup managed fields of all KThreesConfigs to drop ownership of labels and annotations
// from "manager". We do this so that KThreesConfigs that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "capi-kthreescontrolplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, kthreesConfigs, kcpManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of kthreesConfigs %s", klog.KObj(kthreesConfigs))
}
// Update in-place mutating fields on BootstrapConfig.
if err := r.updateExternalObject(ctx, kthreesConfigs, controlPlane.KCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update KThreesConfigs %s", klog.KObj(kthreesConfigs))
}
}
}
// Update the patch helpers.
controlPlane.SetPatchHelpers(patchHelpers)
return nil
}

// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and
// the status of the etcd cluster.
func (r *KThreesControlPlaneReconciler) reconcileControlPlaneConditions(ctx context.Context, controlPlane *k3s.ControlPlane) error {
Expand Down
157 changes: 124 additions & 33 deletions controlplane/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand All @@ -36,10 +37,12 @@ import (
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

bootstrapv1 "github.com/k3s-io/cluster-api-k3s/bootstrap/api/v1beta2"
controlplanev1 "github.com/k3s-io/cluster-api-k3s/controlplane/api/v1beta2"
k3s "github.com/k3s-io/cluster-api-k3s/pkg/k3s"
"github.com/k3s-io/cluster-api-k3s/pkg/util/ssa"
)

var ErrPreConditionFailed = errors.New("precondition check failed")
Expand Down Expand Up @@ -253,6 +256,12 @@ func selectMachineForScaleDown(ctx context.Context, controlPlane *k3s.ControlPla
func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KThreesControlPlane, bootstrapSpec *bootstrapv1.KThreesConfigSpec, failureDomain *string) error {
var errs []error

// Compute desired Machine
machine, err := r.computeDesiredMachine(kcp, cluster, failureDomain, nil)
if err != nil {
return errors.Wrap(err, "failed to create Machine: failed to compute desired Machine")
}

// Since the cloned resource should eventually have a controller ref for the Machine, we create an
// OwnerReference here without the Controller field set
infraCloneOwner := &metav1.OwnerReference{
Expand All @@ -275,6 +284,7 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte
// Safe to return early here since no resources have been created yet.
return fmt.Errorf("failed to clone infrastructure template: %w", err)
}
machine.Spec.InfrastructureRef = *infraRef

// Clone the bootstrap configuration
bootstrapRef, err := r.generateKThreesConfig(ctx, kcp, cluster, bootstrapSpec)
Expand All @@ -284,8 +294,9 @@ func (r *KThreesControlPlaneReconciler) cloneConfigsAndGenerateMachine(ctx conte

// Only proceed to generating the Machine if we haven't encountered an error
if len(errs) == 0 {
if err := r.generateMachine(ctx, kcp, cluster, infraRef, bootstrapRef, failureDomain); err != nil {
errs = append(errs, fmt.Errorf("failed to create Machine: %w", err))
machine.Spec.Bootstrap.ConfigRef = bootstrapRef
if err := r.createMachine(ctx, kcp, machine); err != nil {
errs = append(errs, errors.Wrap(err, "failed to create Machine"))
}
}

Expand Down Expand Up @@ -355,55 +366,135 @@ func (r *KThreesControlPlaneReconciler) generateKThreesConfig(ctx context.Contex
return bootstrapRef, nil
}

func (r *KThreesControlPlaneReconciler) generateMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, infraRef, bootstrapRef *corev1.ObjectReference, failureDomain *string) error {
machine := &clusterv1.Machine{
// updateExternalObject updates the external object with the labels and annotations from KCP.
func (r *KThreesControlPlaneReconciler) updateExternalObject(ctx context.Context, obj client.Object, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) error {
updatedObject := &unstructured.Unstructured{}
updatedObject.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
updatedObject.SetNamespace(obj.GetNamespace())
updatedObject.SetName(obj.GetName())
// Set the UID to ensure that Server-Side-Apply only performs an update
// and does not perform an accidental create.
updatedObject.SetUID(obj.GetUID())

// Update labels
updatedObject.SetLabels(k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate))
// Update annotations
updatedObject.SetAnnotations(kcp.Spec.MachineTemplate.ObjectMeta.Annotations)

if err := ssa.Patch(ctx, r.Client, kcpManagerName, updatedObject, ssa.WithCachingProxy{Cache: r.ssaCache, Original: obj}); err != nil {
return errors.Wrapf(err, "failed to update %s", obj.GetObjectKind().GroupVersionKind().Kind)
}
return nil
}

func (r *KThreesControlPlaneReconciler) createMachine(ctx context.Context, kcp *controlplanev1.KThreesControlPlane, machine *clusterv1.Machine) error {
if err := ssa.Patch(ctx, r.Client, kcpManagerName, machine); err != nil {
return errors.Wrap(err, "failed to create Machine")
}
// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil
}

func (r *KThreesControlPlaneReconciler) updateMachine(ctx context.Context, machine *clusterv1.Machine, kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster) (*clusterv1.Machine, error) {
updatedMachine, err := r.computeDesiredMachine(kcp, cluster, machine.Spec.FailureDomain, machine)
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine: failed to compute desired Machine")
}

err = ssa.Patch(ctx, r.Client, kcpManagerName, updatedMachine, ssa.WithCachingProxy{Cache: r.ssaCache, Original: machine})
if err != nil {
return nil, errors.Wrap(err, "failed to update Machine")
}
return updatedMachine, nil
}

// computeDesiredMachine computes the desired Machine.
// This Machine will be used during reconciliation to:
// * create a new Machine
// * update an existing Machine
// Because we are using Server-Side-Apply we always have to calculate the full object.
// There are small differences in how we calculate the Machine depending on if it
// is a create or update. Example: for a new Machine we have to calculate a new name,
// while for an existing Machine we have to use the name of the existing Machine.
// Also, for an existing Machine, we will not copy its labels, as they are not managed by the KThreesControlPlane controller.
func (r *KThreesControlPlaneReconciler) computeDesiredMachine(kcp *controlplanev1.KThreesControlPlane, cluster *clusterv1.Cluster, failureDomain *string, existingMachine *clusterv1.Machine) (*clusterv1.Machine, error) {
var machineName string
var machineUID types.UID
var version *string
annotations := map[string]string{}
if existingMachine == nil {
// Creating a new machine
machineName = names.SimpleNameGenerator.GenerateName(kcp.Name + "-")
version = &kcp.Spec.Version

// Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane.
// We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any.
serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal cluster configuration")
}
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig)

// In case this machine is being created as a consequence of a remediation, then add an annotation
// tracking remediating data.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
}
} else {
// Updating an existing machine
machineName = existingMachine.Name
machineUID = existingMachine.UID
version = existingMachine.Spec.Version

// For existing machine only set the ClusterConfiguration annotation if the machine already has it.
// We should not add the annotation if it was missing in the first place because we do not have enough
// information.
if serverConfig, ok := existingMachine.Annotations[controlplanev1.KThreesServerConfigurationAnnotation]; ok {
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = serverConfig
}

// If the machine already has remediation data then preserve it.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := existingMachine.Annotations[controlplanev1.RemediationForAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
}
}

// Construct the basic Machine.
desiredMachine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: names.SimpleNameGenerator.GenerateName(kcp.Name + "-"),
Name: machineName,
Namespace: kcp.Namespace,
UID: machineUID,
Labels: k3s.ControlPlaneLabelsForCluster(cluster.Name, kcp.Spec.MachineTemplate),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KThreesControlPlane")),
},
},
Spec: clusterv1.MachineSpec{
ClusterName: cluster.Name,
Version: &kcp.Spec.Version,
InfrastructureRef: *infraRef,
Bootstrap: clusterv1.Bootstrap{
ConfigRef: bootstrapRef,
},
ClusterName: cluster.Name,
Version: version,
FailureDomain: failureDomain,
NodeDrainTimeout: kcp.Spec.MachineTemplate.NodeDrainTimeout,
NodeVolumeDetachTimeout: kcp.Spec.MachineTemplate.NodeVolumeDetachTimeout,
NodeDeletionTimeout: kcp.Spec.MachineTemplate.NodeDeletionTimeout,
},
}

annotations := map[string]string{}

// Machine's bootstrap config may be missing ClusterConfiguration if it is not the first machine in the control plane.
// We store ClusterConfiguration as annotation here to detect any changes in KCP ClusterConfiguration and rollout the machine if any.
serverConfig, err := json.Marshal(kcp.Spec.KThreesConfigSpec.ServerConfig)
if err != nil {
return fmt.Errorf("failed to marshal cluster configuration: %w", err)
}
annotations[controlplanev1.KThreesServerConfigurationAnnotation] = string(serverConfig)

// In case this machine is being created as a consequence of a remediation, then add an annotation
// tracking remediating data.
// NOTE: This is required in order to track remediation retries.
if remediationData, ok := kcp.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
annotations[controlplanev1.RemediationForAnnotation] = remediationData
// Set annotations
for k, v := range kcp.Spec.MachineTemplate.ObjectMeta.Annotations {
annotations[k] = v
}

machine.SetAnnotations(annotations)
desiredMachine.SetAnnotations(annotations)

if err := r.Client.Create(ctx, machine); err != nil {
return fmt.Errorf("failed to create machine: %w", err)
if existingMachine != nil {
desiredMachine.Spec.InfrastructureRef = existingMachine.Spec.InfrastructureRef
desiredMachine.Spec.Bootstrap.ConfigRef = existingMachine.Spec.Bootstrap.ConfigRef
}

// Remove the annotation tracking that a remediation is in progress (the remediation completed when
// the replacement machine has been created above).
delete(kcp.Annotations, controlplanev1.RemediationInProgressAnnotation)
return nil
return desiredMachine, nil
}
Loading

0 comments on commit baeded4

Please sign in to comment.