Skip to content

Commit

Permalink
backup
Browse files Browse the repository at this point in the history
  • Loading branch information
Chiwency committed Aug 19, 2024
1 parent 8cd636c commit 8cf336b
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 50 deletions.
6 changes: 6 additions & 0 deletions apis/dataprotection/v1alpha1/actionset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ type RestoreActionSpec struct {
//
// +optional
PostReady []ActionSpec `json:"postReady,omitempty"`

// Determines if a base backup is required during restoration.
//
// +optional
// +kubebuilder:default=true
BaseBackupRequired *bool `json:"baseBackupRequired,omitempty"`
}

// ActionSpec defines an action that should be executed. Only one of the fields may be set.
Expand Down
4 changes: 2 additions & 2 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func main() {
userAgent string
)

flag.String(metricsAddrFlagKey.String(), ":8080", "The address the metric endpoint binds to.")
flag.String(metricsAddrFlagKey.String(), ":8083", "The address the metric endpoint binds to.")
flag.String(probeAddrFlagKey.String(), ":8081", "The address the probe endpoint binds to.")
flag.Bool(leaderElectFlagKey.String(), false,
"Enable leader election for controller manager. "+
Expand Down Expand Up @@ -184,7 +184,7 @@ func main() {
viper.OnConfigChange(func(e fsnotify.Event) {
setupLog.Info(fmt.Sprintf("config file changed: %s", e.Name))
})
viper.WatchConfig()
// viper.WatchConfig()

metricsAddr = viper.GetString(metricsAddrFlagKey.viperName())
probeAddr = viper.GetString(probeAddrFlagKey.viperName())
Expand Down
4 changes: 2 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (r flagName) viperName() string {

func setupFlags() {
flag.String(metricsAddrFlagKey.String(), ":8080", "The address the metric endpoint binds to.")
flag.String(probeAddrFlagKey.String(), ":8081", "The address the probe endpoint binds to.")
flag.String(probeAddrFlagKey.String(), ":8082", "The address the probe endpoint binds to.")
flag.Bool(leaderElectFlagKey.String(), false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand Down Expand Up @@ -288,7 +288,7 @@ func main() {
viper.OnConfigChange(func(e fsnotify.Event) {
setupLog.Info(fmt.Sprintf("config file changed: %s", e.Name))
})
viper.WatchConfig()
// viper.WatchConfig()

setupLog.Info(fmt.Sprintf("config settings: %v", viper.AllSettings()))
if err := validateRequiredToParseConfigs(); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/dataprotection.kubeblocks.io_actionsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ spec:
restore:
description: Specifies the restore action.
properties:
baseBackupRequired:
default: true
description: Determines if a base backup is required during restoration.
type: boolean
postReady:
description: Specifies the actions that should be executed after
the data has been prepared and is ready for restoration.
Expand Down
55 changes: 25 additions & 30 deletions controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/utils/pointer"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -650,13 +651,21 @@ func (r *BackupReconciler) checkIsCompletedDuringRunning(reqCtx intctrlutil.Requ
}
patch := client.MergeFrom(backup.DeepCopy())
backup.Status.Phase = dpv1alpha1.BackupPhaseCompleted

backup.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now().UTC()}
_ = dpbackup.SetExpirationByCreationTime(backup)
if !backup.Status.StartTimestamp.IsZero() {
// round the duration to a multiple of seconds.
duration := backup.Status.CompletionTimestamp.Sub(backup.Status.StartTimestamp.Time).Round(time.Second)
backup.Status.Duration = &metav1.Duration{Duration: duration}
}

for i, act := range backup.Status.Actions {
act.Phase = dpv1alpha1.ActionPhaseCompleted
act.AvailableReplicas = pointer.Int32(int32(0))
act.CompletionTimestamp = backup.Status.CompletionTimestamp
backup.Status.Actions[i] = act
}
return true, r.Client.Status().Patch(reqCtx.Ctx, backup, patch)
}

Expand Down Expand Up @@ -694,15 +703,6 @@ func (r *BackupReconciler) updateStatusIfFailed(
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}

// deleteExternalJobs deletes the external jobs.
func (r *BackupReconciler) deleteExternalJobs(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
labels := dpbackup.BuildBackupWorkloadLabels(backup)
if err := deleteRelatedJobs(reqCtx, r.Client, backup.Namespace, labels); err != nil {
return err
}
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)
}

func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
backup *dpv1alpha1.Backup) error {
deleter := &dpbackup.Deleter{
Expand All @@ -712,34 +712,29 @@ func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
return deleter.DeleteVolumeSnapshots(backup)
}

// deleteExternalStatefulSet deletes the external statefulSet.
func (r *BackupReconciler) deleteExternalStatefulSet(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
key := client.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Name,
}
sts := &appsv1.StatefulSet{}
if err := r.Client.Get(reqCtx.Ctx, key, sts); err != nil {
return client.IgnoreNotFound(err)
// deleteExternalResources deletes the external workloads that execute backup.
// Currently, it only supports two types of workloads: job, statefulSet
func (r *BackupReconciler) deleteExternalResources(
reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
labels := dpbackup.BuildBackupWorkloadLabels(backup)

// use map to avoid duplicate deletion of the same namespace.
namespaces := map[string]bool{
backup.Namespace: true,
viper.GetString(constant.CfgKeyCtrlrMgrNS): true,
}

patch := client.MergeFrom(sts.DeepCopy())
controllerutil.RemoveFinalizer(sts, dptypes.DataProtectionFinalizerName)
if err := r.Client.Patch(reqCtx.Ctx, sts, patch); err != nil {
// delete the external jobs.
if err := deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels); err != nil {
return err
}
reqCtx.Log.V(1).Info("delete statefulSet", "statefulSet", sts)
return intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, sts)
}

// deleteExternalResources deletes the external workloads that execute backup.
// Currently, it only supports two types of workloads: job.
func (r *BackupReconciler) deleteExternalResources(
reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
if err := r.deleteExternalJobs(reqCtx, backup); err != nil {
// delete the external statefulSets.
if err := deleteRelatedObjectList(reqCtx, r.Client, &appsv1.StatefulSetList{}, namespaces, labels); err != nil {
return err
}
return r.deleteExternalStatefulSet(reqCtx, backup)

return nil
}

// PatchBackupObjectMeta patches backup object metaObject include cluster snapshot.
Expand Down
10 changes: 7 additions & 3 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ func (r *RestoreReconciler) parseRestoreJob(ctx context.Context, object client.O

func (r *RestoreReconciler) deleteExternalResources(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) error {
labels := map[string]string{dprestore.DataProtectionRestoreLabelKey: restore.Name}
if err := deleteRelatedJobs(reqCtx, r.Client, restore.Namespace, labels); err != nil {
return err

// use map to avoid duplicate deletion of the same namespace.
namespaces := map[string]bool{
restore.Namespace: true,
viper.GetString(constant.CfgKeyCtrlrMgrNS): true,
}
return deleteRelatedJobs(reqCtx, r.Client, viper.GetString(constant.CfgKeyCtrlrMgrNS), labels)

return deleteRelatedObjectList(reqCtx, r.Client, &batchv1.JobList{}, namespaces, labels)
}

func CheckBackupRepoForRestore(reqCtx intctrlutil.RequestCtx, cli client.Client, restore *dpv1alpha1.Restore) (string, error) {
Expand Down
47 changes: 35 additions & 12 deletions controllers/dataprotection/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"fmt"
appsv1 "k8s.io/api/apps/v1"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -270,22 +271,44 @@ func getDefaultBackupRepo(ctx context.Context, cli client.Client) (*dpv1alpha1.B
return defaultRepo, nil
}

func deleteRelatedJobs(reqCtx intctrlutil.RequestCtx, cli client.Client, namespace string, labels map[string]string) error {
if labels == nil || namespace == "" {
type objectList interface {
*appsv1.StatefulSetList | *batchv1.JobList
client.ObjectList
}

func deleteRelatedObjectList[T objectList](reqCtx intctrlutil.RequestCtx, cli client.Client, list T, namespaces map[string]bool, labels map[string]string) error {
if labels == nil || len(namespaces) == 0 {
return nil
}
jobs := &batchv1.JobList{}
if err := cli.List(reqCtx.Ctx, jobs,
client.MatchingLabels(labels)); err != nil {
return client.IgnoreNotFound(err)

items := reflect.ValueOf(list).Elem().FieldByName("Items")
if !items.IsValid() {
return fmt.Errorf("ObjectList has no Items field: %s", list.GetObjectKind().GroupVersionKind().String())
}
for i := range jobs.Items {
job := &jobs.Items[i]
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, job); err != nil {
return err

toBeDeleted := reflect.MakeSlice(items.Type(), 0, 0)
for ns := range namespaces {
if err := cli.List(reqCtx.Ctx, list, client.InNamespace(ns),
client.MatchingLabels(labels)); err != nil {
return client.IgnoreNotFound(err)
}
if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, job); err != nil {
return err
objs := reflect.ValueOf(list).Elem().FieldByName("Items")
if !objs.IsZero() {
for i := 0; i < objs.Len(); i++ {
toBeDeleted = reflect.Append(toBeDeleted, objs.Index(i))
}
}
}

if !toBeDeleted.IsZero() {
for i := 0; i < toBeDeleted.Len(); i++ {
item := toBeDeleted.Index(i).Addr().Interface().(client.Object)
if err := dputils.RemoveDataProtectionFinalizer(reqCtx.Ctx, cli, item); err != nil {
return err
}
if err := intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, item); err != nil {
return err
}
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/crds/dataprotection.kubeblocks.io_actionsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ spec:
restore:
description: Specifies the restore action.
properties:
baseBackupRequired:
default: true
description: Determines if a base backup is required during restoration.
type: boolean
postReady:
description: Specifies the actions that should be executed after
the data has been prepared and is ready for restoration.
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataprotection/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (r *Request) buildBackupDataAction(targetPod *corev1.Pod, name string) (act
Name: name,
ObjectMeta: metav1.ObjectMeta{
Namespace: r.Namespace,
Name: r.Name,
Name: GenerateBackupStatefulSetName(r.Backup, r.Target.Name, BackupDataJobNamePrefix),
Labels: BuildBackupWorkloadLabels(r.Backup),
},
Replicas: pointer.Int32(int32(1)),
Expand Down
7 changes: 7 additions & 0 deletions pkg/dataprotection/backup/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ func GenerateBackupJobName(backup *dpv1alpha1.Backup, prefix string) string {
return name
}

func GenerateBackupStatefulSetName(backup *dpv1alpha1.Backup, targetName, prefix string) string {
name := strings.ReplaceAll(fmt.Sprintf("%s-%s-%s", prefix, targetName, backup.Name), "--", "-")
// statefulset name cannot exceed 52 characters for label name limit as the statefulset controller will
// add a 10-length suffix to the name to construct the label "controller-revision-hash": "<statefulset_name>-<hash>"
return strings.TrimSuffix(name[:min(len(name), 52)], "-")
}

func generateBaseCRNameByBackupSchedule(uniqueNameWithBackupSchedule, backupScheduleNS, method string) string {
name := fmt.Sprintf("%s-%s", uniqueNameWithBackupSchedule, backupScheduleNS)
if len(name) > 30 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/dataprotection/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (r *RestoreManager) BuildContinuousRestoreManager(reqCtx intctrlutil.Reques
if err := checkRestoreTime(); err != nil {
return err
}

if baseBackupRequired := continuousBackupSet.ActionSet.Spec.Restore.BaseBackupRequired; baseBackupRequired != nil && !*baseBackupRequired {
r.SetBackupSets(continuousBackupSet)
return nil
}

fullBackupSet, err := r.getFullBackupActionSetForContinuous(reqCtx, cli, continuousBackup, metav1.NewTime(restoreTime))
if err != nil || fullBackupSet == nil {
return err
Expand Down

0 comments on commit 8cf336b

Please sign in to comment.