From 32d48b26714fc3a173de9c4d7ee623ec309a6250 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Fri, 6 Sep 2024 17:16:24 -0400 Subject: [PATCH] feat: Introduce new Dump function to run pods with two containers In order to separate data dump generation by database tools and export by `kando`, a pod performing database dump can have two separate containers and set up the pipe over file instead of anonymous pipe --- docs/functions.rst | 44 ++- docs_new/functions.md | 64 ++++ pkg/function/dump.go | 347 ++++++++++++++++++ pkg/function/dump_test.go | 132 +++++++ pkg/kube/pod.go | 6 +- pkg/kube/pod_test.go | 2 +- .../notes/dump-function-d488516c0f3b22c6.yaml | 2 + 7 files changed, 583 insertions(+), 14 deletions(-) create mode 100644 pkg/function/dump.go create mode 100644 pkg/function/dump_test.go create mode 100644 releasenotes/notes/dump-function-d488516c0f3b22c6.yaml diff --git a/docs/functions.rst b/docs/functions.rst index 42f92e4a02..b56688b226 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -108,11 +108,15 @@ Example: - | echo "Example" -KubeTask +Dump -------- -KubeTask spins up a new container and executes a command via a Pod. -This allows you to run a new Pod from a Blueprint. +Dump spins up a new pod with teo containers connected via shared emptyDir volume. +It's similar to KubeTask, but allows using multiple images to move backup data. +"dump" container is one responsible for generating data, while "export" container +should export it to destination. +The main difference between them is that phase outputs can only generated from the +"export" container outputs. .. csv-table:: :header: "Argument", "Required", "Type", "Description" @@ -120,11 +124,16 @@ This allows you to run a new Pod from a Blueprint. :widths: 5,5,5,15 `namespace`, No, `string`, namespace in which to execute (the pod will be created in controller's namespace if not specified) - `image`, Yes, `string`, image to be used for executing the task - `command`, Yes, `[]string`, command list to execute + `dumpImage`, Yes, `string`, image to be used in "dump" container + `dumpCommand`, Yes, `[]string`, command list to execute in "dump" container + `exportImage`, Yes, `string`, image to be used in "export" container + `exportCommand`, Yes, `[]string`, command list to execute in "export" container `podOverride`, No, `map[string]interface{}`, specs to override default pod specs with `podAnnotations`, No, `map[string]string`, custom annotations for the temporary pod that gets created `podLabels`, No, `map[string]string`, custom labels for the temporary pod that gets created + `sharedStorageMedium`, No, `string`, medium setting for shared volume, see https://kubernetes.io/docs/concepts/storage/volumes/#emptydir + `sharedStorageSize`, No, `string`, sizeLimit setting for shared volume + `sharedStorageDir`, No, `string`, directory to mount shared volume, defaults to `/tmp` Example: @@ -135,20 +144,35 @@ Example: name: examplePhase args: namespace: "{{ .Deployment.Namespace }}" - image: busybox podOverride: containers: - - name: container + - name: export imagePullPolicy: IfNotPresent podAnnotations: annKey: annValue podLabels: labelKey: labelValue - command: - - sh + sharedStorageMedium: Memory + sharedStorageSize: 1Gi + sharedStorageDir: /tmp/ + dumpImage: ubuntu + dumpCommand: + - bash - -c - | - echo "Example" + mkfifo /tmp/pipe-file + for i in {1..10} + do + echo $i + sleep 0.1 + done > /tmp/pipe-file + exportImage: ubuntu + exportCommand: + - bash + - -c + - | + while [ ! -e /tmp/pipe-file ]; do sleep 1; done + cat /tmp/pipe-file ScaleWorkload ------------- diff --git a/docs_new/functions.md b/docs_new/functions.md index 07016e3b66..c0be808a58 100644 --- a/docs_new/functions.md +++ b/docs_new/functions.md @@ -129,6 +129,70 @@ Example: echo "Example" ``` +### Dump + +Dump spins up a new pod with teo containers connected via shared emptyDir volume. +It's similar to KubeTask, but allows using multiple images to move backup data. +"dump" container is one responsible for generating data, while "export" container +should export it to destination. +The main difference between them is that phase outputs can only generated from the +"export" container outputs. + + + | Argument | Required | Type | Description | + | ----------- | :------: | ----------------------- | ----------- | + | namespace | No | string | namespace in which to execute (the pod will be created in controller's namespace if not specified) | + | dumpImage | Yes | string | image to be used in "dump" container | + | dumpCommand | Yes | []string | command list to execute in "dump" container | + | exportImage | Yes | string | image to be used in "export" container | + | exportCommand | Yes | []string | command list to execute in "export" container | + | podOverride | No | map[string]interface{} | specs to override default pod specs with | + | podAnnotations | No | map[string]string | custom annotations for the temporary pod that gets created | + | podLabels | No | map[string]string | custom labels for the temporary pod that gets created | + | sharedStorageMedium | No | string | medium setting for shared volume, see https://kubernetes.io/docs/concepts/storage/volumes/#emptydir | + | sharedStorageSize | No | string | sizeLimit setting for shared volume | + | sharedStorageDir | No | string | directory to mount shared volume, defaults to `/tmp` | + + +Example: + +``` yaml +- func: KubeTask + name: examplePhase + args: + namespace: "{{ .Deployment.Namespace }}" + podOverride: + containers: + - name: export + imagePullPolicy: IfNotPresent + podAnnotations: + annKey: annValue + podLabels: + labelKey: labelValue + sharedStorageMedium: Memory + sharedStorageSize: 1Gi + sharedStorageDir: /tmp/ + dumpImage: ubuntu + dumpCommand: + - bash + - -c + - | + mkfifo /tmp/pipe-file + for i in {1..10} + do + echo $i + sleep 0.1 + done > /tmp/pipe-file + exportImage: ubuntu + exportCommand: + - bash + - -c + - | + while [ ! -e /tmp/pipe-file ]; do sleep 1; done + cat /tmp/pipe-file +``` + + ### ScaleWorkload ScaleWorkload is used to scale up or scale down a Kubernetes workload. diff --git a/pkg/function/dump.go b/pkg/function/dump.go new file mode 100644 index 0000000000..14700694ef --- /dev/null +++ b/pkg/function/dump.go @@ -0,0 +1,347 @@ +// Copyright 2019 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package function + +import ( + "context" + "sort" + "time" + + "github.com/kanisterio/errkit" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + kanister "github.com/kanisterio/kanister/pkg" + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/field" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/log" + "github.com/kanisterio/kanister/pkg/output" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/progress" + "github.com/kanisterio/kanister/pkg/utils" +) + +const ( + dumpPodPrefix = "kanister-job-" + + // DumpFuncName gives the function name + DumpFuncName = "Dump" + DumpNamespaceArg = "namespace" + DumpImageArg = "dumpImage" + DumpCommandArg = "dumpCommand" + DumpExportImageArg = "exportImage" + DumpExportCommandArg = "exportCommand" + DumpStorageMediumArg = "sharedStorageMedium" + DumpStorageSizeArg = "sharedStorageSize" + DumpStorageDirArg = "sharedStorageDir" + DumpPodOverrideArg = "podOverride" +) + +const ( + defaultContainerAnn = "kubectl.kubernetes.io/default-container" + containerExport = "export" + containerDump = "dump" + sharedVolumeName = "shared" + defaultSharedDir = "/tmp/" +) + +func init() { + _ = kanister.Register(&dumpFunc{}) +} + +var _ kanister.Func = (*dumpFunc)(nil) + +type dumpFunc struct { + progressPercent string +} + +func (*dumpFunc) Name() string { + return DumpFuncName +} + +func dump( + ctx context.Context, + cli kubernetes.Interface, + namespace, + dumpImage string, + dumpCommand []string, + exportImage string, + exportCommand []string, + storageDir string, + storageMedium v1.StorageMedium, + storageSize *resource.Quantity, + podOverride crv1alpha1.JSONMap, + labels, + annotations map[string]string, +) (map[string]interface{}, error) { + podSpec := v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Volumes: []v1.Volume{ + { + Name: sharedVolumeName, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{ + Medium: storageMedium, + SizeLimit: storageSize, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: containerExport, + Image: exportImage, + Command: exportCommand, + VolumeMounts: []v1.VolumeMount{ + { + Name: sharedVolumeName, + MountPath: storageDir, + }, + }, + }, + { + Name: containerDump, + Image: dumpImage, + Command: dumpCommand, + VolumeMounts: []v1.VolumeMount{ + { + Name: sharedVolumeName, + MountPath: storageDir, + }, + }, + }, + }, + } + + log.Info().Print("Pod override: ", field.M{"override": podOverride}) + podSpec, err := kube.PatchDefaultPodSpecs(podSpec, podOverride) + if err != nil { + return nil, errkit.Wrap(err, "Unable to apply podOverride") + } + + // Put the export container the first + sort.Slice(podSpec.Containers, func(i, j int) bool { + return podSpec.Containers[i].Name == containerExport + }) + + if labels == nil { + labels = make(map[string]string) + } + labels[consts.LabelKeyCreatedBy] = consts.LabelValueKanister + + if annotations == nil { + annotations = make(map[string]string) + } + // FIXME: this doesn't work with pod controller, make sure PC respect this?? + annotations[defaultContainerAnn] = containerExport + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: dumpPodPrefix, + Namespace: namespace, + Labels: labels, + Annotations: annotations, + }, + Spec: podSpec, + } + + log.Info().Print("POD: ", field.M{"pod": pod}) + + pod, err = cli.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return nil, errkit.Wrap(err, "Failed to create pod") + } + pc, err := kube.NewPodControllerForExistingPod(cli, pod) + if err != nil { + return nil, err + } + + ctx = field.Context(ctx, consts.PodNameKey, pod.Name) + ctx = field.Context(ctx, consts.ContainerNameKey, pod.Spec.Containers[0].Name) + go func() { + <-ctx.Done() + err := pc.StopPod(context.Background(), kube.PodControllerInfiniteStopTime, int64(0)) + if err != nil { + log.WithError(err).Print("Failed to delete pod", field.M{"PodName": pod.Name}) + } + }() + + podFunc := dumpPodFunc() + + return podFunc(ctx, pc) +} + +// This function is identical to kubeTaskPodFunc +// nolint:dupl +func dumpPodFunc() func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + return func(ctx context.Context, pc kube.PodController) (map[string]interface{}, error) { + if err := pc.WaitForPodReady(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pc.PodName()) + } + ctx = field.Context(ctx, consts.LogKindKey, consts.LogKindDatapath) + // Fetch logs from the pod + r, err := pc.StreamPodLogs(ctx) + if err != nil { + return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") + } + out, err := output.LogAndParse(ctx, r) + if err != nil { + return nil, err + } + // Wait for pod completion + if err := pc.WaitForPodCompletion(ctx); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pc.PodName()) + } + return out, err + } +} + +func (ktf *dumpFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { + // Set progress percent + ktf.progressPercent = progress.StartedPercent + defer func() { ktf.progressPercent = progress.CompletedPercent }() + + var namespace, dumpImage, exportImage, storageDir string + var storageMedium v1.StorageMedium + var storageSize *resource.Quantity + var storageSizeString string + var dumpCommand, exportCommand []string + var bpAnnotations, bpLabels map[string]string + var err error + if err = Arg(args, DumpImageArg, &dumpImage); err != nil { + return nil, err + } + if err = Arg(args, DumpExportImageArg, &exportImage); err != nil { + return nil, err + } + if err = Arg(args, DumpCommandArg, &dumpCommand); err != nil { + return nil, err + } + if err = Arg(args, DumpExportCommandArg, &exportCommand); err != nil { + return nil, err + } + if err = OptArg(args, DumpNamespaceArg, &namespace, ""); err != nil { + return nil, err + } + if err = OptArg(args, DumpStorageMediumArg, &storageMedium, ""); err != nil { + return nil, err + } + if err = OptArg(args, DumpStorageSizeArg, &storageSizeString, ""); err != nil { + return nil, err + } + if storageSizeString != "" { + size, err := resource.ParseQuantity(storageSizeString) + if err != nil { + return nil, errors.Wrapf(err, "Failed to parse sharedStorageSize arg") + } + storageSize = &size + } + if err = OptArg(args, DumpStorageDirArg, &storageDir, defaultSharedDir); err != nil { + return nil, err + } + if err = OptArg(args, PodAnnotationsArg, &bpAnnotations, nil); err != nil { + return nil, err + } + if err = OptArg(args, PodLabelsArg, &bpLabels, nil); err != nil { + return nil, err + } + + podOverride, err := GetPodSpecOverride(tp, args, DumpPodOverrideArg) + if err != nil { + return nil, err + } + + if tp.PodAnnotations != nil { + // merge the actionset annotations with blueprint annotations + var actionSetAnn ActionSetAnnotations = tp.PodAnnotations + bpAnnotations = actionSetAnn.MergeBPAnnotations(bpAnnotations) + } + + if tp.PodLabels != nil { + // merge the actionset labels with blueprint labels + var actionSetLabels ActionSetLabels = tp.PodLabels + bpLabels = actionSetLabels.MergeBPLabels(bpLabels) + } + + cli, err := kube.NewClient() + if err != nil { + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") + } + return dump( + ctx, + cli, + namespace, + dumpImage, + dumpCommand, + exportImage, + exportCommand, + storageDir, + storageMedium, + storageSize, + podOverride, + bpLabels, + bpAnnotations, + ) +} + +func (*dumpFunc) RequiredArgs() []string { + return []string{ + DumpImageArg, + DumpCommandArg, + DumpExportImageArg, + DumpExportCommandArg, + } +} + +func (*dumpFunc) Arguments() []string { + return []string{ + DumpNamespaceArg, + DumpImageArg, + DumpCommandArg, + DumpExportImageArg, + DumpExportCommandArg, + DumpStorageMediumArg, + DumpStorageSizeArg, + DumpStorageDirArg, + DumpPodOverrideArg, + PodLabelsArg, + PodAnnotationsArg, + } +} + +func (ktf *dumpFunc) Validate(args map[string]any) error { + if err := ValidatePodLabelsAndAnnotations(ktf.Name(), args); err != nil { + return err + } + + if err := utils.CheckSupportedArgs(ktf.Arguments(), args); err != nil { + return err + } + + return utils.CheckRequiredArgs(ktf.RequiredArgs(), args) +} + +func (k *dumpFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) { + metav1Time := metav1.NewTime(time.Now()) + return crv1alpha1.PhaseProgress{ + ProgressPercent: k.progressPercent, + LastTransitionTime: &metav1Time, + }, nil +} diff --git a/pkg/function/dump_test.go b/pkg/function/dump_test.go new file mode 100644 index 0000000000..06e94a4c5b --- /dev/null +++ b/pkg/function/dump_test.go @@ -0,0 +1,132 @@ +// Copyright 2019 The Kanister Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package function + +import ( + "context" + "os" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + kanister "github.com/kanisterio/kanister/pkg" + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/consts" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/param" + + . "gopkg.in/check.v1" +) + +var _ = Suite(&DumpSuite{}) + +type DumpSuite struct { + cli kubernetes.Interface + namespace string +} + +func (s *DumpSuite) SetUpSuite(c *C) { + cli, err := kube.NewClient() + c.Assert(err, IsNil) + s.cli = cli + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "kanisterdumptest-", + }, + } + cns, err := s.cli.CoreV1().Namespaces().Create(context.TODO(), ns, metav1.CreateOptions{}) + c.Assert(err, IsNil) + s.namespace = cns.Name + err = os.Setenv("POD_NAMESPACE", cns.Name) + c.Assert(err, IsNil) + err = os.Setenv("POD_SERVICE_ACCOUNT", "default") + c.Assert(err, IsNil) +} + +func (s *DumpSuite) TearDownSuite(c *C) { + if s.namespace != "" { + _ = s.cli.CoreV1().Namespaces().Delete(context.TODO(), s.namespace, metav1.DeleteOptions{}) + } +} + +func dumpPhase(namespace string) crv1alpha1.BlueprintPhase { + return crv1alpha1.BlueprintPhase{ + Name: "testDump", + Func: DumpFuncName, + Args: map[string]interface{}{ + DumpNamespaceArg: namespace, + DumpImageArg: consts.LatestKanisterToolsImage, + DumpCommandArg: []string{ + "sh", + "-c", + "echo foo > /tmp/file", + }, + DumpExportImageArg: consts.LatestKanisterToolsImage, + DumpExportCommandArg: []string{ + "sh", + "-c", + "while [ ! -e /tmp/file ]; do sleep 1; done; kando output value $(cat /tmp/file)", + }, + }, + } +} + +func (s *DumpSuite) TestDump(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + tp := param.TemplateParams{ + StatefulSet: ¶m.StatefulSetParams{ + Namespace: s.namespace, + }, + PodOverride: crv1alpha1.JSONMap{ + "containers": []map[string]interface{}{ + { + "name": "dump", + "imagePullPolicy": "Always", + }, + { + "name": "export", + "imagePullPolicy": "Always", + }, + }, + }, + } + action := "test" + for _, tc := range []struct { + bp *crv1alpha1.Blueprint + outs []map[string]interface{} + }{ + { + bp: newTaskBlueprint(dumpPhase(s.namespace)), + outs: []map[string]interface{}{ + { + "value": "foo", + }, + }, + }, + } { + phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp) + c.Assert(err, IsNil) + c.Assert(phases, HasLen, len(tc.outs)) + for i, p := range phases { + out, err := p.Exec(ctx, *tc.bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + c.Assert(out, DeepEquals, tc.outs[i]) + } + } +} diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index 339189110a..5819d02a00 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -172,7 +172,7 @@ func GetPodObjectFromPodOptions(ctx context.Context, cli kubernetes.Interface, o } // Patch default Pod Specs if needed - patchedSpecs, err := patchDefaultPodSpecs(defaultSpecs, opts.PodOverride) + patchedSpecs, err := PatchDefaultPodSpecs(defaultSpecs, opts.PodOverride) if err != nil { return nil, errkit.Wrap(err, "Failed to create pod. Failed to override pod specs.", "namespace", opts.Namespace, "nameFmt", opts.GenerateName) } @@ -515,8 +515,8 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa return errkit.Wrap(err, errorMessage) } -// use Strategic Merge to patch default pod specs with the passed specs -func patchDefaultPodSpecs(defaultPodSpecs corev1.PodSpec, override crv1alpha1.JSONMap) (corev1.PodSpec, error) { +// PatchDefaultPodSpecs paches default pod specs with the passed override using Strategic Merge. +func PatchDefaultPodSpecs(defaultPodSpecs corev1.PodSpec, override crv1alpha1.JSONMap) (corev1.PodSpec, error) { // Merge default specs and override specs with StrategicMergePatch mergedPatch, err := strategicMergeJSONPatch(defaultPodSpecs, override) if err != nil { diff --git a/pkg/kube/pod_test.go b/pkg/kube/pod_test.go index 3033f03a20..2426a88ea8 100644 --- a/pkg/kube/pod_test.go +++ b/pkg/kube/pod_test.go @@ -845,7 +845,7 @@ func (s *PodSuite) TestPatchDefaultPodSpecs(c *C) { for _, test := range tests { override, err := CreateAndMergeJSONPatch(test.BlueprintPodSpecs, test.ActionsetPodSpecs) c.Assert(err, IsNil) - podSpec, err := patchDefaultPodSpecs(defaultSpecs, override) + podSpec, err := PatchDefaultPodSpecs(defaultSpecs, override) c.Assert(err, IsNil) c.Assert(podSpec, DeepEquals, test.Expected) } diff --git a/releasenotes/notes/dump-function-d488516c0f3b22c6.yaml b/releasenotes/notes/dump-function-d488516c0f3b22c6.yaml new file mode 100644 index 0000000000..c130eef2c8 --- /dev/null +++ b/releasenotes/notes/dump-function-d488516c0f3b22c6.yaml @@ -0,0 +1,2 @@ +--- +features: Introduce new Dump function to run pods with two containers connected by shared volume