From 5a92b5fbd218750abb5d5deeb9eff3abe1f53a11 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Fri, 23 Aug 2024 21:17:52 +0200 Subject: [PATCH] Add redis stream impl Signed-off-by: Paolo Di Tommaso --- .../data/stream/AbstractMessageStream.groovy | 121 ++++++++++++++ .../service/data/stream/MessageStream.groovy | 46 ++++++ .../stream/impl/LocalMessageStream.groovy | 60 +++++++ .../stream/impl/RedisMessageStream.groovy | 154 ++++++++++++++++++ .../seqera/wave/service/job/JobConfig.groovy | 4 +- .../seqera/wave/service/job/JobManager.groovy | 61 +------ .../seqera/wave/service/job/JobQueue.groovy | 45 +++-- .../AbstractMessageStreamLocalTest.groovy | 59 +++++++ .../AbstractMessageStreamRedisTest.groovy | 68 ++++++++ .../data/stream/LocalMessageStreamTest.groovy | 57 +++++++ .../data/stream/RedisMessageStreamTest.groovy | 109 +++++++++++++ .../service/data/stream/TestMessage.groovy | 31 ++++ .../service/data/stream/TestStream.groovy | 42 +++++ 13 files changed, 777 insertions(+), 80 deletions(-) create mode 100644 src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/data/stream/MessageStream.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/data/stream/impl/LocalMessageStream.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamLocalTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamRedisTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/LocalMessageStreamTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/RedisMessageStreamTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/TestMessage.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/data/stream/TestStream.groovy diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy new file mode 100644 index 000000000..2196d9b87 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/data/stream/AbstractMessageStream.groovy @@ -0,0 +1,121 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import java.time.Duration +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Consumer + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.seqera.wave.encoder.EncodingStrategy +import io.seqera.wave.encoder.MoshiEncodeStrategy +import io.seqera.wave.util.ExponentialAttempt +import io.seqera.wave.util.TypeHelper +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +abstract class AbstractMessageStream implements MessageStream, Runnable { + + static final private Map> listeners = new ConcurrentHashMap<>() + + static final private AtomicInteger count = new AtomicInteger() + + final private ExponentialAttempt attempt = new ExponentialAttempt() + + final private EncodingStrategy encoder + + final private MessageStream stream + + private Thread thread + + private String name0 + + AbstractMessageStream(MessageStream target) { + final type = TypeHelper.getGenericType(this, 0) + this.encoder = new MoshiEncodeStrategy(type) {} + this.stream = target + this.name0 = name() + '-thread-' + count.getAndIncrement() + this.thread = new Thread(this, name0) + this.thread.setDaemon(true) + this.thread.start() + } + + protected abstract String name() + + protected abstract Duration pollInterval() + + @Override + void offer(String streamId, M message) { + final msg = encoder.encode(message) + stream.offer(streamId, msg) + } + + @Override + void consume(String streamId, Consumer consumer) { + listeners.put(streamId, consumer) + } + + @Override + void run() { + while( !thread.interrupted() ) { + try { + int count=0 + for( Map.Entry> entry : listeners.entrySet() ) { + final consumer0 = entry.value + stream.consume(entry.key, (String msg)-> { count+=1; consumer0.accept(encoder.decode(msg)) }) + } + // reset the attempt count because no error has been thrown + attempt.reset() + // if no message was sent, sleep for a while before retrying + if( count==0 ) { + sleep(pollInterval().toMillis()) + } + } + catch (InterruptedException e) { + log.debug "Interrupting consumer thread for message stream ${name0}" + Thread.currentThread().interrupt() + break + } + catch (Throwable e) { + final d0 = attempt.delay() + log.error("Unexpected error on message stream ${name0} (await: ${d0}) - cause: ${e.message}", e) + sleep(d0.toMillis()) + } + } + } + + void close() { + if( !thread ) + return + // interrupt the thread + thread.interrupt() + // wait for the termination + try { + thread.join(1_000) + } + catch (Exception e) { + log.debug "Unexpected error while terminating ${name0} - cause: ${e.message}" + } + } +} diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/MessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/MessageStream.groovy new file mode 100644 index 000000000..7c4149cf9 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/data/stream/MessageStream.groovy @@ -0,0 +1,46 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + + +import java.util.function.Consumer +/** + * + * @author Paolo Di Tommaso + */ +interface MessageStream { + + /** + * Inserts the specified element at the tail of the specified queue. + * + * @param value + * The value that should be added to the queue + */ + void offer(String streamId, M message) + + /** + * Consume a message from the stream and invoke + * + * @param streamId The target stream name + * @param consumer The {@link Consumer} instance to be invoked to consume the message + * @return number of message consumed + */ + void consume(String streamId, Consumer consumer) + +} diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/impl/LocalMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/impl/LocalMessageStream.groovy new file mode 100644 index 000000000..c55096570 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/data/stream/impl/LocalMessageStream.groovy @@ -0,0 +1,60 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream.impl + + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue +import java.util.function.Consumer + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.micronaut.context.annotation.Requires +import io.seqera.wave.service.data.stream.MessageStream +import jakarta.inject.Singleton +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@Requires(notEnv = 'redis') +@Singleton +@CompileStatic +class LocalMessageStream implements MessageStream { + + private ConcurrentHashMap> delegate = new ConcurrentHashMap<>() + + @Override + void offer(String streamId, String message) { + delegate + .computeIfAbsent(streamId, (it)-> new LinkedBlockingQueue<>()) + .offer(message) + } + + @Override + void consume(String streamId, Consumer consumer) { + final message = delegate + .computeIfAbsent(streamId, (it)-> new LinkedBlockingQueue<>()) + .poll() + if( message!=null ) { + consumer.accept(message) + } + } + +} diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy new file mode 100644 index 000000000..8fff975a5 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy @@ -0,0 +1,154 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream.impl + +import java.time.Duration +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import io.seqera.wave.service.data.stream.MessageStream +import io.seqera.wave.util.LongRndKey +import jakarta.annotation.PostConstruct +import jakarta.inject.Inject +import jakarta.inject.Singleton +import redis.clients.jedis.Jedis +import redis.clients.jedis.JedisPool +import redis.clients.jedis.StreamEntryID +import redis.clients.jedis.exceptions.JedisDataException +import redis.clients.jedis.params.XAutoClaimParams +import redis.clients.jedis.params.XReadGroupParams +import redis.clients.jedis.resps.StreamEntry +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@Requires(env = 'redis') +@Singleton +@CompileStatic +class RedisMessageStream implements MessageStream { + + private static final StreamEntryID STREAM_ENTRY_ZERO = new StreamEntryID("0-0") + + private static final String CONSUMER_GROUP_NAME = "wave-message-stream" + + private static final String DATA_FIELD = 'data' + + private final ConcurrentHashMap group0 = new ConcurrentHashMap<>() + + @Inject + private JedisPool pool + + @Value('${wave.message-stream.claim-timeout:1m}') + private Duration claimTimeout + + private String consumerName + + @PostConstruct + private void init() { + consumerName = "consumer-${LongRndKey.rndLong()}" + log.debug "Creating Redis message stream - consumer=${consumerName}; idle-timeout=${claimTimeout}" + } + + protected void createGroup(Jedis jedis, String stream, String group) { + // use a concurrent hash map to create it only the very first time + group0.computeIfAbsent("$stream/$group".toString(),(it)-> createGroup0(jedis,stream,group)) + } + + protected boolean createGroup0(Jedis jedis, String stream, String group) { + try { + jedis.xgroupCreate(stream, group, STREAM_ENTRY_ZERO, true) + return true + } + catch (JedisDataException e) { + if (e.message.contains("BUSYGROUP")) { + // The group already exists, so we can safely ignore this exception + log.debug "Redis message stream - consume group=$group alreayd exists" + return true + } + throw e + } + } + + @Override + void offer(String streamId, String message) { + try (Jedis jedis = pool.getResource()) { + jedis.xadd(streamId, StreamEntryID.NEW_ENTRY, Map.of(DATA_FIELD, message)) + } + } + + @Override + void consume(String streamId, Consumer consumer) { + try (Jedis jedis = pool.getResource()) { + createGroup(jedis, streamId, CONSUMER_GROUP_NAME) + final entry = readMessage(jedis, streamId) ?: claimMessage(jedis,streamId) + if( entry ) { + // callback the consumer + consumer.accept(entry.getFields().get(DATA_FIELD)) + // Acknowledge the job after processing + jedis.xack(streamId, CONSUMER_GROUP_NAME, entry.getID()) + } + } + } + + protected StreamEntry readMessage(Jedis jedis, String target) { + // Create parameters for reading with a group + final params = new XReadGroupParams() + // Read one message at a time + .count(1) + + // Read new messages from the stream using the correct xreadGroup signature + List>> messages = jedis.xreadGroup( + CONSUMER_GROUP_NAME, + consumerName, + params, + Map.of(target, StreamEntryID.UNRECEIVED_ENTRY) ) + + final entry = messages?.first()?.value?.first() + if( entry ) { + log.debug "Redis stream read entry=$entry" + } + return entry + } + + protected StreamEntry claimMessage(Jedis jedis, String target) { + // Attempt to claim any pending messages that are idle for more than the threshold + final params = new XAutoClaimParams() + // claim one entry at time + .count(1) + final messages = jedis.xautoclaim( + target, + CONSUMER_GROUP_NAME, + consumerName, + claimTimeout.toMillis(), + STREAM_ENTRY_ZERO, + params + ) + final entry = messages?.getValue()?[0] + if( entry ) { + log.debug "Redis stream claimed entry=$entry" + } + return entry + } + +} diff --git a/src/main/groovy/io/seqera/wave/service/job/JobConfig.groovy b/src/main/groovy/io/seqera/wave/service/job/JobConfig.groovy index e2c8b12f3..05324e15b 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobConfig.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobConfig.groovy @@ -32,6 +32,6 @@ class JobConfig { @Value('${wave.job-manager.grace-interval:20s}') Duration graveInterval - @Value('${wave.job-manager.poll-timeout:200ms}') - Duration pollTimeout + @Value('${wave.job-manager.poll-interval:200ms}') + Duration pollInternal } diff --git a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy index 0109c6d7c..5538bb4a3 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobManager.groovy @@ -20,19 +20,13 @@ package io.seqera.wave.service.job import java.time.Duration import java.time.Instant -import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutorService import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.context.annotation.Context import io.micronaut.context.annotation.Requires -import io.micronaut.scheduling.TaskExecutors -import io.seqera.wave.service.blob.BlobCacheInfo -import io.seqera.wave.util.ExponentialAttempt import jakarta.annotation.PostConstruct import jakarta.inject.Inject -import jakarta.inject.Named /** * Implement the logic to handle Blob cache transfer (uploads) * @@ -50,69 +44,18 @@ class JobManager { @Inject private JobQueue queue - @Inject - @Named(TaskExecutors.IO) - private ExecutorService executor - @Inject private JobDispatcher dispatcher @Inject private JobConfig config - private final ExponentialAttempt attempt = new ExponentialAttempt() - @PostConstruct private init() { - CompletableFuture.supplyAsync(()->run(), executor) + queue.consume((job)-> handle(job)) } - void run() { - log.info "+ Starting Job manager" - while( !Thread.currentThread().isInterrupted() ) { - try { - final jobId = queue.poll(config.pollTimeout) - - if( jobId ) { - handle(jobId) - attempt.reset() - } - } - catch (InterruptedException e) { - log.debug "Interrupting transfer manager watcher thread" - break - } - catch (Throwable e) { - final d0 = attempt.delay() - log.error("Transfer manager unexpected error (await: ${d0}) - cause: ${e.message}", e) - sleep(d0.toMillis()) - } - } - } - - /** - * Handles the blob transfer operation i.e. check and update the current upload status - * - * @param blobId the blob cache id i.e. {@link BlobCacheInfo#id()} - */ protected void handle(JobId jobId) { - try { - try { - handle0(jobId) - } - catch (Throwable error) { - dispatcher.onJobException(jobId, error) - } - } - catch (InterruptedException e) { - // re-queue the transfer to not lose it - queue.offer(jobId) - // re-throw the exception - throw e - } - } - - protected void handle0(JobId jobId) { final duration = Duration.between(jobId.creationTime, Instant.now()) final state = jobStrategy.status(jobId) log.trace "Job status id=${jobId.schedulerId}; state=${state}" @@ -136,8 +79,6 @@ class JobManager { } else { log.trace "== Job pending for completion $jobId" - // re-schedule for a new check - queue.offer(jobId) } } diff --git a/src/main/groovy/io/seqera/wave/service/job/JobQueue.groovy b/src/main/groovy/io/seqera/wave/service/job/JobQueue.groovy index b006577d1..1e8a9c141 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobQueue.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobQueue.groovy @@ -19,41 +19,50 @@ package io.seqera.wave.service.job import java.time.Duration +import java.util.function.Consumer -import io.seqera.wave.encoder.EncodingStrategy -import io.seqera.wave.encoder.MoshiEncodeStrategy -import io.seqera.wave.service.data.queue.MessageQueue -import jakarta.annotation.PostConstruct +import io.seqera.wave.service.data.stream.AbstractMessageStream +import io.seqera.wave.service.data.stream.MessageStream +import jakarta.annotation.PreDestroy import jakarta.inject.Inject import jakarta.inject.Singleton - /** * Implements a simple persistent FIFO queue * * @author Paolo Di Tommaso */ @Singleton -class JobQueue { - - final private static String QUEUE_NAME = 'jobs-queue/v1' +class JobQueue extends AbstractMessageStream { - private EncodingStrategy encodingStrategy + final private static String STREAM_NAME = 'jobs-queue/v1' @Inject - private MessageQueue transferQueue + private JobConfig config + + JobQueue(MessageStream target) { + super(target) + } - @PostConstruct - private init() { - encodingStrategy = new MoshiEncodeStrategy() {} + @Override + protected String name() { + return 'jobs-queue' } - void offer(JobId request) { - transferQueue.offer(QUEUE_NAME, encodingStrategy.encode(request)) + @Override + protected Duration pollInterval() { + return config.pollInternal } - JobId poll(Duration timeout) { - final result = transferQueue.poll(QUEUE_NAME, timeout) - return result ? encodingStrategy.decode(result) : null + final void offer(JobId job) { + super.offer(STREAM_NAME, job) } + final void consume(Consumer consumer) { + super.consume(STREAM_NAME, consumer) + } + + @PreDestroy + void destroy() { + this.close() + } } diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamLocalTest.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamLocalTest.groovy new file mode 100644 index 000000000..8531af73f --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamLocalTest.groovy @@ -0,0 +1,59 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import spock.lang.Specification + +import java.util.concurrent.ArrayBlockingQueue + +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.service.data.stream.impl.LocalMessageStream +import io.seqera.wave.util.LongRndKey +import jakarta.inject.Inject +/** + * + * @author Paolo Di Tommaso + */ +@MicronautTest(environments = ['test']) +class AbstractMessageStreamLocalTest extends Specification { + + @Inject + LocalMessageStream target + + def 'should offer and consume some messages' () { + given: + def id1 = "stream-${LongRndKey.rndHex()}" + and: + def stream = new TestStream(target) + def queue = new ArrayBlockingQueue(10) + + when: + stream.offer(id1, new TestMessage('one','two')) + stream.offer(id1, new TestMessage('alpha','omega')) + then: + stream.consume(id1, { it-> queue.add(it) }) + and: + queue.take()==new TestMessage('one','two') + queue.take()==new TestMessage('alpha','omega') + + cleanup: + stream.close() + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamRedisTest.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamRedisTest.groovy new file mode 100644 index 000000000..185754655 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/AbstractMessageStreamRedisTest.groovy @@ -0,0 +1,68 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import spock.lang.Shared +import spock.lang.Specification + +import java.util.concurrent.ArrayBlockingQueue + +import io.micronaut.context.ApplicationContext +import io.seqera.wave.service.data.stream.impl.RedisMessageStream +import io.seqera.wave.test.RedisTestContainer +import io.seqera.wave.util.LongRndKey + +/** + * + * @author Paolo Di Tommaso + */ +class AbstractMessageStreamRedisTest extends Specification implements RedisTestContainer { + + @Shared + ApplicationContext context + + def setup() { + context = ApplicationContext.run([ + REDIS_HOST: redisHostName, + REDIS_PORT: redisPort + ], 'test', 'redis') + } + + def 'should offer and consume some messages' () { + given: + def id1 = "stream-${LongRndKey.rndHex()}" + and: + def target = context.getBean(RedisMessageStream) + def stream = new TestStream(target) + def queue = new ArrayBlockingQueue(10) + + when: + stream.offer(id1, new TestMessage('one','two')) + stream.offer(id1, new TestMessage('alpha','omega')) + then: + stream.consume(id1, { it-> queue.add(it) }) + and: + queue.take()==new TestMessage('one','two') + queue.take()==new TestMessage('alpha','omega') + + cleanup: + stream.close() + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/LocalMessageStreamTest.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/LocalMessageStreamTest.groovy new file mode 100644 index 000000000..f73d92c4d --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/LocalMessageStreamTest.groovy @@ -0,0 +1,57 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import spock.lang.Specification + +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.service.data.stream.impl.LocalMessageStream +import io.seqera.wave.util.LongRndKey + +/** + * + * @author Paolo Di Tommaso + */ +@MicronautTest(environments = ['test']) +class LocalMessageStreamTest extends Specification { + + def 'should offer and consume a value' () { + given: + def id1 = "stream-${LongRndKey.rndHex()}" + def id2 = "stream-${LongRndKey.rndHex()}" + and: + def stream = new LocalMessageStream() + when: + stream.offer(id1, 'one') + and: + stream.offer(id2, 'alpha') + stream.offer(id2, 'delta') + stream.offer(id2, 'gamma') + + then: + stream.consume(id1, { it-> assert it=='one'}) + and: + stream.consume(id2, { it-> assert it=='alpha'}) + stream.consume(id2, { it-> assert it=='delta'}) + stream.consume(id2, { it-> assert it=='gamma'}) + and: + stream.consume(id2, { it-> assert false /* <-- this should not be invoked */ }) + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/RedisMessageStreamTest.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/RedisMessageStreamTest.groovy new file mode 100644 index 000000000..cecb04d8b --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/RedisMessageStreamTest.groovy @@ -0,0 +1,109 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import spock.lang.Shared +import spock.lang.Specification + +import io.micronaut.context.ApplicationContext +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.service.data.stream.impl.RedisMessageStream +import io.seqera.wave.test.RedisTestContainer +import io.seqera.wave.util.LongRndKey + +/** + * + * @author Paolo Di Tommaso + */ +@MicronautTest(environments = ['test']) +class RedisMessageStreamTest extends Specification implements RedisTestContainer { + + @Shared + ApplicationContext context + + def setup() { + context = ApplicationContext.run([ + REDIS_HOST: redisHostName, + REDIS_PORT: redisPort, + 'wave.message-stream.claim-timeout': '1s' + ], 'test', 'redis') + } + + def 'should offer and consume a value' () { + given: + def id1 = "stream-${LongRndKey.rndHex()}" + def id2 = "stream-${LongRndKey.rndHex()}" + and: + def stream = context.getBean(RedisMessageStream) + when: + stream.offer(id1, 'one') + and: + stream.offer(id2, 'alpha') + stream.offer(id2, 'delta') + stream.offer(id2, 'gamma') + + then: + stream.consume(id1, { it-> assert it=='one'}) + and: + stream.consume(id2, { it-> assert it=='alpha'}) + stream.consume(id2, { it-> assert it=='delta'}) + stream.consume(id2, { it-> assert it=='gamma'}) + and: + stream.consume(id2, { it-> assert false /* <-- this should not be invoked */ }) + } + + def 'should offer and consume a value' () { + given: + def id1 = "stream-${LongRndKey.rndHex()}" + def stream = context.getBean(RedisMessageStream) + when: + stream.offer(id1, 'alpha') + stream.offer(id1, 'delta') + stream.offer(id1, 'gamma') + + then: + stream.consume(id1, { it-> assert it=='alpha'}) + and: + try { + stream.consume(id1, { it-> throw new RuntimeException("Oops")}) + } + catch (RuntimeException e) { + assert e.message == 'Oops' + } + and: + // next message is 'gamma' as expected + stream.consume(id1, { it-> assert it=='gamma'}) + and: + // still nothing + stream.consume(id1, { it-> assert false /* <-- this should not be invoked */ }) + and: + // wait 2 seconds (claim timeout is 1 sec) + sleep 2_000 + // now the errored message is available + stream.consume(id1, { it-> assert it=='delta'}) + and: + stream.consume(id1, { it-> assert false /* <-- this should not be invoked */ }) + + when: + stream.offer(id1, 'something') + then: + stream.consume(id1, { it-> assert it=='something'}) + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/TestMessage.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/TestMessage.groovy new file mode 100644 index 000000000..e98fec98d --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/TestMessage.groovy @@ -0,0 +1,31 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import groovy.transform.Canonical + +/** + * + * @author Paolo Di Tommaso + */ +@Canonical +class TestMessage { + String x + String y +} diff --git a/src/test/groovy/io/seqera/wave/service/data/stream/TestStream.groovy b/src/test/groovy/io/seqera/wave/service/data/stream/TestStream.groovy new file mode 100644 index 000000000..297ddc24d --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/data/stream/TestStream.groovy @@ -0,0 +1,42 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.data.stream + +import java.time.Duration + +/** + * + * @author Paolo Di Tommaso + */ +class TestStream extends AbstractMessageStream { + + TestStream(MessageStream target) { + super(target) + } + + @Override + protected String name() { + return 'test-stream' + } + + @Override + protected Duration pollInterval() { + return Duration.ofSeconds(1) + } +}