Skip to content

Commit

Permalink
kubevirt: Move neighbors logic go ovn-node
Browse files Browse the repository at this point in the history
Signed-off-by: Enrique Llorente <[email protected]>
  • Loading branch information
qinqon committed Jun 4, 2024
1 parent 2a464c9 commit 207ac71
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 38 deletions.
148 changes: 148 additions & 0 deletions go-controller/pkg/kubevirt/neighbors_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package kubevirt

import (
"fmt"
"net"
"reflect"
"time"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/controller"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

type NeighborsController struct {
podController controller.Controller
watchFactory *factory.WatchFactory
nodeName string
nadName string
v4 bool
v6 bool
}

func NewNeighborsController(watchFactory *factory.WatchFactory, v4, v6 bool, nodeName, nadName string) *NeighborsController {
c := &NeighborsController{
watchFactory: watchFactory,
nodeName: nodeName,
nadName: nadName,
v4: v4,
v6: v6,
}
c.initControllers()
return c
}

func (c *NeighborsController) initControllers() {
podControllerConfig := &controller.Config[corev1.Pod]{
RateLimiter: workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5),
Informer: c.watchFactory.PodCoreInformer().Informer(),
Lister: c.watchFactory.PodCoreInformer().Lister().List,
ObjNeedsUpdate: c.podNeedsUpdate,
Reconcile: c.reconcilePod,
Threadiness: 1,
}
c.podController = controller.NewController[corev1.Pod]("kubevirt-neighbors-pod-controller", podControllerConfig)
}

func (c *NeighborsController) podNeedsUpdate(oldObj, newObj *corev1.Pod) bool {
if newObj == nil || oldObj == nil {
return false
}
isMigratedSourcePodStale, err := IsMigratedSourcePodStale(c.watchFactory, newObj)
if err != nil {
klog.Errorf("Failed checking IsMigratedSourcePodStale: %v", err)
return false
}
if util.PodWantsHostNetwork(newObj) || !IsPodLiveMigratable(newObj) || isMigratedSourcePodStale {
return false
}
return !reflect.DeepEqual(oldObj.Labels, newObj.Labels) ||
!reflect.DeepEqual(oldObj.Annotations, newObj.Annotations)
}

func (c *NeighborsController) reconcilePod(key string) error {
klog.Infof("Reconciling pods at kubevirt neighbors controller, key=%s", key)

// Split the key in namespace and name of the corresponding object.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("reconcilePod failed to split meta namespace cache key %s for pod: %v", key, err)
return nil
}

pod, err := c.watchFactory.GetPod(namespace, name)
if err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to fetch pod %s in namespace %s", name, namespace)
}

podAnnotation, err := util.UnmarshalPodAnnotation(pod.Annotations, c.nadName)
if err != nil {
return fmt.Errorf("failed reading remote pod annotation: %v", err)
}

nodeOwningSubnet, err := findNodeOwningSubnet(c.watchFactory, podAnnotation, c.nadName)
if err != nil {
return err
}

currentNodeOwnsSubnet := nodeOwningSubnet.Name == c.nodeName
vmRunningAtCurrentNode := c.nodeName == pod.Spec.NodeName

if vmRunningAtCurrentNode && !currentNodeOwnsSubnet {
ipsToNotify, err := findRunningPodsIPsFromPodSubnet(c.watchFactory, podAnnotation, c.nadName)
if err != nil {
return fmt.Errorf("failed discovering pod IPs within VM's subnet to update neighbors VM: %w", err)
}
if c.v4 {
if err := notifyARPProxyMACForIPs(ipsToNotify, podAnnotation.MAC); err != nil {
return fmt.Errorf("failed sending GARP to VM after live migration: %w", err)
}
}
if c.v6 {
if err := notifyUnsolicitedNeighborAdvertisementForIPs(ipsToNotify, podAnnotation.IPs); err != nil {
return fmt.Errorf("failed sending unsolicited na to VM after live migration: %w", err)
}
}
} else if !vmRunningAtCurrentNode && currentNodeOwnsSubnet {
if c.v4 {
if err := notifyARPProxyMACForIPs(podAnnotation.IPs, broadcastMAC); err != nil {
return err
}
}
if c.v6 {
if err := notifyUnsolicitedNeighborAdvertisementForIPs(podAnnotation.IPs, []*net.IPNet{{IP: net.IPv6linklocalallnodes}}); err != nil {
return err
}
}
}

return nil
}

func (c *NeighborsController) syncPods() error {
klog.Infof("Syncing pods at kubevirt neighbors controller")
// TODO
return nil
}

func (c *NeighborsController) Start() error {
klog.Infof("Starting kubevirt neighbors controller")
if err := controller.StartControllersWithInitialSync(c.syncPods, c.podController); err != nil {
return fmt.Errorf("unable to start pod controller: %w", err)
}
return nil
}

func (c *NeighborsController) Stop() {
klog.Infof("Stopping kubevirt neighbors controller")
controller.StopControllers(c.podController)
}
50 changes: 12 additions & 38 deletions go-controller/pkg/kubevirt/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,14 @@ func EnsureLocalZonePodAddressesToNodeRoute(watchFactory *factory.WatchFactory,

}

// TODO: Implement neighbours update for non IC
if config.OVNKubernetesFeature.EnableInterconnect {
currentNodeOwnsSubnet, err := nodeContainsPodSubnet(watchFactory, os.Getenv("K8S_NODE"), podAnnotation, nadName)
if err != nil {
currentNodeOwnsSubnet, err := nodeContainsPodSubnet(watchFactory, os.Getenv("K8S_NODE"), podAnnotation, nadName)
if err != nil {
return err
}
if !vmRunningAtNodeOwningSubnet && currentNodeOwnsSubnet {
if err := deleteStaleLogicalSwitchPorts(watchFactory, nbClient, pod); err != nil {
return err
}
if !currentNodeOwnsSubnet && pod.Spec.NodeName == os.Getenv("K8S_NODE") {
ipsToNotify, err := findRunningPodsIPsFromPodSubnet(watchFactory, podAnnotation, nadName)
if err != nil {
return fmt.Errorf("failed discovering pod IPs within VM's subnet to update neighbors VM: %w", err)
}
if err := notifyARPProxyMACForIPs(ipsToNotify, podAnnotation.MAC); err != nil {
return fmt.Errorf("failed sending GARP to VM after live migration: %w", err)
}
// TODO: Check if there is IPv6
if err := notifyUnsolicitedNeighborAdvertisementForIPs(ipsToNotify, podAnnotation.IPs); err != nil {
return fmt.Errorf("failed sending unsolicited na to VM after live migration: %w", err)
}
}
}
return nil
}
Expand Down Expand Up @@ -258,29 +247,14 @@ func EnsureRemoteZonePodAddressesToNodeRoute(controllerName string, watchFactory

}

//TODO: Implement neighbour update for non IC
if config.OVNKubernetesFeature.EnableInterconnect {
currentNodeOwnsSubnet, err := nodeContainsPodSubnet(watchFactory, os.Getenv("K8S_NODE"), podAnnotation, nadName)
if err != nil {
currentNodeOwnsSubnet, err := nodeContainsPodSubnet(watchFactory, os.Getenv("K8S_NODE"), podAnnotation, nadName)
if err != nil {
return err
}
if !vmRunningAtNodeOwningSubnet && currentNodeOwnsSubnet {
if err := deleteStaleLogicalSwitchPorts(watchFactory, nbClient, pod); err != nil {
return err
}
if !vmRunningAtNodeOwningSubnet && currentNodeOwnsSubnet {
if err := deleteStaleLogicalSwitchPorts(watchFactory, nbClient, pod); err != nil {
return err
}
if err := notifyARPProxyMACForIPs(podAnnotation.IPs, broadcastMAC); err != nil {
return err
}
//TODO check if we are at ipv6
ipsToNotify, err := findRunningPodsIPsFromPodSubnet(watchFactory, podAnnotation, nadName)
if err != nil {
return fmt.Errorf("failed discovering pod IPs within VM's subnet to update neighbors at VM: %w", err)
}
//TODO can we use some kind of multicast so we don't have to send one NA per ip ?
if err := notifyUnsolicitedNeighborAdvertisementForIPs(podAnnotation.IPs, ipsToNotify); err != nil {
return err
}
}
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/informer"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kubevirt"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/controllers/egressip"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/controllers/egressservice"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/linkmanager"
Expand Down Expand Up @@ -106,6 +107,7 @@ type DefaultNodeNetworkController struct {
retryEndpointSlices *retry.RetryFramework

apbExternalRouteNodeController *apbroute.ExternalGatewayNodeController
kubevirtNeighborsController *kubevirt.NeighborsController
}

func newDefaultNodeNetworkController(cnnci *CommonNodeNetworkControllerInfo, stopChan chan struct{},
Expand Down Expand Up @@ -147,6 +149,8 @@ func NewDefaultNodeNetworkController(cnnci *CommonNodeNetworkControllerInfo) (*D
return nil, err
}

nc.kubevirtNeighborsController = kubevirt.NewNeighborsController(nc.watchFactory.(*factory.WatchFactory), config.IPv4Mode, config.IPv6Mode, nc.name, types.DefaultNetworkName)

nc.initRetryFrameworkForNode()

return nc, nil
Expand Down Expand Up @@ -1119,6 +1123,10 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {

linkManager.Run(nc.stopChan, nc.wg)

if err := nc.kubevirtNeighborsController.Start(); err != nil {
return fmt.Errorf("failed starting kubevirt neighbors controller: %v", err)
}

nc.wg.Add(1)
go func() {
defer nc.wg.Done()
Expand All @@ -1132,6 +1140,7 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
// Stop gracefully stops the controller
// deleteLogicalEntities will never be true for default network
func (nc *DefaultNodeNetworkController) Stop() {
nc.kubevirtNeighborsController.Stop()
close(nc.stopChan)
nc.wg.Wait()
}
Expand Down

0 comments on commit 207ac71

Please sign in to comment.