From 938ea7b07051f9bcba620de4284787a4b28c6d16 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 3 Apr 2024 17:19:36 -0400 Subject: [PATCH 1/2] Add support for Consumer Create Action --- .../io/nats/client/JetStreamManagement.java | 22 ++ .../client/api/ConsumerCreateRequest.java | 28 +- .../nats/client/impl/NatsJetStreamImpl.java | 6 +- .../client/impl/NatsJetStreamManagement.java | 22 +- .../client/impl/OrderedMessageManager.java | 3 +- .../io/nats/client/support/ApiConstants.java | 1 + .../client/impl/JetStreamManagementTests.java | 246 +++++++++++++++--- 7 files changed, 291 insertions(+), 37 deletions(-) diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index 623e8b341..1e85e271d 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -121,6 +121,28 @@ public interface JetStreamManagement { */ ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException; + /** + * Creates a consumer. Must not already exist. + * @param streamName name of the stream + * @param config the consumer configuration to use. + * @return consumer information. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data such as the consumer already exists + */ + ConsumerInfo createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException; + + /** + * Updates an existing consumer. Must already exist. + * @param streamName name of the stream + * @param config the consumer configuration to use. + * @return consumer information. + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data such as the consumer does not already exist + */ + ConsumerInfo updateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException; + /** * Deletes a consumer. * @param streamName name of the stream diff --git a/src/main/java/io/nats/client/api/ConsumerCreateRequest.java b/src/main/java/io/nats/client/api/ConsumerCreateRequest.java index 3b3bb1d88..3ac2de3d1 100644 --- a/src/main/java/io/nats/client/api/ConsumerCreateRequest.java +++ b/src/main/java/io/nats/client/api/ConsumerCreateRequest.java @@ -16,20 +16,39 @@ import io.nats.client.support.JsonSerializable; import io.nats.client.support.JsonUtils; -import static io.nats.client.support.ApiConstants.CONFIG; -import static io.nats.client.support.ApiConstants.STREAM_NAME; +import static io.nats.client.support.ApiConstants.*; import static io.nats.client.support.JsonUtils.*; /** * Object used to make a request to create a consumer. Used Internally */ public class ConsumerCreateRequest implements JsonSerializable { + public enum Action { + Create("create"), + Update("update"), + CreateOrUpdate(null); + + public final String actionText; + + Action(String actionText) { + this.actionText = actionText; + } + } + private final String streamName; private final ConsumerConfiguration config; + private final Action action; public ConsumerCreateRequest(String streamName, ConsumerConfiguration config) { this.streamName = streamName; this.config = config; + this.action = Action.CreateOrUpdate; + } + + public ConsumerCreateRequest(String streamName, ConsumerConfiguration config, Action action) { + this.streamName = streamName; + this.config = config; + this.action = action; } public String getStreamName() { @@ -40,11 +59,16 @@ public ConsumerConfiguration getConfig() { return config; } + public Action getAction() { + return action; + } + @Override public String toJson() { StringBuilder sb = beginJson(); addField(sb, STREAM_NAME, streamName); + JsonUtils.addField(sb, ACTION, action.actionText); JsonUtils.addField(sb, CONFIG, config); return endJson(sb).toString(); diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java index 78743e26f..f25388e00 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamImpl.java @@ -81,7 +81,7 @@ ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOE return new ConsumerInfo(resp).throwOnHasError(); } - ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { + ConsumerInfo _createConsumer(String streamName, ConsumerConfiguration config, ConsumerCreateRequest.Action action) throws IOException, JetStreamApiException { // ConsumerConfiguration validates that name and durable are the same if both are supplied. String consumerName = config.getName(); if (consumerName != null && !consumerCreate290Available) { @@ -118,14 +118,14 @@ else if (durable == null) { subj = String.format(JSAPI_DURABLE_CREATE, streamName, durable); } - ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config); + ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config, action); Message resp = makeRequestResponseRequired(subj, ccr.serialize(), jso.getRequestTimeout()); return new ConsumerInfo(resp).throwOnHasError(); } void _createConsumerUnsubscribeOnException(String stream, ConsumerConfiguration cc, NatsJetStreamSubscription sub) throws IOException, JetStreamApiException { try { - ConsumerInfo ci = _createConsumer(stream, cc); + ConsumerInfo ci = _createConsumer(stream, cc, ConsumerCreateRequest.Action.CreateOrUpdate); sub.setConsumerName(ci.getName()); } catch (IOException | JetStreamApiException e) { diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 07da98067..0cd15fb86 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -128,7 +128,27 @@ public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws public ConsumerInfo addOrUpdateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { validateStreamName(streamName, true); validateNotNull(config, "Config"); - return _createConsumer(streamName, config); + return _createConsumer(streamName, config, ConsumerCreateRequest.Action.CreateOrUpdate); + } + + /** + * {@inheritDoc} + */ + @Override + public ConsumerInfo createConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { + validateStreamName(streamName, true); + validateNotNull(config, "Config"); + return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Create); + } + + /** + * {@inheritDoc} + */ + @Override + public ConsumerInfo updateConsumer(String streamName, ConsumerConfiguration config) throws IOException, JetStreamApiException { + validateStreamName(streamName, true); + validateNotNull(config, "Config"); + return _createConsumer(streamName, config, ConsumerCreateRequest.Action.Update); } /** diff --git a/src/main/java/io/nats/client/impl/OrderedMessageManager.java b/src/main/java/io/nats/client/impl/OrderedMessageManager.java index 55c81743f..99aa5a067 100644 --- a/src/main/java/io/nats/client/impl/OrderedMessageManager.java +++ b/src/main/java/io/nats/client/impl/OrderedMessageManager.java @@ -17,6 +17,7 @@ import io.nats.client.Message; import io.nats.client.SubscribeOptions; import io.nats.client.api.ConsumerConfiguration; +import io.nats.client.api.ConsumerCreateRequest; import io.nats.client.api.ConsumerInfo; import java.util.concurrent.atomic.AtomicReference; @@ -98,7 +99,7 @@ private void handleErrorCondition() { // 3. make a new consumer using the same deliver subject but // with a new starting point ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName, null); - ConsumerInfo ci = js._createConsumer(stream, userCC); // this can fail when a server is down. + ConsumerInfo ci = js._createConsumer(stream, userCC, ConsumerCreateRequest.Action.Create); // this can fail when a server is down. sub.setConsumerName(ci.getName()); // 4. restart the manager. diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 8a685af6f..416cb48bf 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -18,6 +18,7 @@ public interface ApiConstants { String ACK_FLOOR = "ack_floor"; String ACK_POLICY = "ack_policy"; String ACK_WAIT = "ack_wait"; + String ACTION = "action"; String ACTIVE = "active"; String ALLOW_ROLLUP_HDRS = "allow_rollup_hdrs"; String ALLOW_DIRECT = "allow_direct"; diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 419f39135..2ab6f2f1e 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -18,6 +18,7 @@ import io.nats.client.support.DateTimeUtils; import io.nats.client.utils.TestBase; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -250,18 +251,66 @@ public void testUpdateStream() throws Exception { } @Test - public void testAddUpdateStreamInvalids() throws Exception { - runInJsServer(nc -> { + public void testAddStreamInvalids() throws Exception { + jsServer.run(nc -> { JetStreamManagement jsm = nc.jetStreamManagement(); StreamConfiguration scNoName = StreamConfiguration.builder().build(); assertThrows(IllegalArgumentException.class, () -> jsm.addStream(null)); assertThrows(IllegalArgumentException.class, () -> jsm.addStream(scNoName)); + + String stream = stream(); + + StreamConfiguration sc = StreamConfiguration.builder() + .name(stream) + .description(variant()) + .storageType(StorageType.Memory) + .subjects(subject()) + .build(); + jsm.addStream(sc); + + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).subjects(subject()).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).description(variant()).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).retentionPolicy(RetentionPolicy.Interest).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).retentionPolicy(RetentionPolicy.WorkQueue).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).compressionOption(CompressionOption.S2).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).maxConsumers(1).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).maxMessages(1).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).maxMessagesPerSubject(1).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).maxAge(Duration.ofSeconds(1L)).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).maxMsgSize(1).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).storageType(StorageType.File).build())); + + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).noAck(true).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).discardPolicy(DiscardPolicy.New).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).duplicateWindow(Duration.ofSeconds(1L)).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).allowRollup(true).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).allowDirect(true).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).denyDelete(true).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).denyPurge(true).build())); + assert10058(() -> jsm.addStream(StreamConfiguration.builder(sc).firstSequence(100).build())); + }); + } + + // io.nats.client.JetStreamApiException: stream name already in use with a different configuration [10058] + private void assert10058(Executable executable) { + assertEquals(10058, assertThrows(JetStreamApiException.class, executable).getApiErrorCode()); + } + + @Test + public void testUpdateStreamInvalids() throws Exception { + jsServer.run(nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + + StreamConfiguration scNoName = StreamConfiguration.builder().build(); assertThrows(IllegalArgumentException.class, () -> jsm.updateStream(null)); assertThrows(IllegalArgumentException.class, () -> jsm.updateStream(scNoName)); + String stream = stream(); + String[] subjects = new String[]{subject(), subject()}; + // cannot update non existent stream - StreamConfiguration sc = getTestStreamConfiguration(); + StreamConfiguration sc = getTestStreamConfiguration(stream, subjects); // stream not added yet assertThrows(JetStreamApiException.class, () -> jsm.updateStream(sc)); @@ -269,7 +318,7 @@ public void testAddUpdateStreamInvalids() throws Exception { jsm.addStream(sc); // cannot change storage type - StreamConfiguration scMemToFile = getTestStreamConfigurationBuilder() + StreamConfiguration scMemToFile = getTestStreamConfigurationBuilder(stream, subjects) .storageType(StorageType.File) .build(); assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scMemToFile)); @@ -280,7 +329,7 @@ public void testAddUpdateStreamInvalids() throws Exception { .build(); assertThrows(JetStreamApiException.class, () -> jsm.updateStream(scMaxCon)); - StreamConfiguration scReten = getTestStreamConfigurationBuilder() + StreamConfiguration scReten = getTestStreamConfigurationBuilder(stream, subjects) .retentionPolicy(RetentionPolicy.Interest) .build(); if (nc.getServerInfo().isOlderThanVersion("2.10")) { @@ -291,10 +340,10 @@ public void testAddUpdateStreamInvalids() throws Exception { jsm.updateStream(scReten); } - jsm.deleteStream(STREAM); + jsm.deleteStream(stream); - jsm.addStream(getTestStreamConfigurationBuilder().storageType(StorageType.File).build()); - assertThrows(JetStreamApiException.class, () -> jsm.updateStream(getTestStreamConfiguration())); + jsm.addStream(getTestStreamConfigurationBuilder(stream, subjects).storageType(StorageType.File).build()); + assertThrows(JetStreamApiException.class, () -> jsm.updateStream(getTestStreamConfiguration(stream, subjects))); }); } @@ -308,28 +357,42 @@ private static StreamConfiguration getTestStreamConfiguration() { return getTestStreamConfigurationBuilder().build(); } + private static StreamConfiguration getTestStreamConfiguration(String stream, String... subjects) { + return getTestStreamConfigurationBuilder(stream, subjects).build(); + } + private static StreamConfiguration.Builder getTestStreamConfigurationBuilder() { + return getTestStreamConfigurationBuilder(STREAM); + } + + private static StreamConfiguration.Builder getTestStreamConfigurationBuilder(String stream, String... subjects) { + if (subjects == null || subjects.length == 0) { + subjects = new String[]{subject(0), subject(1)}; + } + return StreamConfiguration.builder() - .name(STREAM) + .name(stream) .storageType(StorageType.Memory) - .subjects(subject(0), subject(1)); + .subjects(subjects); } @Test public void testGetStreamInfo() throws Exception { - runInJsServer(nc -> { + jsServer.run(nc -> { + String stream = stream(); + JetStreamManagement jsm = nc.jetStreamManagement(); - assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(STREAM)); + assertThrows(JetStreamApiException.class, () -> jsm.getStreamInfo(stream)); JetStream js = nc.jetStream(); String[] subjects = new String[6]; + String subjectIx5 = subject(); for (int x = 0; x < 5; x++) { - subjects[x] = subject(x + 1); + subjects[x] = subject() + x + 1; } - subjects[5] = "foo.>"; + subjects[5] = subjectIx5 + ".>"; - String stream = stream(); createMemoryStream(jsm, stream, subjects); StreamInfo si = jsm.getStreamInfo(stream); @@ -349,12 +412,12 @@ public void testGetStreamInfo() throws Exception { List packs = new ArrayList<>(); for (int x = 0; x < 5; x++) { - jsPublish(js, subject(x + 1), x + 1); - PublishAck pa = jsPublish(js, subject(x + 1), data(x + 2)); + jsPublish(js, subjects[x], x + 1); + PublishAck pa = jsPublish(js, subjects[x], data(x + 2)); packs.add(pa); jsm.deleteMessage(stream, pa.getSeqno()); } - jsPublish(js, "foo.bar", 6); + jsPublish(js, subjectIx5 + ".bar", 6); si = jsm.getStreamInfo(stream); assertEquals(stream, si.getConfiguration().getName()); @@ -376,11 +439,11 @@ public void testGetStreamInfo() throws Exception { map.put(su.getName(), su); } for (int x = 0; x < 5; x++) { - Subject s = map.get(subject(x + 1)); + Subject s = map.get(subjects[x]); assertNotNull(s); assertEquals(x + 1, s.getCount()); } - Subject sf = map.get("foo.bar"); + Subject sf = map.get(subjectIx5 + ".bar"); assertNotNull(sf); assertEquals(6, sf.getCount()); @@ -388,10 +451,10 @@ public void testGetStreamInfo() throws Exception { assertTrue(si.getStreamState().getDeleted().contains(pa.getSeqno())); } - jsPublish(js, "foo.baz", 2); + jsPublish(js, subjectIx5 + ".baz", 2); sleep(100); - si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects("foo.>").deletedDetails().build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects(subjectIx5 + ".>").deletedDetails().build()); assertEquals(7, si.getStreamState().getSubjectCount()); list = si.getStreamState().getSubjects(); assertNotNull(list); @@ -400,19 +463,19 @@ public void testGetStreamInfo() throws Exception { for (Subject su : list) { map.put(su.getName(), su); } - Subject s = map.get("foo.bar"); + Subject s = map.get(subjectIx5 + ".bar"); assertNotNull(s); assertEquals(6, s.getCount()); - s = map.get("foo.baz"); + s = map.get(subjectIx5 + ".baz"); assertNotNull(s); assertEquals(2, s.getCount()); - si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects(subject(5)).build()); + si = jsm.getStreamInfo(stream, StreamInfoOptions.builder().filterSubjects(subjects[4]).build()); list = si.getStreamState().getSubjects(); assertNotNull(list); assertEquals(1, list.size()); s = list.get(0); - assertEquals(subject(5), s.getName()); + assertEquals(subjects[4], s.getName()); assertEquals(5, s.getCount()); }); } @@ -1033,23 +1096,22 @@ public void testGetConsumers() throws Exception { JetStreamManagement jsm = nc.jetStreamManagement(); TestingStreamContainer tsc = new TestingStreamContainer(jsm); - addConsumers(jsm, tsc.stream, 600, "A", null); // getConsumers pages at 256 + addConsumers(jsm, tsc.stream, 600, "A"); // getConsumers pages at 256 List list = jsm.getConsumers(tsc.stream); assertEquals(600, list.size()); - addConsumers(jsm, tsc.stream, 500, "B", null); // getConsumerNames pages at 1024 + addConsumers(jsm, tsc.stream, 500, "B"); // getConsumerNames pages at 1024 List names = jsm.getConsumerNames(tsc.stream); assertEquals(1100, names.size()); }); } - private void addConsumers(JetStreamManagement jsm, String stream, int count, String durableVary, String filterSubject) throws IOException, JetStreamApiException { + private void addConsumers(JetStreamManagement jsm, String stream, int count, String durableVary) throws IOException, JetStreamApiException { for (int x = 1; x <= count; x++) { String dur = durable(durableVary, x); ConsumerConfiguration cc = ConsumerConfiguration.builder() .durable(dur) - .filterSubject(filterSubject) .build(); ConsumerInfo ci = jsm.addOrUpdateConsumer(stream, cc); assertEquals(dur, ci.getName()); @@ -1368,4 +1430,128 @@ public void testDirectMessageRepublishedSubject() throws Exception { assertEquals("tres", kve3.getValueAsString()); }); } + + @Test + public void testCreateConsumerUpdateConsumer() throws Exception { + jsServer.run(TestBase::atLeast2_9_0, nc -> { + String streamPrefix = variant(); + JetStreamManagement jsmNew = nc.jetStreamManagement(); + JetStreamManagement jsmPre290 = nc.jetStreamManagement(JetStreamOptions.builder().optOut290ConsumerCreate(true).build()); + + // -------------------------------------------------------- + // New without filter + // -------------------------------------------------------- + String stream1 = streamPrefix + "-new"; + String name = name(); + String subject = name(); + createMemoryStream(jsmNew, stream1, subject + ".*"); + + ConsumerConfiguration cc11 = ConsumerConfiguration.builder().name(name).build(); + + // update no good when not exist + JetStreamApiException e = assertThrows(JetStreamApiException.class, () -> jsmNew.updateConsumer(stream1, cc11)); + assertEquals(10149, e.getApiErrorCode()); + + // initial create ok + ConsumerInfo ci = jsmNew.createConsumer(stream1, cc11); + assertEquals(name, ci.getName()); + assertNull(ci.getConsumerConfiguration().getFilterSubject()); + + // any other create no good + e = assertThrows(JetStreamApiException.class, () -> jsmNew.createConsumer(stream1, cc11)); + assertEquals(10148, e.getApiErrorCode()); + + // update ok when exists + ConsumerConfiguration cc12 = ConsumerConfiguration.builder().name(name).description(variant()).build(); + ci = jsmNew.updateConsumer(stream1, cc12); + assertEquals(name, ci.getName()); + assertNull(ci.getConsumerConfiguration().getFilterSubject()); + + // -------------------------------------------------------- + // New with filter subject + // -------------------------------------------------------- + String stream2 = streamPrefix + "-new-fs"; + name = name(); + subject = name(); + String fs1 = subject + ".A"; + String fs2 = subject + ".B"; + createMemoryStream(jsmNew, stream2, subject + ".*"); + + ConsumerConfiguration cc21 = ConsumerConfiguration.builder().name(name).filterSubject(fs1).build(); + + // update no good when not exist + e = assertThrows(JetStreamApiException.class, () -> jsmNew.updateConsumer(stream2, cc21)); + assertEquals(10149, e.getApiErrorCode()); + + // initial create ok + ci = jsmNew.createConsumer(stream2, cc21); + assertEquals(name, ci.getName()); + assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); + + // any other create no good + e = assertThrows(JetStreamApiException.class, () -> jsmNew.createConsumer(stream2, cc21)); + assertEquals(10148, e.getApiErrorCode()); + + // update ok when exists + ConsumerConfiguration cc22 = ConsumerConfiguration.builder().name(name).filterSubjects(fs2).build(); + ci = jsmNew.updateConsumer(stream2, cc22); + assertEquals(name, ci.getName()); + assertEquals(fs2, ci.getConsumerConfiguration().getFilterSubject()); + + // -------------------------------------------------------- + // Pre 290 durable pathway + // -------------------------------------------------------- + String stream3 = streamPrefix + "-old-durable"; + name = name(); + subject = name(); + fs1 = subject + ".A"; + fs2 = subject + ".B"; + String fs3 = subject + ".C"; + createMemoryStream(jsmPre290, stream3, subject + ".*"); + + ConsumerConfiguration cc31 = ConsumerConfiguration.builder().durable(name).filterSubject(fs1).build(); + + // update no good when not exist + e = assertThrows(JetStreamApiException.class, () -> jsmPre290.updateConsumer(stream3, cc31)); + assertEquals(10149, e.getApiErrorCode()); + + // initial create ok + ci = jsmPre290.createConsumer(stream3, cc31); + assertEquals(name, ci.getName()); + assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); + + // opt out of 209, create on existing ok + // This is not exactly the same behavior as with the new consumer create api, but it's what the server does + jsmPre290.createConsumer(stream3, cc31); + + ConsumerConfiguration cc32 = ConsumerConfiguration.builder().durable(name).filterSubject(fs2).build(); + e = assertThrows(JetStreamApiException.class, () -> jsmPre290.createConsumer(stream3, cc32)); + assertEquals(10148, e.getApiErrorCode()); + + // update ok when exists + ConsumerConfiguration cc33 = ConsumerConfiguration.builder().durable(name).filterSubjects(fs3).build(); + ci = jsmPre290.updateConsumer(stream3, cc33); + assertEquals(name, ci.getName()); + assertEquals(fs3, ci.getConsumerConfiguration().getFilterSubject()); + + // -------------------------------------------------------- + // Pre 290 ephemeral pathway + // -------------------------------------------------------- + subject = name(); + + String stream4 = streamPrefix + "-old-ephemeral"; + fs1 = subject + ".A"; + createMemoryStream(jsmPre290, stream4, subject + ".*"); + + ConsumerConfiguration cc4 = ConsumerConfiguration.builder().filterSubject(fs1).build(); + + // update no good when not exist + e = assertThrows(JetStreamApiException.class, () -> jsmPre290.updateConsumer(stream4, cc4)); + assertEquals(10149, e.getApiErrorCode()); + + // initial create ok + jsmPre290.createConsumer(stream4, cc4); + assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); + }); + } } From c492d47faf29ebcb56656b09b04e23885356250e Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 4 Apr 2024 13:02:26 -0400 Subject: [PATCH 2/2] fixed test --- src/test/java/io/nats/client/impl/JetStreamManagementTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index 2ab6f2f1e..21e4a4f29 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1550,7 +1550,7 @@ public void testCreateConsumerUpdateConsumer() throws Exception { assertEquals(10149, e.getApiErrorCode()); // initial create ok - jsmPre290.createConsumer(stream4, cc4); + ci = jsmPre290.createConsumer(stream4, cc4); assertEquals(fs1, ci.getConsumerConfiguration().getFilterSubject()); }); }