Skip to content

Commit

Permalink
feat!: add support to StatefulSet (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostbean authored Oct 3, 2024
1 parent bb62eab commit a1632d5
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 110 deletions.
9 changes: 9 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
linters:
enable:
- exhaustruct
- exportloopref
- gomnd
- staticcheck
- exhaustive
max-issues-per-linter: 0
sort-results: true
2 changes: 1 addition & 1 deletion ci/obd-demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ spec:

---
apiVersion: apps/v1
kind: Deployment
kind: StatefulSet
metadata:
name: postgres-v1
labels:
Expand Down
96 changes: 50 additions & 46 deletions kardinal-cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"kardinal.cli/multi_os_cmd_executor"

"github.com/kurtosis-tech/stacktrace"
"github.com/samber/lo"
"github.com/segmentio/analytics-go/v3"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -119,7 +120,7 @@ var deployCmd = &cobra.Command{
Short: "Deploy services",
Args: cobra.ExactArgs(0),
Run: func(cmd *cobra.Command, args []string) {
serviceConfigs, ingressConfigs, gatewayConfigs, routeConfigs, namespace, err := parseKubernetesManifestFile(kubernetesManifestFile)
services, deployments, statefulSets, ingresses, gateways, routes, namespace, err := parseKubernetesManifestFile(kubernetesManifestFile)
if err != nil {
log.Fatalf("Error loading k8s manifest file: %v", err)
}
Expand All @@ -128,7 +129,7 @@ var deployCmd = &cobra.Command{
log.Fatal("Error getting or creating user tenant UUID", err)
}

deploy(tenantUuid.String(), serviceConfigs, ingressConfigs, gatewayConfigs, routeConfigs, namespace)
deploy(tenantUuid.String(), services, deployments, statefulSets, ingresses, gateways, routes, namespace)
},
}

Expand All @@ -144,7 +145,7 @@ var templateCreateCmd = &cobra.Command{
// A valid template only modifies services
// A valid template has metadata.name
// A valid template modifies at least one service
serviceConfigs, _, _, _, _, err := parseKubernetesManifestFile(templateYamlFile)
serviceConfigs, _, _, _, _, _, _, err := parseKubernetesManifestFile(templateYamlFile)
if err != nil {
log.Fatalf("Error loading template file: %v", err)
}
Expand Down Expand Up @@ -678,19 +679,28 @@ func parsePairs(pairs []string) map[string]string {
return pairsMap
}

func parseKubernetesManifestFile(kubernetesManifestFile string) ([]api_types.ServiceConfig, []api_types.IngressConfig, []api_types.GatewayConfig, []api_types.RouteConfig, string, error) {
func parseKubernetesManifestFile(kubernetesManifestFile string) (
[]api_types.ServiceConfig,
[]api_types.DeploymentConfig,
[]api_types.StatefulSetConfig,
[]api_types.IngressConfig,
[]api_types.GatewayConfig,
[]api_types.RouteConfig, string, error,
) {
fileBytes, err := loadKubernetesManifestFile(kubernetesManifestFile)
if err != nil {
log.Fatalf("Error loading kubernetest manifest file: %v", err)
return nil, nil, nil, nil, "", err
return nil, nil, nil, nil, nil, nil, "", err
}

manifest := string(fileBytes)
var namespace string
serviceConfigs := map[string]*api_types.ServiceConfig{}
ingressConfigs := map[string]*api_types.IngressConfig{}
gatewayConfigs := map[string]*api_types.GatewayConfig{}
routeConfigs := map[string]*api_types.RouteConfig{}
serviceConfigs := map[string]api_types.ServiceConfig{}
deploymentConfigs := map[string]api_types.DeploymentConfig{}
statefulSetConfigs := map[string]api_types.StatefulSetConfig{}
ingressConfigs := map[string]api_types.IngressConfig{}
gatewayConfigs := map[string]api_types.GatewayConfig{}
routeConfigs := map[string]api_types.RouteConfig{}

// Register the gateway scheme to parse the Gateway CRD
gatewayscheme.AddToScheme(scheme.Scheme)
Expand All @@ -701,73 +711,64 @@ func parseKubernetesManifestFile(kubernetesManifestFile string) ([]api_types.Ser
}
obj, _, err := decode([]byte(spec), nil, nil)
if err != nil {
return nil, nil, nil, nil, "", stacktrace.Propagate(err, "An error occurred parsing the spec: %s", spec)
return nil, nil, nil, nil, nil, nil, "", stacktrace.Propagate(err, "An error occurred parsing the spec: %s", spec)
}
switch obj := obj.(type) {
case *corev1.Service:
service := obj
serviceName := getObjectName(service.GetObjectMeta().(*metav1.ObjectMeta))
_, ok := serviceConfigs[serviceName]
if !ok {
serviceConfigs[serviceName] = &api_types.ServiceConfig{
serviceConfigs[serviceName] = api_types.ServiceConfig{
Service: *service,
}
} else {
serviceConfigs[serviceName].Service = *service
logrus.Warnf("Service %s already exists, skipping it", serviceName)
}
case *appv1.Deployment:
deployment := obj
deploymentName := getObjectName(deployment.GetObjectMeta().(*metav1.ObjectMeta))
_, ok := serviceConfigs[deploymentName]
_, ok := deploymentConfigs[deploymentName]
if !ok {
serviceConfigs[deploymentName] = &api_types.ServiceConfig{
deploymentConfigs[deploymentName] = api_types.DeploymentConfig{
Deployment: *deployment,
}
} else {
serviceConfigs[deploymentName].Deployment = *deployment
logrus.Warnf("Deployment %s already exists, skipping it", deploymentName)
}
case *appv1.StatefulSet:
statefulset := obj
statefulsetName := getObjectName(statefulset.GetObjectMeta().(*metav1.ObjectMeta))
_, ok := statefulSetConfigs[statefulsetName]
if !ok {
statefulSetConfigs[statefulsetName] = api_types.StatefulSetConfig{
StatefulSet: *statefulset,
}
} else {
logrus.Warnf("StatefulSet %s already exists, skipping it", statefulsetName)
}
case *k8snet.Ingress:
ingress := obj
ingressName := getObjectName(ingress.GetObjectMeta().(*metav1.ObjectMeta))
ingressConfigs[ingressName] = &api_types.IngressConfig{Ingress: *ingress}
ingressConfigs[ingressName] = api_types.IngressConfig{Ingress: *ingress}
case *corev1.Namespace:
namespaceObj := obj
namespaceName := getObjectName(namespaceObj.GetObjectMeta().(*metav1.ObjectMeta))
namespace = namespaceName
case *gateway.Gateway:
gatewayObj := obj
gatewayName := getObjectName(gatewayObj.GetObjectMeta().(*metav1.ObjectMeta))
gatewayConfigs[gatewayName] = &api_types.GatewayConfig{Gateway: *gatewayObj}
gatewayConfigs[gatewayName] = api_types.GatewayConfig{Gateway: *gatewayObj}
case *gateway.HTTPRoute:
routeObj := obj
routeName := getObjectName(routeObj.GetObjectMeta().(*metav1.ObjectMeta))
routeConfigs[routeName] = &api_types.RouteConfig{HttpRoute: *routeObj}
routeConfigs[routeName] = api_types.RouteConfig{HttpRoute: *routeObj}
default:
return nil, nil, nil, nil, "", stacktrace.NewError("An error occurred parsing the manifest because of an unsupported kubernetes type")
return nil, nil, nil, nil, nil, nil, "", stacktrace.NewError("An error occurred parsing the manifest because of an unsupported kubernetes type")
}
}

finalServiceConfigs := []api_types.ServiceConfig{}
for _, serviceConfig := range serviceConfigs {
finalServiceConfigs = append(finalServiceConfigs, *serviceConfig)
}

finalIngressConfigs := []api_types.IngressConfig{}
for _, ingressConfig := range ingressConfigs {
finalIngressConfigs = append(finalIngressConfigs, *ingressConfig)
}

finalGatewayConfigs := []api_types.GatewayConfig{}
for _, gatewayConfig := range gatewayConfigs {
finalGatewayConfigs = append(finalGatewayConfigs, *gatewayConfig)
}

finalRouteConfigs := []api_types.RouteConfig{}
for _, routeConfig := range routeConfigs {
finalRouteConfigs = append(finalRouteConfigs, *routeConfig)
}

return finalServiceConfigs, finalIngressConfigs, finalGatewayConfigs, finalRouteConfigs, namespace, nil
return lo.Values(serviceConfigs), lo.Values(deploymentConfigs), lo.Values(statefulSetConfigs), lo.Values(ingressConfigs), lo.Values(gatewayConfigs), lo.Values(routeConfigs), namespace, nil
}

func parseTemplateArgs(filepathOrJson string) (map[string]interface{}, error) {
Expand Down Expand Up @@ -817,7 +818,6 @@ func listDevFlow(tenantUuid api_types.Uuid) {
}

printFlowTable(flows)
return
}

func getTenantUuidFlows(tenantUuid api_types.Uuid) ([]api_types.Flow, error) {
Expand Down Expand Up @@ -890,6 +890,8 @@ func createDevFlow(tenantUuid api_types.Uuid, pairsMap map[string]string, templa
func deploy(
tenantUuid api_types.Uuid,
serviceConfigs []api_types.ServiceConfig,
deploymentConfigs []api_types.DeploymentConfig,
statefulSetConfigs []api_types.StatefulSetConfig,
ingressConfigs []api_types.IngressConfig,
gatewayConfigs []api_types.GatewayConfig,
routeConfigs []api_types.RouteConfig,
Expand All @@ -898,11 +900,13 @@ func deploy(
ctx := context.Background()

body := api_types.PostTenantUuidDeployJSONRequestBody{
ServiceConfigs: &serviceConfigs,
IngressConfigs: &ingressConfigs,
GatewayConfigs: &gatewayConfigs,
RouteConfigs: &routeConfigs,
Namespace: &namespace,
ServiceConfigs: &serviceConfigs,
DeploymentConfigs: &deploymentConfigs,
StatefulSetConfigs: &statefulSetConfigs,
IngressConfigs: &ingressConfigs,
GatewayConfigs: &gatewayConfigs,
RouteConfigs: &routeConfigs,
Namespace: &namespace,
}
client := getKontrolServiceClient()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package cluster_manager
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/labels"
"strings"

"k8s.io/apimachinery/pkg/labels"

"github.com/kurtosis-tech/kardinal/libs/manager-kontrol-api/api/golang/types"
"github.com/kurtosis-tech/stacktrace"
"github.com/samber/lo"
Expand Down Expand Up @@ -206,6 +207,7 @@ func (manager *ClusterManager) ApplyClusterResources(ctx context.Context, cluste
allNSs := [][]string{
lo.Uniq(lo.Map(*clusterResources.Services, func(item corev1.Service, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.Deployments, func(item appsv1.Deployment, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.StatefulSets, func(item appsv1.StatefulSet, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.VirtualServices, func(item v1alpha3.VirtualService, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.DestinationRules, func(item v1alpha3.DestinationRule, _ int) string { return item.Namespace })),
lo.Uniq(lo.Map(*clusterResources.Gateways, func(item gateway.Gateway, _ int) string { return item.Namespace })),
Expand Down Expand Up @@ -242,6 +244,12 @@ func (manager *ClusterManager) ApplyClusterResources(ctx context.Context, cluste
}
}

for _, statefulSet := range *clusterResources.StatefulSets {
if err := manager.createOrUpdateStatefulSet(ctx, &statefulSet); err != nil {
return stacktrace.Propagate(err, "An error occurred while creating or updating statefulSet '%s'", statefulSet.GetName())
}
}

for _, virtualService := range *clusterResources.VirtualServices {
if err := manager.createOrUpdateVirtualService(ctx, &virtualService); err != nil {
return stacktrace.Propagate(err, "An error occurred while creating or updating virtual service '%s'", virtualService.GetName())
Expand Down Expand Up @@ -394,6 +402,14 @@ func (manager *ClusterManager) CleanUpClusterResources(ctx context.Context, clus
}
}

// Clean up deployments
statefulSetsByNS := lo.GroupBy(*clusterResources.StatefulSets, func(item appsv1.StatefulSet) string { return item.Namespace })
for namespace, statefulSets := range statefulSetsByNS {
if err := manager.cleanUpStatefulSetsInNamespace(ctx, namespace, statefulSets); err != nil {
return stacktrace.Propagate(err, "An error occurred cleaning up statefulSets '%+v' in namespace '%s'", statefulSets, namespace)
}
}

// Cleanup authorization policies
if clusterResources.AuthorizationPolicies != nil {
authorizationPoliciesByNS := lo.GroupBy(*clusterResources.AuthorizationPolicies, func(item securityv1beta1.AuthorizationPolicy) string {
Expand Down Expand Up @@ -479,7 +495,6 @@ func (manager *ClusterManager) removeNamespace(ctx context.Context, namespace *c
}

func (manager *ClusterManager) ensureNamespace(ctx context.Context, name string) error {

if name == istioSystemNamespace {
// Some resources might be under the istio system namespace but we don't want to alter
// this namespace because it is managed by Istio
Expand Down Expand Up @@ -561,6 +576,27 @@ func (manager *ClusterManager) createOrUpdateDeployment(ctx context.Context, dep
return nil
}

func (manager *ClusterManager) createOrUpdateStatefulSet(ctx context.Context, statefulSet *appsv1.StatefulSet) error {
statefulSetClient := manager.kubernetesClient.clientSet.AppsV1().StatefulSets(statefulSet.Namespace)
existingStatefulSet, err := statefulSetClient.Get(ctx, statefulSet.Name, metav1.GetOptions{})
if err != nil {
_, err = statefulSetClient.Create(ctx, statefulSet, globalCreateOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to create statefulSet: %s", statefulSet.GetName())
}
} else {
if !deepCheckEqual(existingStatefulSet.Spec, statefulSet.Spec) {
updateStatefulSetWithRelevantValuesFromCurrentDeployment(statefulSet, existingStatefulSet)
_, err = statefulSetClient.Update(ctx, statefulSet, globalUpdateOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to update statefulSet: %s", statefulSet.GetName())
}
}
}

return nil
}

func updateDeploymentWithRelevantValuesFromCurrentDeployment(newDeployment *appsv1.Deployment, currentDeployment *appsv1.Deployment) {
newDeployment.ResourceVersion = currentDeployment.ResourceVersion
// merge annotations
Expand All @@ -577,6 +613,22 @@ func updateDeploymentWithRelevantValuesFromCurrentDeployment(newDeployment *apps
newDeployment.Spec.Template.Annotations = newAnnotations
}

func updateStatefulSetWithRelevantValuesFromCurrentDeployment(newStatefulSet *appsv1.StatefulSet, currentStatefulSet *appsv1.StatefulSet) {
newStatefulSet.ResourceVersion = currentStatefulSet.ResourceVersion
// merge annotations
newAnnotations := newStatefulSet.Spec.Template.GetAnnotations()
currentAnnotations := currentStatefulSet.Spec.Template.GetAnnotations()

for annotationKey, annotationValue := range currentAnnotations {
if annotationKey == telepresenceRestartedAtAnnotation {
// This key is necessary for Kardinal/Telepresence (https://www.telepresence.io/) integration
// keeping this annotation because otherwise the telepresence traffic-agent container will be removed from the pod
newAnnotations[annotationKey] = annotationValue
}
}
newStatefulSet.Spec.Template.Annotations = newAnnotations
}

func (manager *ClusterManager) createOrUpdateVirtualService(ctx context.Context, virtualService *v1alpha3.VirtualService) error {
virtServiceClient := manager.istioClient.clientSet.NetworkingV1alpha3().VirtualServices(virtualService.GetNamespace())

Expand Down Expand Up @@ -770,6 +822,24 @@ func (manager *ClusterManager) cleanUpDeploymentsInNamespace(ctx context.Context
return nil
}

func (manager *ClusterManager) cleanUpStatefulSetsInNamespace(ctx context.Context, namespace string, statefulSetsToKeep []appsv1.StatefulSet) error {
statefulSetClient := manager.kubernetesClient.clientSet.AppsV1().StatefulSets(namespace)
allstatefulSets, err := statefulSetClient.List(ctx, globalListOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to list statefulSets in namespace %s", namespace)
}
for _, statefulSet := range allstatefulSets.Items {
_, exists := lo.Find(statefulSetsToKeep, func(item appsv1.StatefulSet) bool { return item.Name == statefulSet.Name })
if !exists {
err = statefulSetClient.Delete(ctx, statefulSet.Name, globalDeleteOptions)
if err != nil {
return stacktrace.Propagate(err, "Failed to delete statefulSet %s", statefulSet.GetName())
}
}
}
return nil
}

func (manager *ClusterManager) cleanUpVirtualServicesInNamespace(ctx context.Context, namespace string, virtualServicesToKeep []v1alpha3.VirtualService) error {
virtServiceClient := manager.istioClient.clientSet.NetworkingV1alpha3().VirtualServices(namespace)
allVirtServices, err := virtServiceClient.List(ctx, globalListOptions)
Expand Down
Loading

0 comments on commit a1632d5

Please sign in to comment.