← Kursa Dön
📄 Text · 30 min

Spring AMQP & RabbitMQ

Giriş

Bir postane düşünün. Mektubunuzu zarfa koyup postaneye verirsiniz (producer). Postane mektubu adrese göre doğru posta kutusuna yönlendirir (exchange → routing → queue). Alıcı, posta kutusundan mektubu alır (consumer). Postane, gönderen ile alıcı arasında bir aracıdır — gönderen, alıcının evde olup olmadığını bilmez; alıcı da mektubun ne zaman gönderildiğini bilmek zorunda değildir. Bu asenkron, gevşek bağlı iletişim modelidir.

Spring Boot, RabbitMQ ile entegrasyon için spring-boot-starter-amqp starter'ını sunar. Bu starter, Spring AMQP projesini otomatik yapılandırarak RabbitMQ ile çalışmayı son derece kolaylaştırır — ConnectionFactory, RabbitTemplate ve RabbitAdmin bean'leri otomatik oluşturulur.

Bu derste bağımlılık yapılandırmasını, exchange/queue/binding tanımlamayı, RabbitTemplate ile mesaj göndermeyi, @RabbitListener ile mesaj almayı, JSON mesaj dönüşümünü, retry ve Dead Letter Queue (DLQ) mekanizmasını, publisher confirm/return ile güvenilir mesaj teslimatını ve production best practice'lerini derinlemesine öğreneceğiz.


Bağımlılık ve Yapılandırma

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000
    # Publisher confirms — mesajın broker'a ulaştığını doğrulama
    publisher-confirm-type: correlated
    publisher-returns: true
    # Consumer ayarları
    listener:
      simple:
        acknowledge-mode: auto           # auto, manual, none
        prefetch: 10                       # Bir seferde kaç mesaj al
        concurrency: 3                     # Başlangıç consumer sayısı
        max-concurrency: 10                # Maksimum consumer sayısı
        retry:
          enabled: true
          initial-interval: 1000
          max-interval: 10000
          multiplier: 2.0
          max-attempts: 3
        default-requeue-rejected: false    # Retry bitince DLQ'ya gönder

Exchange, Queue ve Binding Tanımlama

Spring AMQP'de altyapı bileşenlerini Java konfigürasyonu ile tanımlarız. Spring Boot başlatıldığında RabbitAdmin, bu bean'leri otomatik olarak RabbitMQ broker'ında oluşturur.

@Configuration
public class RabbitConfig {
    
    // ═══════════════════════════════════════════
    // EXCHANGE'ler
    // ═══════════════════════════════════════════
    
    // Direct Exchange — routing key'e göre birebir eşleşme
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order-exchange", true, false);
        // durable=true (broker restart'ta kaybolmaz), autoDelete=false
    }
    
    // Topic Exchange — pattern matching ile routing
    @Bean
    public TopicExchange notificationExchange() {
        return new TopicExchange("notification-exchange");
    }
    
    // Fanout Exchange — tüm bağlı kuyruklara broadcast
    @Bean
    public FanoutExchange auditExchange() {
        return new FanoutExchange("audit-exchange");
    }
    
    // ═══════════════════════════════════════════
    // QUEUE'lar
    // ═══════════════════════════════════════════
    
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order-queue")
            .withArgument("x-dead-letter-exchange", "dlx-exchange")
            .withArgument("x-dead-letter-routing-key", "dead-orders")
            .withArgument("x-message-ttl", 300000)  // 5 dakika TTL
            .build();
    }
    
    @Bean
    public Queue notificationEmailQueue() {
        return QueueBuilder.durable("notification-email-queue").build();
    }
    
    @Bean
    public Queue notificationSmsQueue() {
        return QueueBuilder.durable("notification-sms-queue").build();
    }
    
    // Dead Letter Queue
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dead-order-queue").build();
    }
    
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx-exchange");
    }
    
    // ═══════════════════════════════════════════
    // BINDING'ler
    // ═══════════════════════════════════════════
    
    @Bean
    public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue)
            .to(orderExchange)
            .with("order.created");  // routing key
    }
    
    // Topic binding — pattern matching
    @Bean
    public Binding emailNotificationBinding() {
        return BindingBuilder.bind(notificationEmailQueue())
            .to(notificationExchange())
            .with("notification.email.#");  // # = sıfır veya daha fazla kelime
    }
    
    @Bean
    public Binding smsNotificationBinding() {
        return BindingBuilder.bind(notificationSmsQueue())
            .to(notificationExchange())
            .with("notification.sms.*");  // * = tek kelime
    }
    
    // Dead Letter binding
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("dead-orders");
    }
}

Message Converter — JSON

Varsayılan olarak Spring AMQP, Java serialization kullanır — bu güvenli değildir ve farklı dillerdeki consumer'larla uyumsuzdur. JSON converter kullanmak best practice'tir:

@Configuration
public class RabbitConfig {
    
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        return new Jackson2JsonMessageConverter(mapper);
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         Jackson2JsonMessageConverter converter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(converter);
        return template;
    }
}

JSON converter kullanıldığında, mesajlar __TypeId__ header'ı ile tip bilgisi taşır. Consumer tarafında otomatik olarak doğru Java nesnesine dönüştürülür.


RabbitTemplate ile Mesaj Gönderme

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderEventPublisher {
    
    private final RabbitTemplate rabbitTemplate;
    
    // Basit mesaj gönderme
    public void publishOrderCreated(OrderEvent event) {
        rabbitTemplate.convertAndSend(
            "order-exchange",     // exchange
            "order.created",      // routing key
            event                  // mesaj (JSON'a serialize edilir)
        );
        log.info("Order event published: {}", event.getOrderId());
    }
    
    // Header ekleyerek gönderme
    public void publishWithHeaders(OrderEvent event) {
        rabbitTemplate.convertAndSend("order-exchange", "order.created", event,
            message -> {
                MessageProperties props = message.getMessageProperties();
                props.setHeader("source", "order-service");
                props.setHeader("version", "1.0");
                props.setCorrelationId(UUID.randomUUID().toString());
                props.setContentType("application/json");
                props.setPriority(event.isUrgent() ? 10 : 1);
                props.setExpiration("300000");  // 5 dakika TTL
                return message;
            });
    }
    
    // Topic exchange'e gönderme
    public void publishNotification(String type, NotificationEvent event) {
        // routing key: "notification.email.welcome" veya "notification.sms.otp"
        String routingKey = "notification." + type + "." + event.getTemplate();
        rabbitTemplate.convertAndSend("notification-exchange", routingKey, event);
    }
    
    // Fanout exchange'e gönderme (routing key önemli değil)
    public void publishAudit(AuditEvent event) {
        rabbitTemplate.convertAndSend("audit-exchange", "", event);
    }
}

@RabbitListener ile Mesaj Alma

@Service
@Slf4j
public class OrderEventConsumer {
    
    // Basit listener — otomatik deserialization
    @RabbitListener(queues = "order-queue")
    public void handleOrderCreated(OrderEvent event) {
        log.info("📦 Order received: orderId={}", event.getOrderId());
        orderProcessingService.process(event);
    }
    
    // Header'lara erişim + manuel ACK
    @RabbitListener(queues = "order-queue")
    public void handleWithManualAck(
            OrderEvent event,
            @Header("source") String source,
            @Header(value = "version", required = false) String version,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        
        try {
            log.info("Processing order from {}: {}", source, event.getOrderId());
            orderProcessingService.process(event);
            
            // Başarılı — ACK gönder (mesaj kuyruktan silinir)
            channel.basicAck(deliveryTag, false);
            
        } catch (RetryableException e) {
            // Geçici hata — mesajı kuyruğa geri koy (retry)
            channel.basicNack(deliveryTag, false, true);  // requeue=true
            
        } catch (Exception e) {
            // Kalıcı hata — DLQ'ya gönder
            channel.basicNack(deliveryTag, false, false);  // requeue=false → DLQ
            log.error("Permanent failure processing order: {}", event.getOrderId(), e);
        }
    }
    
    // Inline queue tanımlama (queue yoksa otomatik oluşturur)
    @RabbitListener(bindings = @QueueBinding(
        value = @org.springframework.amqp.rabbit.annotation.Queue(
            value = "notification-email-queue", durable = "true"),
        exchange = @Exchange(
            value = "notification-exchange", type = ExchangeTypes.TOPIC),
        key = "notification.email.#"
    ))
    public void handleEmailNotification(NotificationEvent event) {
        log.info("📧 Email notification: {}", event);
        emailService.send(event);
    }
}

@RabbitHandler ile Tip Bazlı Yönlendirme

@Service
@RabbitListener(queues = "event-queue")
public class MultiTypeEventConsumer {
    
    @RabbitHandler
    public void handleOrder(OrderEvent event) {
        log.info("Processing order: {}", event.getOrderId());
    }
    
    @RabbitHandler
    public void handlePayment(PaymentEvent event) {
        log.info("Processing payment: {}", event.getPaymentId());
    }
    
    @RabbitHandler(isDefault = true)
    public void handleDefault(Object event) {
        log.warn("Unknown event type: {}", event.getClass().getSimpleName());
    }
}

Retry ve Dead Letter Queue (DLQ)

Otomatik Retry

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000    # İlk deneme: 1 saniye sonra
          max-interval: 10000       # Maksimum bekleme: 10 saniye
          multiplier: 2.0           # Her denemede 2x artış (1s, 2s, 4s, 8s, 10s)
          max-attempts: 3           # Toplam 3 deneme
        default-requeue-rejected: false  # Retry bitince DLQ'ya gönder

Programatik Retry

@Configuration
public class RabbitRetryConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            Jackson2JsonMessageConverter converter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(converter);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        
        // Retry advice
        factory.setAdviceChain(RetryInterceptorBuilder.stateless()
            .maxAttempts(3)
            .backOffOptions(1000, 2.0, 10000)
            .recoverer(new RejectAndDontRequeueRecoverer())
            .build());
        
        return factory;
    }
}

DLQ İzleme

// Dead Letter Queue'daki mesajları izleyen consumer
@RabbitListener(queues = "dead-order-queue")
public void handleDeadLetter(Message message) {
    String body = new String(message.getBody());
    Map<String, Object> headers = message.getMessageProperties().getHeaders();
    
    log.error("☠️ Dead letter received — body={}, headers={}", body, headers);
    
    // x-death header'ından orijinal kuyruk ve hata bilgisi alınabilir
    List<Map<String, Object>> xDeath = 
        (List<Map<String, Object>>) headers.get("x-death");
    if (xDeath != null && !xDeath.isEmpty()) {
        Map<String, Object> death = xDeath.get(0);
        log.error("Original queue: {}, reason: {}, count: {}",
            death.get("queue"), death.get("reason"), death.get("count"));
    }
    
    // Alert gönder
    alertService.sendCritical("Dead letter in order queue: " + body);
}

Publisher Confirms ve Returns

Mesajın broker'a başarıyla ulaştığını doğrulamak için:

@Service
@RequiredArgsConstructor
@Slf4j
public class ReliablePublisher {
    
    private final RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void configure() {
        // Confirm — broker mesajı aldı mı?
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("✅ Message confirmed: {}", correlationData);
            } else {
                log.error("❌ Message NACK'd: {} — cause: {}", correlationData, cause);
                // Retry mantığı veya alert
            }
        });
        
        // Return — mesaj hiçbir kuyruğa yönlendirilemedi
        rabbitTemplate.setReturnsCallback(returned -> {
            log.warn("⚠️ Message returned: {} — exchange={}, routingKey={}",
                returned.getReplyText(),
                returned.getExchange(),
                returned.getRoutingKey());
            // Mesaj bir kuyruğa ulaşamadı — routing key veya binding eksik olabilir
        });
        
        rabbitTemplate.setMandatory(true);  // Return callback'in çalışması için
    }
    
    public void publishReliably(String exchange, String routingKey, Object message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

Yaygın Hatalar

1. Java Serialization Kullanmak

// ❌ YANLIŞ — Güvenlik açığı, farklı dillere uyumsuz
// Varsayılan serializer Java serialization kullanır

// ✅ DOĞRU — JSON converter kullanın
@Bean
public Jackson2JsonMessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

2. Acknowledge Mode'u Yanlış Ayarlamak

// ❌ acknowledge-mode: none — mesaj kaybolabilir!
// Consumer hata fırlatsa bile mesaj kuyruktan silinir

// ✅ acknowledge-mode: auto (varsayılan) veya manual
// auto: exception yoksa ACK, exception varsa NACK
// manual: siz kontrol edersiniz (channel.basicAck/basicNack)

3. Prefetch'i Çok Yüksek Ayarlamak

// ❌ prefetch: 1000 — bir consumer 1000 mesaj alır, diğerleri boşta kalır
// ✅ prefetch: 10-50 — dengeli dağılım

Özet

  • Spring AMQP, RabbitMQ ile entegrasyonu RabbitTemplate (gönderme) ve @RabbitListener (alma) ile kolaylaştırır. Auto-configuration ile minimum yapılandırma gerekir.

  • JSON converter zorunlu — varsayılan Java serialization güvensiz ve uyumsuzdur. Jackson2JsonMessageConverter bean'i tanımlayın.

  • DLQ (Dead Letter Queue) ile işlenemeyen mesajları ayrı kuyruğa yönlendirin. DLQ'yu izleyin ve alert kurun — sessizce kaybolan mesajlar büyük sorunlara yol açar.

  • Retry mekanizması ile geçici hataları (veritabanı bağlantısı, timeout) otomatik telafi edin. Exponential backoff (1s, 2s, 4s) kullanın.

  • Publisher confirm/return ile mesajın broker'a ulaştığını doğrulayın. Kritik iş akışlarında mesaj kaybını önlemek için zorunludur.

  • Consumer concurrency ayarını iş yüküne göre yapılandırın. prefetch ile bir consumer'ın çok fazla mesaj almasını önleyin.


Gerçek Dünya Örneği: E-Ticaret Sipariş Akışı

Bir e-ticaret uygulamasında sipariş oluşturulduğunda birden fazla servisin bilgilendirilmesi gerekir:

// Sipariş servisi — event yayınlar
@Service
@RequiredArgsConstructor
public class OrderPublisher {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void publishOrderEvent(Order order, String eventType) {
        OrderMessage message = OrderMessage.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .totalAmount(order.getTotalAmount())
            .items(order.getItems().stream()
                .map(item -> new OrderItemMessage(item.getProductId(), item.getQuantity()))
                .toList())
            .eventType(eventType)
            .timestamp(Instant.now())
            .build();
        
        // Topic exchange — farklı servisler farklı pattern'larla dinler
        rabbitTemplate.convertAndSend(
            "order-exchange",
            "order." + eventType.toLowerCase(),  // order.created, order.shipped, order.cancelled
            message
        );
    }
}

// Envanter servisi — stok güncelleme
@Component
@Slf4j
public class InventoryConsumer {
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("inventory-order-queue"),
        exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
        key = "order.created"
    ))
    public void handleOrderCreated(OrderMessage message) {
        log.info("📦 Stok ayırma: orderId={}", message.getOrderId());
        message.getItems().forEach(item ->
            inventoryService.reserve(item.getProductId(), item.getQuantity())
        );
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("inventory-cancel-queue"),
        exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
        key = "order.cancelled"
    ))
    public void handleOrderCancelled(OrderMessage message) {
        log.info("📦 Stok iade: orderId={}", message.getOrderId());
        message.getItems().forEach(item ->
            inventoryService.release(item.getProductId(), item.getQuantity())
        );
    }
}

// Bildirim servisi — e-posta ve SMS
@Component
@Slf4j
public class NotificationConsumer {
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue("notification-order-queue"),
        exchange = @Exchange(value = "order-exchange", type = ExchangeTypes.TOPIC),
        key = "order.*"  // Tüm sipariş event'lerini dinle
    ))
    public void handleOrderEvent(OrderMessage message) {
        switch (message.getEventType()) {
            case "CREATED" -> emailService.sendOrderConfirmation(message);
            case "SHIPPED" -> emailService.sendShippingNotification(message);
            case "CANCELLED" -> emailService.sendCancellationNotification(message);
        }
    }
}

Bu yapıda her servis bağımsız olarak sipariş event'lerini dinler. Yeni bir servis (örneğin analytics) eklemek, sadece yeni bir consumer yazmak ve ilgili routing key'e binding oluşturmak demektir — mevcut servisler etkilenmez.