-
-
Notifications
You must be signed in to change notification settings - Fork 468
feat(spring-jakarta): [Queue Instrumentation 3] Add Kafka producer instrumentation #5254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
adinauer
merged 4 commits into
feat/queue-instrumentation
from
feat/queue-instrumentation-producer
Apr 29, 2026
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
be5af44
feat(spring-jakarta): Add Kafka producer instrumentation
adinauer 5049ffc
changelog
adinauer 915e42b
ref(spring-jakarta): Replace SentryKafkaProducerWrapper with SentryPr…
adinauer fdb3a03
fix(spring-jakarta): Initialize Sentry in SentryProducerInterceptorTest
adinauer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
82 changes: 82 additions & 0 deletions
82
...ta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.ScopesAdapter; | ||
| import io.sentry.SentryLevel; | ||
| import java.lang.reflect.Field; | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.beans.BeansException; | ||
| import org.springframework.beans.factory.config.BeanPostProcessor; | ||
| import org.springframework.core.Ordered; | ||
| import org.springframework.core.PriorityOrdered; | ||
| import org.springframework.kafka.core.KafkaTemplate; | ||
| import org.springframework.kafka.support.CompositeProducerInterceptor; | ||
|
|
||
| /** | ||
| * Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link | ||
| * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. | ||
| * | ||
| * <p>If the template already has a {@link ProducerInterceptor}, both are composed using {@link | ||
| * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public | ||
| * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry | ||
| * interceptor is set. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaProducerBeanPostProcessor | ||
| implements BeanPostProcessor, PriorityOrdered { | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public @NotNull Object postProcessAfterInitialization( | ||
| final @NotNull Object bean, final @NotNull String beanName) throws BeansException { | ||
| if (bean instanceof KafkaTemplate) { | ||
| final @NotNull KafkaTemplate<?, ?> template = (KafkaTemplate<?, ?>) bean; | ||
| final @Nullable ProducerInterceptor<?, ?> existing = getExistingInterceptor(template); | ||
|
|
||
| if (existing instanceof SentryProducerInterceptor) { | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| final SentryProducerInterceptor sentryInterceptor = | ||
| new SentryProducerInterceptor<>(ScopesAdapter.getInstance()); | ||
|
|
||
| if (existing != null) { | ||
| @SuppressWarnings("rawtypes") | ||
| final CompositeProducerInterceptor composite = | ||
| new CompositeProducerInterceptor(sentryInterceptor, existing); | ||
| template.setProducerInterceptor(composite); | ||
| } else { | ||
| template.setProducerInterceptor(sentryInterceptor); | ||
| } | ||
| } | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private @Nullable ProducerInterceptor<?, ?> getExistingInterceptor( | ||
| final @NotNull KafkaTemplate<?, ?> template) { | ||
| try { | ||
| final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); | ||
| field.setAccessible(true); | ||
| return (ProducerInterceptor<?, ?>) field.get(template); | ||
| } catch (NoSuchFieldException | IllegalAccessException e) { | ||
| ScopesAdapter.getInstance() | ||
| .getOptions() | ||
| .getLogger() | ||
| .log( | ||
| SentryLevel.WARNING, | ||
| "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " | ||
| + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", | ||
| e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int getOrder() { | ||
| return Ordered.LOWEST_PRECEDENCE; | ||
| } | ||
| } | ||
111 changes: 111 additions & 0 deletions
111
...pring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.BaggageHeader; | ||
| import io.sentry.IScopes; | ||
| import io.sentry.ISpan; | ||
| import io.sentry.SentryTraceHeader; | ||
| import io.sentry.SpanDataConvention; | ||
| import io.sentry.SpanOptions; | ||
| import io.sentry.SpanStatus; | ||
| import io.sentry.util.TracingUtils; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Map; | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.clients.producer.RecordMetadata; | ||
| import org.apache.kafka.common.header.Headers; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
|
|
||
| /** | ||
| * A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing | ||
| * headers into outgoing records. | ||
| * | ||
| * <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing | ||
| * "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link | ||
| * #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread. | ||
| * | ||
| * <p>If the customer already has a {@link ProducerInterceptor}, the {@link | ||
| * SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link | ||
| * org.springframework.kafka.support.CompositeProducerInterceptor}. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> { | ||
|
|
||
| static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer"; | ||
| static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; | ||
|
|
||
| private final @NotNull IScopes scopes; | ||
|
|
||
| public SentryProducerInterceptor(final @NotNull IScopes scopes) { | ||
| this.scopes = scopes; | ||
| } | ||
|
|
||
| @Override | ||
| public @NotNull ProducerRecord<K, V> onSend(final @NotNull ProducerRecord<K, V> record) { | ||
| if (!scopes.getOptions().isEnableQueueTracing()) { | ||
| return record; | ||
| } | ||
|
|
||
| final @Nullable ISpan activeSpan = scopes.getSpan(); | ||
| if (activeSpan == null || activeSpan.isNoOp()) { | ||
| return record; | ||
| } | ||
|
|
||
| final @NotNull SpanOptions spanOptions = new SpanOptions(); | ||
| spanOptions.setOrigin(TRACE_ORIGIN); | ||
| final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); | ||
| if (span.isNoOp()) { | ||
| return record; | ||
| } | ||
|
|
||
| span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); | ||
| span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); | ||
|
|
||
| try { | ||
| injectHeaders(record.headers(), span); | ||
| } catch (Throwable ignored) { | ||
| // Header injection must not break the send | ||
|
adinauer marked this conversation as resolved.
|
||
| } | ||
|
adinauer marked this conversation as resolved.
|
||
|
|
||
| span.setStatus(SpanStatus.OK); | ||
| span.finish(); | ||
|
|
||
| return record; | ||
| } | ||
|
|
||
| @Override | ||
| public void onAcknowledgement( | ||
| final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} | ||
|
|
||
| @Override | ||
| public void close() {} | ||
|
|
||
| @Override | ||
| public void configure(final @Nullable Map<String, ?> configs) {} | ||
|
|
||
| private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { | ||
| final @Nullable TracingUtils.TracingHeaders tracingHeaders = | ||
| TracingUtils.trace(scopes, null, span); | ||
| if (tracingHeaders != null) { | ||
| final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); | ||
| headers.remove(sentryTraceHeader.getName()); | ||
| headers.add( | ||
| sentryTraceHeader.getName(), | ||
| sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); | ||
| if (baggageHeader != null) { | ||
| headers.remove(baggageHeader.getName()); | ||
|
adinauer marked this conversation as resolved.
|
||
| headers.add( | ||
| baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
|
adinauer marked this conversation as resolved.
|
||
| } | ||
|
|
||
| headers.remove(SENTRY_ENQUEUED_TIME_HEADER); | ||
| headers.add( | ||
| SENTRY_ENQUEUED_TIME_HEADER, | ||
| String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| } | ||
78 changes: 78 additions & 0 deletions
78
...rc/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package io.sentry.spring.jakarta.kafka | ||
|
|
||
| import kotlin.test.Test | ||
| import kotlin.test.assertSame | ||
| import kotlin.test.assertTrue | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor | ||
| import org.mockito.kotlin.mock | ||
| import org.springframework.kafka.core.KafkaTemplate | ||
| import org.springframework.kafka.core.ProducerFactory | ||
| import org.springframework.kafka.support.CompositeProducerInterceptor | ||
|
|
||
| class SentryKafkaProducerBeanPostProcessorTest { | ||
|
|
||
| private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { | ||
| val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") | ||
| field.isAccessible = true | ||
| return field.get(template) | ||
| } | ||
|
|
||
| @Test | ||
| fun `sets SentryProducerInterceptor on KafkaTemplate`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not double-wrap when SentryProducerInterceptor already set`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
| val firstInterceptor = readInterceptor(template) | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
| val secondInterceptor = readInterceptor(template) | ||
|
|
||
| assertSame(firstInterceptor, secondInterceptor) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not modify non-KafkaTemplate beans`() { | ||
| val someBean = "not a kafka template" | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(someBean, "someBean") | ||
|
|
||
| assertSame(someBean, result) | ||
| } | ||
|
|
||
| @Test | ||
| fun `returns the same bean instance`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertSame(template, result, "BPP should return the same bean, not a replacement") | ||
| } | ||
|
|
||
| @Test | ||
| fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val customerInterceptor = mock<ProducerInterceptor<String, String>>() | ||
| template.setProducerInterceptor(customerInterceptor) | ||
|
|
||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertTrue( | ||
| readInterceptor(template) is CompositeProducerInterceptor<*, *>, | ||
| "Should use CompositeProducerInterceptor when existing interceptor is present", | ||
| ) | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.