NED Service โ Implementation Reference
Java 25 ยท Quarkus 3 ยท Kafka consumer โ GMD + OSG fan-out
Table of Contents
- What is NED
- Architecture
- Development Environment Setup
- Local Dev Stack (Docker)
- Configuration Reference
- Message Processing Pipeline
- Sink Implementations
- Fault Tolerance
- Metrics Reference
- Test Suite
- Coding Standards
- Pre-commit Checklist
1. What is NED
NED implements the "Signal-Only" notification pattern (Option 3). Instead of replicating full message payloads, WMX emits lightweight notification signals. NED acts as the orchestrator that maps these signals from the WMX Kafka stream into the specific formats required by the on-site gateways (OSG and GMD).
For more context, see the ADR-CH-0022 Inbox - Real-time Notification nudge delivery.
The gateways use these signals to push a "nudge" to customers who are currently on-site. Upon receiving this nudge, clients retrieve the updated inbox data directly from WMX via HTTP.
NED (Notification Event Distribution) is a Quarkus service that consumes these notification events from a WMX Kafka topic and fans them out to two downstream sinks in parallel:
- GMD โ publishes a
UserNotificationInboundEventto a GMD Kafka topic - OSG โ POSTs a REST notification request to the OSG microservice
NED is a consumer-only service. It has no REST endpoints of its own. Health endpoints are served by quarkus-smallrye-health via Vert.x.
2. Architecture
Data Flow
WMX Kafka Topic: mac.sbg.notifications
โ NotificationEvent (Avro)
โผ
NotificationConsumerService.consume() @RunOnVirtualThread
โ
โผ
NotificationProcessor.handleNotificationEvent()
โโ isValid() โ drop + WARN + ned.events.rejected
โโ isSupportedMessageType() โ drop + INFO + ned.events.skipped
โโ recordEventConsumed()
โโ EnrichedNotificationEvent (adds UUID messageId)
โ
โโ VT โโโบ GMDProducerService.send()
โ โโ UserNotificationInboundEvent โ gmd.notifications Kafka topic
โ
โโ VT โโโบ OSGNotificationClient.send()
โโ POST /send โ OSG microservice
Key Components
| Class | Package | Role |
|---|---|---|
NotificationConsumerService |
source |
@Incoming Kafka consumer; @RunOnVirtualThread |
NotificationProcessor |
source |
Orchestrator: validates, filters, dispatches to sinks |
EnrichedNotificationEvent |
model |
Immutable Java record; adds messageId (UUID) |
SinkRegistry |
routing |
Holds enabled SinkAdapter instances; returns unmodifiableCollection |
SinkAdapter |
sink |
Interface: getId(), isEnabled(), send(EnrichedNotificationEvent) |
GMDProducerService |
sink.gmd |
GMD Kafka producer; transforms to UserNotificationInboundEvent |
OSGNotificationClient |
sink.osg |
REST client delegating to OSGRetryableSender |
OSGRetryableSender |
sink.osg |
@Retry + @ExponentialBackoff for 5xx errors |
SinkMetrics |
metrics |
All Micrometer counters; eagerly registered |
Avro Schemas
| Schema | File | Used by |
|---|---|---|
NotificationEvent |
notification-event.avsc |
WMX consumer (inbound) |
UserNotificationInboundEvent |
UserNotificationInboundEvent.avsc |
GMD producer (outbound) |
Pluggable Sink Pattern
Adding a new sink requires:
1. Create class implementing SinkAdapter
2. Annotate @ApplicationScoped โ CDI registers it automatically
3. No changes to NotificationProcessor or SinkRegistry
3. Development Environment Setup
Java Version
Tool: SDKMAN ยท Version: 25.0.2-graalce
sdk use java 25.0.2-graalce # per session
sdk default java 25.0.2-graalce # permanent
java -version # GraalVM CE 25.0.2
Build
mvn clean verify # compile + test
mvn clean package -q -DskipTests # fast package
mvn quarkus:dev # dev mode (hot reload)
Ignore Java 25 warnings about Unsafe/restricted methods from Jansi/Guava โ benign.
Key Dependency Versions
| Dependency | Version | Note |
|---|---|---|
| Quarkus BOM | 3.34.6 | All Quarkus deps managed from here |
kafka-avro-serializer |
7.2.2 | Versions 6.2.xโ7.3.x only are native-safe; 7.4+ unsupported |
google-java-format (Spotless) |
1.27.0 | Java 25 AST compatible |
| Maven compiler plugin | 3.13.0 | 3.14.0+ breaks Java 25 release flag |
| Testcontainers BOM | 2.0.5 | All testcontainers managed here |
Native Image Constraints
kafka-avro-serializermust stay โค 7.3.x for native image support- Exclude
jakarta.ws.rs:jakarta.ws.rs-apifromkafka-avro-serializer(Quarkus BOM provides it) quarkus-apicurio-registry-avrois test scope only- Do not add
quarkus-restโ NED has no REST endpoints
4. Local Dev Stack (Docker)
Start
| Service | Port | Purpose |
|---|---|---|
| Kafka (KRaft) | 9092 | Message broker |
| Schema Registry | 8081 | Confluent Avro SR |
| WireMock | 8090 | OSG HTTP mock |
| kafka-init | โ | One-shot; pre-creates topics |
Pre-created topics: mac.sbg.notifications (3 partitions), gmd.notifications (3 partitions)
kafka-initis needed because Kafka auto-create only fires on producer write, not on consumer subscribe.
Configure NED
The file overrides defaults to point at local ports (Kafka 9092, SR 8081, OSG WireMock 8090).
Start NED
sdk use java 25.0.2-graalce
mvn quarkus:dev -Dquarkus.config.locations=$(pwd)/local/application-local.properties
Check health: curl -s localhost:8080/health/readiness | jq .
Send a test event
mvn test-compile exec:java \
-Dexec.mainClass=com.uki.platform.personalisation.LocalEventProducer \
-Dexec.classpathScope=test \
-Dproducer.accountId=12345 \
-Dproducer.count=1 \
-Dproducer.schemaRegistry=http://localhost:8081
Known issues
Single SR for local dev โ Production uses separate Schema Registry clusters per Kafka environment. Locally one SR on 8081 is used for both topics, which also matches application.properties defaults so NED works without the -Dquarkus.config.locations override.
5. Configuration Reference
# โโโ Kafka consumer โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
mp.messaging.incoming.notification-events.connector=smallrye-kafka
mp.messaging.incoming.notification-events.topic=${WMX_KAFKA_TOPIC:mac.sbg.notifications}
mp.messaging.incoming.notification-events.failure-strategy=ignore
# โโโ Processor tunables โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# Bounded VT pool caps concurrent sink calls to protect OSG/GMD
ned.processor.max-concurrency=100
ned.processor.queue-capacity=500
# Graceful shutdown: wait up to N seconds for in-flight tasks
ned.processor.shutdown-timeout-seconds=30
# Quarkus shutdown grace period (must be > shutdown-timeout-seconds)
quarkus.shutdown.timeout=35S
# โโโ GMD sink โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
gmd.sink.enabled=${GMD_SINK_ENABLED:true}
gmd.brand=${GMD_BRAND:betfair}
mp.messaging.outgoing.gmd-notifications.connector=smallrye-kafka
mp.messaging.outgoing.gmd-notifications.topic=${GMD_KAFKA_TOPIC:gmd.notifications}
mp.messaging.outgoing.gmd-notifications.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
# โโโ OSG sink โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
osg.sink.enabled=${OSG_SINK_ENABLED:true}
osg.api.header.id=${OSG_API_HEADER_ID:osg-api-id}
quarkus.rest-client.osg-client.url=${OSG_API_URL:http://localhost:8090}
# Worst case: 5s ร 4 attempts = 20s total per event
quarkus.rest-client.osg-client.read-timeout=5000
quarkus.rest-client.osg-client.connection-timeout=2000
6. Message Processing Pipeline
Validation (isValid)
Both accountId and messageType are declared required in the Avro schema. isValid() is a defensive guard for malformed payloads from upstream:
event != nullaccountId != null && !accountId.isBlank()messageType != nullaccountIdis numeric (Long.parseLong)
Rejected events: WARN log + metrics.recordEventRejected(). Never routed.
Message Type Filter (isSupportedMessageType)
WMX is a shared topic โ multiple producers publish different message types. NED only processes SYSTEM_REFRESH_INBOX:
private static final Set<MessageType> SUPPORTED_MESSAGE_TYPES =
Set.of(MessageType.SYSTEM_REFRESH_INBOX);
Non-matching types: INFO log + metrics.recordEventSkipped(). Silently dropped.
Why two separate counters?
| Gate | Method | Counter | Log | Meaning |
|---|---|---|---|---|
| Structural | isValid() |
ned.events.rejected |
WARN | Unexpected; data quality issue |
| Business | isSupportedMessageType() |
ned.events.skipped |
INFO | Expected; multi-producer topic |
To support a new message type, add it to SUPPORTED_MESSAGE_TYPES. No other changes needed.
Dispatch
// Submit each sink to its own VT โ fire-and-forget, fault-isolated
for (SinkAdapter sink : sinkRegistry.getEnabledSinks()) {
vtExecutor.submit(() -> dispatchToSink(enrichedEvent, sink));
}
Sinks run in parallel. A failure in one sink does not affect the other.
Graceful Shutdown
void onShutdown(@Observes ShutdownEvent event) {
vtExecutor.shutdown();
if (!vtExecutor.awaitTermination(30s)) {
List<Runnable> pending = vtExecutor.shutdownNow();
LOG.warn("Forced shutdown; {} task(s) still pending", pending.size());
}
}
quarkus.shutdown.timeout=35S gives Quarkus a 35-second window โ 5 seconds more than the 30-second await, so the VT pool always gets a chance to drain cleanly.
Back-pressure
The vtExecutor is bounded (max-concurrency=100, queue-capacity=500). When the queue is full, CallerRunsPolicy runs the task on the calling Kafka consumer thread, blocking it. This propagates back-pressure to SmallRye Reactive Messaging, which pauses Kafka polling until a slot frees.
7. Sink Implementations
GMD Sink (GMDProducerService)
Transforms EnrichedNotificationEvent โ UserNotificationInboundEvent and publishes to gmd.notifications.
Field mapping:
| Source field | GMD field | Value |
|---|---|---|
enrichedEvent.messageId() |
messageId |
UUID generated at enrichment |
| (hardcoded constant) | messageType |
"REFRESH_INBOX" |
enrichedEvent.getAccountId() |
authentication.accountId |
Numeric account ID |
| (default) | channels |
[IN_GAME, ON_SITE] |
System.currentTimeMillis() |
timestamp |
Event creation time |
| (default) | messagePayload |
empty map |
messageTypeis hardcoded as"REFRESH_INBOX"โ the WMX schema usesSYSTEM_REFRESH_INBOXbut GMD expectsREFRESH_INBOX. The mapping is a sink-specific transformation detail;EnrichedNotificationEvent.getMessageType()is not used for output.
send() blocks on the Emitter CompletableFuture so that gmd.messages.published / gmd.messages.failed metrics reflect the true broker outcome. Callers must invoke on a worker thread (@Blocking in NotificationConsumerService).
OSG Sink (OSGNotificationClient + OSGRetryableSender)
POSTs to OSG REST endpoint. Request model (matches OpenAPI spec):
{
"accountId": 1234567,
"topic": "notificationMessaging",
"message": { "urn": "urn:notifications:system-refresh-inbox" }
}
Response handling:
| Status | Action | Metric |
|---|---|---|
| 200 | Success | osg.messages.delivered |
| 404 | Customer not online (not an error) | osg.messages.customer_not_online |
| 5xx | Throw OSGServerException โ @Retry triggers |
osg.messages.retry_attempts |
| 4xx (non-404) | Throw SinkDeliveryException immediately |
osg.messages.failed |
| Network error | Throw SinkDeliveryException |
osg.messages.failed |
8. Fault Tolerance
OSG Retry (@Retry + @ExponentialBackoff)
@Retry(maxRetries = 3, delay = 100, delayUnit = MILLIS, retryOn = {OSGServerException.class})
@ExponentialBackoff(factor = 2)
void invoke(EnrichedNotificationEvent enrichedEvent)
Retry timeline: 0ms โ 100ms โ 300ms โ 700ms (4 total attempts)
@Retry is on a separate CDI bean (OSGRetryableSender) because interceptors don't fire on self-invocation.
Circuit Breaker โ Why It Was Removed
A @CircuitBreaker was prototyped but removed. SmallRye FT 6.x counts each retry attempt as a separate CB invocation โ not the final outer-call outcome. With maxRetries=3, one outer call = 4 CB invocations. With requestVolumeThreshold=10, failureRatio=0.5, only ~2 fully-failed events open the circuit and block all OSG delivery for 30 seconds. Too aggressive. Retry-only is the current strategy.
Graceful Shutdown
See ยง6 above. Config: ned.processor.shutdown-timeout-seconds=30, quarkus.shutdown.timeout=35S.
9. Metrics Reference
All metrics live in SinkMetrics. Counters are eagerly registered in the constructor.
| Metric | Incremented when | Alert? |
|---|---|---|
ned.events.consumed |
Valid event passed all gates and was routed to sinks | No |
ned.events.rejected |
Event dropped: null/blank accountId, non-numeric accountId, or null messageType | Yes โ data quality |
ned.events.skipped |
Event has valid structure but unsupported messageType | No โ expected |
osg.messages.delivered |
OSG responded 200 OK | No |
osg.messages.customer_not_online |
OSG responded 404 | No |
osg.messages.failed |
OSG unrecoverable failure (after retries) | Yes |
osg.messages.retry_attempts |
5xx triggered a retry attempt | Dashboard only |
gmd.messages.published |
Kafka broker ack received | No |
gmd.messages.failed |
Kafka broker nack or serialization error | Yes |
Prometheus endpoint: http://localhost:8080/q/metrics
10. Test Suite
60 tests, 0 failures.
| Test class | Type | Tests | What it validates |
|---|---|---|---|
NedE2eIntegrationTest |
E2E (Kafka + WireMock) | 5 | Full pipeline: WMX Kafka in โ GMD Kafka out + OSG HTTP |
SinkMetricsIntegrationTest |
Integration | 5 | Metrics in Quarkus MeterRegistry |
GMDProducerServiceIntegrationTest |
Integration | 3 | GMD Avro serialisation + Kafka publish |
OSGNotificationClientWireMockTest |
Integration | 9 | HTTP handling: 200/404/5xx/4xx/timeout/retry |
NotificationConsumerHealthCheckIntegrationTest |
Integration | 2 | Health endpoint wiring |
NotificationProcessorIntegrationTest |
Integration | 2 | Enrichment, messageId generation |
SinkMetricsTest |
Unit | 10 | All counter operations in isolation |
NotificationConsumerHealthCheckTest |
Unit | 5 | Health check logic |
NotificationConsumerServiceTest |
Unit | 4 | Consumer delegation, null guard, catch-all |
NotificationProcessorTest |
Unit | 15 | Validation, filter, VT dispatch, metrics |
Infrastructure patterns
Kafka tests โ ConfluentKafkaContainer (testcontainers 2.0.5) via KafkaTestProfile (@QuarkusTestResource). Quarkus DevService auto-starts Schema Registry alongside the container.
OSG tests โ WireMock via OSGWireMockTestProfile (@TestProfile). @BeforeEach resets stubs for test-order independence. Metrics use baseline-delta pattern (read counter before + after) because MeterRegistry.clear() would orphan cached Counter references.
E2E test โ Both Kafka containers + WireMock active simultaneously. Asserts GMD messageType = "REFRESH_INBOX" (not "SYSTEM_REFRESH_INBOX").
Notable test patterns
// โ
Awaitility for async polling (never Thread.sleep for assertions)
await().atMost(5, SECONDS).pollInterval(100, MILLISECONDS)
.until(() -> consumer.poll(Duration.ofMillis(100)), r -> r.count() > 0);
// โ
Parameterized tests for related cases
@ParameterizedTest
@ValueSource(strings = {"", " ", "not-a-number"})
void testInvalidAccountIdIsRejected(String accountId) { ... }
// โ
simulateIoLatency() helper instead of bare Thread.sleep in stubs
// (blocking stub latency simulation โ not async polling โ so Awaitility doesn't apply)
11. Coding Standards
Dependency Injection
- Constructor injection only. No field injection.
@Singletonfor stateless orchestrators.@ApplicationScopedfor beans with lifecycle callbacks.@ConfigPropertyonly as constructor parameters, never on fields.- Always provide
defaultValuefor tunables; omit it for required values (fail-fast on startup).
Validation
All validation belongs in NotificationProcessor.isValid(). Sinks receive trusted, pre-validated input.
Error Handling
- Never re-throw from
@Incomingconsumers. A nack halts the Kafka channel. - Use
SinkDeliveryException(checked) for expected delivery failures. - Always have separate
catch (SinkDeliveryException e)andcatch (Exception e)blocks.
Retry
@Retry only for transient failures (5xx, network). Never retry 4xx. Place @Retry on a separate CDI bean โ interceptors don't fire on self-invocation.
Logging
- SLF4J only.
private static final Logger LOG = LoggerFactory.getLogger(...). DEBUGโ per-message happy path.INFOโ startup/shutdown/config/expected filtering.WARNโ invalid input, degraded.ERRORโ delivery failure (always include exception).- Always include
accountId. Always use parameterised logging โ never string concatenation.
Metrics
- All metrics live in
SinkMetrics. No counter creation outside this class. - Naming:
{sink}.messages.{status}for sinks ยทned.events.{status}for consumer. - Record after the outcome is known.
Concurrency
- Use
Thread.ofVirtual().factory()for blocking I/O. NeverExecutors.newVirtualThreadPerTaskExecutor()(unbounded). CallerRunsPolicyis the back-pressure mechanism. Do not replace it.- Always
awaitTerminationaftershutdown(). Log pending count ifshutdownNow()is called.
Configuration
Every tunable in application.properties must have a comment explaining the value choice. Secrets use ${ENV_VAR:default}. Never hardcode.
Sink Pattern
Every sink implements SinkAdapter: getId() returns a stable lowercase string constant; isEnabled() is driven by @ConfigProperty; send() throws SinkDeliveryException on failure and never swallows silently.
Testing
- Never
Thread.sleep()for assertions โ use Awaitility. - Every test asserts the metric counter for the scenario under test.
- Test naming:
[Component][Integration|Unit]Test.java
Dead Code
Before every commit: no unused imports/fields/methods, no defensive null-checks on values guaranteed non-null upstream, no onShutdown handlers that only log without real cleanup, no defensive copies of immutable-after-construction collections, no commented-out code without explanation.
Native Image
kafka-avro-serializerversions 6.2.xโ7.3.x only. Versions 7.4+ requirejackson-dataformat-csvand are unsupported.- Exclude
jakarta.ws.rs:jakarta.ws.rs-apifromkafka-avro-serializer. - No
quarkus-rest. Test-only dependencies must be<scope>test</scope>.
12. Pre-commit Checklist
- [ ]
mvn testpasses (60/60) - [ ] No
Thread.sleep()in tests - [ ] No unused imports, fields, variables
- [ ] All config tunables have comments in
application.properties - [ ] Metrics incremented and tested for each new code path
- [ ] Validation at
NotificationProcessor.isValid()โ not in sinks - [ ]
git diffreviewed end-to-end for dead code
Vault: /Users/saji.varghese2/Obsidian Vault/50_Implementation/ned-service/
Specs: 40_Specs/ned-kafka-consumer.md ยท 40_Specs/ned-pluggable-router-pattern.md

