Spring AMQP & RabbitMQ
Giriş
Bir postane düşünün. Mektubunuzu zarfa koyup postaneye verirsiniz (producer). Postane mektubu adrese göre doğru posta kutusuna yönlendirir (exchange → routing → queue). Alıcı, posta kutusundan mektubu alır (consumer). Postane, gönderen ile alıcı arasında bir aracıdır — gönderen, alıcının evde olup olmadığını bilmez; alıcı da mektubun ne zaman gönderildiğini bilmek zorunda değildir. Bu asenkron, gevşek bağlı iletişim modelidir.
Spring Boot, RabbitMQ ile entegrasyon için spring-boot-starter-amqp starter'ını sunar. Bu starter, Spring AMQP projesini otomatik yapılandırarak RabbitMQ ile çalışmayı son derece kolaylaştırır — ConnectionFactory, RabbitTemplate ve RabbitAdmin bean'leri otomatik oluşturulur.
Bu derste bağımlılık yapılandırmasını, exchange/queue/binding tanımlamayı, RabbitTemplate ile mesaj göndermeyi, @RabbitListener ile mesaj almayı, JSON mesaj dönüşümünü, retry ve Dead Letter Queue (DLQ) mekanizmasını, publisher confirm/return ile güvenilir mesaj teslimatını ve production best practice'lerini derinlemesine öğreneceğiz.
Bağımlılık ve Yapılandırma
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 5000
# Publisher confirms — mesajın broker'a ulaştığını doğrulama
publisher-confirm-type: correlated
publisher-returns: true
# Consumer ayarları
listener:
simple:
acknowledge-mode: auto # auto, manual, none
prefetch: 10 # Bir seferde kaç mesaj al
concurrency: 3 # Başlangıç consumer sayısı
max-concurrency: 10 # Maksimum consumer sayısı
retry:
enabled: true
initial-interval: 1000
max-interval: 10000
multiplier: 2.0
max-attempts: 3
default-requeue-rejected: false # Retry bitince DLQ'ya gönderExchange, Queue ve Binding Tanımlama
Spring AMQP'de altyapı bileşenlerini Java konfigürasyonu ile tanımlarız. Spring Boot başlatıldığında RabbitAdmin, bu bean'leri otomatik olarak RabbitMQ broker'ında oluşturur.
@Configuration
public class RabbitConfig {
// ═══════════════════════════════════════════
// EXCHANGE'ler
// ═══════════════════════════════════════════
// Direct Exchange — routing key'e göre birebir eşleşme
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order-exchange", true, false);
// durable=true (broker restart'ta kaybolmaz), autoDelete=false
}
// Topic Exchange — pattern matching ile routing
@Bean
public TopicExchange notificationExchange() {
return new TopicExchange("notification-exchange");
}
// Fanout Exchange — tüm bağlı kuyruklara broadcast
@Bean
public FanoutExchange auditExchange() {
return new FanoutExchange("audit-exchange");
}
// ═══════════════════════════════════════════
// QUEUE'lar
// ═══════════════════════════════════════════
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order-queue")
.withArgument("x-dead-letter-exchange", "dlx-exchange")
.withArgument("x-dead-letter-routing-key", "dead-orders")
.withArgument("x-message-ttl", 300000) // 5 dakika TTL
.build();
}
@Bean
public Queue notificationEmailQueue() {
return QueueBuilder.durable("notification-email-queue").build();
}
@Bean
public Queue notificationSmsQueue() {
return QueueBuilder.durable("notification-sms-queue").build();
}
// Dead Letter Queue
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead-order-queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx-exchange");
}
// ═══════════════════════════════════════════
// BINDING'ler
// ═══════════════════════════════════════════
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue)
.to(orderExchange)
.with("order.created"); // routing key
}
// Topic binding — pattern matching
@Bean
public Binding emailNotificationBinding() {
return BindingBuilder.bind(notificationEmailQueue())
.to(notificationExchange())
.with("notification.email.#"); // # = sıfır veya daha fazla kelime
}
@Bean
public Binding smsNotificationBinding() {
return BindingBuilder.bind(notificationSmsQueue())
.to(notificationExchange())
.with("notification.sms.*"); // * = tek kelime
}
// Dead Letter binding
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead-orders");
}
}Message Converter — JSON
Varsayılan olarak Spring AMQP, Java serialization kullanır — bu güvenli değildir ve farklı dillerdeki consumer'larla uyumsuzdur. JSON converter kullanmak best practice'tir:
@Configuration
public class RabbitConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(mapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}JSON converter kullanıldığında, mesajlar __TypeId__ header'ı ile tip bilgisi taşır. Consumer tarafında otomatik olarak doğru Java nesnesine dönüştürülür.
RabbitTemplate ile Mesaj Gönderme
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventPublisher {
private final RabbitTemplate rabbitTemplate;
// Basit mesaj gönderme
public void publishOrderCreated(OrderEvent event) {
rabbitTemplate.convertAndSend(
"order-exchange", // exchange
"order.created", // routing key
event // mesaj (JSON'a serialize edilir)
);
log.info("Order event published: {}", event.getOrderId());
}
// Header ekleyerek gönderme
public void publishWithHeaders(OrderEvent event) {
rabbitTemplate.convertAndSend("order-exchange", "order.created", event,
message -> {
MessageProperties props = message.getMessageProperties();
props.setHeader("source", "order-service");
props.setHeader("version", "1.0");
props.setCorrelationId(UUID.randomUUID().toString());
props.setContentType("application/json");
props.setPriority(event.isUrgent() ? 10 : 1);
props.setExpiration("300000"); // 5 dakika TTL
return message;
});
}
// Topic exchange'e gönderme
public void publishNotification(String type, NotificationEvent event) {
// routing key: "notification.email.welcome" veya "notification.sms.otp"
String routingKey = "notification." + type + "." + event.getTemplate();
rabbitTemplate.convertAndSend("notification-exchange", routingKey, event);
}
// Fanout exchange'e gönderme (routing key önemli değil)
public void publishAudit(AuditEvent event) {
rabbitTemplate.convertAndSend("audit-exchange", "", event);
}
}@RabbitListener ile Mesaj Alma
@Service
@Slf4j
public class OrderEventConsumer {
// Basit listener — otomatik deserialization
@RabbitListener(queues = "order-queue")
public void handleOrderCreated(OrderEvent event) {
log.info("📦 Order received: orderId={}", event.getOrderId());
orderProcessingService.process(event);
}
// Header'lara erişim + manuel ACK
@RabbitListener(queues = "order-queue")
public void handleWithManualAck(
OrderEvent event,
@Header("source") String source,
@Header(value = "version", required = false) String version,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
log.info("Processing order from {}: {}", source, event.getOrderId());
orderProcessingService.process(event);
// Başarılı — ACK gönder (mesaj kuyruktan silinir)
channel.basicAck(deliveryTag, false);
} catch (RetryableException e) {
// Geçici hata — mesajı kuyruğa geri koy (retry)
channel.basicNack(deliveryTag, false, true); // requeue=true
} catch (Exception e) {
// Kalıcı hata — DLQ'ya gönder
channel.basicNack(deliveryTag, false, false); // requeue=false → DLQ
log.error("Permanent failure processing order: {}", event.getOrderId(), e);
}
}
// Inline queue tanımlama (queue yoksa otomatik oluşturur)
@RabbitListener(bindings = @QueueBinding(
value = @org.springframework.amqp.rabbit.annotation.Queue(
value = "notification-email-queue", durable = "true"),
exchange = @Exchange(
value = "notification-exchange", type = ExchangeTypes.TOPIC),
key = "notification.email.#"
))
public void handleEmailNotification(NotificationEvent event) {
log.info("📧 Email notification: {}", event);
emailService.send(event);
}
}@RabbitHandler ile Tip Bazlı Yönlendirme
@Service
@RabbitListener(queues = "event-queue")
public class MultiTypeEventConsumer {
@RabbitHandler
public void handleOrder(OrderEvent event) {
log.info("Processing order: {}", event.getOrderId());
}
@RabbitHandler
public void handlePayment(PaymentEvent event) {
log.info("Processing payment: {}", event.getPaymentId());
}
@RabbitHandler(isDefault = true)
public void handleDefault(Object event) {
log.warn("Unknown event type: {}", event.getClass().getSimpleName());
}
}Retry ve Dead Letter Queue (DLQ)
Otomatik Retry
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000 # İlk deneme: 1 saniye sonra
max-interval: 10000 # Maksimum bekleme: 10 saniye
multiplier: 2.0 # Her denemede 2x artış (1s, 2s, 4s, 8s, 10s)
max-attempts: 3 # Toplam 3 deneme
default-requeue-rejected: false # Retry bitince DLQ'ya gönderProgramatik Retry
@Configuration
public class RabbitRetryConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(10);
// Retry advice
factory.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer())
.build());
return factory;
}
}DLQ İzleme
// Dead Letter Queue'daki mesajları izleyen consumer
@RabbitListener(queues = "dead-order-queue")
public void handleDeadLetter(Message message) {
String body = new String(message.getBody());
Map<String, Object> headers = message.getMessageProperties().getHeaders();
log.error("☠️ Dead letter received — body={}, headers={}", body, headers);
// x-death header'ından orijinal kuyruk ve hata bilgisi alınabilir
List<Map<String, Object>> xDeath =
(List<Map<String, Object>>) headers.get("x-death");
if (xDeath != null && !xDeath.isEmpty()) {
Map<String, Object> death = xDeath.get(0);
log.error("Original queue: {}, reason: {}, count: {}",
death.get("queue"), death.get("reason"), death.get("count"));
}
// Alert gönder
alertService.sendCritical("Dead letter in order queue: " + body);
}Publisher Confirms ve Returns
Mesajın broker'a başarıyla ulaştığını doğrulamak için:
@Service
@RequiredArgsConstructor
@Slf4j
public class ReliablePublisher {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void configure() {
// Confirm — broker mesajı aldı mı?
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("✅ Message confirmed: {}", correlationData);
} else {
log.error("❌ Message NACK'd: {} — cause: {}", correlationData, cause);
// Retry mantığı veya alert
}
});
// Return — mesaj hiçbir kuyruğa yönlendirilemedi
rabbitTemplate.setReturnsCallback(returned -> {
log.warn("⚠️ Message returned: {} — exchange={}, routingKey={}",
returned.getReplyText(),
returned.getExchange(),
returned.getRoutingKey());
// Mesaj bir kuyruğa ulaşamadı — routing key veya binding eksik olabilir
});
rabbitTemplate.setMandatory(true); // Return callback'in çalışması için
}
public void publishReliably(String exchange, String routingKey, Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}Yaygın Hatalar
1. Java Serialization Kullanmak
// ❌ YANLIŞ — Güvenlik açığı, farklı dillere uyumsuz
// Varsayılan serializer Java serialization kullanır
// ✅ DOĞRU — JSON converter kullanın
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}2. Acknowledge Mode'u Yanlış Ayarlamak
// ❌ acknowledge-mode: none — mesaj kaybolabilir!
// Consumer hata fırlatsa bile mesaj kuyruktan silinir
// ✅ acknowledge-mode: auto (varsayılan) veya manual
// auto: exception yoksa ACK, exception varsa NACK
// manual: siz kontrol edersiniz (channel.basicAck/basicNack)3. Prefetch'i Çok Yüksek Ayarlamak
// ❌ prefetch: 1000 — bir consumer 1000 mesaj alır, diğerleri boşta kalır
// ✅ prefetch: 10-50 — dengeli dağılımÖzet
Spring AMQP, RabbitMQ ile entegrasyonu
RabbitTemplate(gönderme) ve@RabbitListener(alma) ile kolaylaştırır. Auto-configuration ile minimum yapılandırma gerekir.JSON converter zorunlu — varsayılan Java serialization güvensiz ve uyumsuzdur.
Jackson2JsonMessageConverterbean'i tanımlayın.DLQ (Dead Letter Queue) ile işlenemeyen mesajları ayrı kuyruğa yönlendirin. DLQ'yu izleyin ve alert kurun — sessizce kaybolan mesajlar büyük sorunlara yol açar.
Retry mekanizması ile geçici hataları (veritabanı bağlantısı, timeout) otomatik telafi edin. Exponential backoff (1s, 2s, 4s) kullanın.
Publisher confirm/return ile mesajın broker'a ulaştığını doğrulayın. Kritik iş akışlarında mesaj kaybını önlemek için zorunludur.
Consumer concurrency ayarını iş yüküne göre yapılandırın.
prefetchile bir consumer'ın çok fazla mesaj almasını önleyin.
Gerçek Dünya Örneği: E-Ticaret Sipariş Akışı
Bir e-ticaret uygulamasında sipariş oluşturulduğunda birden fazla servisin bilgilendirilmesi gerekir:
// Sipariş servisi — event yayınlar
@Service
@RequiredArgsConstructor
public class OrderPublisher {
private final RabbitTemplate rabbitTemplate;
public void publishOrderEvent(Order order, String eventType) {
OrderMessage message = OrderMessage.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.totalAmount(order.getTotalAmount())
.items(order.getItems().stream()
.map(item -> new OrderItemMessage(item.getProductId(), item.getQuantity()))
.toList())
.eventType(eventType)
.timestamp(Instant.now())
.build();
// Topic exchange — farklı servisler farklı pattern'larla dinler
rabbitTemplate.convertAndSend(
"order-exchange",
"order." + eventType.toLowerCase(), // order.created, order.shipped, order.cancelled
message
);
}
}
// Envanter servisi — stok güncelleme
@Component
@Slf4j
public class InventoryConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("inventory-order-queue"),
exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
key = "order.created"
))
public void handleOrderCreated(OrderMessage message) {
log.info("📦 Stok ayırma: orderId={}", message.getOrderId());
message.getItems().forEach(item ->
inventoryService.reserve(item.getProductId(), item.getQuantity())
);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("inventory-cancel-queue"),
exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
key = "order.cancelled"
))
public void handleOrderCancelled(OrderMessage message) {
log.info("📦 Stok iade: orderId={}", message.getOrderId());
message.getItems().forEach(item ->
inventoryService.release(item.getProductId(), item.getQuantity())
);
}
}
// Bildirim servisi — e-posta ve SMS
@Component
@Slf4j
public class NotificationConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("notification-order-queue"),
exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
key = "order.*" // Tüm sipariş event'lerini dinle
))
public void handleOrderEvent(OrderMessage message) {
switch (message.getEventType()) {
case "CREATED" -> emailService.sendOrderConfirmation(message);
case "SHIPPED" -> emailService.sendShippingNotification(message);
case "CANCELLED" -> emailService.sendCancellationNotification(message);
}
}
}Bu yapıda her servis bağımsız olarak sipariş event'lerini dinler. Yeni bir servis (örneğin analytics) eklemek, sadece yeni bir consumer yazmak ve ilgili routing key'e binding oluşturmak demektir — mevcut servisler etkilenmez.
AI Asistan
Sorularını yanıtlamaya hazır