Skip to content

Commit

Permalink
Allow templating of host and advertisedHost fields
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Sep 18, 2024
1 parent 2b3310e commit b4fc080
Show file tree
Hide file tree
Showing 18 changed files with 408 additions and 191 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* The `ContinueReconciliationOnManualRollingUpdateFailure` feature gate moves to beta stage and is enabled by default.
If needed, `ContinueReconciliationOnManualRollingUpdateFailure` can be disabled in the feature gates configuration in the Cluster Operator.
* Add support for managing connector offsets via KafkaConnector and KafkaMirrorMaker2 custom resources.
* Add support for templating `host` and `advertisedHost` fields in listener configuration

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
@DescriptionFile
@JsonPropertyOrder({"brokerCertChainAndKey", "class", "preferredAddressType", "externalTrafficPolicy",
"loadBalancerSourceRanges", "bootstrap", "brokers", "ipFamilyPolicy", "ipFamilies", "createBootstrapService",
"finalizers", "useServiceDnsDomain", "maxConnections", "maxConnectionCreationRate", "preferredNodePortAddressType", "publishNotReadyAddresses"})
"finalizers", "useServiceDnsDomain", "maxConnections", "maxConnectionCreationRate", "preferredNodePortAddressType",
"publishNotReadyAddresses", "hostTemplate", "advertisedHostTemplate"})
@JsonInclude(JsonInclude.Include.NON_NULL)
@Buildable(
editableEnabled = false,
Expand All @@ -54,6 +55,8 @@ public class GenericKafkaListenerConfiguration implements UnknownPropertyPreserv
private List<IpFamily> ipFamilies;
private Boolean createBootstrapService = true;
private Boolean publishNotReadyAddresses;
private String hostTemplate;
private String advertisedHostTemplate;
private Map<String, Object> additionalProperties;

@Description("Reference to the `Secret` which holds the certificate and private key pair which will be used for this listener. " +
Expand Down Expand Up @@ -256,6 +259,28 @@ public void setPublishNotReadyAddresses(Boolean publishNotReadyAddresses) {
this.publishNotReadyAddresses = publishNotReadyAddresses;
}

@Description("Configures the template for generating the host names of the individual brokers. " +
"Valid placeholders that you can use in the template are `{nodeId}` and `{nodePodName}`")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getHostTemplate() {
return hostTemplate;
}

public void setHostTemplate(String hostTemplate) {
this.hostTemplate = hostTemplate;
}

@Description("Configures the template for generating the advertisedHost names of the individual brokers. " +
"Valid placeholders that you can use in the template are `{nodeId}` and `{nodePodName}`")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getAdvertisedHostTemplate() {
return advertisedHostTemplate;
}

public void setAdvertisedHostTemplate(String advertisedHostTemplate) {
this.advertisedHostTemplate = advertisedHostTemplate;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties != null ? this.additionalProperties : Map.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ public List<Route> generateExternalRoutes() {
.endSpec()
.build();

String host = ListenersUtils.brokerHost(listener, node.nodeId());
String host = ListenersUtils.brokerHost(listener, node);
if (host != null) {
route.getSpec().setHost(host);
}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ public List<Ingress> generateExternalIngresses() {
if (pool.isBroker()) {
for (NodeRef node : pool.nodes()) {
String ingressName = ListenersUtils.backwardsCompatiblePerBrokerServiceName(pool.componentName, node.nodeId(), listener);
String host = ListenersUtils.brokerHost(listener, node.nodeId());
String host = ListenersUtils.brokerHost(listener, node);
String ingressClass = ListenersUtils.controllerClass(listener);

HTTPIngressPath path = new HTTPIngressPathBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.strimzi.api.kafka.model.common.template.IpFamily;
import io.strimzi.api.kafka.model.common.template.IpFamilyPolicy;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfiguration;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfigurationBroker;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationCustom;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationOAuth;
Expand Down Expand Up @@ -475,17 +476,31 @@ public static String bootstrapHost(GenericKafkaListener listener) {
}

/**
* Finds broker host configuration
* Replaces the template fields in the template string with the corresponding values from the node reference.
*
* @param listener Listener for which the host should be found
* @param pod Pod ID for which we should get the configuration option
* @return Host or null if not specified
* @param template Template with the placeholders
* @param node Node reference that should be used to provide the final values
*
* @return The rendered template
*/
public static String brokerHost(GenericKafkaListener listener, int pod) {
if (listener.getConfiguration() != null
&& listener.getConfiguration().getBrokers() != null) {
return listener.getConfiguration().getBrokers().stream()
.filter(broker -> broker != null && broker.getBroker() != null && broker.getBroker() == pod && broker.getHost() != null)
/* test */ static String renderHostTemplate(String template, NodeRef node) {
return template
.replace("{nodeId}", Integer.toString(node.nodeId()))
.replace("{nodePodName}", node.podName());
}

/**
* Finds per-broker host configuration based on node ID.
*
* @param listenerConfiguration Configuration of the listener for which the host should be found
* @param nodeId Node ID for which we should get the configuration option
*
* @return Host configured for given node ID or null if not specified
*/
private static String brokerHost(GenericKafkaListenerConfiguration listenerConfiguration, int nodeId) {
if (listenerConfiguration.getBrokers() != null) {
return listenerConfiguration.getBrokers().stream()
.filter(broker -> broker != null && broker.getBroker() != null && broker.getBroker() == nodeId && broker.getHost() != null)
.map(GenericKafkaListenerConfigurationBroker::getHost)
.findAny()
.orElse(null);
Expand All @@ -495,17 +510,41 @@ public static String brokerHost(GenericKafkaListener listener, int pod) {
}

/**
* Finds broker advertised host configuration
* Finds broker host configuration either in the per-broker configuration or renders it from the template. If no
* per-broker value and no template are set, it returns null.
*
* @param listener Listener for which the advertised host should be found
* @param pod Pod ID for which we should get the configuration option
* @return Advertised Host or null if not specified
* @param listener Listener for which the host should be found
* @param node Node reference describing the node for which we want to find the host
*
* @return Host or null if not specified
*/
public static String brokerAdvertisedHost(GenericKafkaListener listener, int pod) {
if (listener.getConfiguration() != null
&& listener.getConfiguration().getBrokers() != null) {
return listener.getConfiguration().getBrokers().stream()
.filter(broker -> broker != null && broker.getBroker() != null && broker.getBroker() == pod && broker.getAdvertisedHost() != null)
public static String brokerHost(GenericKafkaListener listener, NodeRef node) {
if (listener.getConfiguration() != null) {
String host = brokerHost(listener.getConfiguration(), node.nodeId());

if (host == null && listener.getConfiguration().getHostTemplate() != null) {
// There is no host defined specifically for given broker, so we try to use the template
host = renderHostTemplate(listener.getConfiguration().getHostTemplate(), node);
}

return host;
} else {
return null;
}
}

/**
* Finds per-broker advertised host configuration based on node ID.
*
* @param listenerConfiguration Configuration of the listener for which the advertised host should be found
* @param nodeId Node ID for which we should get the configuration option
*
* @return Advertised Host or null if not specified
*/
private static String brokerAdvertisedHost(GenericKafkaListenerConfiguration listenerConfiguration, int nodeId) {
if (listenerConfiguration.getBrokers() != null) {
return listenerConfiguration.getBrokers().stream()
.filter(broker -> broker != null && broker.getBroker() != null && broker.getBroker() == nodeId && broker.getAdvertisedHost() != null)
.map(GenericKafkaListenerConfigurationBroker::getAdvertisedHost)
.findAny()
.orElse(null);
Expand All @@ -514,6 +553,30 @@ public static String brokerAdvertisedHost(GenericKafkaListener listener, int pod
}
}

/**
* Finds broker advertised host configuration either in the per-broker configuration or renders it from the template. If no
* per-broker value and no template are set, it returns null.
*
* @param listener Listener for which the advertised host should be found
* @param node The node for which the advertised hostname should be obtained
*
* @return Advertised Host or null if not specified
*/
public static String brokerAdvertisedHost(GenericKafkaListener listener, NodeRef node) {
if (listener.getConfiguration() != null) {
String advertisedHost = brokerAdvertisedHost(listener.getConfiguration(), node.nodeId());

if (advertisedHost == null && listener.getConfiguration().getAdvertisedHostTemplate() != null) {
// There is no advertised host defined specifically for given broker, so we try to use the template
advertisedHost = renderHostTemplate(listener.getConfiguration().getAdvertisedHostTemplate(), node);
}

return advertisedHost;
} else {
return null;
}
}

/**
* Finds broker advertised port configuration
*
Expand Down Expand Up @@ -671,28 +734,6 @@ public static String serviceType(GenericKafkaListener listener) {
}
}


/**
* Returns the advertised host for given broker. If user specified some override in the listener configuration, it
* will return this override. If no override is specified, it will return the host obtained from Kubernetes
* passes as parameter to this method.
*
* @param listener Listener where the configuration should be found
* @param nodeId Kafka node ID
* @param hostname The advertised hostname which will be used if there is no listener override
*
* @return The advertised hostname
*/
public static String advertisedHostnameFromOverrideOrParameter(GenericKafkaListener listener, int nodeId, String hostname) {
String advertisedHost = ListenersUtils.brokerAdvertisedHost(listener, nodeId);

if (advertisedHost == null && hostname == null) {
return null;
}

return advertisedHost != null ? advertisedHost : hostname;
}

/**
* Returns the advertised port for given broker. If user specified some override in the listener configuration, it
* will return this override. If no override is specified, it will return the port obtained from Kubernetes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static void validate(Reconciliation reconciliation, Set<NodeRef> brokerNo
validatePreferredAddressType(errors, listener);
validatePublishNotReadyAddresses(errors, listener);
validateCreateBootstrapService(errors, listener);

validateBrokerHostTemplate(errors, listener);

if (listener.getConfiguration().getBootstrap() != null) {
validateBootstrapHost(errors, listener);
Expand Down Expand Up @@ -173,7 +173,9 @@ private static void validateIngress(Set<String> errors, Set<NodeRef> brokerNodes
errors.add("listener " + listener.getName() + " is missing a bootstrap host name which is required for Ingress based listeners");
}

if (conf.getBrokers() != null) {
if (conf.getHostTemplate() != null) {
// nothing to do => when the template is set, we do not care about the per-broker configurations
} else if (conf.getBrokers() != null) {
for (NodeRef node : brokerNodes) {
GenericKafkaListenerConfigurationBroker broker = conf.getBrokers().stream().filter(b -> b.getBroker() == node.nodeId()).findFirst().orElse(null);

Expand All @@ -184,8 +186,6 @@ private static void validateIngress(Set<String> errors, Set<NodeRef> brokerNodes
} else {
errors.add("listener " + listener.getName() + " is missing a broker configuration with host names which is required for Ingress based listeners");
}


} else {
errors.add("listener " + listener.getName() + " is missing a configuration with host names which is required for Ingress based listeners");
}
Expand Down Expand Up @@ -324,6 +324,19 @@ private static void validateFinalizers(Set<String> errors, GenericKafkaListener
}
}

/**
* Validates that hostTemplate is used only with Route or Ingress type listener
*
* @param errors List where any found errors will be added
* @param listener Listener which needs to be validated
*/
private static void validateBrokerHostTemplate(Set<String> errors, GenericKafkaListener listener) {
if ((!KafkaListenerType.ROUTE.equals(listener.getType()) && !KafkaListenerType.INGRESS.equals(listener.getType()))
&& listener.getConfiguration().getHostTemplate() != null) {
errors.add("listener " + listener.getName() + " cannot configure hostTemplate because it is not Route or Ingress based listener");
}
}

/**
* Validates that bootstrap.host is used only with Route or Ingress type listener
*
Expand Down
Loading

0 comments on commit b4fc080

Please sign in to comment.