Skip to content

Commit

Permalink
Implement per-cluster maintenance window for Postgres automatic upgra…
Browse files Browse the repository at this point in the history
…de (#2710)

* implement maintenance window for major version upgrade 
* e2e test: fix major version upgrade test and extend with the time window
* unit test: add iteration to test isInMaintenanceWindow
* UI: show the window and enable edit via UI
  • Loading branch information
idanovinda committed Aug 9, 2024
1 parent ce15d10 commit e6ae9e3
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 21 deletions.
6 changes: 6 additions & 0 deletions docs/reference/cluster_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ These parameters are grouped directly under the `spec` key in the manifest.
this parameter. Optional, when empty the load balancer service becomes
inaccessible from outside of the Kubernetes cluster.

* **maintenanceWindows**
a list defines specific time frames when major version upgrades are permitted
to occur, restricting major version upgrades to these designated periods only.
Accepted formats include "01:00-06:00" for daily maintenance windows or
"Sat:00:00-04:00" for specific days, with all times in UTC.

* **users**
a map of usernames to user flags for the users that should be created in the
cluster by the operator. User flags are a list, allowed elements are
Expand Down
2 changes: 0 additions & 2 deletions e2e/tests/k8s_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
pods_with_update_flag = self.count_pods_with_rolling_update_flag(labels, namespace)

while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
Expand Down Expand Up @@ -525,7 +524,6 @@ def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
pod_phase = 'Failing over'
new_pod_node = ''
pods_with_update_flag = self.count_pods_with_rolling_update_flag(labels, namespace)

while (pod_phase != 'Running') or (new_pod_node not in failover_targets):
pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items
if pods:
Expand Down
87 changes: 76 additions & 11 deletions e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

SPILO_CURRENT = "registry.opensource.zalan.do/acid/spilo-16-e2e:0.1"
SPILO_LAZY = "registry.opensource.zalan.do/acid/spilo-16-e2e:0.2"
SPILO_FULL_IMAGE = "ghcr.io/zalando/spilo-16:3.2-p3"


def to_selector(labels):
Expand Down Expand Up @@ -115,6 +116,7 @@ def setUpClass(cls):
configmap = yaml.safe_load(f)
configmap["data"]["workers"] = "1"
configmap["data"]["docker_image"] = SPILO_CURRENT
configmap["data"]["major_version_upgrade_mode"] = "full"

with open("manifests/configmap.yaml", 'w') as f:
yaml.dump(configmap, f, Dumper=yaml.Dumper)
Expand Down Expand Up @@ -1181,31 +1183,94 @@ def get_docker_image():
self.eventuallyEqual(lambda: len(k8s.get_patroni_running_members("acid-minimal-cluster-0")), 2, "Postgres status did not enter running")

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
@unittest.skip("Skipping this test until fixed")
def test_major_version_upgrade(self):
"""
Test major version upgrade
"""
def check_version():
p = k8s.patroni_rest("acid-upgrade-test-0", "")
version = p.get("server_version", 0) // 10000
return version

k8s = self.k8s
result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml")
self.eventuallyEqual(lambda: k8s.count_running_pods(labels="application=spilo,cluster-name=acid-upgrade-test"), 2, "No 2 pods running")
cluster_label = 'application=spilo,cluster-name=acid-upgrade-test'

with open("manifests/minimal-postgres-manifest-12.yaml", 'r+') as f:
upgrade_manifest = yaml.safe_load(f)
upgrade_manifest["spec"]["dockerImage"] = SPILO_FULL_IMAGE

with open("manifests/minimal-postgres-manifest-12.yaml", 'w') as f:
yaml.dump(upgrade_manifest, f, Dumper=yaml.Dumper)

k8s.create_with_kubectl("manifests/minimal-postgres-manifest-12.yaml")
self.eventuallyEqual(lambda: k8s.count_running_pods(labels=cluster_label), 2, "No 2 pods running")
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
self.eventuallyEqual(check_version, 12, "Version is not correct")

pg_patch_version = {
master_nodes, _ = k8s.get_cluster_nodes(cluster_labels=cluster_label)
# should upgrade immediately
pg_patch_version_14 = {
"spec": {
"postgres": {
"postgresql": {
"version": "14"
}
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version)
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_14)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# should have finish failover
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 14, "Version should be upgraded from 12 to 14")

# should not upgrade because current time is not in maintenanceWindow
current_time = datetime.now()
maintenance_window_future = f"{(current_time+timedelta(minutes=60)).strftime('%H:%M')}-{(current_time+timedelta(minutes=120)).strftime('%H:%M')}"
pg_patch_version_15 = {
"spec": {
"postgresql": {
"version": "15"
},
"maintenanceWindows": [
maintenance_window_future
]
}
}
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_15)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

def check_version_14():
p = k8s.get_patroni_state("acid-upgrade-test-0")
version = p["server_version"][0:2]
return version
# should have finish failover
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 14, "Version should not be upgraded")

# change the version again to trigger operator sync
maintenance_window_current = f"{(current_time-timedelta(minutes=30)).strftime('%H:%M')}-{(current_time+timedelta(minutes=30)).strftime('%H:%M')}"
pg_patch_version_16 = {
"spec": {
"postgresql": {
"version": "16"
},
"maintenanceWindows": [
maintenance_window_current
]
}
}

self.eventuallyEqual(check_version_14, "14", "Version was not upgrade to 14")
k8s.api.custom_objects_api.patch_namespaced_custom_object(
"acid.zalan.do", "v1", "default", "postgresqls", "acid-upgrade-test", pg_patch_version_16)
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")

# should have finish failover
k8s.wait_for_pod_failover(master_nodes, 'spilo-role=replica,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=master,' + cluster_label)
k8s.wait_for_pod_start('spilo-role=replica,' + cluster_label)
self.eventuallyEqual(check_version, 16, "Version should be upgraded from 14 to 16")

@timeout_decorator.timeout(TEST_TIMEOUT_SEC)
def test_persistent_volume_claim_retention_policy(self):
Expand Down
5 changes: 5 additions & 0 deletions pkg/cluster/majorversionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}

if !c.isInMainternanceWindow() {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}

pods, err := c.listPods()
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions pkg/cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,24 @@ func parseResourceRequirements(resourcesRequirement v1.ResourceRequirements) (ac
}
return resources, nil
}

func (c *Cluster) isInMainternanceWindow() bool {
if c.Spec.MaintenanceWindows == nil {
return true
}
now := time.Now()
currentDay := now.Weekday()
currentTime := now.Format("15:04")

for _, window := range c.Spec.MaintenanceWindows {
startTime := window.StartTime.Format("15:04")
endTime := window.EndTime.Format("15:04")

if window.Everyday || window.Weekday == currentDay {
if currentTime >= startTime && currentTime <= endTime {
return true
}
}
}
return false
}
89 changes: 89 additions & 0 deletions pkg/cluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import (

var externalAnnotations = map[string]string{"existing": "annotation"}

func mustParseTime(s string) metav1.Time {
v, err := time.Parse("15:04", s)
if err != nil {
panic(err)
}

return metav1.Time{Time: v.UTC()}
}

func newFakeK8sAnnotationsClient() (k8sutil.KubernetesClient, *k8sFake.Clientset) {
clientSet := k8sFake.NewSimpleClientset()
acidClientSet := fakeacidv1.NewSimpleClientset()
Expand Down Expand Up @@ -521,3 +530,83 @@ func Test_trimCronjobName(t *testing.T) {
})
}
}

func TestIsInMaintenanceWindow(t *testing.T) {
client, _ := newFakeK8sStreamClient()

var cluster = New(
Config{
OpConfig: config.Config{
PodManagementPolicy: "ordered_ready",
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
},
},
}, client, pg, logger, eventRecorder)

now := time.Now()
futureTimeStart := now.Add(1 * time.Hour)
futureTimeStartFormatted := futureTimeStart.Format("15:04")
futureTimeEnd := now.Add(2 * time.Hour)
futureTimeEndFormatted := futureTimeEnd.Format("15:04")

tests := []struct {
name string
windows []acidv1.MaintenanceWindow
expected bool
}{
{
name: "no maintenance windows",
windows: nil,
expected: true,
},
{
name: "maintenance windows with everyday",
windows: []acidv1.MaintenanceWindow{
{
Everyday: true,
StartTime: mustParseTime("00:00"),
EndTime: mustParseTime("23:59"),
},
},
expected: true,
},
{
name: "maintenance windows with weekday",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime("00:00"),
EndTime: mustParseTime("23:59"),
},
},
expected: true,
},
{
name: "maintenance windows with future interval time",
windows: []acidv1.MaintenanceWindow{
{
Weekday: now.Weekday(),
StartTime: mustParseTime(futureTimeStartFormatted),
EndTime: mustParseTime(futureTimeEndFormatted),
},
},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster.Spec.MaintenanceWindows = tt.windows
if cluster.isInMainternanceWindow() != tt.expected {
t.Errorf("Expected isInMainternanceWindow to return %t", tt.expected)
}
})
}
}
8 changes: 0 additions & 8 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,21 +384,13 @@ func (c *Controller) warnOnDeprecatedPostgreSQLSpecParameters(spec *acidv1.Postg
c.logger.Warningf("parameter %q is deprecated. Consider setting %q instead", deprecated, replacement)
}

noeffect := func(param string, explanation string) {
c.logger.Warningf("parameter %q takes no effect. %s", param, explanation)
}

if spec.UseLoadBalancer != nil {
deprecate("useLoadBalancer", "enableMasterLoadBalancer")
}
if spec.ReplicaLoadBalancer != nil {
deprecate("replicaLoadBalancer", "enableReplicaLoadBalancer")
}

if len(spec.MaintenanceWindows) > 0 {
noeffect("maintenanceWindows", "Not implemented.")
}

if (spec.UseLoadBalancer != nil || spec.ReplicaLoadBalancer != nil) &&
(spec.EnableReplicaLoadBalancer != nil || spec.EnableMasterLoadBalancer != nil) {
c.logger.Warnf("both old and new load balancer parameters are present in the manifest, ignoring old ones")
Expand Down
1 change: 1 addition & 0 deletions ui/app/src/edit.tag.pug
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ edit
o.spec.enableReplicaConnectionPooler = i.spec.enableReplicaConnectionPooler || false
o.spec.enableMasterPoolerLoadBalancer = i.spec.enableMasterPoolerLoadBalancer || false
o.spec.enableReplicaPoolerLoadBalancer = i.spec.enableReplicaPoolerLoadBalancer || false
o.spec.maintenanceWindows = i.spec.maintenanceWindows || []

o.spec.volume = {
size: i.spec.volume.size,
Expand Down
12 changes: 12 additions & 0 deletions ui/app/src/new.tag.pug
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ new
{{#if enableReplicaPoolerLoadBalancer}}
enableReplicaPoolerLoadBalancer: true
{{/if}}
{{#if maintenanceWindows}}
maintenanceWindows:
{{#each maintenanceWindows}}
- "{{ this }}"
{{/each}}
{{/if}}
volume:
size: "{{ volumeSize }}Gi"{{#if volumeStorageClass}}
storageClass: "{{ volumeStorageClass }}"{{/if}}{{#if iops}}
Expand Down Expand Up @@ -651,6 +657,7 @@ new
enableReplicaConnectionPooler: this.enableReplicaConnectionPooler,
enableMasterPoolerLoadBalancer: this.enableMasterPoolerLoadBalancer,
enableReplicaPoolerLoadBalancer: this.enableReplicaPoolerLoadBalancer,
maintenanceWindows: this.maintenanceWindows,
volumeSize: this.volumeSize,
volumeStorageClass: this.volumeStorageClass,
iops: this.iops,
Expand Down Expand Up @@ -727,6 +734,10 @@ new
this.enableReplicaPoolerLoadBalancer = !this.enableReplicaPoolerLoadBalancer
}

this.maintenanceWindows = e => {
this.maintenanceWindows = e.target.value
}

this.volumeChange = e => {
this.volumeSize = +e.target.value
}
Expand Down Expand Up @@ -1042,6 +1053,7 @@ new
this.enableReplicaConnectionPooler = false
this.enableMasterPoolerLoadBalancer = false
this.enableReplicaPoolerLoadBalancer = false
this.maintenanceWindows = {}

this.postgresqlVersion = this.postgresqlVersion = (
this.config.postgresql_versions[0]
Expand Down
6 changes: 6 additions & 0 deletions ui/operator_ui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ def get_postgresqls():
'status': status,
'num_elb': spec.get('enableMasterLoadBalancer', 0) + spec.get('enableReplicaLoadBalancer', 0) + \
spec.get('enableMasterPoolerLoadBalancer', 0) + spec.get('enableReplicaPoolerLoadBalancer', 0),
'maintenance_windows': spec.get('maintenanceWindows', []),
}
for cluster in these(
read_postgresqls(
Expand Down Expand Up @@ -566,6 +567,11 @@ def update_postgresql(namespace: str, cluster: str):
return fail('allowedSourceRanges invalid')
spec['allowedSourceRanges'] = postgresql['spec']['allowedSourceRanges']

if 'maintenanceWindows' in postgresql['spec']:
if not isinstance(postgresql['spec']['maintenanceWindows'], list):
return fail('maintenanceWindows invalid')
spec['maintenanceWindows'] = postgresql['spec']['maintenanceWindows']

if 'numberOfInstances' in postgresql['spec']:
if not isinstance(postgresql['spec']['numberOfInstances'], int):
return fail('numberOfInstances invalid')
Expand Down

0 comments on commit e6ae9e3

Please sign in to comment.