Skip to content

Commit

Permalink
Shutdown kafka buffer (opensearch-project#3464)
Browse files Browse the repository at this point in the history
* Add shutdown method to buffer API

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove POC code

Signed-off-by: Chase Engelbrecht <[email protected]>

* Revert acknowledgments default

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add unit tests

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add test for coverage

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove unused import

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Oct 10, 2023
1 parent 4232439 commit a79cc54
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ public interface Buffer<T extends Record<?>> {
default Duration getDrainTimeout() {
return Duration.ZERO;
}

default void shutdown() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ public void testGetDrainTimeout() {

Assert.assertEquals(Duration.ZERO, buffer.getDrainTimeout());
}

@Test
public void testShutdown() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);
buffer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public boolean isEmpty() {
public Duration getDrainTimeout() {
return buffer.getDrainTimeout();
}

@Override
public void shutdown() {
buffer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public synchronized void shutdown() {
shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor");

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
buffer.shutdown();

sinks.stream()
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ public Duration getDrainTimeout() {
.map(Buffer::getDrainTimeout)
.reduce(Duration.ZERO, Duration::plus);
}

@Override
public void shutdown() {
primaryBuffer.shutdown();
secondaryBuffers.forEach(Buffer::shutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ void getDrainTimeout_returns_buffer_drain_timeout() {
assertThat(result, equalTo(duration));
}

@Test
void shutdown_calls_buffer_shutdown() {
createObjectUnderTest().shutdown();
verify(buffer).shutdown();
}

@Nested
class NoCircuitBreakerChecks {
@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void testPipelineState() {
final TestSink testSink = new TestSink();
final DataFlowComponent<Sink> sinkDataFlowComponent = mock(DataFlowComponent.class);
when(sinkDataFlowComponent.getComponent()).thenReturn(testSink);
final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME),
final Buffer buffer = spy(new BlockingBuffer(TEST_PIPELINE_NAME));
final Pipeline testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, buffer,
Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT,
processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout);
Expand All @@ -119,6 +120,7 @@ void testPipelineState() {
testPipeline.shutdown();
assertThat("Pipeline isStopRequested is expected to be true", testPipeline.isStopRequested(), is(true));
assertThat("Sink shutdown should be called", testSink.isShutdown, is(true));
verify(buffer).shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,26 @@ void getDrainTimeout_MultipleSecondaryBuffers_ReturnsSumOfDurations() {
verify(secondaryBuffer, times(2)).getDrainTimeout();
}

@Test
void shutdown_NoSecondaryBuffers_CallsPrimaryBufferShutdown() {
createObjectUnderTest(0).shutdown();
verify(primaryBuffer).shutdown();
}

@Test
void shutdown_OneSecondaryBuffers_CallsPrimaryAndSecondaryBufferShutdown() {
createObjectUnderTest(1).shutdown();
verify(primaryBuffer).shutdown();
verify(secondaryBuffer).shutdown();
}

@Test
void shutdown_MultipleSecondaryBuffers_CallsAllBuffersShutdown() {
createObjectUnderTest(2).shutdown();
verify(primaryBuffer).shutdown();
verify(secondaryBuffer, times(2)).shutdown();
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

@DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class)
public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class);
static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 30L;
public static final int INNER_BUFFER_CAPACITY = 1000000;
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
private final KafkaCustomProducer producer;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;
private AtomicBoolean shutdownInProgress;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
Expand All @@ -51,8 +54,9 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false));
innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress);
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down Expand Up @@ -101,4 +105,24 @@ public boolean isEmpty() {
public Duration getDrainTimeout() {
return drainTimeout;
}

@Override
public void shutdown() {
shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();
}

innerBuffer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -47,13 +48,16 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBuffer.EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -236,4 +240,37 @@ void test_kafkaBuffer_getDrainTimeout() {

verify(bufferConfig).getDrainTimeout();
}

@Test
public void testShutdown_Successful() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(true);

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
}

@Test
public void testShutdown_Timeout() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS))).thenReturn(false);

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}

@Test
public void testShutdown_InterruptedException() throws InterruptedException {
kafkaBuffer = createObjectUnderTest();
lenient().when(executorService.awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS)))
.thenThrow(new InterruptedException());

kafkaBuffer.shutdown();
verify(executorService).shutdown();
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}
}

0 comments on commit a79cc54

Please sign in to comment.