From 74bc89b5837c169b3bb70e74f6a607aecd35f65e Mon Sep 17 00:00:00 2001 From: Felix Kunde Date: Tue, 6 Aug 2024 17:54:50 +0200 Subject: [PATCH] separated sync and delete logic for Patroni resources --- e2e/tests/k8s_api.py | 5 +-- pkg/cluster/cluster.go | 56 ++++++++++--------------- pkg/cluster/resources.go | 88 ++++++++++++++++++++++++++++++++-------- pkg/cluster/sync.go | 73 +++++++++++++++++++++------------ 4 files changed, 142 insertions(+), 80 deletions(-) diff --git a/e2e/tests/k8s_api.py b/e2e/tests/k8s_api.py index 2b5aa289c..276ddfa25 100644 --- a/e2e/tests/k8s_api.py +++ b/e2e/tests/k8s_api.py @@ -188,10 +188,7 @@ def count_services_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items) def count_endpoints_with_label(self, labels, namespace='default'): - eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items - for ep in eps: - print("found endpoint: {}".format(ep.metadata.name)) - return len(eps) + return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items) def count_secrets_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index f08fdc342..b131b59a5 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -30,7 +30,6 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" - apipolicyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,7 +61,8 @@ type Config struct { type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints - ConfigMaps map[string]*v1.ConfigMap + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet PodDisruptionBudget *policyv1.PodDisruptionBudget @@ -135,11 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres systemUsers: make(map[string]spec.PgUser), podSubscribers: make(map[spec.NamespacedName]chan PodEvent), kubeResources: kubeResources{ - Secrets: make(map[types.UID]*v1.Secret), - Services: make(map[PostgresRole]*v1.Service), - Endpoints: make(map[PostgresRole]*v1.Endpoints), - ConfigMaps: make(map[string]*v1.ConfigMap), - Streams: make(map[string]*zalandov1.FabricEventStream)}, + Secrets: make(map[types.UID]*v1.Secret), + Services: make(map[PostgresRole]*v1.Service), + Endpoints: make(map[PostgresRole]*v1.Endpoints), + PatroniEndpoints: make(map[string]*v1.Endpoints), + PatroniConfigMaps: make(map[string]*v1.ConfigMap), + Streams: make(map[string]*zalandov1.FabricEventStream)}, userSyncStrategy: users.DefaultUserSyncStrategy{ PasswordEncryption: passwordEncryption, RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix, @@ -363,17 +364,8 @@ func (c *Cluster) Create() (err error) { c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready") // sync resources created by Patroni - if c.patroniKubernetesUseConfigMaps() { - if err = c.syncConfigMaps(); err != nil { - c.logger.Warnf("Patroni configmaps not yet synced: %v", err) - } - } else { - if err = c.syncEndpoint(Patroni); err != nil { - err = fmt.Errorf("%s endpoint not yet synced: %v", Patroni, err) - } - } - if err = c.syncService(Patroni); err != nil { - err = fmt.Errorf("%s servic not yet synced: %v", Patroni, err) + if err = c.syncPatroniResources(); err != nil { + c.logger.Warnf("Patroni resources not yet synced: %v", err) } // create database objects unless we are running without pods or disabled @@ -866,7 +858,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool return true, "" } -func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) { +func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) { //TODO: improve comparison if match := reflect.DeepEqual(new.Spec, cur.Spec); !match { return false, "new PDB spec does not match the current one" @@ -1201,13 +1193,7 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) } - for _, role := range []PostgresRole{Master, Replica, Patroni} { - if err := c.deleteService(role); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s service: %v", role, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) - } - + for _, role := range []PostgresRole{Master, Replica} { if !c.patroniKubernetesUseConfigMaps() { if err := c.deleteEndpoint(role); err != nil { anyErrors = true @@ -1215,18 +1201,20 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err) } } - } - if c.patroniKubernetesUseConfigMaps() { - for _, suffix := range []string{"leader", "config", "sync", "failover"} { - if err := c.deletePatroniConfigMap(suffix); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s config map: %v", suffix, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s config map: %v", suffix, err) - } + if err := c.deleteService(role); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) } } + if err := c.deletePatroniResources(); err != nil { + anyErrors = true + c.logger.Warningf("could not delete all Patroni resources: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err) + } + // Delete connection pooler objects anyway, even if it's not mentioned in the // manifest, just to not keep orphaned components in case if something went // wrong diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 93c44c28b..30a97169e 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -43,20 +43,24 @@ func (c *Cluster) listResources() error { c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace) } + for role, service := range c.Services { + c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) + } + + for role, endpoint := range c.Endpoints { + c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) + } + if c.patroniKubernetesUseConfigMaps() { - for suffix, configmap := range c.ConfigMaps { - c.logger.Infof("found %s config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID) + for suffix, configmap := range c.PatroniConfigMaps { + c.logger.Infof("found %s Patroni config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID) } } else { - for role, endpoint := range c.Endpoints { - c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) + for suffix, endpoint := range c.PatroniEndpoints { + c.logger.Infof("found %s Patroni endpoint: %q (uid: %q)", suffix, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID) } } - for role, service := range c.Services { - c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID) - } - pods, err := c.listPods() if err != nil { return fmt.Errorf("could not get the list of pods: %v", err) @@ -510,23 +514,73 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error { return nil } +func (c *Cluster) deletePatroniResources() error { + c.setProcessName("deleting Patroni resources") + errors := make([]string, 0) + + if err := c.deleteService(Patroni); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + for _, suffix := range patroniObjectSuffixes { + if c.patroniKubernetesUseConfigMaps() { + if err := c.deletePatroniConfigMap(suffix); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } else { + if err := c.deletePatroniEndpoint(suffix); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + } + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + + return nil +} + func (c *Cluster) deletePatroniConfigMap(suffix string) error { - c.setProcessName("deleting config map") - c.logger.Debugln("deleting config map") - if c.ConfigMaps[suffix] == nil { - c.logger.Debugf("there is no %s config map in the cluster", suffix) + c.setProcessName("deleting Patroni config map") + c.logger.Debugln("deleting Patroni config map") + cm := c.PatroniConfigMaps[suffix] + if cm == nil { + c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix) + return nil + } + + if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, c.deleteOptions); err != nil { + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not delete %s Patroni config map %q: %v", suffix, cm.Name, err) + } + c.logger.Debugf("%s Patroni config map has already been deleted", suffix) + } + + c.logger.Infof("%s Patroni config map %q has been deleted", suffix, util.NameFromMeta(cm.ObjectMeta)) + delete(c.PatroniConfigMaps, suffix) + + return nil +} + +func (c *Cluster) deletePatroniEndpoint(suffix string) error { + c.setProcessName("deleting Patroni endpoint") + c.logger.Debugln("deleting Patroni endpoint") + ep := c.PatroniEndpoints[suffix] + if ep == nil { + c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix) return nil } - if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil { + if err := c.KubeClient.Endpoints(ep.Namespace).Delete(context.TODO(), ep.Name, c.deleteOptions); err != nil { if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not delete %s config map %q: %v", suffix, c.ConfigMaps[suffix].Name, err) + return fmt.Errorf("could not delete %s Patroni endpoint %q: %v", suffix, ep.Name, err) } - c.logger.Debugf("%s config map has already been deleted", suffix) + c.logger.Debugf("%s Patroni endpoint has already been deleted", suffix) } - c.logger.Infof("%s config map %q has been deleted", suffix, util.NameFromMeta(c.ConfigMaps[suffix].ObjectMeta)) - delete(c.ConfigMaps, suffix) + c.logger.Infof("%s Patroni endpoint %q has been deleted", suffix, util.NameFromMeta(ep.ObjectMeta)) + delete(c.PatroniEndpoints, suffix) return nil } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 24e469d43..fb95eda1b 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -80,10 +80,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } - if c.patroniKubernetesUseConfigMaps() { - if err = c.syncConfigMaps(); err != nil { - c.logger.Errorf("could not sync config maps: %v", err) - } + if err = c.syncPatroniResources(); err != nil { + c.logger.Errorf("could not sync Patroni resources: %v", err) } // sync volume may already transition volumes to gp3, if iops/throughput or type is specified @@ -179,40 +177,74 @@ func (c *Cluster) syncFinalizer() error { return nil } -func (c *Cluster) syncConfigMaps() error { - for _, suffix := range []string{"leader", "config", "sync", "failover"} { - if err := c.syncConfigMap(suffix); err != nil { - return fmt.Errorf("could not sync %s config map: %v", suffix, err) +func (c *Cluster) syncPatroniResources() error { + errors := make([]string, 0) + + if err := c.syncService(Patroni); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s service: %v", Patroni, err)) + } + + for _, suffix := range patroniObjectSuffixes { + if c.patroniKubernetesUseConfigMaps() { + if err := c.syncPatroniConfigMap(suffix); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s Patroni config map: %v", suffix, err)) + } + } else { + if err := c.syncPatroniEndpoint(suffix); err != nil { + errors = append(errors, fmt.Sprintf("could not sync %s Patroni endpoint: %v", suffix, err)) + } } } + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil } -func (c *Cluster) syncConfigMap(suffix string) error { +func (c *Cluster) syncPatroniConfigMap(suffix string) error { var ( cm *v1.ConfigMap err error ) name := fmt.Sprintf("%s-%s", c.Name, suffix) - c.logger.Debugf("syncing %s config map", name) - c.setProcessName("syncing %s config map", name) + c.logger.Debugf("syncing %s Patroni config map", name) + c.setProcessName("syncing %s Patroni config map", name) if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { - c.ConfigMaps[suffix] = cm + c.PatroniConfigMaps[suffix] = cm return nil } if !k8sutil.ResourceNotFound(err) { - return fmt.Errorf("could not get %s config map: %v", suffix, err) + return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err) + } + + return nil +} + +func (c *Cluster) syncPatroniEndpoint(suffix string) error { + var ( + ep *v1.Endpoints + err error + ) + name := fmt.Sprintf("%s-%s", c.Name, suffix) + c.logger.Debugf("syncing %s Patroni endpoint", name) + c.setProcessName("syncing %s Patroni endpoint", name) + + if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil { + c.PatroniEndpoints[suffix] = ep + return nil + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err) } - // no existing config map, Patroni will handle it - c.ConfigMaps[suffix] = nil return nil } func (c *Cluster) syncServices() error { - for _, role := range []PostgresRole{Master, Replica, Patroni} { + for _, role := range []PostgresRole{Master, Replica} { c.logger.Debugf("syncing %s service", role) if !c.patroniKubernetesUseConfigMaps() { @@ -284,11 +316,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { c.setProcessName("syncing %s endpoint", role) if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil { - c.Endpoints[role] = ep - // do not touch config endpoint managed by Patroni - if role == Patroni { - return nil - } desiredEp := c.generateEndpoint(role, ep.Subsets) if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed { patchData, err := metaAnnotationsPatch(desiredEp.Annotations) @@ -306,10 +333,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { if !k8sutil.ResourceNotFound(err) { return fmt.Errorf("could not get %s endpoint: %v", role, err) } - // if config endpoint does not exist Patroni will create it - if role == Patroni { - return nil - } // no existing endpoint, create new one c.Endpoints[role] = nil c.logger.Infof("could not find the cluster's %s endpoint", role)