From 517b38ee4dd30ba83375e53f6edbedb690bf572b Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Tue, 11 Jul 2023 19:45:06 +0200 Subject: [PATCH] src: Use jobs not plain pods for auxiliary tasks Job should have security context set properly automatically. Signed-off-by: Matej Vasek --- pkg/k8s/dialer.go | 96 +++++++++++++++++++++-------------- pkg/k8s/persistent_volumes.go | 81 +++++++++++++++++------------ 2 files changed, 107 insertions(+), 70 deletions(-) diff --git a/pkg/k8s/dialer.go b/pkg/k8s/dialer.go index d45c7386ed..694f4edfa9 100644 --- a/pkg/k8s/dialer.go +++ b/pkg/k8s/dialer.go @@ -16,13 +16,14 @@ import ( "syscall" "time" + batchV1 "k8s.io/api/batch/v1" coreV1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -64,7 +65,9 @@ func NewInClusterDialer(ctx context.Context, clientConfig clientcmd.ClientConfig type contextDialer struct { coreV1 v1.CoreV1Interface clientConfig clientcmd.ClientConfig + batchV1 batchv1.BatchV1Interface restConf *restclient.Config + jobName string podName string namespace string detachChan chan struct{} @@ -185,9 +188,13 @@ func (c *contextDialer) Close() error { close(c.detachChan) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1) defer cancel() - delOpts := metaV1.DeleteOptions{} - return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts) + pp := metaV1.DeletePropagationForeground + delOpts := metaV1.DeleteOptions{ + PropagationPolicy: &pp, + } + + return c.batchV1.Jobs(c.namespace).Delete(ctx, c.jobName, delOpts) } func (c *contextDialer) startDialerPod(ctx context.Context) (err error) { @@ -206,16 +213,16 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) { if err != nil { return } + c.coreV1 = client.CoreV1() + c.batchV1 = client.BatchV1() c.namespace, _, err = c.clientConfig.Namespace() if err != nil { return } - pods := client.CoreV1().Pods(c.namespace) - - c.podName = "in-cluster-dialer-" + rand.String(5) + jobs := client.BatchV1().Jobs(c.namespace) defer func() { if err != nil { @@ -223,39 +230,49 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) { } }() - pod := &coreV1.Pod{ + c.jobName = "in-cluster-dialer-" + rand.String(5) + + job := &batchV1.Job{ ObjectMeta: metaV1.ObjectMeta{ - Name: c.podName, - Labels: nil, - Annotations: nil, + Name: c.jobName, }, - Spec: coreV1.PodSpec{ - SecurityContext: defaultPodSecurityContext(), - Containers: []coreV1.Container{ - { - Name: c.podName, - Image: SocatImage, - Stdin: true, - StdinOnce: true, - Command: []string{"socat", "-u", "-", "OPEN:/dev/null"}, - SecurityContext: defaultSecurityContext(client), + Spec: batchV1.JobSpec{ + Template: coreV1.PodTemplateSpec{ + Spec: coreV1.PodSpec{ + Containers: []coreV1.Container{ + { + Name: "container", + Image: SocatImage, + Stdin: true, + StdinOnce: true, + Command: []string{"socat", "-u", "-", "OPEN:/dev/null"}, + }, + }, + DNSPolicy: coreV1.DNSClusterFirst, + RestartPolicy: coreV1.RestartPolicyNever, }, }, - DNSPolicy: coreV1.DNSClusterFirst, - RestartPolicy: coreV1.RestartPolicyNever, }, } + creatOpts := metaV1.CreateOptions{} - ready := podReady(ctx, c.coreV1, c.podName, c.namespace) + podChan, err := podReady(ctx, c.coreV1, c.jobName, c.namespace) + if err != nil { + return fmt.Errorf("cannot setup pod watch: %w", err) + } - _, err = pods.Create(ctx, pod, creatOpts) + _, err = jobs.Create(ctx, job, creatOpts) if err != nil { return } select { - case err = <-ready: + case poe := <-podChan: + if poe.err != nil { + return poe.err + } + c.podName = poe.pod.Name case <-ctx.Done(): err = ctx.Err() case <-time.After(time.Minute * 1): @@ -293,7 +310,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write SubResource("exec") req.VersionedParams(&coreV1.PodExecOptions{ Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)}, - Container: c.podName, + Container: "container", Stdin: true, Stdout: true, Stderr: true, @@ -320,7 +337,7 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam Namespace(namespace). SubResource("attach") req.VersionedParams(&coreV1.PodAttachOptions{ - Container: podName, + Container: "container", Stdin: true, Stdout: true, Stderr: true, @@ -340,26 +357,30 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam }) } -func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) { - d := make(chan error) - errChan = d +type podOrError struct { + pod *coreV1.Pod + err error +} + +func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace string) (result <-chan podOrError, err error) { + outChan := make(chan podOrError, 1) + result = outChan pods := core.Pods(namespace) - nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String() listOpts := metaV1.ListOptions{ Watch: true, - FieldSelector: nameSelector, + LabelSelector: "job-name=" + jobName, } watcher, err := pods.Watch(ctx, listOpts) if err != nil { - return + return nil, err } go func() { defer watcher.Stop() - ch := watcher.ResultChan() - for event := range ch { + watchChan := watcher.ResultChan() + for event := range watchChan { pod, ok := event.Object.(*coreV1.Pod) if !ok { continue @@ -368,7 +389,7 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s if event.Type == watch.Modified { for _, status := range pod.Status.ContainerStatuses { if status.Ready { - d <- nil + outChan <- podOrError{pod: pod} return } if status.State.Waiting != nil { @@ -379,9 +400,10 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s "InvalidImageName", "CrashLoopBackOff", "ImagePullBackOff": - d <- fmt.Errorf("reason: %v, message: %v", + e := fmt.Errorf("reason: %v, message: %v", status.State.Waiting.Reason, status.State.Waiting.Message) + outChan <- podOrError{err: e} return default: continue diff --git a/pkg/k8s/persistent_volumes.go b/pkg/k8s/persistent_volumes.go index 92bdccd393..a826298e3f 100644 --- a/pkg/k8s/persistent_volumes.go +++ b/pkg/k8s/persistent_volumes.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + batchV1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -101,65 +102,79 @@ func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []str return fmt.Errorf("cannot get namespace: %w", err) } - podName := "volume-uploader-" + rand.String(5) + jobName := "volume-uploader-" + rand.String(5) pods := client.CoreV1().Pods(namespace) + jobs := client.BatchV1().Jobs(namespace) defer func() { - _ = pods.Delete(ctx, podName, metav1.DeleteOptions{}) + pp := metav1.DeletePropagationForeground + delOpts := metav1.DeleteOptions{ + PropagationPolicy: &pp, + } + _ = jobs.Delete(ctx, jobName, delOpts) }() const volumeMntPoint = "/tmp/volume_mnt" const pVol = "p-vol" - pod := &corev1.Pod{ + job := &batchV1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Labels: nil, - Annotations: nil, + Name: jobName, }, - Spec: corev1.PodSpec{ - SecurityContext: defaultPodSecurityContext(), - Containers: []corev1.Container{ - { - Name: podName, - Image: podImage, - Stdin: true, - StdinOnce: true, - WorkingDir: volumeMntPoint, - Command: podCommand, - VolumeMounts: []corev1.VolumeMount{ + Spec: batchV1.JobSpec{ + + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ { - Name: pVol, - MountPath: volumeMntPoint, + Name: "container", + Image: podImage, + Stdin: true, + StdinOnce: true, + WorkingDir: volumeMntPoint, + Command: podCommand, + VolumeMounts: []corev1.VolumeMount{ + { + Name: pVol, + MountPath: volumeMntPoint, + }, + }, }, }, - SecurityContext: defaultSecurityContext(client), + Volumes: []corev1.Volume{{ + Name: pVol, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: claimName, + }, + }, + }}, + DNSPolicy: corev1.DNSClusterFirst, + RestartPolicy: corev1.RestartPolicyNever, }, }, - Volumes: []corev1.Volume{{ - Name: pVol, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: claimName, - }, - }, - }}, - DNSPolicy: corev1.DNSClusterFirst, - RestartPolicy: corev1.RestartPolicyNever, }, } localCtx, cancel := context.WithCancel(ctx) defer cancel() - ready := podReady(localCtx, client.CoreV1(), podName, namespace) + podChan, err := podReady(localCtx, client.CoreV1(), jobName, namespace) + if err != nil { + return fmt.Errorf("cannot setup pod watch: %w", err) + } - _, err = pods.Create(ctx, pod, metav1.CreateOptions{}) + _, err = jobs.Create(ctx, job, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("cannot create pod: %w", err) } + var podName string select { - case err = <-ready: + case poe := <-podChan: + if poe.err != nil { + return poe.err + } + podName = poe.pod.Name case <-ctx.Done(): err = ctx.Err() case <-time.After(time.Minute * 5):