Skip to content

Commit

Permalink
separated sync and delete logic for Patroni resources
Browse files Browse the repository at this point in the history
  • Loading branch information
FxKu committed Aug 6, 2024
1 parent 0482af4 commit 74bc89b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 80 deletions.
5 changes: 1 addition & 4 deletions e2e/tests/k8s_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,7 @@ def count_services_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_service(namespace, label_selector=labels).items)

def count_endpoints_with_label(self, labels, namespace='default'):
eps = self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items
for ep in eps:
print("found endpoint: {}".format(ep.metadata.name))
return len(eps)
return len(self.api.core_v1.list_namespaced_endpoints(namespace, label_selector=labels).items)

def count_secrets_with_label(self, labels, namespace='default'):
return len(self.api.core_v1.list_namespaced_secret(namespace, label_selector=labels).items)
Expand Down
56 changes: 22 additions & 34 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apipolicyv1 "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -62,7 +61,8 @@ type Config struct {
type kubeResources struct {
Services map[PostgresRole]*v1.Service
Endpoints map[PostgresRole]*v1.Endpoints
ConfigMaps map[string]*v1.ConfigMap
PatroniEndpoints map[string]*v1.Endpoints
PatroniConfigMaps map[string]*v1.ConfigMap
Secrets map[types.UID]*v1.Secret
Statefulset *appsv1.StatefulSet
PodDisruptionBudget *policyv1.PodDisruptionBudget
Expand Down Expand Up @@ -135,11 +135,12 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
systemUsers: make(map[string]spec.PgUser),
podSubscribers: make(map[spec.NamespacedName]chan PodEvent),
kubeResources: kubeResources{
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints),
ConfigMaps: make(map[string]*v1.ConfigMap),
Streams: make(map[string]*zalandov1.FabricEventStream)},
Secrets: make(map[types.UID]*v1.Secret),
Services: make(map[PostgresRole]*v1.Service),
Endpoints: make(map[PostgresRole]*v1.Endpoints),
PatroniEndpoints: make(map[string]*v1.Endpoints),
PatroniConfigMaps: make(map[string]*v1.ConfigMap),
Streams: make(map[string]*zalandov1.FabricEventStream)},
userSyncStrategy: users.DefaultUserSyncStrategy{
PasswordEncryption: passwordEncryption,
RoleDeletionSuffix: cfg.OpConfig.RoleDeletionSuffix,
Expand Down Expand Up @@ -363,17 +364,8 @@ func (c *Cluster) Create() (err error) {
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "StatefulSet", "Pods are ready")

// sync resources created by Patroni
if c.patroniKubernetesUseConfigMaps() {
if err = c.syncConfigMaps(); err != nil {
c.logger.Warnf("Patroni configmaps not yet synced: %v", err)
}
} else {
if err = c.syncEndpoint(Patroni); err != nil {
err = fmt.Errorf("%s endpoint not yet synced: %v", Patroni, err)
}
}
if err = c.syncService(Patroni); err != nil {
err = fmt.Errorf("%s servic not yet synced: %v", Patroni, err)
if err = c.syncPatroniResources(); err != nil {
c.logger.Warnf("Patroni resources not yet synced: %v", err)
}

// create database objects unless we are running without pods or disabled
Expand Down Expand Up @@ -866,7 +858,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
return true, ""
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
//TODO: improve comparison
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
return false, "new PDB spec does not match the current one"
Expand Down Expand Up @@ -1201,32 +1193,28 @@ func (c *Cluster) Delete() error {
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err)
}

for _, role := range []PostgresRole{Master, Replica, Patroni} {
if err := c.deleteService(role); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s service: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
}

for _, role := range []PostgresRole{Master, Replica} {
if !c.patroniKubernetesUseConfigMaps() {
if err := c.deleteEndpoint(role); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s endpoint: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s endpoint: %v", role, err)
}
}
}

if c.patroniKubernetesUseConfigMaps() {
for _, suffix := range []string{"leader", "config", "sync", "failover"} {
if err := c.deletePatroniConfigMap(suffix); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s config map: %v", suffix, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s config map: %v", suffix, err)
}
if err := c.deleteService(role); err != nil {
anyErrors = true
c.logger.Warningf("could not delete %s service: %v", role, err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err)
}
}

if err := c.deletePatroniResources(); err != nil {
anyErrors = true
c.logger.Warningf("could not delete all Patroni resources: %v", err)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete all Patroni resources: %v", err)
}

// Delete connection pooler objects anyway, even if it's not mentioned in the
// manifest, just to not keep orphaned components in case if something went
// wrong
Expand Down
88 changes: 71 additions & 17 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,24 @@ func (c *Cluster) listResources() error {
c.logger.Infof("found secret: %q (uid: %q) namespace: %s", util.NameFromMeta(secret.ObjectMeta), secret.UID, secret.ObjectMeta.Namespace)
}

for role, service := range c.Services {
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
}

for role, endpoint := range c.Endpoints {
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
}

if c.patroniKubernetesUseConfigMaps() {
for suffix, configmap := range c.ConfigMaps {
c.logger.Infof("found %s config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID)
for suffix, configmap := range c.PatroniConfigMaps {
c.logger.Infof("found %s Patroni config map: %q (uid: %q)", suffix, util.NameFromMeta(configmap.ObjectMeta), configmap.UID)
}
} else {
for role, endpoint := range c.Endpoints {
c.logger.Infof("found %s endpoint: %q (uid: %q)", role, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
for suffix, endpoint := range c.PatroniEndpoints {
c.logger.Infof("found %s Patroni endpoint: %q (uid: %q)", suffix, util.NameFromMeta(endpoint.ObjectMeta), endpoint.UID)
}
}

for role, service := range c.Services {
c.logger.Infof("found %s service: %q (uid: %q)", role, util.NameFromMeta(service.ObjectMeta), service.UID)
}

pods, err := c.listPods()
if err != nil {
return fmt.Errorf("could not get the list of pods: %v", err)
Expand Down Expand Up @@ -510,23 +514,73 @@ func (c *Cluster) deleteEndpoint(role PostgresRole) error {
return nil
}

func (c *Cluster) deletePatroniResources() error {
c.setProcessName("deleting Patroni resources")
errors := make([]string, 0)

if err := c.deleteService(Patroni); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}

for _, suffix := range patroniObjectSuffixes {
if c.patroniKubernetesUseConfigMaps() {
if err := c.deletePatroniConfigMap(suffix); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
} else {
if err := c.deletePatroniEndpoint(suffix); err != nil {
errors = append(errors, fmt.Sprintf("%v", err))
}
}
}

if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) deletePatroniConfigMap(suffix string) error {
c.setProcessName("deleting config map")
c.logger.Debugln("deleting config map")
if c.ConfigMaps[suffix] == nil {
c.logger.Debugf("there is no %s config map in the cluster", suffix)
c.setProcessName("deleting Patroni config map")
c.logger.Debugln("deleting Patroni config map")
cm := c.PatroniConfigMaps[suffix]
if cm == nil {
c.logger.Debugf("there is no %s Patroni config map in the cluster", suffix)
return nil
}

if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, c.deleteOptions); err != nil {
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not delete %s Patroni config map %q: %v", suffix, cm.Name, err)
}
c.logger.Debugf("%s Patroni config map has already been deleted", suffix)
}

c.logger.Infof("%s Patroni config map %q has been deleted", suffix, util.NameFromMeta(cm.ObjectMeta))
delete(c.PatroniConfigMaps, suffix)

return nil
}

func (c *Cluster) deletePatroniEndpoint(suffix string) error {
c.setProcessName("deleting Patroni endpoint")
c.logger.Debugln("deleting Patroni endpoint")
ep := c.PatroniEndpoints[suffix]
if ep == nil {
c.logger.Debugf("there is no %s Patroni endpoint in the cluster", suffix)
return nil
}

if err := c.KubeClient.ConfigMaps(c.ConfigMaps[suffix].Namespace).Delete(context.TODO(), c.ConfigMaps[suffix].Name, c.deleteOptions); err != nil {
if err := c.KubeClient.Endpoints(ep.Namespace).Delete(context.TODO(), ep.Name, c.deleteOptions); err != nil {
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not delete %s config map %q: %v", suffix, c.ConfigMaps[suffix].Name, err)
return fmt.Errorf("could not delete %s Patroni endpoint %q: %v", suffix, ep.Name, err)
}
c.logger.Debugf("%s config map has already been deleted", suffix)
c.logger.Debugf("%s Patroni endpoint has already been deleted", suffix)
}

c.logger.Infof("%s config map %q has been deleted", suffix, util.NameFromMeta(c.ConfigMaps[suffix].ObjectMeta))
delete(c.ConfigMaps, suffix)
c.logger.Infof("%s Patroni endpoint %q has been deleted", suffix, util.NameFromMeta(ep.ObjectMeta))
delete(c.PatroniEndpoints, suffix)

return nil
}
Expand Down
73 changes: 48 additions & 25 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
return err
}

if c.patroniKubernetesUseConfigMaps() {
if err = c.syncConfigMaps(); err != nil {
c.logger.Errorf("could not sync config maps: %v", err)
}
if err = c.syncPatroniResources(); err != nil {
c.logger.Errorf("could not sync Patroni resources: %v", err)
}

// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
Expand Down Expand Up @@ -179,40 +177,74 @@ func (c *Cluster) syncFinalizer() error {
return nil
}

func (c *Cluster) syncConfigMaps() error {
for _, suffix := range []string{"leader", "config", "sync", "failover"} {
if err := c.syncConfigMap(suffix); err != nil {
return fmt.Errorf("could not sync %s config map: %v", suffix, err)
func (c *Cluster) syncPatroniResources() error {
errors := make([]string, 0)

if err := c.syncService(Patroni); err != nil {
errors = append(errors, fmt.Sprintf("could not sync %s service: %v", Patroni, err))
}

for _, suffix := range patroniObjectSuffixes {
if c.patroniKubernetesUseConfigMaps() {
if err := c.syncPatroniConfigMap(suffix); err != nil {
errors = append(errors, fmt.Sprintf("could not sync %s Patroni config map: %v", suffix, err))
}
} else {
if err := c.syncPatroniEndpoint(suffix); err != nil {
errors = append(errors, fmt.Sprintf("could not sync %s Patroni endpoint: %v", suffix, err))
}
}
}

if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) syncConfigMap(suffix string) error {
func (c *Cluster) syncPatroniConfigMap(suffix string) error {
var (
cm *v1.ConfigMap
err error
)
name := fmt.Sprintf("%s-%s", c.Name, suffix)
c.logger.Debugf("syncing %s config map", name)
c.setProcessName("syncing %s config map", name)
c.logger.Debugf("syncing %s Patroni config map", name)
c.setProcessName("syncing %s Patroni config map", name)

if cm, err = c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil {
c.ConfigMaps[suffix] = cm
c.PatroniConfigMaps[suffix] = cm
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s config map: %v", suffix, err)
return fmt.Errorf("could not get %s Patroni config map: %v", suffix, err)
}

return nil
}

func (c *Cluster) syncPatroniEndpoint(suffix string) error {
var (
ep *v1.Endpoints
err error
)
name := fmt.Sprintf("%s-%s", c.Name, suffix)
c.logger.Debugf("syncing %s Patroni endpoint", name)
c.setProcessName("syncing %s Patroni endpoint", name)

if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), name, metav1.GetOptions{}); err == nil {
c.PatroniEndpoints[suffix] = ep
return nil
}
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s Patroni endpoint: %v", suffix, err)
}
// no existing config map, Patroni will handle it
c.ConfigMaps[suffix] = nil

return nil
}

func (c *Cluster) syncServices() error {
for _, role := range []PostgresRole{Master, Replica, Patroni} {
for _, role := range []PostgresRole{Master, Replica} {
c.logger.Debugf("syncing %s service", role)

if !c.patroniKubernetesUseConfigMaps() {
Expand Down Expand Up @@ -284,11 +316,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
c.setProcessName("syncing %s endpoint", role)

if ep, err = c.KubeClient.Endpoints(c.Namespace).Get(context.TODO(), c.serviceName(role), metav1.GetOptions{}); err == nil {
c.Endpoints[role] = ep
// do not touch config endpoint managed by Patroni
if role == Patroni {
return nil
}
desiredEp := c.generateEndpoint(role, ep.Subsets)
if changed, _ := c.compareAnnotations(ep.Annotations, desiredEp.Annotations); changed {
patchData, err := metaAnnotationsPatch(desiredEp.Annotations)
Expand All @@ -306,10 +333,6 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error {
if !k8sutil.ResourceNotFound(err) {
return fmt.Errorf("could not get %s endpoint: %v", role, err)
}
// if config endpoint does not exist Patroni will create it
if role == Patroni {
return nil
}
// no existing endpoint, create new one
c.Endpoints[role] = nil
c.logger.Infof("could not find the cluster's %s endpoint", role)
Expand Down

0 comments on commit 74bc89b

Please sign in to comment.