Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Cluster Nodes processing #3000

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.2.11-SNAPSHOT</version>
<version>3.2.11-GH-2862-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.convert.converter.Converter;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.GeoResult;
Expand Down Expand Up @@ -63,6 +62,7 @@
* @author daihuabin
* @author John Blum
* @author Sorokin Evgeniy
* @author Marcin Grzejszczak
*/
public abstract class Converters {

Expand Down Expand Up @@ -545,10 +545,15 @@ enum ClusterNodesConverter implements Converter<String, RedisClusterNode> {
* <li>{@code %s:%i} (Redis 3)</li>
* <li>{@code %s:%i@%i} (Redis 4, with bus port)</li>
* <li>{@code %s:%i@%i,%s} (Redis 7, with announced hostname)</li>
*
* The output of the {@code CLUSTER NODES } command is just a space-separated CSV string, where each
* line represents a node in the cluster. The following is an example of output on Redis 7.2.0.
* You can check the latest <a href="https://redis.io/docs/latest/commands/cluster-nodes/">here</a>.
*
* {@code <id> <ip:port@cport[,hostname]> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>}
*
* </ul>
*/
static final Pattern clusterEndpointPattern = Pattern
.compile("\\[?([0-9a-zA-Z\\-_\\.:]*)\\]?:([0-9]+)(?:@[0-9]+(?:,([^,].*))?)?");
private static final Map<String, Flag> flagLookupMap;

static {
Expand All @@ -567,32 +572,88 @@ enum ClusterNodesConverter implements Converter<String, RedisClusterNode> {
static final int LINK_STATE_INDEX = 7;
static final int SLOTS_INDEX = 8;

/**
* Value object capturing Redis' representation of a cluster node network coordinate.
*
* @author Marcin Grzejszczak
* @author Mark Paluch
*/
record AddressPortHostname(String address, String port, @Nullable String hostname) {

/**
* Parses Redis {@code CLUSTER NODES} host and port segment into {@link AddressPortHostname}.
*/
static AddressPortHostname parse(String hostAndPortPart) {

String[] segments = hostAndPortPart.split(",");
int portSeparator = segments[0].lastIndexOf(":");
Assert.isTrue(portSeparator != -1, "ClusterNode information does not define host and port");

String addressPart = getAddressPart(segments[0].substring(0, portSeparator));
String portPart = getPortPart(segments[0].substring(portSeparator + 1));
String hostnamePart = segments.length > 1 ? segments[1] : null;

return new AddressPortHostname(addressPart, portPart, hostnamePart);
}

private static String getAddressPart(String address) {
return address.startsWith("[") && address.endsWith("]") ? address.substring(1, address.length() - 1) : address;
}

private static String getPortPart(String segment) {

if (segment.contains("@")) {
return segment.substring(0, segment.indexOf('@'));
}

if (segment.contains(":")) {
return segment.substring(0, segment.indexOf(':'));
}

return segment;
}

public int portAsInt() {
return Integer.parseInt(port());
}

public boolean hasHostname() {
return StringUtils.hasText(hostname());
}

public String getRequiredHostname() {

if (StringUtils.hasText(hostname())) {
return hostname();
}

throw new IllegalStateException("Hostname not available");
}
}

@Override
public RedisClusterNode convert(String source) {

String[] args = source.split(" ");

Matcher matcher = clusterEndpointPattern.matcher(args[HOST_PORT_INDEX]);
Assert.isTrue(args.length >= MASTER_ID_INDEX + 1,
() -> "Invalid ClusterNode information, insufficient segments: %s".formatted(source));

Assert.isTrue(matcher.matches(), "ClusterNode information does not define host and port");

String addressPart = matcher.group(1);
String portPart = matcher.group(2);
String hostnamePart = matcher.group(3);
AddressPortHostname endpoint = AddressPortHostname.parse(args[HOST_PORT_INDEX]);

SlotRange range = parseSlotRange(args);
Set<Flag> flags = parseFlags(args);
Set<Flag> flags = parseFlags(args[FLAGS_INDEX]);

RedisClusterNodeBuilder nodeBuilder = RedisClusterNode.newRedisClusterNode()
.listeningAt(addressPart, Integer.parseInt(portPart)) //
.listeningAt(endpoint.address(), endpoint.portAsInt()) //
.withId(args[ID_INDEX]) //
.promotedAs(flags.contains(Flag.MASTER) ? NodeType.MASTER : NodeType.REPLICA) //
.serving(range) //
.withFlags(flags) //
.linkState(parseLinkState(args));

if (hostnamePart != null) {
nodeBuilder.withName(hostnamePart);
if (endpoint.hasHostname()) {
nodeBuilder.withName(endpoint.getRequiredHostname());
}

if (!args[MASTER_ID_INDEX].isEmpty() && !args[MASTER_ID_INDEX].startsWith("-")) {
Expand All @@ -602,14 +663,12 @@ public RedisClusterNode convert(String source) {
return nodeBuilder.build();
}

private Set<Flag> parseFlags(String[] args) {

String raw = args[FLAGS_INDEX];
private Set<Flag> parseFlags(String source) {

Set<Flag> flags = new LinkedHashSet<>(8, 1);

if (StringUtils.hasText(raw)) {
for (String flag : raw.split(",")) {
if (StringUtils.hasText(source)) {
for (String flag : source.split(",")) {
flags.add(flagLookupMap.get(flag));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@
import static org.assertj.core.api.Assertions.*;

import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisClusterNode.Flag;
import org.springframework.data.redis.connection.RedisClusterNode.LinkState;
import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.connection.convert.Converters.ClusterNodesConverter;
import org.springframework.data.redis.connection.convert.Converters.ClusterNodesConverter.AddressPortHostname;

/**
* Unit tests for {@link Converters}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Sorokin Evgeniy
* @author Marcin Grzejszczak
*/
class ConvertersUnitTests {

Expand Down Expand Up @@ -72,6 +73,10 @@ class ConvertersUnitTests {

private static final String CLUSTER_NODE_WITH_SINGLE_INVALID_IPV6_HOST = "67adfe3df1058896e3cb49d2863e0f70e7e159fa 2a02:6b8:c67:9c:0:6d8b:33da:5a2c: master,nofailover - 0 1692108412315 1 connected 0-5460";

private static final String CLUSTER_NODE_WITH_SINGLE_IPV4_EMPTY_HOSTNAME = "3765733728631672640db35fd2f04743c03119c6 10.180.0.33:11003@16379, master - 0 1708041426947 2 connected 0-5460";

private static final String CLUSTER_NODE_WITH_SINGLE_IPV4_HOSTNAME = "3765733728631672640db35fd2f04743c03119c6 10.180.0.33:11003@16379,hostname1 master - 0 1708041426947 2 connected 0-5460";

@Test // DATAREDIS-315
void toSetOfRedis30ClusterNodesShouldConvertSingleStringNodesResponseCorrectly() {

Expand Down Expand Up @@ -248,6 +253,37 @@ void toClusterNodeWithIPv6Hostname() {
assertThat(node.getSlotRange().getSlots().size()).isEqualTo(5461);
}

@Test // GH-2862
void toClusterNodeWithIPv4EmptyHostname() {

RedisClusterNode node = Converters.toClusterNode(CLUSTER_NODE_WITH_SINGLE_IPV4_EMPTY_HOSTNAME);

assertThat(node.getId()).isEqualTo("3765733728631672640db35fd2f04743c03119c6");
assertThat(node.getHost()).isEqualTo("10.180.0.33");
assertThat(node.hasValidHost()).isTrue();
assertThat(node.getPort()).isEqualTo(11003);
assertThat(node.getType()).isEqualTo(NodeType.MASTER);
assertThat(node.getFlags()).contains(Flag.MASTER);
assertThat(node.getLinkState()).isEqualTo(LinkState.CONNECTED);
assertThat(node.getSlotRange().getSlots().size()).isEqualTo(5461);
}

@Test // GH-2862
void toClusterNodeWithIPv4Hostname() {

RedisClusterNode node = Converters.toClusterNode(CLUSTER_NODE_WITH_SINGLE_IPV4_HOSTNAME);

assertThat(node.getId()).isEqualTo("3765733728631672640db35fd2f04743c03119c6");
assertThat(node.getHost()).isEqualTo("10.180.0.33");
assertThat(node.getName()).isEqualTo("hostname1");
assertThat(node.hasValidHost()).isTrue();
assertThat(node.getPort()).isEqualTo(11003);
assertThat(node.getType()).isEqualTo(NodeType.MASTER);
assertThat(node.getFlags()).contains(Flag.MASTER);
assertThat(node.getLinkState()).isEqualTo(LinkState.CONNECTED);
assertThat(node.getSlotRange().getSlots().size()).isEqualTo(5461);
}

@Test // GH-2678
void toClusterNodeWithIPv6HostnameSquareBrackets() {

Expand All @@ -271,35 +307,64 @@ void toClusterNodeWithInvalidIPv6Hostname() {

@ParameterizedTest // GH-2678
@MethodSource("clusterNodesEndpoints")
void shouldAcceptHostPatterns(String endpoint, String expectedAddress, String expectedPort, String expectedHostname) {
void shouldAcceptHostPatterns(String endpoint, AddressPortHostname expected) {

Matcher matcher = ClusterNodesConverter.clusterEndpointPattern.matcher(endpoint);
assertThat(matcher.matches()).isTrue();
AddressPortHostname addressPortHostname = AddressPortHostname.parse(endpoint);

assertThat(matcher.group(1)).isEqualTo(expectedAddress);
assertThat(matcher.group(2)).isEqualTo(expectedPort);
assertThat(matcher.group(3)).isEqualTo(expectedHostname);
assertThat(addressPortHostname).isEqualTo(expected);
}

static Stream<Arguments> clusterNodesEndpoints() {

return Stream.of(
Stream<Arguments> regular = Stream.of(
// IPv4 with Host, Redis 3
Arguments.of("1.2.4.4:7379", "1.2.4.4", "7379", null),
Arguments.of("1.2.4.4:7379", new AddressPortHostname("1.2.4.4", "7379", null)),
// IPv6 with Host, Redis 3
Arguments.of("6b8:c67:9c:0:6d8b:33da:5a2c:6380", "6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null),
Arguments.of("6b8:c67:9c:0:6d8b:33da:5a2c:6380",
new AddressPortHostname("6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null)),
// Assuming IPv6 in brackets with Host, Redis 3
Arguments.of("[6b8:c67:9c:0:6d8b:33da:5a2c]:6380", "6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null),
Arguments.of("[6b8:c67:9c:0:6d8b:33da:5a2c]:6380",
new AddressPortHostname("6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null)),

// IPv4 with Host and Bus Port, Redis 4
Arguments.of("127.0.0.1:7382@17382", "127.0.0.1", "7382", null),
Arguments.of("127.0.0.1:7382@17382", new AddressPortHostname("127.0.0.1", "7382", null)),
// IPv6 with Host and Bus Port, Redis 4
Arguments.of("6b8:c67:9c:0:6d8b:33da:5a2c:6380", "6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null),
Arguments.of("6b8:c67:9c:0:6d8b:33da:5a2c:6380",
new AddressPortHostname("6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null)),

// Hostname with Port and Bus Port, Redis 7
Arguments.of("my.host-name.com:7379@17379", "my.host-name.com", "7379", null),
Arguments.of("my.host-name.com:7379@17379", new AddressPortHostname("my.host-name.com", "7379", null)),

// With hostname, Redis 7
Arguments.of("1.2.4.4:7379@17379,my.host-name.com", "1.2.4.4", "7379", "my.host-name.com"));
Arguments.of("1.2.4.4:7379@17379,my.host-name.com",
new AddressPortHostname("1.2.4.4", "7379", "my.host-name.com")));

Stream<Arguments> weird = Stream.of(
// Port-only
Arguments.of(":6380", new AddressPortHostname("", "6380", null)),

// Port-only with bus-port
Arguments.of(":6380@6381", new AddressPortHostname("", "6380", null)),
// IP with trailing comma
Arguments.of("127.0.0.1:6380,", new AddressPortHostname("127.0.0.1", "6380", null)),
// IPv6 with bus-port
Arguments.of("2a02:6b8:c67:9c:0:6d8b:33da:5a2c:6380@6381",
new AddressPortHostname("2a02:6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null)),
// IPv6 with bus-port and hostname
Arguments.of("2a02:6b8:c67:9c:0:6d8b:33da:5a2c:6380@6381,hostname1",
new AddressPortHostname("2a02:6b8:c67:9c:0:6d8b:33da:5a2c", "6380", "hostname1")),
// Port-only with hostname
Arguments.of(":6380,hostname1", new AddressPortHostname("", "6380", "hostname1")),

// Port-only with bus-port
Arguments.of(":6380@6381,hostname1", new AddressPortHostname("", "6380", "hostname1")),
// IPv6 in brackets with bus-port
Arguments.of("[2a02:6b8:c67:9c:0:6d8b:33da:5a2c]:6380@6381",
new AddressPortHostname("2a02:6b8:c67:9c:0:6d8b:33da:5a2c", "6380", null)),
// IPv6 in brackets with bus-port and hostname
Arguments.of("[2a02:6b8:c67:9c:0:6d8b:33da:5a2c]:6380@6381,hostname1",
new AddressPortHostname("2a02:6b8:c67:9c:0:6d8b:33da:5a2c", "6380", "hostname1")));

return Stream.concat(regular, weird);
}
}