Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Merge branch 'delta' into delta
Browse files Browse the repository at this point in the history
  • Loading branch information
askiad committed Nov 18, 2021
2 parents 7cca30b + 0d2b066 commit ce5d658
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 134 deletions.
83 changes: 53 additions & 30 deletions k8s-scheduler/src/main/java/com/vmware/dcm/DDlogDBViews.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,33 @@ static List<String> getSchema() {
*/
private static void allPendingPods(final ViewStatements viewStatements) {
final String name = "PODS_TO_ASSIGN_NO_LIMIT";
final String query = "SELECT DISTINCT pod_info.*, node_name AS controllable__node_name " +
"FROM pod_info " +
"WHERE status = 'Pending' AND node_name IS NULL AND scheduler_name = 'dcm-scheduler'";
final String query = """
SELECT DISTINCT
pod_info.uid, pod_info.pod_name,
pod_info.status,
pod_info.node_name,
pod_info.namespace,
pod_info.owner_name,
pod_info.creation_timestamp,
pod_info.priority,
pod_info.scheduler_name,
pod_info.has_node_selector_labels,
pod_info.has_pod_affinity_requirements,
pod_info.has_pod_anti_affinity_requirements,
pod_info.has_node_port_requirements,
pod_info.has_topology_spread_constraints,
pod_info.equivalence_class,
pod_info.qos_class,
pod_info.resourceversion,
pod_info.last_requeue,
pod_info.node_name as controllable__node_name
FROM pod_info
JOIN timer_t
ON last_requeue < tick
AND status = 'Pending'
AND node_name IS NULL
AND scheduler_name = 'dcm-scheduler'
""";
viewStatements.addQuery(name, query);
}

Expand Down Expand Up @@ -343,13 +367,12 @@ private static void spareCapacityPerNode(final ViewStatements viewStatements) {
FROM node_info
JOIN node_resources
ON node_info.uid = node_resources.uid
LEFT JOIN (SELECT pod_info.node_name,
LEFT JOIN (SELECT DISTINCT pod_info.node_name,
pod_resource_demands.resource,
sum(pod_resource_demands.demand) AS total_demand
pod_resource_demands.demand AS total_demand
FROM pod_info
JOIN pod_resource_demands
ON pod_resource_demands.uid = pod_info.uid
GROUP BY pod_info.node_name, pod_resource_demands.resource) A
ON pod_resource_demands.uid = pod_info.uid) A
ON A.node_name = node_info.name AND A.resource = node_resources.resource
WHERE unschedulable = false AND
memory_pressure = false AND
Expand Down Expand Up @@ -390,7 +413,7 @@ private static void podsThatTolerateNodeTaints(final ViewStatements viewStatemen
"JOIN (SELECT DISTINCT *, COUNT(*) OVER (PARTITION BY node_name) AS num_taints " +
" FROM node_taints) AS A " +
" ON pod_tolerations.tolerations_key = A.taint_key " +
" AND (pod_tolerations.tolerations_effect = null " +
" AND ( pod_tolerations.tolerations_effect IS NULL " +
" OR pod_tolerations.tolerations_effect = A.taint_effect) " +
" AND (pod_tolerations.tolerations_operator = 'Exists' " +
" OR pod_tolerations.tolerations_value = A.taint_value) " +
Expand Down Expand Up @@ -432,28 +455,28 @@ private static void interPodAffinityAndAntiAffinitySimple(final ViewStatements v
// The format string parameterizes the <pending/fixed> pods and whether we are producing the
// result for <affinity/anti-affinity>
final String formatString =
"SELECT DISTINCT" +
" pods_to_assign.uid as pod_uid, " +
" ARRAY_AGG(matching_pods.pod_uid) OVER (PARTITION BY pods_to_assign.uid) AS pod_matches, " +
" ARRAY_AGG(other_pods.node_name) OVER (PARTITION BY pods_to_assign.uid) AS node_matches " +
"FROM " +
" %2$s AS pods_to_assign " +
" JOIN pod_%1$s_match_expressions ON " +
" pods_to_assign.uid " +
"= pod_%1$s_match_expressions.pod_uid " +
" JOIN matching_pods " +
" ON array_contains(pod_%1$s_match_expressions.match_expressions, matching_pods.expr_id) " +
" JOIN %3$s as other_pods ON " +
" matching_pods.pod_uid = other_pods.uid AND pods_to_assign.uid != other_pods.uid " +
" WHERE pods_to_assign.has_pod_%1$s_requirements = true " +
"GROUP BY " +
" pods_to_assign.uid, " +
" matching_pods.pod_uid, " +
" label_selector, " +
" topology_key, " +
" match_expressions, " +
" other_pods.node_name " +
"HAVING array_length(match_expressions) = COUNT(DISTINCT matching_pods.expr_id)";
"SELECT DISTINCT" +
" pods_to_assign.uid as pod_uid, " +
" ARRAY_AGG(matching_pods.pod_uid) OVER (PARTITION BY pods_to_assign.uid) AS pod_matches, " +
" ARRAY_AGG(other_pods.node_name) OVER (PARTITION BY pods_to_assign.uid) AS node_matches " +
"FROM " +
" %2$s AS pods_to_assign " +
" JOIN pod_%1$s_match_expressions ON " +
" pods_to_assign.uid " +
"= pod_%1$s_match_expressions.pod_uid " +
" JOIN matching_pods " +
" ON array_contains(pod_%1$s_match_expressions.%1$s_match_expressions, matching_pods.expr_id) " +
" JOIN %3$s as other_pods ON " +
" matching_pods.pod_uid = other_pods.uid AND pods_to_assign.uid != other_pods.uid " +
" WHERE pods_to_assign.has_pod_%1$s_requirements = true " +
"GROUP BY " +
" pods_to_assign.uid, " +
" matching_pods.pod_uid, " +
" label_selector, " +
" topology_key, " +
" %1$s_match_expressions, " +
" other_pods.node_name " +
"HAVING array_length(%1$s_match_expressions) = COUNT(DISTINCT matching_pods.expr_id)";
for (final String type: List.of("affinity", "anti_affinity")) {
final String pendingQuery = String.format(formatString, type, viewStatements.unfixedPods,
viewStatements.unfixedPods);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.jooq.Delete;
import org.jooq.Record;
import org.jooq.Update;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -43,8 +44,8 @@ public Update<PodInfoRecord> bindOne(final String namespace, final String podNam
// Mimic a binding notification
try (final DSLContext conn = dbConnectionPool.getConnectionToDb()) {
return conn.update(Tables.POD_INFO)
.set(Tables.POD_INFO.STATUS, "Running")
.where(Tables.POD_INFO.UID.eq(podUid));
.set(DSL.field(Tables.POD_INFO.STATUS.getUnqualifiedName()), "Running")
.where(DSL.field(Tables.POD_INFO.UID.getUnqualifiedName()).eq(podUid));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.NodeCondition;
import io.fabric8.kubernetes.api.model.NodeStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import org.jooq.DSLContext;
Expand All @@ -28,6 +29,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -217,13 +219,28 @@ private void deleteNode(final Node node, final DSLContext conn) {
LOG.info("Node {} deleted", node.getMetadata().getName());
}

private List<Insert<NodeResourcesRecord>> addNodeCapacities(final DSLContext conn, final Node node) {
private List<Query> addNodeCapacities(final DSLContext conn, final Node node) {
final List<Query> inserts = new ArrayList<>();
final Map<String, Quantity> allocatable = node.getStatus().getAllocatable();
LOG.info("Allocatable for node {} is {}", node.getMetadata().getName(), allocatable);
return allocatable.entrySet().stream().map(
/*
* XXX: We add dummy pods per node that have 0 demands for the resources available on the node.
* This is a workaround to make sure the outer joins in spare_capacity_per_node
* can be evaluated using regular joins. Remove this when ddlog supports outer joins.
*/
final Pod pod = Utils.newPod(node.getMetadata().getName() + "-resource-dummy");
final Map<String, Quantity> resourceRequests = new HashMap<>();
allocatable.forEach((k, v) -> resourceRequests.put(k, new Quantity("0")));
pod.getSpec().getContainers().get(0).getResources().setRequests(resourceRequests);
pod.getStatus().setPhase("Running");
pod.getSpec().setNodeName(node.getMetadata().getName());
inserts.addAll(PodEventsToDatabase.updatePodRecord(pod, conn));
inserts.addAll(PodEventsToDatabase.updateResourceRequests(pod, conn));
allocatable.entrySet().stream().map(
(es) -> conn.insertInto(Tables.NODE_RESOURCES)
.values(node.getMetadata().getUid(), es.getKey(), convertUnit(es.getValue(), es.getKey()))
).collect(Collectors.toList());
).forEach(inserts::add);
return inserts;
}

private List<Insert<NodeLabelsRecord>> addNodeLabels(final DSLContext conn, final Node node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private void updatePod(final Pod pod, final Pod oldPod) {
}
}

private List<Query> updatePodRecord(final Pod pod, final DSLContext conn) {
static List<Query> updatePodRecord(final Pod pod, final DSLContext conn) {
final List<Query> inserts = new ArrayList<>();
final List<ResourceRequirements> resourceRequirements = pod.getSpec().getContainers().stream()
.map(Container::getResources)
Expand Down Expand Up @@ -363,7 +363,7 @@ private List<Query> updatePodRecord(final Pod pod, final DSLContext conn) {
return inserts;
}

private List<Insert<?>> updateResourceRequests(final Pod pod, final DSLContext conn) {
static List<Insert<?>> updateResourceRequests(final Pod pod, final DSLContext conn) {
conn.deleteFrom(Tables.POD_RESOURCE_DEMANDS)
.where(DSL.field(Tables.POD_RESOURCE_DEMANDS.UID.getUnqualifiedName())
.eq(pod.getMetadata().getUid())).execute();
Expand All @@ -390,7 +390,7 @@ private List<Insert<?>> updateResourceRequests(final Pod pod, final DSLContext c
return inserts;
}

private boolean hasNodeSelector(final Pod pod) {
private static boolean hasNodeSelector(final Pod pod) {
final PodSpec podSpec = pod.getSpec();
return (podSpec.getNodeSelector() != null && podSpec.getNodeSelector().size() > 0)
|| (podSpec.getAffinity() != null
Expand Down Expand Up @@ -553,7 +553,7 @@ private List<Insert<?>> insertPodAffinityTerms(final Table<?> table, final Pod p
final List<Insert<?>> inserts = new ArrayList<>();
int termNumber = 0;
for (final PodAffinityTerm term: terms) {
final Object[] matchExpressions = term.getLabelSelector().getMatchExpressions().stream()
final Long[] matchExpressions = term.getLabelSelector().getMatchExpressions().stream()
.map(e -> toMatchExpressionId(conn, e.getKey(), e.getOperator(), e.getValues()))
.toList().toArray(new Long[0]);
inserts.add(conn.insertInto(table)
Expand Down Expand Up @@ -609,7 +609,7 @@ private long toMatchExpressionId(final DSLContext conn, final String key, final
}
}

private long equivalenceClassHash(final Pod pod) {
private static long equivalenceClassHash(final Pod pod) {
return Objects.hash(pod.getMetadata().getNamespace(),
pod.getMetadata().getLabels(),
pod.getSpec().getAffinity(),
Expand All @@ -625,7 +625,7 @@ private long equivalenceClassHash(final Pod pod) {
* BestEffort -> no requests nor limits for any containers
* Burstable -> requests and limits do not match
*/
private QosClass getQosClass(final List<ResourceRequirements> resourceRequirements) {
private static QosClass getQosClass(final List<ResourceRequirements> resourceRequirements) {
final List<String> supportedResources = List.of("cpu", "memory");
boolean isGuaranteed = true;
boolean bestEffort = true;
Expand Down
25 changes: 21 additions & 4 deletions k8s-scheduler/src/main/java/com/vmware/dcm/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.jooq.Result;
import org.jooq.Table;
import org.jooq.Update;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -281,11 +282,25 @@ private Scheduler(final IConnectionPool dbConnectionPool, final Model initialPla
}

void handlePodEvent(final PodEvent podEvent) {
tick();
podEventsToDatabase.handle(podEvent);
notificationQueue.add(true);
}

void tick() {
try {
this.dbConnectionPool.getConnectionToDb()
.execute(String.format("insert into timer_t values (1, %s)",
System.currentTimeMillis() - 1000));
} catch (final Exception e) {
this.dbConnectionPool.getConnectionToDb()
.execute(String.format("update timer_t set tick = %s where tick_id = 1",
System.currentTimeMillis() - 1000));
}
}

void handlePodEventNoNotify(final PodEvent podEvent) {
tick();
podEventsToDatabase.handle(podEvent);
// Skip adding pending pod in notification queue
}
Expand Down Expand Up @@ -313,6 +328,7 @@ void startScheduler(final IPodToNodeBinder binder) {
}

void scheduleAllPendingPods(final IPodToNodeBinder binder) {
tick();
final IntSupplier numPending =
() -> {
final List<Integer> res = dbConnectionPool.getConnectionToDb()
Expand Down Expand Up @@ -397,11 +413,12 @@ private void handleAssignment(final Result<? extends Record> assignedPods, final
final List<Update<?>> updates = new ArrayList<>();
assignedPods.forEach(r -> {
final String podName = r.get("POD_NAME", String.class);
final String podUid = r.get("UID", String.class);
final String newNodeName = r.get("CONTROLLABLE__NODE_NAME", String.class);
updates.add(
conn.update(Tables.POD_INFO)
.set(Tables.POD_INFO.NODE_NAME, newNodeName)
.where(Tables.POD_INFO.POD_NAME.eq(podName))
.set(DSL.field(Tables.POD_INFO.NODE_NAME.getUnqualifiedName()), newNodeName)
.where(DSL.field(Tables.POD_INFO.UID.getUnqualifiedName()).eq(podUid))
);
LOG.info("Scheduling decision for pod {} as part of batch {} made in time: {}",
podName, batch, totalTime);
Expand All @@ -422,8 +439,8 @@ private void requeue(final Result<? extends Record> unassignedPods) {
final long requeueTime = System.currentTimeMillis();
updates.add(
conn.update(Tables.POD_INFO)
.set(Tables.POD_INFO.LAST_REQUEUE, requeueTime)
.where(Tables.POD_INFO.POD_NAME.eq(podName))
.set(DSL.field(Tables.POD_INFO.LAST_REQUEUE.getUnqualifiedName()), requeueTime)
.where(DSL.field(Tables.POD_INFO.POD_NAME.getUnqualifiedName()).eq(podName))
);
LOG.info("Re-queuing pod {} at time: {}", podName, requeueTime);
});
Expand Down
60 changes: 60 additions & 0 deletions k8s-scheduler/src/main/java/com/vmware/dcm/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,19 @@
package com.vmware.dcm;

import edu.umd.cs.findbugs.annotations.Nullable;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.NodeAffinity;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;

class Utils {

Expand All @@ -34,4 +46,52 @@ static void compileDDlog(@Nullable String ddlogFile) {
.setScopedInitialPlacement(true).build();
dbConnectionPool.buildDDlog(true, true);
}

/*
* XXX: The below methods are only used by NodeResourceEventHandler as a temporary workaround. Remove
* when the workaround is no longer needed.
*/
static Pod newPod(final String name) {
return newPod(name, "Pending");
}

static Pod newPod(final String name, final String status) {
return newPod(name, UUID.randomUUID(), status, Collections.emptyMap(), Collections.emptyMap());
}

static Pod newPod(final String podName, final UUID uid, final String phase,
final Map<String, String> selectorLabels, final Map<String, String> labels) {
final Pod pod = new Pod();
final ObjectMeta meta = new ObjectMeta();
meta.setUid(uid.toString());
meta.setName(podName);
meta.setLabels(labels);
meta.setCreationTimestamp("1");
meta.setNamespace("default");
meta.setResourceVersion("0");
final PodSpec spec = new PodSpec();
spec.setSchedulerName(Scheduler.SCHEDULER_NAME);
spec.setPriority(0);
spec.setNodeSelector(selectorLabels);

final Container container = new Container();
container.setName("ignore");
container.setImage("ignore");

final ResourceRequirements resourceRequirements = new ResourceRequirements();
resourceRequirements.setRequests(Collections.emptyMap());
container.setResources(resourceRequirements);
spec.getContainers().add(container);

final Affinity affinity = new Affinity();
final NodeAffinity nodeAffinity = new NodeAffinity();
affinity.setNodeAffinity(nodeAffinity);
spec.setAffinity(affinity);
final PodStatus status = new PodStatus();
status.setPhase(phase);
pod.setMetadata(meta);
pod.setSpec(spec);
pod.setStatus(status);
return pod;
}
}
Loading

0 comments on commit ce5d658

Please sign in to comment.