Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
k8s-scheduler: use a periodic batching implementation with a count limit
Browse files Browse the repository at this point in the history
Signed-off-by: Lalith Suresh <[email protected]>
  • Loading branch information
lalithsuresh committed Feb 2, 2022
1 parent 4acf278 commit 750b87d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ hikariCPVersion = 3.4.2
fabricK8sClientVersion = 5.5.0
metricsVersion = 4.1.0-rc3
commonsCliVersion = 1.4
rxJavaVersion = 2.2.21

# benchmark sub-project
shadowJarPluginVersion = 6.0.0
Expand Down
1 change: 1 addition & 0 deletions k8s-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation "commons-cli:commons-cli:${commonsCliVersion}"
implementation "commons-io:commons-io:${commonsIoVersion}"
implementation "com.h2database:h2:${h2Version}"
implementation "io.reactivex.rxjava2:rxjava:${rxJavaVersion}"
implementation "ddlog:ddlogapi:0.1"
implementation ("ddlog:sql:1.0-SNAPSHOT") {
// The required dependencies: apache-calcite-core, the presto parser, and jooq, are available here,
Expand Down
60 changes: 44 additions & 16 deletions k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.SettableFuture;
import com.vmware.dcm.k8s.generated.Tables;
import com.vmware.dcm.k8s.generated.tables.MatchExpressions;
import com.vmware.dcm.k8s.generated.tables.PodInfo;
Expand All @@ -28,6 +29,7 @@
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.Toleration;
import io.reactivex.subjects.PublishSubject;
import org.jooq.DSLContext;
import org.jooq.Insert;
import org.jooq.InsertOnDuplicateStep;
Expand All @@ -45,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -58,14 +61,16 @@
* Reflects pod events from the Kubernetes API into the database.
*/
class PodEventsToDatabase {
private static final int BATCH_COUNT = 10;
private static final int BATCH_INTERVAL_IN_MS = 100;
private static final Logger LOG = LoggerFactory.getLogger(PodEventsToDatabase.class);
private static final long NEVER_REQUEUED = 0;
private final IConnectionPool dbConnectionPool;
private final Cache<String, Boolean> deletedUids = CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
private final AtomicLong expressionIds = new AtomicLong();

private final PublishSubject<BatchedTask> eventStream = PublishSubject.create();
private enum Operators {
In,
Exists,
Expand All @@ -75,31 +80,54 @@ private enum Operators {

PodEventsToDatabase(final IConnectionPool dbConnectionPool) {
this.dbConnectionPool = dbConnectionPool;
eventStream.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT).subscribe(podEvents -> {
if (podEvents.isEmpty()) {
return;
}
final List<Query> queries = new ArrayList<>();
for (final BatchedTask task: podEvents) {
queries.addAll(task.queries());
}
LOG.info("Inserting {} queries from a batch of {} events", queries.size(), podEvents.size());
dbConnectionPool.getConnectionToDb().batch(queries).execute();
for (final BatchedTask task: podEvents) {
task.future().set(true);
}
});
}

record BatchedTask (List<Query> queries, SettableFuture<Boolean> future) {}

PodEvent handle(final PodEvent event) {
switch (event.action()) {
final List<Query> queries = switch (event.action()) {
case ADDED -> addPod(event.pod());
case UPDATED -> updatePod(event.pod(), Objects.requireNonNull(event.oldPod()));
case DELETED -> deletePod(event.pod());
default -> throw new IllegalArgumentException(event.toString());
};
final SettableFuture<Boolean> future = SettableFuture.create();
eventStream.onNext(new BatchedTask(queries, future));
try {
future.get();
} catch (final InterruptedException | ExecutionException e) {
LOG.error("future.get() failed with exception: ", e);
throw new RuntimeException(e);
}
return event;
}

private void addPod(final Pod pod) {
private List<Query> addPod(final Pod pod) {
LOG.info("Adding pod {} (uid: {}, resourceVersion: {})",
pod.getMetadata().getName(), pod.getMetadata().getUid(), pod.getMetadata().getResourceVersion());
if (pod.getMetadata().getUid() != null &&
deletedUids.getIfPresent(pod.getMetadata().getUid()) != null) {
LOG.trace("Received stale event for pod that we already deleted: {} (uid: {}, resourceVersion {}). " +
"Ignoring", pod.getMetadata().getName(), pod.getMetadata().getUid(),
pod.getMetadata().getResourceVersion());
return;
return Collections.emptyList();
}
final long start = System.nanoTime();
final List<Query> inserts = new ArrayList<>();
try (final DSLContext conn = dbConnectionPool.getConnectionToDb()) {
final List<Query> inserts = new ArrayList<>();
inserts.addAll(insertPodRecord(pod, conn));
inserts.addAll(updateContainerInfoForPod(pod, conn));
inserts.addAll(updatePodLabels(pod, conn));
Expand All @@ -109,13 +137,13 @@ private void addPod(final Pod pod) {
inserts.addAll(updateResourceRequests(pod, conn));
inserts.addAll(updatePodTopologySpread(pod, conn));
inserts.add(tick(conn));
conn.batch(inserts).execute();
}
final long end = System.nanoTime();
LOG.info("{} pod added in {}ns", pod.getMetadata().getName(), (end - start));
return inserts;
}

private void deletePod(final Pod pod) {
private List<Query> deletePod(final Pod pod) {
LOG.trace("Deleting pod {} (uid: {}, resourceVersion: {})",
pod.getMetadata().getName(), pod.getMetadata().getUid(), pod.getMetadata().getResourceVersion());
// The assumption here is that all foreign key references to pod_info.pod_name will be deleted using
Expand All @@ -124,8 +152,8 @@ private void deletePod(final Pod pod) {
deletedUids.getIfPresent(pod.getMetadata().getUid()) == null) {
deletedUids.put(pod.getMetadata().getUid(), true);
}
final List<Query> deletes = new ArrayList<>();
try (final DSLContext conn = dbConnectionPool.getConnectionToDb()) {
final List<Query> deletes = new ArrayList<>();
deletes.add(conn.deleteFrom(Tables.POD_INFO)
.where(DSL.field(Tables.POD_INFO.UID.getUnqualifiedName()).eq(pod.getMetadata().getUid())));
deletes.add(deletePodLabels(pod, conn));
Expand All @@ -135,39 +163,39 @@ private void deletePod(final Pod pod) {
deletes.add(deletePodTolerations(pod, conn));
deletes.addAll(deletePodAffinity(pod, conn));
deletes.add(tick(conn));
conn.batch(deletes).execute();
}
return deletes;
}

private void updatePod(final Pod pod, final Pod oldPod) {
private List<Query> updatePod(final Pod pod, final Pod oldPod) {
try (final DSLContext conn = dbConnectionPool.getConnectionToDb()) {
final PodInfoRecord existingPodInfoRecord = conn.selectFrom(Tables.POD_INFO)
.where(DSL.field(Tables.POD_INFO.UID.getUnqualifiedName()).eq(pod.getMetadata().getUid()))
.fetchOne();
if (existingPodInfoRecord == null) {
LOG.trace("Pod {} (uid: {}) does not exist. Skipping",
pod.getMetadata().getName(), pod.getMetadata().getUid());
return;
return Collections.emptyList();
}
final long incomingResourceVersion = Long.parseLong(pod.getMetadata().getResourceVersion());
if (existingPodInfoRecord.getResourceversion() >= incomingResourceVersion) {
LOG.trace("Received a stale pod event {} (uid: {}, resourceVersion: {}). Ignoring",
pod.getMetadata().getName(), pod.getMetadata().getUid(),
pod.getMetadata().getResourceVersion());
return;
return Collections.emptyList();
}
if (pod.getSpec().getNodeName() == null &&
existingPodInfoRecord.getNodeName() != null) {
LOG.trace("Received a duplicate event for a node that we have already scheduled (old: {}, new:{}). " +
"Ignoring.", existingPodInfoRecord.getNodeName(), pod.getSpec().getNodeName());
return;
return Collections.emptyList();
}
if (pod.getMetadata().getUid() != null &&
deletedUids.getIfPresent(pod.getMetadata().getUid()) != null) {
LOG.trace("Received stale event for pod that we already deleted: {} (uid: {}, resourceVersion: {}). " +
"Ignoring", pod.getMetadata().getName(), pod.getMetadata().getUid(),
pod.getMetadata().getResourceVersion());
return;
return Collections.emptyList();
}
LOG.trace("Updating pod {} (uid: {}, resourceVersion: {})", pod.getMetadata().getName(),
pod.getMetadata().getUid(), pod.getMetadata().getResourceVersion());
Expand All @@ -194,7 +222,7 @@ private void updatePod(final Pod pod, final Pod oldPod) {
insertOrUpdate.addAll(updatePodTopologySpread(pod, conn));
}
insertOrUpdate.add(tick(conn));
conn.batch(insertOrUpdate).execute();
return insertOrUpdate;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void runTest() throws Exception {
@Test
public void runTestScope() throws Exception {
final String[] args =
{"-n", "50", "-f", "test-data.txt", "-c", "100", "-m", "200", "-t", "100", "-s", "100", "-S"};
{"-n", "50", "-f", "v2-cropped.txt", "-c", "100", "-m", "200", "-t", "100", "-s", "100", "-S"};
EmulatedCluster.main(args);
}
}

0 comments on commit 750b87d

Please sign in to comment.