Skip to content

Commit

Permalink
Refactor node group state tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
mnaser committed Jun 11, 2024
1 parent e41f610 commit 00cd894
Showing 1 changed file with 82 additions and 62 deletions.
144 changes: 82 additions & 62 deletions magnum_cluster_api/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,38 @@ def _get_cluster_status_reason(self, capi_cluster):
capi_ops_cluster_status_reason,
)

def update_cluster_control_plane_status(
self,
context,
cluster: magnum_objects.Cluster,
):
nodegroup = cluster.default_ng_master
action = nodegroup.status.split("_")[0]

kcp = resources.get_kubeadm_control_plane(self.k8s_api, cluster)
if kcp is None:
return nodegroup

generation = kcp.obj.get("status", {}).get("observedGeneration", 1)
if generation > 1:
action = "UPDATE"

ready = kcp.obj["status"].get("ready", False)
failure_message = kcp.obj["status"].get("failureMessage")

updated_replicas = kcp.obj["status"].get("updatedReplicas")
replicas = kcp.obj["status"].get("replicas")

if updated_replicas != replicas:
nodegroup.status = f"{action}_IN_PROGRESS"
elif updated_replicas == replicas and ready:
nodegroup.status = f"{action}_COMPLETE"
nodegroup.status_reason = failure_message

nodegroup.save()

return nodegroup

@cluster_lock_wrapper
def update_cluster_status(
self, context, cluster: magnum_objects.Cluster, use_admin_ctx: bool = False
Expand All @@ -137,11 +169,10 @@ def update_cluster_status(
# need to refresh it to make sure we have the latest data.
cluster.refresh()

node_groups = [
self.update_nodegroup_status(context, cluster, node_group)
for node_group in cluster.nodegroups
]
# TODO: watch for topology change instead
node_groups = [
self.update_cluster_control_plane_status(context, cluster)
] + self.update_nodegroups_status(context, cluster)
osc = clients.get_openstack_api(context)

capi_cluster = resources.Cluster(context, self.k8s_api, cluster).get_or_none()
Expand Down Expand Up @@ -259,7 +290,7 @@ def resize_cluster(
nodegroup: magnum_objects.NodeGroup = None,
):
"""
Resize cluster.
Resize cluster (primarily add or remove nodes).
The cluster object passed to this method is already not in `UPDATE_IN_PROGRESS`
state and the node group object passed to this method is in `UPDATE_IN_PROGRESS`
Expand Down Expand Up @@ -397,57 +428,58 @@ def create_nodegroup(

utils.kube_apply_patch(cluster_resource)

def update_nodegroup_status(
self,
context,
cluster: magnum_objects.Cluster,
nodegroup: magnum_objects.NodeGroup,
):
action = nodegroup.status.split("_")[0]
def update_nodegroups_status(
self, context, cluster: magnum_objects.Cluster
) -> list[magnum_objects.NodeGroup]:
node_groups = []

if nodegroup.role == "master":
kcp = resources.get_kubeadm_control_plane(self.k8s_api, cluster)
if kcp is None:
return nodegroup
for node_group in cluster.nodegroups:
cluster_resource = objects.Cluster.for_magnum_cluster(self.k8s_api, cluster)

generation = kcp.obj.get("status", {}).get("observedGeneration", 1)
if generation > 1:
action = "UPDATE"
md = objects.MachineDeployment.for_node_group_or_none(
self.k8s_api, cluster, node_group
)
md_is_running = (
md is not None and md.obj.get("status", {}).get("phase") == "Running"
)

ready = kcp.obj["status"].get("ready", False)
failure_message = kcp.obj["status"].get("failureMessage")
# NOTE(mnaser): If the node group is in `CREATE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to be hit the `Running` phase
# before we can mark the node group as `CREATE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.CREATE_IN_PROGRESS
and md_is_running
):
node_group.status = fields.ClusterStatus.CREATE_COMPLETE
node_group.save()

updated_replicas = kcp.obj["status"].get("updatedReplicas")
replicas = kcp.obj["status"].get("replicas")
# NOTE(mnaser): If the cluster is in `UPDATE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to match the desired state
# from the `Cluster` resource and that it is in the `Running`
# phase before we can mark the node group as `UPDATE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.UPDATE_IN_PROGRESS
and md_is_running
and md.equals_spec(
cluster_resource.get_machine_deployment_spec(node_group.name)
)
):
node_group.status = fields.ClusterStatus.UPDATE_COMPLETE
node_group.save()

if updated_replicas != replicas:
nodegroup.status = f"{action}_IN_PROGRESS"
elif updated_replicas == replicas and ready:
nodegroup.status = f"{action}_COMPLETE"
nodegroup.status_reason = failure_message
else:
md = objects.MachineDeployment.for_node_group_or_none(
self.k8s_api, cluster, nodegroup
)
if md is None:
if action == "DELETE":
nodegroup.status = f"{action}_COMPLETE"
nodegroup.save()
return nodegroup
return nodegroup

phase = md.obj["status"]["phase"]

if phase in ("ScalingUp", "ScalingDown"):
nodegroup.status = f"{action}_IN_PROGRESS"
elif phase == "Running" and action != "DELETE":
nodegroup.status = f"{action}_COMPLETE"
elif phase in ("Failed", "Unknown"):
nodegroup.status = f"{action}_FAILED"
# NOTE(mnaser): If the cluster is in `DELETE_IN_PROGRESS` state, we need to
# wait for the `MachineDeployment` to be deleted before we can
# mark the node group as `DELETE_COMPLETE`.
if (
node_group.status == fields.ClusterStatus.DELETE_IN_PROGRESS
and md is None
):
node_group.status = fields.ClusterStatus.DELETE_COMPLETE
node_group.save()

nodegroup.save()
node_groups.append(node_group)

return nodegroup
return node_groups

@cluster_lock_wrapper
def update_nodegroup(
Expand All @@ -457,7 +489,7 @@ def update_nodegroup(
nodegroup: magnum_objects.NodeGroup,
):
"""
Update node group.
Update node group (primarily resize it)
This cluster object passed to this method is already in `UPDATE_IN_PROGRESS` state
and the node group object passed to this method is in `UPDATE_IN_PROGRESS` state
Expand Down Expand Up @@ -503,18 +535,6 @@ def _update_nodegroup(
cluster_resource.set_machine_deployment_spec(nodegroup.name, target_md_spec)
utils.kube_apply_patch(cluster_resource)

for attempt in Retrying(
retry=retry_if_not_result(lambda md: md.equals_spec(target_md_spec)),
stop=stop_after_delay(10),
wait=wait_fixed(1),
):
with attempt:
md = objects.MachineDeployment.for_node_group(
self.k8s_api, cluster, nodegroup
)
if not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(md)

@cluster_lock_wrapper
def delete_nodegroup(
self,
Expand Down

0 comments on commit 00cd894

Please sign in to comment.