From c2d0f4a05ee8ef29e7854436939c2169a7e108d3 Mon Sep 17 00:00:00 2001 From: Lalith Suresh Date: Tue, 1 Feb 2022 16:27:33 -0800 Subject: [PATCH] k8s-scheduler: run observer on a single thread Signed-off-by: Lalith Suresh --- .../com/vmware/dcm/PodEventsToDatabase.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java b/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java index a56d3776..55e93d0c 100644 --- a/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java +++ b/k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java @@ -29,6 +29,7 @@ import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.Toleration; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; import org.jooq.DSLContext; import org.jooq.Insert; @@ -80,19 +81,23 @@ private enum Operators { PodEventsToDatabase(final IConnectionPool dbConnectionPool) { this.dbConnectionPool = dbConnectionPool; - eventStream.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT).subscribe(podEvents -> { - if (podEvents.isEmpty()) { - return; - } - final List queries = new ArrayList<>(); - for (final BatchedTask task: podEvents) { - queries.addAll(task.queries()); - } - LOG.info("Inserting {} queries from a batch of {} events", queries.size(), podEvents.size()); - dbConnectionPool.getConnectionToDb().batch(queries).execute(); - for (final BatchedTask task: podEvents) { - task.future().set(true); - } + eventStream.subscribeOn(Schedulers.single()) + .buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT) + .subscribe(podEvents -> { + if (podEvents.isEmpty()) { + return; + } + final List queries = new ArrayList<>(); + for (final BatchedTask task: podEvents) { + queries.addAll(task.queries()); + } + final long now = System.nanoTime(); + dbConnectionPool.getConnectionToDb().batch(queries).execute(); + LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(), + System.nanoTime() - now); + for (final BatchedTask task: podEvents) { + task.future().set(true); + } }); }