Skip to content

Commit

Permalink
src: Use jobs not plain pods for auxiliary tasks
Browse files Browse the repository at this point in the history
Job should have security context set properly automatically.

Signed-off-by: Matej Vasek <[email protected]>
  • Loading branch information
matejvasek committed Jul 11, 2023
1 parent 79c36ee commit 517b38e
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 70 deletions.
96 changes: 59 additions & 37 deletions pkg/k8s/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
"syscall"
"time"

batchV1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -64,7 +65,9 @@ func NewInClusterDialer(ctx context.Context, clientConfig clientcmd.ClientConfig
type contextDialer struct {
coreV1 v1.CoreV1Interface
clientConfig clientcmd.ClientConfig
batchV1 batchv1.BatchV1Interface
restConf *restclient.Config
jobName string
podName string
namespace string
detachChan chan struct{}
Expand Down Expand Up @@ -185,9 +188,13 @@ func (c *contextDialer) Close() error {
close(c.detachChan)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}

return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts)
pp := metaV1.DeletePropagationForeground
delOpts := metaV1.DeleteOptions{
PropagationPolicy: &pp,
}

return c.batchV1.Jobs(c.namespace).Delete(ctx, c.jobName, delOpts)
}

func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Expand All @@ -206,56 +213,66 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
if err != nil {
return
}

c.coreV1 = client.CoreV1()
c.batchV1 = client.BatchV1()

c.namespace, _, err = c.clientConfig.Namespace()
if err != nil {
return
}

pods := client.CoreV1().Pods(c.namespace)

c.podName = "in-cluster-dialer-" + rand.String(5)
jobs := client.BatchV1().Jobs(c.namespace)

defer func() {
if err != nil {
c.Close()
}
}()

pod := &coreV1.Pod{
c.jobName = "in-cluster-dialer-" + rand.String(5)

job := &batchV1.Job{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Labels: nil,
Annotations: nil,
Name: c.jobName,
},
Spec: coreV1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []coreV1.Container{
{
Name: c.podName,
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(client),
Spec: batchV1.JobSpec{
Template: coreV1.PodTemplateSpec{
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: "container",
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
}

creatOpts := metaV1.CreateOptions{}

ready := podReady(ctx, c.coreV1, c.podName, c.namespace)
podChan, err := podReady(ctx, c.coreV1, c.jobName, c.namespace)
if err != nil {
return fmt.Errorf("cannot setup pod watch: %w", err)
}

_, err = pods.Create(ctx, pod, creatOpts)
_, err = jobs.Create(ctx, job, creatOpts)
if err != nil {
return
}

select {
case err = <-ready:
case poe := <-podChan:
if poe.err != nil {
return poe.err
}
c.podName = poe.pod.Name
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 1):
Expand Down Expand Up @@ -293,7 +310,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: c.podName,
Container: "container",
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -320,7 +337,7 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: podName,
Container: "container",
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -340,26 +357,30 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
})
}

func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d
type podOrError struct {
pod *coreV1.Pod
err error
}

func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace string) (result <-chan podOrError, err error) {
outChan := make(chan podOrError, 1)
result = outChan

pods := core.Pods(namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
LabelSelector: "job-name=" + jobName,
}
watcher, err := pods.Watch(ctx, listOpts)
if err != nil {
return
return nil, err
}

go func() {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
watchChan := watcher.ResultChan()
for event := range watchChan {
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
Expand All @@ -368,7 +389,7 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s
if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
d <- nil
outChan <- podOrError{pod: pod}
return
}
if status.State.Waiting != nil {
Expand All @@ -379,9 +400,10 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s
"InvalidImageName",
"CrashLoopBackOff",
"ImagePullBackOff":
d <- fmt.Errorf("reason: %v, message: %v",
e := fmt.Errorf("reason: %v, message: %v",
status.State.Waiting.Reason,
status.State.Waiting.Message)
outChan <- podOrError{err: e}
return
default:
continue
Expand Down
81 changes: 48 additions & 33 deletions pkg/k8s/persistent_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

batchV1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -101,65 +102,79 @@ func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []str
return fmt.Errorf("cannot get namespace: %w", err)
}

podName := "volume-uploader-" + rand.String(5)
jobName := "volume-uploader-" + rand.String(5)

pods := client.CoreV1().Pods(namespace)
jobs := client.BatchV1().Jobs(namespace)

defer func() {
_ = pods.Delete(ctx, podName, metav1.DeleteOptions{})
pp := metav1.DeletePropagationForeground
delOpts := metav1.DeleteOptions{
PropagationPolicy: &pp,
}
_ = jobs.Delete(ctx, jobName, delOpts)
}()

const volumeMntPoint = "/tmp/volume_mnt"
const pVol = "p-vol"
pod := &corev1.Pod{
job := &batchV1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: nil,
Annotations: nil,
Name: jobName,
},
Spec: corev1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []corev1.Container{
{
Name: podName,
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
Spec: batchV1.JobSpec{

Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: pVol,
MountPath: volumeMntPoint,
Name: "container",
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: pVol,
MountPath: volumeMntPoint,
},
},
},
},
SecurityContext: defaultSecurityContext(client),
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
},
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
}

localCtx, cancel := context.WithCancel(ctx)
defer cancel()
ready := podReady(localCtx, client.CoreV1(), podName, namespace)
podChan, err := podReady(localCtx, client.CoreV1(), jobName, namespace)
if err != nil {
return fmt.Errorf("cannot setup pod watch: %w", err)
}

_, err = pods.Create(ctx, pod, metav1.CreateOptions{})
_, err = jobs.Create(ctx, job, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("cannot create pod: %w", err)
}

var podName string
select {
case err = <-ready:
case poe := <-podChan:
if poe.err != nil {
return poe.err
}
podName = poe.pod.Name
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 5):
Expand Down

0 comments on commit 517b38e

Please sign in to comment.