Skip to content

Commit

Permalink
Add redis stream impl
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Aug 23, 2024
1 parent 6bc21f0 commit 5a92b5f
Show file tree
Hide file tree
Showing 13 changed files with 777 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.service.data.stream

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.wave.encoder.EncodingStrategy
import io.seqera.wave.encoder.MoshiEncodeStrategy
import io.seqera.wave.util.ExponentialAttempt
import io.seqera.wave.util.TypeHelper
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
abstract class AbstractMessageStream<M> implements MessageStream<M>, Runnable {

static final private Map<String,Consumer<M>> listeners = new ConcurrentHashMap<>()

static final private AtomicInteger count = new AtomicInteger()

final private ExponentialAttempt attempt = new ExponentialAttempt()

final private EncodingStrategy<M> encoder

final private MessageStream<String> stream

private Thread thread

private String name0

AbstractMessageStream(MessageStream<String> target) {
final type = TypeHelper.getGenericType(this, 0)
this.encoder = new MoshiEncodeStrategy<M>(type) {}
this.stream = target
this.name0 = name() + '-thread-' + count.getAndIncrement()
this.thread = new Thread(this, name0)
this.thread.setDaemon(true)
this.thread.start()
}

protected abstract String name()

protected abstract Duration pollInterval()

@Override
void offer(String streamId, M message) {
final msg = encoder.encode(message)
stream.offer(streamId, msg)
}

@Override
void consume(String streamId, Consumer<M> consumer) {
listeners.put(streamId, consumer)
}

@Override
void run() {
while( !thread.interrupted() ) {
try {
int count=0
for( Map.Entry<String,Consumer<M>> entry : listeners.entrySet() ) {
final consumer0 = entry.value
stream.consume(entry.key, (String msg)-> { count+=1; consumer0.accept(encoder.decode(msg)) })
}
// reset the attempt count because no error has been thrown
attempt.reset()
// if no message was sent, sleep for a while before retrying
if( count==0 ) {
sleep(pollInterval().toMillis())
}
}
catch (InterruptedException e) {
log.debug "Interrupting consumer thread for message stream ${name0}"
Thread.currentThread().interrupt()
break
}
catch (Throwable e) {
final d0 = attempt.delay()
log.error("Unexpected error on message stream ${name0} (await: ${d0}) - cause: ${e.message}", e)
sleep(d0.toMillis())
}
}
}

void close() {
if( !thread )
return
// interrupt the thread
thread.interrupt()
// wait for the termination
try {
thread.join(1_000)
}
catch (Exception e) {
log.debug "Unexpected error while terminating ${name0} - cause: ${e.message}"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.service.data.stream


import java.util.function.Consumer
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
interface MessageStream<M> {

/**
* Inserts the specified element at the tail of the specified queue.
*
* @param value
* The value that should be added to the queue
*/
void offer(String streamId, M message)

/**
* Consume a message from the stream and invoke
*
* @param streamId The target stream name
* @param consumer The {@link Consumer} instance to be invoked to consume the message
* @return number of message consumed
*/
void consume(String streamId, Consumer<M> consumer)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.service.data.stream.impl


import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.function.Consumer

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Requires
import io.seqera.wave.service.data.stream.MessageStream
import jakarta.inject.Singleton
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@Requires(notEnv = 'redis')
@Singleton
@CompileStatic
class LocalMessageStream implements MessageStream<String> {

private ConcurrentHashMap<String, LinkedBlockingQueue<String>> delegate = new ConcurrentHashMap<>()

@Override
void offer(String streamId, String message) {
delegate
.computeIfAbsent(streamId, (it)-> new LinkedBlockingQueue<>())
.offer(message)
}

@Override
void consume(String streamId, Consumer<String> consumer) {
final message = delegate
.computeIfAbsent(streamId, (it)-> new LinkedBlockingQueue<>())
.poll()
if( message!=null ) {
consumer.accept(message)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.service.data.stream.impl

import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Consumer

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.seqera.wave.service.data.stream.MessageStream
import io.seqera.wave.util.LongRndKey
import jakarta.annotation.PostConstruct
import jakarta.inject.Inject
import jakarta.inject.Singleton
import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisPool
import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.exceptions.JedisDataException
import redis.clients.jedis.params.XAutoClaimParams
import redis.clients.jedis.params.XReadGroupParams
import redis.clients.jedis.resps.StreamEntry
/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@Requires(env = 'redis')
@Singleton
@CompileStatic
class RedisMessageStream implements MessageStream<String> {

private static final StreamEntryID STREAM_ENTRY_ZERO = new StreamEntryID("0-0")

private static final String CONSUMER_GROUP_NAME = "wave-message-stream"

private static final String DATA_FIELD = 'data'

private final ConcurrentHashMap<String,Boolean> group0 = new ConcurrentHashMap<>()

@Inject
private JedisPool pool

@Value('${wave.message-stream.claim-timeout:1m}')
private Duration claimTimeout

private String consumerName

@PostConstruct
private void init() {
consumerName = "consumer-${LongRndKey.rndLong()}"
log.debug "Creating Redis message stream - consumer=${consumerName}; idle-timeout=${claimTimeout}"
}

protected void createGroup(Jedis jedis, String stream, String group) {
// use a concurrent hash map to create it only the very first time
group0.computeIfAbsent("$stream/$group".toString(),(it)-> createGroup0(jedis,stream,group))
}

protected boolean createGroup0(Jedis jedis, String stream, String group) {
try {
jedis.xgroupCreate(stream, group, STREAM_ENTRY_ZERO, true)
return true
}
catch (JedisDataException e) {
if (e.message.contains("BUSYGROUP")) {
// The group already exists, so we can safely ignore this exception
log.debug "Redis message stream - consume group=$group alreayd exists"

Check failure on line 86 in src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy

View workflow job for this annotation

GitHub Actions / Check for spelling errors

alreayd ==> already
return true
}
throw e
}
}

@Override
void offer(String streamId, String message) {
try (Jedis jedis = pool.getResource()) {
jedis.xadd(streamId, StreamEntryID.NEW_ENTRY, Map.of(DATA_FIELD, message))
}
}

@Override
void consume(String streamId, Consumer<String> consumer) {
try (Jedis jedis = pool.getResource()) {
createGroup(jedis, streamId, CONSUMER_GROUP_NAME)
final entry = readMessage(jedis, streamId) ?: claimMessage(jedis,streamId)
if( entry ) {
// callback the consumer
consumer.accept(entry.getFields().get(DATA_FIELD))
// Acknowledge the job after processing
jedis.xack(streamId, CONSUMER_GROUP_NAME, entry.getID())
}
}
}

protected StreamEntry readMessage(Jedis jedis, String target) {
// Create parameters for reading with a group
final params = new XReadGroupParams()
// Read one message at a time
.count(1)

// Read new messages from the stream using the correct xreadGroup signature
List<Map.Entry<String, List<StreamEntry>>> messages = jedis.xreadGroup(
CONSUMER_GROUP_NAME,
consumerName,
params,
Map.of(target, StreamEntryID.UNRECEIVED_ENTRY) )

final entry = messages?.first()?.value?.first()
if( entry ) {
log.debug "Redis stream read entry=$entry"
}
return entry
}

protected StreamEntry claimMessage(Jedis jedis, String target) {
// Attempt to claim any pending messages that are idle for more than the threshold
final params = new XAutoClaimParams()
// claim one entry at time
.count(1)
final messages = jedis.xautoclaim(
target,
CONSUMER_GROUP_NAME,
consumerName,
claimTimeout.toMillis(),
STREAM_ENTRY_ZERO,
params
)
final entry = messages?.getValue()?[0]
if( entry ) {
log.debug "Redis stream claimed entry=$entry"
}
return entry
}

}
4 changes: 2 additions & 2 deletions src/main/groovy/io/seqera/wave/service/job/JobConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ class JobConfig {
@Value('${wave.job-manager.grace-interval:20s}')
Duration graveInterval

@Value('${wave.job-manager.poll-timeout:200ms}')
Duration pollTimeout
@Value('${wave.job-manager.poll-interval:200ms}')
Duration pollInternal
}
Loading

0 comments on commit 5a92b5f

Please sign in to comment.