Skip to content

Commit

Permalink
Merge pull request #220 from clobrano/e1879-use-pod-ip-port-0
Browse files Browse the repository at this point in the history
Use Pod IP for peer communication
  • Loading branch information
openshift-merge-bot[bot] authored Jul 9, 2024
2 parents 4c8286e + cda2f3f commit 22336d0
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 63 deletions.
71 changes: 36 additions & 35 deletions pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/go-logr/logr"
"google.golang.org/grpc/credentials"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -128,21 +128,21 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
}

c.config.Log.Info("Error count exceeds threshold, trying to ask other nodes if I'm healthy")
nodesToAsk := c.config.Peers.GetPeersAddresses(peers.Worker)
if nodesToAsk == nil || len(nodesToAsk) == 0 {
peersToAsk := c.config.Peers.GetPeersAddresses(peers.Worker)
if peersToAsk == nil || len(peersToAsk) == 0 {
c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, nothing we can do, so consider the node being healthy")
//todo maybe we need to check if this happens too much and reboot
// TODO: maybe we need to check if this happens too much and reboot
return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseNoPeersWereFound}
}

apiErrorsResponsesSum := 0
nrAllNodes := len(nodesToAsk)
// nodesToAsk is being reduced in every iteration, iterate until no nodes left to ask
for i := 0; len(nodesToAsk) > 0; i++ {
nrAllPeers := len(peersToAsk)
// peersToAsk is being reduced at every iteration, iterate until no peers left to ask
for i := 0; len(peersToAsk) > 0; i++ {

batchSize := utils.GetNextBatchSize(nrAllNodes, len(nodesToAsk))
chosenNodesAddresses := c.popNodes(&nodesToAsk, batchSize)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses)
batchSize := utils.GetNextBatchSize(nrAllPeers, len(peersToAsk))
chosenPeersIPs := c.popPeerIPs(&peersToAsk, batchSize)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs)
if healthyResponses+unhealthyResponses+apiErrorsResponses > 0 {
c.timeOfLastPeerResponse = time.Now()
}
Expand All @@ -161,9 +161,9 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
if apiErrorsResponses > 0 {
c.config.Log.Info("Peer can't access the api-server")
apiErrorsResponsesSum += apiErrorsResponses
//todo consider using [m|n]hc.spec.maxUnhealthy instead of 50%
if apiErrorsResponsesSum > nrAllNodes/2 { //already reached more than 50% of the nodes and all of them returned api error
//assuming this is a control plane failure as others can't access api-server as well
// TODO: consider using [m|n]hc.spec.maxUnhealthy instead of 50%
if apiErrorsResponsesSum > nrAllPeers/2 { // already reached more than 50% of the peers and all of them returned api error
// assuming this is a control plane failure as others can't access api-server as well
c.config.Log.Info("More than 50% of the nodes couldn't access the api-server, assuming this is a control plane failure")
return peers.Response{IsHealthy: true, Reason: peers.HealthyBecauseMostPeersCantAccessAPIServer}
}
Expand All @@ -185,47 +185,48 @@ func (c *ApiConnectivityCheck) getWorkerPeersResponse() peers.Response {
}

func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool {
nodesToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane)
numOfControlPlanePeers := len(nodesToAsk)
peersToAsk := c.config.Peers.GetPeersAddresses(peers.ControlPlane)
numOfControlPlanePeers := len(peersToAsk)
if numOfControlPlanePeers == 0 {
c.config.Log.Info("Peers list is empty and / or couldn't be retrieved from server, other control planes can't be reached")
return false
}

chosenNodesAddresses := c.popNodes(&nodesToAsk, numOfControlPlanePeers)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenNodesAddresses)
chosenPeersIPs := c.popPeerIPs(&peersToAsk, numOfControlPlanePeers)
healthyResponses, unhealthyResponses, apiErrorsResponses, _ := c.getHealthStatusFromPeers(chosenPeersIPs)

// Any response is an indication of communication with a peer
return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0
}

func (c *ApiConnectivityCheck) popNodes(nodes *[][]v1.NodeAddress, count int) []string {
nrOfNodes := len(*nodes)
if nrOfNodes == 0 {
return []string{}
func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP {
nrOfPeers := len(*peersIPs)
if nrOfPeers == 0 {
return []corev1.PodIP{}
}

if count > nrOfNodes {
count = nrOfNodes
if count > nrOfPeers {
count = nrOfPeers
}

//todo maybe we should pick nodes randomly rather than relying on the order returned from api-server
addresses := make([]string, count)
// TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server
selectedIPs := make([]corev1.PodIP, count)
for i := 0; i < count; i++ {
nodeAddresses := (*nodes)[i]
if len(nodeAddresses) == 0 || nodeAddresses[0].Address == "" {
c.config.Log.Info("ignoring node without IP address")
ip := (*peersIPs)[i]
if ip.IP == "" {
// This should not happen, but keeping it for good measure.
c.config.Log.Info("ignoring peers without IP address")
continue
}
addresses[i] = nodeAddresses[0].Address //todo node might have multiple addresses or none
selectedIPs[i] = ip
}

*nodes = (*nodes)[count:] //remove popped nodes from the list
*peersIPs = (*peersIPs)[count:] //remove popped nodes from the list

return addresses
return selectedIPs
}

func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP) (int, int, int, int) {
nrAddresses := len(addresses)
responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses)

Expand All @@ -237,9 +238,9 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int
}

// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, results chan<- selfNodeRemediation.HealthCheckResponseCode) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) {

logger := c.config.Log.WithValues("IP", endpointIp)
logger := c.config.Log.WithValues("IP", endpointIp.IP)
logger.Info("getting health status from peer")

if err := c.initClientCreds(); err != nil {
Expand All @@ -249,7 +250,7 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
}

// TODO does this work with IPv6?
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
if err != nil {
logger.Error(err, "failed to init grpc client")
results <- selfNodeRemediation.RequestFailed
Expand Down
81 changes: 53 additions & 28 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-logr/logr"
commonlabels "github.com/medik8s/common/pkg/labels"
pkgerrors "github.com/pkg/errors"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -36,7 +37,7 @@ type Peers struct {
myNodeName string
mutex sync.Mutex
apiServerTimeout time.Duration
workerPeersAddresses, controlPlanePeersAddresses [][]v1.NodeAddress
workerPeersAddresses, controlPlanePeersAddresses []v1.PodIP
}

func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers {
Expand All @@ -47,8 +48,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read
myNodeName: myNodeName,
mutex: sync.Mutex{},
apiServerTimeout: apiServerTimeout,
workerPeersAddresses: [][]v1.NodeAddress{},
controlPlanePeersAddresses: [][]v1.NodeAddress{},
workerPeersAddresses: []v1.PodIP{},
controlPlanePeersAddresses: []v1.PodIP{},
}
}

Expand Down Expand Up @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error {
p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode))
}

go wait.UntilWithContext(ctx, func(ctx context.Context) {
p.updateWorkerPeers(ctx)
p.updateControlPlanePeers(ctx)
}, p.peerUpdateInterval)
var updatePeersError error
cancellableCtx, cancel := context.WithCancel(ctx)

p.log.Info("peers started")
p.log.Info("peer starting", "name", p.myNodeName)
wait.UntilWithContext(cancellableCtx, func(ctx context.Context) {
updatePeersError = p.updateWorkerPeers(ctx)
if updatePeersError != nil {
cancel()
}
updatePeersError = p.updateControlPlanePeers(ctx)
if updatePeersError != nil {
cancel()
}
}, p.peerUpdateInterval)

<-ctx.Done()
return nil
return updatePeersError
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.workerPeersAddresses = addresses }
func (p *Peers) updateWorkerPeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
setterFunc := func(addresses [][]v1.NodeAddress) { p.controlPlanePeersAddresses = addresses }
func (p *Peers) updateControlPlanePeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses [][]v1.NodeAddress)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -111,37 +119,54 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil {
if errors.IsNotFound(err) {
// we are the only node at the moment... reset peerList
p.workerPeersAddresses = [][]v1.NodeAddress{}
p.workerPeersAddresses = []v1.PodIP{}
}
p.log.Error(err, "failed to update peer list")
return
return pkgerrors.Wrap(err, "failed to update peer list")
}

pods := v1.PodList{}
listOptions := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"app.kubernetes.io/name": "self-node-remediation",
"app.kubernetes.io/component": "agent",
}),
}
if err := p.List(readerCtx, &pods, listOptions); err != nil {
p.log.Error(err, "could not get pods")
return pkgerrors.Wrap(err, "could not get pods")
}

nodesCount := len(nodes.Items)
addresses := make([][]v1.NodeAddress, nodesCount)
addresses := make([]v1.PodIP, nodesCount)
for i, node := range nodes.Items {
addresses[i] = node.Status.Addresses
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 {
return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name))
}
addresses[i] = pod.Status.PodIPs[0]
}
}
}
setAddresses(addresses)
return nil
}

func (p *Peers) GetPeersAddresses(role Role) [][]v1.NodeAddress {
func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {
p.mutex.Lock()
defer p.mutex.Unlock()

var addresses [][]v1.NodeAddress
var addresses []v1.PodIP
if role == Worker {
addresses = p.workerPeersAddresses
} else {
addresses = p.controlPlanePeersAddresses
}
//we don't want the caller to be able to change the addresses
//so we create a deep copy and return it
addressesCopy := make([][]v1.NodeAddress, len(addresses))
for i := range addressesCopy {
addressesCopy[i] = make([]v1.NodeAddress, len(addresses[i]))
copy(addressesCopy, addresses)
}
addressesCopy := make([]v1.PodIP, len(addresses))
copy(addressesCopy, addresses)

return addressesCopy
}
Expand Down

0 comments on commit 22336d0

Please sign in to comment.