Skip to content

NED Service โ€” Implementation Reference

Java 25 ยท Quarkus 3 ยท Kafka consumer โ†’ GMD + OSG fan-out


Table of Contents

  1. What is NED
  2. Architecture
  3. Development Environment Setup
  4. Local Dev Stack (Docker)
  5. Configuration Reference
  6. Message Processing Pipeline
  7. Sink Implementations
  8. Fault Tolerance
  9. Metrics Reference
  10. Test Suite
  11. Coding Standards
  12. Pre-commit Checklist

1. What is NED

NED Context Flow|697

NED implements the "Signal-Only" notification pattern (Option 3). Instead of replicating full message payloads, WMX emits lightweight notification signals. NED acts as the orchestrator that maps these signals from the WMX Kafka stream into the specific formats required by the on-site gateways (OSG and GMD).

For more context, see the ADR-CH-0022 Inbox - Real-time Notification nudge delivery.

The gateways use these signals to push a "nudge" to customers who are currently on-site. Upon receiving this nudge, clients retrieve the updated inbox data directly from WMX via HTTP.

NED (Notification Event Distribution) is a Quarkus service that consumes these notification events from a WMX Kafka topic and fans them out to two downstream sinks in parallel:

  • GMD โ€” publishes a UserNotificationInboundEvent to a GMD Kafka topic
  • OSG โ€” POSTs a REST notification request to the OSG microservice

NED is a consumer-only service. It has no REST endpoints of its own. Health endpoints are served by quarkus-smallrye-health via Vert.x.


2. Architecture

NED Architecture Overview

Data Flow

WMX Kafka Topic: mac.sbg.notifications
        โ”‚ NotificationEvent (Avro)
        โ–ผ
NotificationConsumerService.consume()   @RunOnVirtualThread
        โ”‚
        โ–ผ
NotificationProcessor.handleNotificationEvent()
        โ”œโ”€ isValid()                    โ†’ drop + WARN + ned.events.rejected
        โ”œโ”€ isSupportedMessageType()     โ†’ drop + INFO + ned.events.skipped
        โ”œโ”€ recordEventConsumed()
        โ”œโ”€ EnrichedNotificationEvent (adds UUID messageId)
        โ”‚
        โ”œโ”€ VT โ”€โ”€โ–บ GMDProducerService.send()
        โ”‚          โ””โ”€ UserNotificationInboundEvent โ†’ gmd.notifications Kafka topic
        โ”‚
        โ””โ”€ VT โ”€โ”€โ–บ OSGNotificationClient.send()
                   โ””โ”€ POST /send โ†’ OSG microservice

Key Components

Class Package Role
NotificationConsumerService source @Incoming Kafka consumer; @RunOnVirtualThread
NotificationProcessor source Orchestrator: validates, filters, dispatches to sinks
EnrichedNotificationEvent model Immutable Java record; adds messageId (UUID)
SinkRegistry routing Holds enabled SinkAdapter instances; returns unmodifiableCollection
SinkAdapter sink Interface: getId(), isEnabled(), send(EnrichedNotificationEvent)
GMDProducerService sink.gmd GMD Kafka producer; transforms to UserNotificationInboundEvent
OSGNotificationClient sink.osg REST client delegating to OSGRetryableSender
OSGRetryableSender sink.osg @Retry + @ExponentialBackoff for 5xx errors
SinkMetrics metrics All Micrometer counters; eagerly registered

Avro Schemas

Schema File Used by
NotificationEvent notification-event.avsc WMX consumer (inbound)
UserNotificationInboundEvent UserNotificationInboundEvent.avsc GMD producer (outbound)

Pluggable Sink Pattern

Adding a new sink requires: 1. Create class implementing SinkAdapter 2. Annotate @ApplicationScoped โ€” CDI registers it automatically 3. No changes to NotificationProcessor or SinkRegistry


3. Development Environment Setup

Java Version

Tool: SDKMAN ยท Version: 25.0.2-graalce

sdk use java 25.0.2-graalce   # per session
sdk default java 25.0.2-graalce  # permanent

java -version   # GraalVM CE 25.0.2

Build

mvn clean verify          # compile + test
mvn clean package -q -DskipTests  # fast package
mvn quarkus:dev           # dev mode (hot reload)

Ignore Java 25 warnings about Unsafe/restricted methods from Jansi/Guava โ€” benign.

Key Dependency Versions

Dependency Version Note
Quarkus BOM 3.34.6 All Quarkus deps managed from here
kafka-avro-serializer 7.2.2 Versions 6.2.xโ€“7.3.x only are native-safe; 7.4+ unsupported
google-java-format (Spotless) 1.27.0 Java 25 AST compatible
Maven compiler plugin 3.13.0 3.14.0+ breaks Java 25 release flag
Testcontainers BOM 2.0.5 All testcontainers managed here

Native Image Constraints

  • kafka-avro-serializer must stay โ‰ค 7.3.x for native image support
  • Exclude jakarta.ws.rs:jakarta.ws.rs-api from kafka-avro-serializer (Quarkus BOM provides it)
  • quarkus-apicurio-registry-avro is test scope only
  • Do not add quarkus-rest โ€” NED has no REST endpoints

4. Local Dev Stack (Docker)

Start

cd local
docker compose up -d
Service Port Purpose
Kafka (KRaft) 9092 Message broker
Schema Registry 8081 Confluent Avro SR
WireMock 8090 OSG HTTP mock
kafka-init โ€” One-shot; pre-creates topics

Pre-created topics: mac.sbg.notifications (3 partitions), gmd.notifications (3 partitions)

kafka-init is needed because Kafka auto-create only fires on producer write, not on consumer subscribe.

Configure NED

cp local/application-local.properties.example local/application-local.properties

The file overrides defaults to point at local ports (Kafka 9092, SR 8081, OSG WireMock 8090).

Start NED

sdk use java 25.0.2-graalce
mvn quarkus:dev -Dquarkus.config.locations=$(pwd)/local/application-local.properties

Check health: curl -s localhost:8080/health/readiness | jq .

Send a test event

mvn test-compile exec:java \
  -Dexec.mainClass=com.uki.platform.personalisation.LocalEventProducer \
  -Dexec.classpathScope=test \
  -Dproducer.accountId=12345 \
  -Dproducer.count=1 \
  -Dproducer.schemaRegistry=http://localhost:8081

Known issues

Single SR for local dev โ€” Production uses separate Schema Registry clusters per Kafka environment. Locally one SR on 8081 is used for both topics, which also matches application.properties defaults so NED works without the -Dquarkus.config.locations override.


5. Configuration Reference

# โ”€โ”€โ”€ Kafka consumer โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
mp.messaging.incoming.notification-events.connector=smallrye-kafka
mp.messaging.incoming.notification-events.topic=${WMX_KAFKA_TOPIC:mac.sbg.notifications}
mp.messaging.incoming.notification-events.failure-strategy=ignore

# โ”€โ”€โ”€ Processor tunables โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Bounded VT pool caps concurrent sink calls to protect OSG/GMD
ned.processor.max-concurrency=100
ned.processor.queue-capacity=500
# Graceful shutdown: wait up to N seconds for in-flight tasks
ned.processor.shutdown-timeout-seconds=30
# Quarkus shutdown grace period (must be > shutdown-timeout-seconds)
quarkus.shutdown.timeout=35S

# โ”€โ”€โ”€ GMD sink โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
gmd.sink.enabled=${GMD_SINK_ENABLED:true}
gmd.brand=${GMD_BRAND:betfair}
mp.messaging.outgoing.gmd-notifications.connector=smallrye-kafka
mp.messaging.outgoing.gmd-notifications.topic=${GMD_KAFKA_TOPIC:gmd.notifications}
mp.messaging.outgoing.gmd-notifications.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

# โ”€โ”€โ”€ OSG sink โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
osg.sink.enabled=${OSG_SINK_ENABLED:true}
osg.api.header.id=${OSG_API_HEADER_ID:osg-api-id}
quarkus.rest-client.osg-client.url=${OSG_API_URL:http://localhost:8090}
# Worst case: 5s ร— 4 attempts = 20s total per event
quarkus.rest-client.osg-client.read-timeout=5000
quarkus.rest-client.osg-client.connection-timeout=2000

6. Message Processing Pipeline

Validation (isValid)

Both accountId and messageType are declared required in the Avro schema. isValid() is a defensive guard for malformed payloads from upstream:

  1. event != null
  2. accountId != null && !accountId.isBlank()
  3. messageType != null
  4. accountId is numeric (Long.parseLong)

Rejected events: WARN log + metrics.recordEventRejected(). Never routed.

Message Type Filter (isSupportedMessageType)

WMX is a shared topic โ€” multiple producers publish different message types. NED only processes SYSTEM_REFRESH_INBOX:

private static final Set<MessageType> SUPPORTED_MESSAGE_TYPES =
    Set.of(MessageType.SYSTEM_REFRESH_INBOX);

Non-matching types: INFO log + metrics.recordEventSkipped(). Silently dropped.

Why two separate counters?

Gate Method Counter Log Meaning
Structural isValid() ned.events.rejected WARN Unexpected; data quality issue
Business isSupportedMessageType() ned.events.skipped INFO Expected; multi-producer topic

To support a new message type, add it to SUPPORTED_MESSAGE_TYPES. No other changes needed.

Dispatch

// Submit each sink to its own VT โ€” fire-and-forget, fault-isolated
for (SinkAdapter sink : sinkRegistry.getEnabledSinks()) {
    vtExecutor.submit(() -> dispatchToSink(enrichedEvent, sink));
}

Sinks run in parallel. A failure in one sink does not affect the other.

Graceful Shutdown

void onShutdown(@Observes ShutdownEvent event) {
    vtExecutor.shutdown();
    if (!vtExecutor.awaitTermination(30s)) {
        List<Runnable> pending = vtExecutor.shutdownNow();
        LOG.warn("Forced shutdown; {} task(s) still pending", pending.size());
    }
}

quarkus.shutdown.timeout=35S gives Quarkus a 35-second window โ€” 5 seconds more than the 30-second await, so the VT pool always gets a chance to drain cleanly.

Back-pressure

The vtExecutor is bounded (max-concurrency=100, queue-capacity=500). When the queue is full, CallerRunsPolicy runs the task on the calling Kafka consumer thread, blocking it. This propagates back-pressure to SmallRye Reactive Messaging, which pauses Kafka polling until a slot frees.


7. Sink Implementations

GMD Sink (GMDProducerService)

Transforms EnrichedNotificationEvent โ†’ UserNotificationInboundEvent and publishes to gmd.notifications.

Field mapping:

Source field GMD field Value
enrichedEvent.messageId() messageId UUID generated at enrichment
(hardcoded constant) messageType "REFRESH_INBOX"
enrichedEvent.getAccountId() authentication.accountId Numeric account ID
(default) channels [IN_GAME, ON_SITE]
System.currentTimeMillis() timestamp Event creation time
(default) messagePayload empty map

messageType is hardcoded as "REFRESH_INBOX" โ€” the WMX schema uses SYSTEM_REFRESH_INBOX but GMD expects REFRESH_INBOX. The mapping is a sink-specific transformation detail; EnrichedNotificationEvent.getMessageType() is not used for output.

send() blocks on the Emitter CompletableFuture so that gmd.messages.published / gmd.messages.failed metrics reflect the true broker outcome. Callers must invoke on a worker thread (@Blocking in NotificationConsumerService).

OSG Sink (OSGNotificationClient + OSGRetryableSender)

POSTs to OSG REST endpoint. Request model (matches OpenAPI spec):

{
  "accountId": 1234567,
  "topic": "notificationMessaging",
  "message": { "urn": "urn:notifications:system-refresh-inbox" }
}

Response handling:

Status Action Metric
200 Success osg.messages.delivered
404 Customer not online (not an error) osg.messages.customer_not_online
5xx Throw OSGServerException โ†’ @Retry triggers osg.messages.retry_attempts
4xx (non-404) Throw SinkDeliveryException immediately osg.messages.failed
Network error Throw SinkDeliveryException osg.messages.failed

8. Fault Tolerance

OSG Retry (@Retry + @ExponentialBackoff)

@Retry(maxRetries = 3, delay = 100, delayUnit = MILLIS, retryOn = {OSGServerException.class})
@ExponentialBackoff(factor = 2)
void invoke(EnrichedNotificationEvent enrichedEvent)

Retry timeline: 0ms โ†’ 100ms โ†’ 300ms โ†’ 700ms (4 total attempts)

@Retry is on a separate CDI bean (OSGRetryableSender) because interceptors don't fire on self-invocation.

Circuit Breaker โ€” Why It Was Removed

A @CircuitBreaker was prototyped but removed. SmallRye FT 6.x counts each retry attempt as a separate CB invocation โ€” not the final outer-call outcome. With maxRetries=3, one outer call = 4 CB invocations. With requestVolumeThreshold=10, failureRatio=0.5, only ~2 fully-failed events open the circuit and block all OSG delivery for 30 seconds. Too aggressive. Retry-only is the current strategy.

Graceful Shutdown

See ยง6 above. Config: ned.processor.shutdown-timeout-seconds=30, quarkus.shutdown.timeout=35S.


9. Metrics Reference

All metrics live in SinkMetrics. Counters are eagerly registered in the constructor.

Metric Incremented when Alert?
ned.events.consumed Valid event passed all gates and was routed to sinks No
ned.events.rejected Event dropped: null/blank accountId, non-numeric accountId, or null messageType Yes โ€” data quality
ned.events.skipped Event has valid structure but unsupported messageType No โ€” expected
osg.messages.delivered OSG responded 200 OK No
osg.messages.customer_not_online OSG responded 404 No
osg.messages.failed OSG unrecoverable failure (after retries) Yes
osg.messages.retry_attempts 5xx triggered a retry attempt Dashboard only
gmd.messages.published Kafka broker ack received No
gmd.messages.failed Kafka broker nack or serialization error Yes

Prometheus endpoint: http://localhost:8080/q/metrics

curl -s localhost:8080/q/metrics | grep -E "ned_events|osg_messages|gmd_messages"

10. Test Suite

60 tests, 0 failures.

Test class Type Tests What it validates
NedE2eIntegrationTest E2E (Kafka + WireMock) 5 Full pipeline: WMX Kafka in โ†’ GMD Kafka out + OSG HTTP
SinkMetricsIntegrationTest Integration 5 Metrics in Quarkus MeterRegistry
GMDProducerServiceIntegrationTest Integration 3 GMD Avro serialisation + Kafka publish
OSGNotificationClientWireMockTest Integration 9 HTTP handling: 200/404/5xx/4xx/timeout/retry
NotificationConsumerHealthCheckIntegrationTest Integration 2 Health endpoint wiring
NotificationProcessorIntegrationTest Integration 2 Enrichment, messageId generation
SinkMetricsTest Unit 10 All counter operations in isolation
NotificationConsumerHealthCheckTest Unit 5 Health check logic
NotificationConsumerServiceTest Unit 4 Consumer delegation, null guard, catch-all
NotificationProcessorTest Unit 15 Validation, filter, VT dispatch, metrics

Infrastructure patterns

Kafka tests โ€” ConfluentKafkaContainer (testcontainers 2.0.5) via KafkaTestProfile (@QuarkusTestResource). Quarkus DevService auto-starts Schema Registry alongside the container.

OSG tests โ€” WireMock via OSGWireMockTestProfile (@TestProfile). @BeforeEach resets stubs for test-order independence. Metrics use baseline-delta pattern (read counter before + after) because MeterRegistry.clear() would orphan cached Counter references.

E2E test โ€” Both Kafka containers + WireMock active simultaneously. Asserts GMD messageType = "REFRESH_INBOX" (not "SYSTEM_REFRESH_INBOX").

Notable test patterns

// โœ… Awaitility for async polling (never Thread.sleep for assertions)
await().atMost(5, SECONDS).pollInterval(100, MILLISECONDS)
    .until(() -> consumer.poll(Duration.ofMillis(100)), r -> r.count() > 0);

// โœ… Parameterized tests for related cases
@ParameterizedTest
@ValueSource(strings = {"", " ", "not-a-number"})
void testInvalidAccountIdIsRejected(String accountId) { ... }

// โœ… simulateIoLatency() helper instead of bare Thread.sleep in stubs
// (blocking stub latency simulation โ€” not async polling โ€” so Awaitility doesn't apply)

11. Coding Standards

Dependency Injection

  • Constructor injection only. No field injection.
  • @Singleton for stateless orchestrators. @ApplicationScoped for beans with lifecycle callbacks.
  • @ConfigProperty only as constructor parameters, never on fields.
  • Always provide defaultValue for tunables; omit it for required values (fail-fast on startup).

Validation

All validation belongs in NotificationProcessor.isValid(). Sinks receive trusted, pre-validated input.

Error Handling

  • Never re-throw from @Incoming consumers. A nack halts the Kafka channel.
  • Use SinkDeliveryException (checked) for expected delivery failures.
  • Always have separate catch (SinkDeliveryException e) and catch (Exception e) blocks.

Retry

@Retry only for transient failures (5xx, network). Never retry 4xx. Place @Retry on a separate CDI bean โ€” interceptors don't fire on self-invocation.

Logging

  • SLF4J only. private static final Logger LOG = LoggerFactory.getLogger(...).
  • DEBUG โ€” per-message happy path. INFO โ€” startup/shutdown/config/expected filtering. WARN โ€” invalid input, degraded. ERROR โ€” delivery failure (always include exception).
  • Always include accountId. Always use parameterised logging โ€” never string concatenation.

Metrics

  • All metrics live in SinkMetrics. No counter creation outside this class.
  • Naming: {sink}.messages.{status} for sinks ยท ned.events.{status} for consumer.
  • Record after the outcome is known.

Concurrency

  • Use Thread.ofVirtual().factory() for blocking I/O. Never Executors.newVirtualThreadPerTaskExecutor() (unbounded).
  • CallerRunsPolicy is the back-pressure mechanism. Do not replace it.
  • Always awaitTermination after shutdown(). Log pending count if shutdownNow() is called.

Configuration

Every tunable in application.properties must have a comment explaining the value choice. Secrets use ${ENV_VAR:default}. Never hardcode.

Sink Pattern

Every sink implements SinkAdapter: getId() returns a stable lowercase string constant; isEnabled() is driven by @ConfigProperty; send() throws SinkDeliveryException on failure and never swallows silently.

Testing

  • Never Thread.sleep() for assertions โ€” use Awaitility.
  • Every test asserts the metric counter for the scenario under test.
  • Test naming: [Component][Integration|Unit]Test.java

Dead Code

Before every commit: no unused imports/fields/methods, no defensive null-checks on values guaranteed non-null upstream, no onShutdown handlers that only log without real cleanup, no defensive copies of immutable-after-construction collections, no commented-out code without explanation.

Native Image

  • kafka-avro-serializer versions 6.2.xโ€“7.3.x only. Versions 7.4+ require jackson-dataformat-csv and are unsupported.
  • Exclude jakarta.ws.rs:jakarta.ws.rs-api from kafka-avro-serializer.
  • No quarkus-rest. Test-only dependencies must be <scope>test</scope>.

12. Pre-commit Checklist

  • [ ] mvn test passes (60/60)
  • [ ] No Thread.sleep() in tests
  • [ ] No unused imports, fields, variables
  • [ ] All config tunables have comments in application.properties
  • [ ] Metrics incremented and tested for each new code path
  • [ ] Validation at NotificationProcessor.isValid() โ€” not in sinks
  • [ ] git diff reviewed end-to-end for dead code

Vault: /Users/saji.varghese2/Obsidian Vault/50_Implementation/ned-service/
Specs: 40_Specs/ned-kafka-consumer.md ยท 40_Specs/ned-pluggable-router-pattern.md