Skip to content

Commit

Permalink
initial design
Browse files Browse the repository at this point in the history
Signed-off-by: ntishchauhan0022 <[email protected]>
  • Loading branch information
nitishchauhan0022 committed Jun 25, 2023
1 parent ffcf189 commit 263a0de
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 8 deletions.
24 changes: 24 additions & 0 deletions pkg/daemon/podprobe/pod_probe_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -88,6 +90,8 @@ type Controller struct {
nodeName string
// kruise daemon start time
start time.Time
// runtime client
runtimeClient runtimeclient.Client
}

// NewController returns the controller for pod probe
Expand Down Expand Up @@ -126,6 +130,7 @@ func NewController(opts daemonoptions.Options) (*Controller, error) {
nodeName: nodeName,
eventRecorder: recorder,
start: time.Now(),
runtimeClient: opts.RuntimeClient,
}
c.prober = newProber(c.runtimeFactory.GetRuntimeService())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -407,6 +412,25 @@ func (c *Controller) fetchLatestPodContainer(podUID, name string) (*runtimeapi.C
return containerStatus, err
}

func (c *Controller) fetchPodIp(podUID, name, namespace string) (string, error) {

podList := &corev1.PodList{}
err := c.runtimeClient.List(context.TODO(), podList, runtimeclient.MatchingFields{"metadata.name": name, "metadata.namespace": namespace})

if err != nil {
klog.Errorf("Failed to fetch pod list in namespace %s: %v", namespace, err)
return "", err
}

for _, pod := range podList.Items {
if string(pod.UID) == podUID {
klog.Info("pod ip is ", pod.Status.PodIP)
return pod.Status.PodIP, nil
}
}
return "", fmt.Errorf("pod %s with Uid %s not found in namespace %s", name, podUID, namespace)
}

func updateNodePodProbeStatus(update Update, newStatus *appsv1alpha1.NodePodProbeStatus) {
var probeStatus *appsv1alpha1.PodProbeStatus
for i := range newStatus.PodProbeStatuses {
Expand Down
41 changes: 36 additions & 5 deletions pkg/daemon/podprobe/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -28,6 +30,8 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
"k8s.io/utils/exec"
)

Expand All @@ -36,6 +40,8 @@ const maxProbeMessageLength = 1024
// Prober helps to check the probe(exec, http, tcp) of a container.
type prober struct {
exec execprobe.Prober
tcp tcpprobe.Prober
http httpprobe.Prober
runtimeService criapi.RuntimeService
}

Expand All @@ -44,13 +50,15 @@ type prober struct {
func newProber(runtimeService criapi.RuntimeService) *prober {
return &prober{
exec: execprobe.New(),
tcp: tcpprobe.New(),
http: httpprobe.New(false),
runtimeService: runtimeService,
}
}

// probe probes the container.
func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) {
result, msg, err := pb.runProbe(p, container, containerID)
func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (appsv1alpha1.ProbeState, string, error) {
result, msg, err := pb.runProbe(p, container, containerID, hostIP)
if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength {
msg = msg[:maxProbeMessageLength]
}
Expand All @@ -60,17 +68,40 @@ func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeap
return appsv1alpha1.ProbeSucceeded, msg, nil
}

func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) {
func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (probe.Result, string, error) {
timeSecond := p.TimeoutSeconds
if timeSecond <= 0 {
timeSecond = 1
}
timeout := time.Duration(timeSecond) * time.Second
// current only support exec
// todo: http, tcp
// for probing using exec method
if p.Exec != nil {
return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout))
}

// for probing using tcp method
if p.TCPSocket.Port.IntVal != 0 {
if p.TCPSocket.Host != "" {
return pb.tcp.Probe(p.TCPSocket.Host, p.TCPSocket.Port.IntValue(), timeout)
} else {
return pb.tcp.Probe(hostIP, p.TCPSocket.Port.IntValue(), timeout)
}
}

// for probing using http method
if p.HTTPGet.Path != "" {
var u url.URL
var header http.Header
u.Scheme = "http"
u.Path = p.HTTPGet.Path
if p.HTTPGet.Host != "" {
u.Host = p.HTTPGet.Host
} else {
u.Host = hostIP
}
return pb.http.Probe(&u, header, timeout)
}

klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name)
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/daemon/podprobe/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

podIp, err := w.probeController.fetchPodIp(w.key.podUID, w.key.podName, w.key.podNs)
if err != nil {
klog.Errorf("Pod(%s/%s) fetchPodIP failed: %s", w.key.podNs, w.key.podName, err.Error())
return true
}

container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName)
if container == nil {
klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName)
Expand Down Expand Up @@ -152,7 +158,12 @@ func (w *worker) doProbe() (keepGoing bool) {

// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID)
fmt.Println("===========================calling pod probecontroller.prober.probe===============================")
result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID, podIp) //Main Line
fmt.Printf("\n===========================Result: %s===============================", result)
fmt.Printf("\n===========================Msg: %s===============================", msg)
fmt.Printf("\n===========================Error: %s===============================", err.Error())

if err != nil {
klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s",
w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ func validateHandler(handler *corev1.Handler, fldPath *field.Path) field.ErrorLi
numHandlers++
allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...)
}
if handler.HTTPGet != nil || handler.TCPSocket != nil {
if handler.HTTPGet != nil {
numHandlers++
allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current only support exec probe"))
allErrors = append(allErrors, validateHTTPGetAction(handler.HTTPGet, fldPath.Child("httpGet"))...)
}
if handler.TCPSocket != nil {
numHandlers++
allErrors = append(allErrors, validateTCPSocketAction(handler.TCPSocket, fldPath.Child("tcpSocket"))...)
}
if numHandlers == 0 {
allErrors = append(allErrors, field.Required(fldPath, "must specify a handler type"))
Expand All @@ -205,6 +209,25 @@ func validateExecAction(exec *corev1.ExecAction, fldPath *field.Path) field.Erro
return allErrors
}

func validateHTTPGetAction(httpGet *corev1.HTTPGetAction, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if httpGet.Port.IntValue() == 0 {
allErrors = append(allErrors, field.Required(fldPath.Child("port"), ""))
}
if httpGet.Path == "" {
allErrors = append(allErrors, field.Required(fldPath.Child("path"), ""))
}
return allErrors
}

func validateTCPSocketAction(tcpSocket *corev1.TCPSocketAction, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if tcpSocket.Port.IntValue() == 0 {
allErrors = append(allErrors, field.Required(fldPath.Child("port"), ""))
}
return allErrors
}

func validateProbeMarkerPolicy(policy *appsv1alpha1.ProbeMarkerPolicy, fldPath *field.Path) field.ErrorList {
allErrors := field.ErrorList{}
if policy.State != appsv1alpha1.ProbeSucceeded && policy.State != appsv1alpha1.ProbeFailed {
Expand Down
1 change: 1 addition & 0 deletions vendor/k8s.io/component-base/version/.gitattributes

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions vendor/k8s.io/component-base/version/base.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions vendor/k8s.io/component-base/version/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 263a0de

Please sign in to comment.