diff --git a/opentracing-spring-rabbitmq-it/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingManualConfig.java b/opentracing-spring-rabbitmq-it/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingManualConfig.java index 1389c39..3d2375d 100644 --- a/opentracing-spring-rabbitmq-it/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingManualConfig.java +++ b/opentracing-spring-rabbitmq-it/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingManualConfig.java @@ -35,12 +35,8 @@ public class RabbitMqTracingManualConfig { private Tracer tracer; @Bean - public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitTemplate rabbitTemplate, - RabbitMqSpanDecorator spanDecorator) { - Assert.notNull(rabbitTemplate.getMessageConverter(), "RabbitTemplate has no message converter configured"); - - return new RabbitMqSendTracingAspect(tracer, rabbitTemplate.getExchange(), rabbitTemplate.getRoutingKey(), - rabbitTemplate.getMessageConverter(), spanDecorator); + public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitMqSpanDecorator spanDecorator) { + return new RabbitMqSendTracingAspect(tracer, spanDecorator); } @Bean diff --git a/opentracing-spring-rabbitmq-starter/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingAutoConfiguration.java b/opentracing-spring-rabbitmq-starter/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingAutoConfiguration.java index e4ec6d4..9aa8787 100644 --- a/opentracing-spring-rabbitmq-starter/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingAutoConfiguration.java +++ b/opentracing-spring-rabbitmq-starter/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqTracingAutoConfiguration.java @@ -53,12 +53,8 @@ public class RabbitMqTracingAutoConfiguration { */ @ConditionalOnBean(RabbitTemplate.class) @Bean - public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitTemplate rabbitTemplate, - RabbitMqSpanDecorator spanDecorator) { - Assert.notNull(rabbitTemplate.getMessageConverter(), "RabbitTemplate has no message converter configured"); - - return new RabbitMqSendTracingAspect(tracer, rabbitTemplate.getExchange(), rabbitTemplate.getRoutingKey(), - rabbitTemplate.getMessageConverter(), spanDecorator); + public RabbitMqSendTracingAspect rabbitMqSendTracingAspect(RabbitMqSpanDecorator spanDecorator) { + return new RabbitMqSendTracingAspect(tracer, spanDecorator); } @Bean diff --git a/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java b/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java index af9854e..82b14db 100644 --- a/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java +++ b/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java @@ -14,6 +14,7 @@ package io.opentracing.contrib.spring.rabbitmq; import io.opentracing.Tracer; +import java.util.Optional; import lombok.AllArgsConstructor; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -32,9 +33,6 @@ class RabbitMqSendTracingAspect { private final Tracer tracer; - private final String exchange; - private final String routingKey; - private final MessageConverter messageConverter; private final RabbitMqSpanDecorator spanDecorator; /** @@ -43,9 +41,7 @@ class RabbitMqSendTracingAspect { @Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.send(..)) && args(message)", argNames = "pjp,message") public Object traceRabbitSend(ProceedingJoinPoint pjp, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, false); } /** @@ -54,9 +50,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, Object message) throws Th @Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.send(..)) && args(routingKey, message)", argNames = "pjp,routingKey,message") public Object traceRabbitSend(ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, false); } /** @@ -66,9 +60,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, String routingKey, Object "routingKey, message)", argNames = "pjp,exchange, routingKey, message") public Object traceRabbitSend(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, false); } /** @@ -77,9 +69,7 @@ public Object traceRabbitSend(ProceedingJoinPoint pjp, String exchange, String r @Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.convertAndSend(..)) " + "&& args(message)", argNames = "pjp,message") public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, false); } /** @@ -90,9 +80,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message) public Object traceRabbitConvertAndSend( ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, false); } /** @@ -103,9 +91,7 @@ public Object traceRabbitConvertAndSend( public Object traceRabbitConvertAndSend( ProceedingJoinPoint pjp, String exchange, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, false); } /** @@ -115,9 +101,7 @@ public Object traceRabbitConvertAndSend( " && args(message, messagePostProcessor)", argNames = "pjp,message,messagePostProcessor") public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, false); } /** @@ -128,9 +112,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, Object message, argNames = "pjp,routingKey,message,messagePostProcessor") public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, false); } /** @@ -141,9 +123,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingK argNames = "pjp,exchange,routingKey,message,messagePostProcessor") public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp,String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, false); } /** @@ -155,9 +135,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp,String exchange, public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, false); } /** @@ -169,9 +147,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String routingK public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message, CorrelationData correlationData) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, false); } /** @@ -180,9 +156,7 @@ public Object traceRabbitConvertAndSend(ProceedingJoinPoint pjp, String exchange @Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.sendAndReceive(..))" + " && args(message)", argNames = "pjp,message") public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(this.exchange, this.routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, true); } /** @@ -191,9 +165,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, Object message) @Around(value = "execution(* org.springframework.amqp.core.AmqpTemplate.sendAndReceive(..))" + " && args(routingKey, message)", argNames = "pjp,routingKey,message") public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, true); } /** @@ -203,9 +175,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String routingK " && args(exchange, routingKey, message)", argNames = "pjp,exchange,routingKey,message") public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, true); } // Intercept public methods that eventually delegate to RabbitTemplate.doSendAndReceive @@ -218,10 +188,7 @@ public Object traceRabbitSendAndReceive(ProceedingJoinPoint pjp, String exchange public Object traceRabbitSendAndReceive( ProceedingJoinPoint pjp, String exchange, String routingKey, Message message, CorrelationData correlationData) throws Throwable { - return createTracingHelper() - .nullResponseMeansTimeout((RabbitTemplate) pjp.getTarget()) - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, true); } /** @@ -231,9 +198,7 @@ public Object traceRabbitSendAndReceive( " && args(message)", argNames = "pjp,message") public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, true); } /** @@ -243,9 +208,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object m " && args(routingKey, message)", argNames = "pjp,routingKey,message") public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, true); } /** @@ -256,9 +219,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String r public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, true); } /** @@ -269,9 +230,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String e public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 0)); + return tagAndProceed(pjp, null, null, message, 0, true); } /** @@ -283,9 +242,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, Object m public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 1)); + return tagAndProceed(pjp, null, routingKey, message, 1, true); } /** @@ -297,9 +254,7 @@ public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String r public Object traceRabbitConvertSendAndReceive(ProceedingJoinPoint pjp, String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws Throwable { - return createTracingHelper() - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, true); } /** @@ -312,14 +267,27 @@ public Object traceRabbitConvertSendAndReceive( ProceedingJoinPoint pjp, String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws Throwable { - return createTracingHelper() - .nullResponseMeansTimeout((RabbitTemplate) pjp.getTarget()) - .doWithTracingHeadersMessage(exchange, routingKey, message, (convertedMessage) -> - proceedReplacingMessage(pjp, convertedMessage, 2)); + return tagAndProceed(pjp, exchange, routingKey, message, 2, true); } - private RabbitMqSendTracingHelper createTracingHelper() { - return new RabbitMqSendTracingHelper(tracer, messageConverter, spanDecorator); + private Object tagAndProceed(ProceedingJoinPoint pjp, String exchange, String routingKey, + Object message, int messageIndex, boolean nullResponseMeansTimeout) + throws Throwable { + if (pjp.getTarget() instanceof RabbitTemplate) { + RabbitTemplate rabbitTemplate = (RabbitTemplate) pjp.getTarget(); + MessageConverter converter = rabbitTemplate.getMessageConverter(); + String exactlyRoutingKey = Optional.ofNullable(routingKey).orElseGet(rabbitTemplate::getRoutingKey); + String exactlyExchange = Optional.ofNullable(exchange).orElseGet(rabbitTemplate::getExchange); + + RabbitMqSendTracingHelper helper = new RabbitMqSendTracingHelper(tracer, converter, spanDecorator); + if (nullResponseMeansTimeout) { + helper.nullResponseMeansTimeout(rabbitTemplate); + } + return helper + .doWithTracingHeadersMessage(exactlyExchange, exactlyRoutingKey, message, + (convertedMessage) -> proceedReplacingMessage(pjp, convertedMessage, messageIndex)); + } + return pjp.proceed(pjp.getArgs()); } private Object proceedReplacingMessage(ProceedingJoinPoint pjp, Message convertedMessage, int messageArgumentIndex) diff --git a/opentracing-spring-rabbitmq/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspectTest.java b/opentracing-spring-rabbitmq/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspectTest.java index a6d9bfa..cfc7c11 100644 --- a/opentracing-spring-rabbitmq/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspectTest.java +++ b/opentracing-spring-rabbitmq/src/test/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspectTest.java @@ -15,6 +15,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import io.opentracing.Span; @@ -26,9 +27,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Import; @@ -52,12 +56,13 @@ public class RabbitMqSendTracingAspectTest { private ProceedingJoinPoint proceedingJoinPoint; @Mock private MessageConverter messageConverter; + @Mock + private RabbitTemplate rabbitTemplate; @Before public void init() { MockitoAnnotations.initMocks(this); - aspect = new RabbitMqSendTracingAspect(mockTracer, "overridden-exchange", ROUTING_KEY, - messageConverter, spanDecorator); + aspect = new RabbitMqSendTracingAspect(mockTracer, spanDecorator); } @After @@ -88,11 +93,17 @@ public void testTraceRabbitSend_whenNoPropertiesHeaders() throws Throwable { given(messageConverter.toMessage(any(Object.class), any(MessageProperties.class))) .willReturn(message); + given(proceedingJoinPoint.getTarget()).willReturn(rabbitTemplate); + + given(rabbitTemplate.getMessageConverter()).willReturn(messageConverter); + // when aspect.traceRabbitSend(proceedingJoinPoint, exchange, routingKey, myMessage); // then verify(proceedingJoinPoint).getArgs(); + verify(rabbitTemplate).getMessageConverter(); + verify(proceedingJoinPoint, times(2)).getTarget(); verify(messageConverter).toMessage(any(Object.class), any(MessageProperties.class)); verify(proceedingJoinPoint).proceed(args); } @@ -107,6 +118,8 @@ public void testTraceRabbitSend_whenNoConversionIsNeeded() throws Throwable { Object[] args = new Object[] {exchange, ROUTING_KEY, message}; given(proceedingJoinPoint.getArgs()).willReturn(args); + given(proceedingJoinPoint.getTarget()).willReturn(rabbitTemplate); + given(messageConverter.toMessage(any(Object.class), any(MessageProperties.class))) .willReturn(message); @@ -114,6 +127,8 @@ public void testTraceRabbitSend_whenNoConversionIsNeeded() throws Throwable { aspect.traceRabbitSend(proceedingJoinPoint, exchange, ROUTING_KEY, message); // then + verify(rabbitTemplate).getMessageConverter(); + verify(proceedingJoinPoint, times(2)).getTarget(); verify(proceedingJoinPoint).getArgs(); verify(proceedingJoinPoint).proceed(args); } @@ -128,6 +143,8 @@ public void testTraceRabbitSend_whenException() throws Throwable { Object[] args = new Object[] {exchange, ROUTING_KEY, message}; given(proceedingJoinPoint.getArgs()).willReturn(args); + given(proceedingJoinPoint.getTarget()).willReturn(rabbitTemplate); + given(messageConverter.toMessage(any(Object.class), any(MessageProperties.class))) .willReturn(message); @@ -138,13 +155,41 @@ public void testTraceRabbitSend_whenException() throws Throwable { aspect.traceRabbitSend(proceedingJoinPoint, exchange, ROUTING_KEY, message); } catch (RuntimeException e) { // then + verify(rabbitTemplate).getMessageConverter(); verify(proceedingJoinPoint).getArgs(); + verify(proceedingJoinPoint, times(2)).getTarget(); verify(proceedingJoinPoint).proceed(args); throw e; } } + @Test + public void testTraceRabbitSend_whenTargetIsNotRabbitTemplate() throws Throwable { + // given + String exchange = "opentracing.event.exchange"; + + MessageProperties properties = new MessageProperties(); + Message message = new Message("".getBytes(), properties); + Object[] args = new Object[] {exchange, ROUTING_KEY, message}; + AmqpTemplate notRabbitTemplate = Mockito.mock(AmqpTemplate.class); + given(proceedingJoinPoint.getArgs()).willReturn(args); + + given(proceedingJoinPoint.getTarget()).willReturn(rabbitTemplate); + + given(messageConverter.toMessage(any(Object.class), any(MessageProperties.class))) + .willReturn(message); + + given(proceedingJoinPoint.getTarget()).willReturn(notRabbitTemplate); + + // when + aspect.traceRabbitSend(proceedingJoinPoint, exchange, ROUTING_KEY, message); + // then + verify(proceedingJoinPoint).getArgs(); + verify(proceedingJoinPoint).getTarget(); + verify(proceedingJoinPoint).proceed(args); + } + class TestMessage { private T body;