diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go index 064a450b79..b2da3d0001 100644 --- a/pkg/daemon/podprobe/pod_probe_controller.go +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -44,6 +44,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -88,6 +90,8 @@ type Controller struct { nodeName string // kruise daemon start time start time.Time + // runtime client + runtimeClient runtimeclient.Client } // NewController returns the controller for pod probe @@ -126,6 +130,7 @@ func NewController(opts daemonoptions.Options) (*Controller, error) { nodeName: nodeName, eventRecorder: recorder, start: time.Now(), + runtimeClient: opts.RuntimeClient, } c.prober = newProber(c.runtimeFactory.GetRuntimeService()) informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -407,6 +412,25 @@ func (c *Controller) fetchLatestPodContainer(podUID, name string) (*runtimeapi.C return containerStatus, err } +func (c *Controller) fetchPodIp(podUID, name, namespace string) (string, error) { + + podList := &corev1.PodList{} + err := c.runtimeClient.List(context.TODO(), podList, runtimeclient.MatchingFields{"metadata.name": name, "metadata.namespace": namespace}) + + if err != nil { + klog.Errorf("Failed to fetch pod list in namespace %s: %v", namespace, err) + return "", err + } + + for _, pod := range podList.Items { + if string(pod.UID) == podUID { + klog.Info("pod ip is ", pod.Status.PodIP) + return pod.Status.PodIP, nil + } + } + return "", fmt.Errorf("pod %s with Uid %s not found in namespace %s", name, podUID, namespace) +} + func updateNodePodProbeStatus(update Update, newStatus *appsv1alpha1.NodePodProbeStatus) { var probeStatus *appsv1alpha1.PodProbeStatus for i := range newStatus.PodProbeStatuses { diff --git a/pkg/daemon/podprobe/prober.go b/pkg/daemon/podprobe/prober.go index 70f9482717..962c4d5da5 100644 --- a/pkg/daemon/podprobe/prober.go +++ b/pkg/daemon/podprobe/prober.go @@ -20,6 +20,8 @@ import ( "bytes" "fmt" "io" + "net/http" + "net/url" "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -28,6 +30,8 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" + httpprobe "k8s.io/kubernetes/pkg/probe/http" + tcpprobe "k8s.io/kubernetes/pkg/probe/tcp" "k8s.io/utils/exec" ) @@ -36,6 +40,8 @@ const maxProbeMessageLength = 1024 // Prober helps to check the probe(exec, http, tcp) of a container. type prober struct { exec execprobe.Prober + tcp tcpprobe.Prober + http httpprobe.Prober runtimeService criapi.RuntimeService } @@ -44,13 +50,15 @@ type prober struct { func newProber(runtimeService criapi.RuntimeService) *prober { return &prober{ exec: execprobe.New(), + tcp: tcpprobe.New(), + http: httpprobe.New(false), runtimeService: runtimeService, } } // probe probes the container. -func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (appsv1alpha1.ProbeState, string, error) { - result, msg, err := pb.runProbe(p, container, containerID) +func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (appsv1alpha1.ProbeState, string, error) { + result, msg, err := pb.runProbe(p, container, containerID, hostIP) if bytes.Count([]byte(msg), nil)-1 > maxProbeMessageLength { msg = msg[:maxProbeMessageLength] } @@ -60,17 +68,40 @@ func (pb *prober) probe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeap return appsv1alpha1.ProbeSucceeded, msg, nil } -func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string) (probe.Result, string, error) { +func (pb *prober) runProbe(p *appsv1alpha1.ContainerProbeSpec, container *runtimeapi.ContainerStatus, containerID string, hostIP string) (probe.Result, string, error) { timeSecond := p.TimeoutSeconds if timeSecond <= 0 { timeSecond = 1 } timeout := time.Duration(timeSecond) * time.Second - // current only support exec - // todo: http, tcp + // for probing using exec method if p.Exec != nil { return pb.exec.Probe(pb.newExecInContainer(containerID, p.Exec.Command, timeout)) } + + // for probing using tcp method + if p.TCPSocket.Port.IntVal != 0 { + if p.TCPSocket.Host != "" { + return pb.tcp.Probe(p.TCPSocket.Host, p.TCPSocket.Port.IntValue(), timeout) + } else { + return pb.tcp.Probe(hostIP, p.TCPSocket.Port.IntValue(), timeout) + } + } + + // for probing using http method + if p.HTTPGet.Path != "" { + var u url.URL + var header http.Header + u.Scheme = "http" + u.Path = p.HTTPGet.Path + if p.HTTPGet.Host != "" { + u.Host = p.HTTPGet.Host + } else { + u.Host = hostIP + } + return pb.http.Probe(&u, header, timeout) + } + klog.InfoS("Failed to find probe builder for container", "containerName", container.Metadata.Name) return probe.Unknown, "", fmt.Errorf("missing probe handler for %s", container.Metadata.Name) } diff --git a/pkg/daemon/podprobe/worker.go b/pkg/daemon/podprobe/worker.go index 246258dbcf..56f200706c 100644 --- a/pkg/daemon/podprobe/worker.go +++ b/pkg/daemon/podprobe/worker.go @@ -119,6 +119,12 @@ func (w *worker) doProbe() (keepGoing bool) { defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging) defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true }) + podIp, err := w.probeController.fetchPodIp(w.key.podUID, w.key.podName, w.key.podNs) + if err != nil { + klog.Errorf("Pod(%s/%s) fetchPodIP failed: %s", w.key.podNs, w.key.podName, err.Error()) + return true + } + container, _ := w.probeController.fetchLatestPodContainer(w.key.podUID, w.key.containerName) if container == nil { klog.V(5).Infof("Pod(%s/%s) container(%s) Not Found", w.key.podNs, w.key.podName, w.key.containerName) @@ -152,7 +158,12 @@ func (w *worker) doProbe() (keepGoing bool) { // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. - result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID) + fmt.Println("===========================calling pod probecontroller.prober.probe===============================") + result, msg, err := w.probeController.prober.probe(w.spec, container, w.containerID, podIp) //Main Line + fmt.Printf("\n===========================Result: %s===============================", result) + fmt.Printf("\n===========================Msg: %s===============================", msg) + fmt.Printf("\n===========================Error: %s===============================", err.Error()) + if err != nil { klog.Errorf("Pod(%s/%s) do container(%s) probe(%s) spec(%s) failed: %s", w.key.podNs, w.key.podName, w.key.containerName, w.key.probeName, util.DumpJSON(w.spec), err.Error()) diff --git a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go index c01cfda564..7aad64484c 100644 --- a/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go +++ b/pkg/webhook/podprobemarker/validating/probe_create_update_handler.go @@ -187,9 +187,13 @@ func validateHandler(handler *corev1.Handler, fldPath *field.Path) field.ErrorLi numHandlers++ allErrors = append(allErrors, validateExecAction(handler.Exec, fldPath.Child("exec"))...) } - if handler.HTTPGet != nil || handler.TCPSocket != nil { + if handler.HTTPGet != nil { numHandlers++ - allErrors = append(allErrors, field.Forbidden(fldPath.Child("probe"), "current only support exec probe")) + allErrors = append(allErrors, validateHTTPGetAction(handler.HTTPGet, fldPath.Child("httpGet"))...) + } + if handler.TCPSocket != nil { + numHandlers++ + allErrors = append(allErrors, validateTCPSocketAction(handler.TCPSocket, fldPath.Child("tcpSocket"))...) } if numHandlers == 0 { allErrors = append(allErrors, field.Required(fldPath, "must specify a handler type")) @@ -205,6 +209,25 @@ func validateExecAction(exec *corev1.ExecAction, fldPath *field.Path) field.Erro return allErrors } +func validateHTTPGetAction(httpGet *corev1.HTTPGetAction, fldPath *field.Path) field.ErrorList { + allErrors := field.ErrorList{} + if httpGet.Port.IntValue() == 0 { + allErrors = append(allErrors, field.Required(fldPath.Child("port"), "")) + } + if httpGet.Path == "" { + allErrors = append(allErrors, field.Required(fldPath.Child("path"), "")) + } + return allErrors +} + +func validateTCPSocketAction(tcpSocket *corev1.TCPSocketAction, fldPath *field.Path) field.ErrorList { + allErrors := field.ErrorList{} + if tcpSocket.Port.IntValue() == 0 { + allErrors = append(allErrors, field.Required(fldPath.Child("port"), "")) + } + return allErrors +} + func validateProbeMarkerPolicy(policy *appsv1alpha1.ProbeMarkerPolicy, fldPath *field.Path) field.ErrorList { allErrors := field.ErrorList{} if policy.State != appsv1alpha1.ProbeSucceeded && policy.State != appsv1alpha1.ProbeFailed { diff --git a/vendor/k8s.io/component-base/version/.gitattributes b/vendor/k8s.io/component-base/version/.gitattributes new file mode 100644 index 0000000000..7e349eff60 --- /dev/null +++ b/vendor/k8s.io/component-base/version/.gitattributes @@ -0,0 +1 @@ +base.go export-subst diff --git a/vendor/k8s.io/component-base/version/base.go b/vendor/k8s.io/component-base/version/base.go new file mode 100644 index 0000000000..b753b7d191 --- /dev/null +++ b/vendor/k8s.io/component-base/version/base.go @@ -0,0 +1,63 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +// Base version information. +// +// This is the fallback data used when version information from git is not +// provided via go ldflags. It provides an approximation of the Kubernetes +// version for ad-hoc builds (e.g. `go build`) that cannot get the version +// information from git. +// +// If you are looking at these fields in the git tree, they look +// strange. They are modified on the fly by the build process. The +// in-tree values are dummy values used for "git archive", which also +// works for GitHub tar downloads. +// +// When releasing a new Kubernetes version, this file is updated by +// build/mark_new_version.sh to reflect the new version, and then a +// git annotated tag (using format vX.Y where X == Major version and Y +// == Minor version) is created to point to the commit that updates +// component-base/version/base.go +var ( + // TODO: Deprecate gitMajor and gitMinor, use only gitVersion + // instead. First step in deprecation, keep the fields but make + // them irrelevant. (Next we'll take it out, which may muck with + // scripts consuming the kubectl version output - but most of + // these should be looking at gitVersion already anyways.) + gitMajor string // major version, always numeric + gitMinor string // minor version, numeric possibly followed by "+" + + // semantic version, derived by build scripts (see + // https://github.com/kubernetes/community/blob/master/contributors/design-proposals/release/versioning.md + // for a detailed discussion of this field) + // + // TODO: This field is still called "gitVersion" for legacy + // reasons. For prerelease versions, the build metadata on the + // semantic version is a git hash, but the version itself is no + // longer the direct output of "git describe", but a slight + // translation to be semver compliant. + + // NOTE: The $Format strings are replaced during 'git archive' thanks to the + // companion .gitattributes file containing 'export-subst' in this same + // directory. See also https://git-scm.com/docs/gitattributes + gitVersion = "v0.0.0-master+$Format:%H$" + gitCommit = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD) + gitTreeState = "" // state of git tree, either "clean" or "dirty" + + buildDate = "1970-01-01T00:00:00Z" // build date in ISO8601 format, output of $(date -u +'%Y-%m-%dT%H:%M:%SZ') +) diff --git a/vendor/k8s.io/component-base/version/version.go b/vendor/k8s.io/component-base/version/version.go new file mode 100644 index 0000000000..d1e76dc00e --- /dev/null +++ b/vendor/k8s.io/component-base/version/version.go @@ -0,0 +1,42 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import ( + "fmt" + "runtime" + + apimachineryversion "k8s.io/apimachinery/pkg/version" +) + +// Get returns the overall codebase version. It's for detecting +// what code a binary was built from. +func Get() apimachineryversion.Info { + // These variables typically come from -ldflags settings and in + // their absence fallback to the settings in ./base.go + return apimachineryversion.Info{ + Major: gitMajor, + Minor: gitMinor, + GitVersion: gitVersion, + GitCommit: gitCommit, + GitTreeState: gitTreeState, + BuildDate: buildDate, + GoVersion: runtime.Version(), + Compiler: runtime.Compiler, + Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/probe/http/http.go b/vendor/k8s.io/kubernetes/pkg/probe/http/http.go new file mode 100644 index 0000000000..ad034ee3fa --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/probe/http/http.go @@ -0,0 +1,156 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package http + +import ( + "crypto/tls" + "errors" + "fmt" + "net/http" + "net/url" + "time" + + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/component-base/version" + "k8s.io/kubernetes/pkg/probe" + + "k8s.io/klog/v2" + utilio "k8s.io/utils/io" +) + +const ( + maxRespBodyLength = 10 * 1 << 10 // 10KB +) + +// New creates Prober that will skip TLS verification while probing. +// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname. +// If disabled, redirects to other hosts will trigger a warning result. +func New(followNonLocalRedirects bool) Prober { + tlsConfig := &tls.Config{InsecureSkipVerify: true} + return NewWithTLSConfig(tlsConfig, followNonLocalRedirects) +} + +// NewWithTLSConfig takes tls config as parameter. +// followNonLocalRedirects configures whether the prober should follow redirects to a different hostname. +// If disabled, redirects to other hosts will trigger a warning result. +func NewWithTLSConfig(config *tls.Config, followNonLocalRedirects bool) Prober { + // We do not want the probe use node's local proxy set. + transport := utilnet.SetTransportDefaults( + &http.Transport{ + TLSClientConfig: config, + DisableKeepAlives: true, + Proxy: http.ProxyURL(nil), + }) + return httpProber{transport, followNonLocalRedirects} +} + +// Prober is an interface that defines the Probe function for doing HTTP readiness/liveness checks. +type Prober interface { + Probe(url *url.URL, headers http.Header, timeout time.Duration) (probe.Result, string, error) +} + +type httpProber struct { + transport *http.Transport + followNonLocalRedirects bool +} + +// Probe returns a ProbeRunner capable of running an HTTP check. +func (pr httpProber) Probe(url *url.URL, headers http.Header, timeout time.Duration) (probe.Result, string, error) { + pr.transport.DisableCompression = true // removes Accept-Encoding header + client := &http.Client{ + Timeout: timeout, + Transport: pr.transport, + CheckRedirect: redirectChecker(pr.followNonLocalRedirects), + } + return DoHTTPProbe(url, headers, client) +} + +// GetHTTPInterface is an interface for making HTTP requests, that returns a response and error. +type GetHTTPInterface interface { + Do(req *http.Request) (*http.Response, error) +} + +// DoHTTPProbe checks if a GET request to the url succeeds. +// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success. +// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure. +// This is exported because some other packages may want to do direct HTTP probes. +func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) { + req, err := http.NewRequest("GET", url.String(), nil) + if err != nil { + // Convert errors into failures to catch timeouts. + return probe.Failure, err.Error(), nil + } + if headers == nil { + headers = http.Header{} + } + if _, ok := headers["User-Agent"]; !ok { + // explicitly set User-Agent so it's not set to default Go value + v := version.Get() + headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)) + } + if _, ok := headers["Accept"]; !ok { + // Accept header was not defined. accept all + headers.Set("Accept", "*/*") + } else if headers.Get("Accept") == "" { + // Accept header was overridden but is empty. removing + headers.Del("Accept") + } + req.Header = headers + req.Host = headers.Get("Host") + res, err := client.Do(req) + if err != nil { + // Convert errors into failures to catch timeouts. + return probe.Failure, err.Error(), nil + } + defer res.Body.Close() + b, err := utilio.ReadAtMost(res.Body, maxRespBodyLength) + if err != nil { + if err == utilio.ErrLimitReached { + klog.V(4).Infof("Non fatal body truncation for %s, Response: %v", url.String(), *res) + } else { + return probe.Failure, "", err + } + } + body := string(b) + if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest { + if res.StatusCode >= http.StatusMultipleChoices { // Redirect + klog.V(4).Infof("Probe terminated redirects for %s, Response: %v", url.String(), *res) + return probe.Warning, body, nil + } + klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res) + return probe.Success, body, nil + } + klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body) + return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil +} + +func redirectChecker(followNonLocalRedirects bool) func(*http.Request, []*http.Request) error { + if followNonLocalRedirects { + return nil // Use the default http client checker. + } + + return func(req *http.Request, via []*http.Request) error { + if req.URL.Hostname() != via[0].URL.Hostname() { + return http.ErrUseLastResponse + } + // Default behavior: stop after 10 redirects. + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + return nil + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go new file mode 100644 index 0000000000..771bc04771 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/probe/tcp/tcp.go @@ -0,0 +1,61 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tcp + +import ( + "net" + "strconv" + "time" + + "k8s.io/kubernetes/pkg/probe" + + "k8s.io/klog/v2" +) + +// New creates Prober. +func New() Prober { + return tcpProber{} +} + +// Prober is an interface that defines the Probe function for doing TCP readiness/liveness checks. +type Prober interface { + Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) +} + +type tcpProber struct{} + +// Probe returns a ProbeRunner capable of running an TCP check. +func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) { + return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout) +} + +// DoTCPProbe checks that a TCP socket to the address can be opened. +// If the socket can be opened, it returns Success +// If the socket fails to open, it returns Failure. +// This is exported because some other packages may want to do direct TCP probes. +func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) { + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + // Convert errors to failures to handle timeouts. + return probe.Failure, err.Error(), nil + } + err = conn.Close() + if err != nil { + klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err) + } + return probe.Success, "", nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 904c4ced13..f7ba5cbe2a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1112,6 +1112,7 @@ k8s.io/component-base/config/v1alpha1 k8s.io/component-base/config/validation k8s.io/component-base/featuregate k8s.io/component-base/logs/logreduction +k8s.io/component-base/version # k8s.io/component-helpers v0.22.6 => k8s.io/component-helpers v0.22.6 ## explicit; go 1.16 k8s.io/component-helpers/scheduling/corev1 @@ -1195,6 +1196,8 @@ k8s.io/kubernetes/pkg/kubelet/util/format k8s.io/kubernetes/pkg/kubelet/util/ioutils k8s.io/kubernetes/pkg/probe k8s.io/kubernetes/pkg/probe/exec +k8s.io/kubernetes/pkg/probe/http +k8s.io/kubernetes/pkg/probe/tcp k8s.io/kubernetes/pkg/proxy/util k8s.io/kubernetes/pkg/scheduler/apis/config k8s.io/kubernetes/pkg/scheduler/apis/config/v1beta1