Skip to content

Commit

Permalink
Add support for Consumer Create Action (#1108)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Apr 4, 2024
1 parent d9eec8b commit 475a02c
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 37 deletions.
22 changes: 22 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ public interface JetStreamManagement {
*/
ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Creates a consumer. Must not already exist.
* @param streamName name of the stream
* @param config the consumer configuration to use.
* @return consumer information.
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data such as the consumer already exists
*/
ConsumerInfo createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Updates an existing consumer. Must already exist.
* @param streamName name of the stream
* @param config the consumer configuration to use.
* @return consumer information.
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data such as the consumer does not already exist
*/
ConsumerInfo updateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException;

/**
* Deletes a consumer.
* @param streamName name of the stream
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/io/nats/client/api/ConsumerCreateRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,39 @@
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonUtils;

import static io.nats.client.support.ApiConstants.CONFIG;
import static io.nats.client.support.ApiConstants.STREAM_NAME;
import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.*;

/**
* Object used to make a request to create a consumer. Used Internally
*/
public class ConsumerCreateRequest implements JsonSerializable {
public enum Action {
Create("create"),
Update("update"),
CreateOrUpdate(null);

public final String actionText;

Action(String actionText) {
this.actionText = actionText;
}
}

private final String streamName;
private final ConsumerConfiguration config;
private final Action action;

public ConsumerCreateRequest(String streamName, ConsumerConfiguration config) {
this.streamName = streamName;
this.config = config;
this.action = Action.CreateOrUpdate;
}

public ConsumerCreateRequest(String streamName, ConsumerConfiguration config, Action action) {
this.streamName = streamName;
this.config = config;
this.action = action;
}

public String getStreamName() {
Expand All @@ -40,11 +59,16 @@ public ConsumerConfiguration getConfig() {
return config;
}

public Action getAction() {
return action;
}

@Override
public String toJson() {
StringBuilder sb = beginJson();

addField(sb, STREAM_NAME, streamName);
JsonUtils.addField(sb, ACTION, action.actionText);
JsonUtils.addField(sb, CONFIG, config);

return endJson(sb).toString();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOE
return new ConsumerInfo(resp).throwOnHasError();
}

ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config, ConsumerCreateRequest.Action action) throws IOException, JetStreamApiException {
// ConsumerConfiguration validates that name and durable are the same if both are supplied.
String consumerName = config.getName();
if (consumerName != null && !consumerCreate290Available) {
Expand Down Expand Up @@ -118,14 +118,14 @@ else if (durable == null) {
subj = String.format(JSAPI_DURABLE_CREATE, streamName, durable);
}

ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config);
ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config, action);
Message resp = makeRequestResponseRequired(subj, ccr.serialize(), jso.getRequestTimeout());
return new ConsumerInfo(resp).throwOnHasError();
}

void _createConsumerUnsubscribeOnException(String stream, ConsumerConfiguration cc, NatsJetStreamSubscription sub) throws IOException, JetStreamApiException {
try {
ConsumerInfo ci = _createConsumer(stream, cc);
ConsumerInfo ci = _createConsumer(stream, cc, ConsumerCreateRequest.Action.CreateOrUpdate);
sub.setConsumerName(ci.getName());
}
catch (IOException | JetStreamApiException e) {
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,27 @@ public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws
public ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
validateStreamName(streamName, true);
validateNotNull(config, "Config");
return _createConsumer(streamName, config);
return _createConsumer(streamName, config, ConsumerCreateRequest.Action.CreateOrUpdate);
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerInfo createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
validateStreamName(streamName, true);
validateNotNull(config, "Config");
return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Create);
}

/**
* {@inheritDoc}
*/
@Override
public ConsumerInfo updateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException {
validateStreamName(streamName, true);
validateNotNull(config, "Config");
return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Update);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/nats/client/impl/OrderedMessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.nats.client.Message;
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerCreateRequest;
import io.nats.client.api.ConsumerInfo;

import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -98,7 +99,7 @@ private void handleErrorCondition() {
// 3. make a new consumer using the same deliver subject but
// with a new starting point
ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName, null);
ConsumerInfo ci = js._createConsumer(stream, userCC); // this can fail when a server is down.
ConsumerInfo ci = js._createConsumer(stream, userCC, ConsumerCreateRequest.Action.Create); // this can fail when a server is down.
sub.setConsumerName(ci.getName());

// 4. restart the manager.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public interface ApiConstants {
String ACK_FLOOR = "ack_floor";
String ACK_POLICY = "ack_policy";
String ACK_WAIT = "ack_wait";
String ACTION = "action";
String ACTIVE = "active";
String ALLOW_ROLLUP_HDRS = "allow_rollup_hdrs";
String ALLOW_DIRECT = "allow_direct";
Expand Down
Loading

0 comments on commit 475a02c

Please sign in to comment.