← Kursa Dön
📄 Text · 30 min

Event Sourcing & CQRS

Giriş

Bir banka hesap defterini düşünün. Eski usulde defteri açarsınız, bakiyeyi görürsünüz: "1.500 TL". Ama bu paranın nasıl biriktiğini bilmezsiniz — maaş mı yattı, borç mu ödendi, faiz mi geldi? Event Sourcing ile hesap defterindeki her işlem kayıtlıdır: "3.000 TL maaş yattı → 500 TL kira ödendi → 1.000 TL alışveriş yapıldı". Bakiye bu işlemlerin toplamından hesaplanır: 3000 - 500 - 1000 = 1.500 TL. Her şeyin hikayesi, geçmişi ve sebebi bellidir.

Geleneksel CRUD uygulamalarında bir kaydı güncellediğinizde eski değer kaybolur — veritabanında yalnızca son durum saklanır. Event Sourcing, bu yaklaşımı kökten değiştirerek her değişikliği bir olay (event) olarak kaydeder. CQRS (Command Query Responsibility Segregation) ise okuma ve yazma işlemlerini ayrı modellere böler. Bu iki pattern birlikte, özellikle karmaşık iş mantığı gerektiren sistemlerde güçlü bir çözüm sunar.

Bu derste Event Sourcing'in temellerini, Event Store yapısını, event replay mekanizmasını, Materialized View (projeksiyon) kavramını, CQRS ile read/write ayrımını, Spring Boot'ta implementasyonu ve ne zaman kullanılacağını derinlemesine öğreneceğiz.


Event Sourcing Nedir?

Event Sourcing'de bir nesnenin mevcut durumu, oluşumundan bu yana gerçekleşen tüm olayların sırasıyla uygulanması ile elde edilir. Veritabanında son durum değil, olaylar dizisi saklanır.

Geleneksel vs Event Sourcing

═══ GELENEKSEL (State Sourcing) ═══

Account tablosu:
| id | owner | balance |
| 1  | Ali   | 1500    |  ← Sadece son durum. Nasıl buraya geldi? Bilinmiyor.

Güncelleme: UPDATE accounts SET balance = 1500 WHERE id = 1
Eski değer (2000 TL): KAYBEDİLDİ!


═══ EVENT SOURCING ═══

Event Store:
| seq | aggregate_id | event_type      | data              | timestamp           |
|  1  | acc-1        | AccountCreated  | {owner: "Ali"}    | 2024-01-01 10:00:00 |
|  2  | acc-1        | MoneyDeposited  | {amount: 3000}    | 2024-01-01 10:05:00 |
|  3  | acc-1        | MoneyWithdrawn  | {amount: 500}     | 2024-01-15 14:30:00 |
|  4  | acc-1        | MoneyWithdrawn  | {amount: 1000}    | 2024-01-20 18:00:00 |

Mevcut bakiye: 0 + 3000 - 500 - 1000 = 1500
Her şeyin kaydı var! "20 Ocak'ta 1000 TL nereye gitti?" sorusuna yanıt verebilirsiniz.

Event Sourcing'in Faydaları

  • Tam audit trail: Her değişikliğin kaydı var — finans, sağlık, hukuk sektörlerinde zorunlu

  • Temporal query: "2 ay önce bakiye neydi?" sorusuna yanıt verebilirsiniz

  • Replay yeteneği: Tüm olayları baştan uygulayarak yeni görünümler oluşturabilirsiniz

  • Debug kolaylığı: Hatalı duruma nasıl ulaşıldığını olayları izleyerek anlarsınız

  • Undo/Redo: Kompensasyon event'leri ile işlemleri geri alabilirsiniz


Event Store

Event Store, olayların saklandığı özel bir veritabanı veya tablodur:

@Entity
@Table(name = "event_store")
@Getter
@NoArgsConstructor
public class StoredEvent {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long sequenceNumber;
    
    @Column(nullable = false)
    private String aggregateId;
    
    @Column(nullable = false)
    private String aggregateType;
    
    @Column(nullable = false)
    private String eventType;
    
    @Column(columnDefinition = "TEXT", nullable = false)
    private String eventData;  // JSON
    
    @Column(nullable = false)
    private int version;
    
    @Column(nullable = false)
    private Instant occurredAt;
    
    @Column(nullable = false)
    private String triggeredBy;  // Kim tetikledi (kullanıcı veya sistem)
}
@Repository
public interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {
    
    List<StoredEvent> findByAggregateIdOrderByVersionAsc(String aggregateId);
    
    Optional<Integer> findMaxVersionByAggregateId(String aggregateId);
    
    // Belirli bir versiyondan sonraki event'ler (incremental replay)
    List<StoredEvent> findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(
        String aggregateId, int afterVersion);
}
@Service
@RequiredArgsConstructor
@Slf4j
public class EventStore {
    
    private final StoredEventRepository repository;
    private final ObjectMapper objectMapper;
    private final ApplicationEventPublisher publisher;
    
    public void append(String aggregateId, String aggregateType, 
                       DomainEvent event, int expectedVersion) {
        
        // Optimistic locking — versiyon kontrolü (concurrent write koruması)
        int currentVersion = repository.findMaxVersionByAggregateId(aggregateId)
            .orElse(0);
        
        if (currentVersion != expectedVersion) {
            throw new OptimisticLockingException(
                String.format("Expected version %d but found %d for aggregate %s",
                    expectedVersion, currentVersion, aggregateId));
        }
        
        StoredEvent stored = new StoredEvent();
        stored.setAggregateId(aggregateId);
        stored.setAggregateType(aggregateType);
        stored.setEventType(event.getClass().getSimpleName());
        stored.setEventData(serialize(event));
        stored.setVersion(expectedVersion + 1);
        stored.setOccurredAt(Instant.now());
        stored.setTriggeredBy(getCurrentUser());
        
        repository.save(stored);
        
        // Event'i yayınla — projection'lar (read model) dinler
        publisher.publishEvent(event);
        
        log.info("Event stored: aggregate={}, type={}, version={}",
            aggregateId, event.getClass().getSimpleName(), expectedVersion + 1);
    }
    
    public List<DomainEvent> loadEvents(String aggregateId) {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
            .stream()
            .map(this::deserialize)
            .toList();
    }
    
    private String serialize(DomainEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize event", e);
        }
    }
    
    private DomainEvent deserialize(StoredEvent stored) {
        try {
            Class<?> eventClass = Class.forName(
                "com.example.events." + stored.getEventType());
            return (DomainEvent) objectMapper.readValue(stored.getEventData(), eventClass);
        } catch (Exception e) {
            throw new EventDeserializationException("Failed to deserialize event", e);
        }
    }
}

Temel kurallar:

  • Append-only: Olaylar yalnızca eklenir, asla güncellenmez veya silinmez

  • Sıralı: Her olay bir version numarası taşır

  • Optimistic locking: Aynı aggregate'e eşzamanlı yazma koruması


Aggregate ve Event Replay

Aggregate, iş kurallarını kapsayan ve event'leri uygulayan domain nesnesidir:

public class AccountAggregate {
    
    private String id;
    private String owner;
    private BigDecimal balance = BigDecimal.ZERO;
    private AccountStatus status = AccountStatus.ACTIVE;
    private int version = 0;
    
    // Event replay — her event tipi için bir apply metodu
    public void apply(DomainEvent event) {
        if (event instanceof AccountCreated e) {
            this.id = e.getAccountId();
            this.owner = e.getOwner();
            this.balance = BigDecimal.ZERO;
            this.status = AccountStatus.ACTIVE;
        } else if (event instanceof MoneyDeposited e) {
            this.balance = this.balance.add(e.getAmount());
        } else if (event instanceof MoneyWithdrawn e) {
            this.balance = this.balance.subtract(e.getAmount());
        } else if (event instanceof AccountClosed e) {
            this.status = AccountStatus.CLOSED;
        }
        this.version++;
    }
    
    // Event store'dan aggregate'i yeniden oluşturma
    public static AccountAggregate fromEvents(List<DomainEvent> events) {
        AccountAggregate account = new AccountAggregate();
        events.forEach(account::apply);
        return account;
    }
    
    // İş kuralı — para çekme
    public MoneyWithdrawn withdraw(BigDecimal amount) {
        if (status != AccountStatus.ACTIVE) {
            throw new AccountClosedException("Hesap kapalı");
        }
        if (balance.compareTo(amount) < 0) {
            throw new InsufficientFundsException(
                "Yetersiz bakiye. Mevcut: " + balance + ", İstenen: " + amount);
        }
        return new MoneyWithdrawn(this.id, amount);
    }
    
    // İş kuralı — para yatırma
    public MoneyDeposited deposit(BigDecimal amount) {
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new InvalidAmountException("Tutar pozitif olmalı");
        }
        return new MoneyDeposited(this.id, amount);
    }
    
    // Getters
    public String getId() { return id; }
    public BigDecimal getBalance() { return balance; }
    public int getVersion() { return version; }
}

Materialized View (Projeksiyon)

Event replay her sorguda yapılırsa performans sorunları oluşur (binlerce event'i her seferinde replay etmek). Materialized view (projection), event'leri dinleyerek güncel bir okunabilir model tutar:

@Service
@RequiredArgsConstructor
@Slf4j
public class AccountProjection {
    
    private final AccountViewRepository readRepository;
    
    @EventListener
    @Transactional
    public void on(AccountCreated event) {
        AccountView view = new AccountView();
        view.setId(event.getAccountId());
        view.setOwner(event.getOwner());
        view.setBalance(BigDecimal.ZERO);
        view.setStatus("ACTIVE");
        view.setCreatedAt(event.getOccurredAt());
        view.setLastUpdatedAt(event.getOccurredAt());
        readRepository.save(view);
        log.info("Projection: Account created — {}", event.getAccountId());
    }
    
    @EventListener
    @Transactional
    public void on(MoneyDeposited event) {
        AccountView view = readRepository.findById(event.getAccountId()).orElseThrow();
        view.setBalance(view.getBalance().add(event.getAmount()));
        view.setLastUpdatedAt(Instant.now());
        view.setTransactionCount(view.getTransactionCount() + 1);
        readRepository.save(view);
    }
    
    @EventListener
    @Transactional
    public void on(MoneyWithdrawn event) {
        AccountView view = readRepository.findById(event.getAccountId()).orElseThrow();
        view.setBalance(view.getBalance().subtract(event.getAmount()));
        view.setLastUpdatedAt(Instant.now());
        view.setTransactionCount(view.getTransactionCount() + 1);
        readRepository.save(view);
    }
}

Projection'ın gücü: farklı görünümler oluşturabilirsiniz. Aynı event'lerden bir "hesap özeti" view'ı, bir "işlem geçmişi" view'ı, bir "aylık rapor" view'ı çıkarabilirsiniz.


CQRS: Command/Query Ayrımı

CQRS, okuma ve yazma operasyonlarını tamamen ayrı modellere böler:

               ┌─── Command Model ───┐
User ─ Command ▶│ (Write Side)        │──▶ Event Store
               │ Validation           │
               │ Business Rules       │
               └──────────────────────┘
                         │ events
                         ▼
               ┌─── Query Model ─────┐
User ◀ Response│ (Read Side)          │◀── Materialized View
               │ Optimized for reads  │    (projection)
               │ Denormalized data    │
               └──────────────────────┘

Command Handler (Write Side)

@Service
@RequiredArgsConstructor
public class AccountCommandHandler {
    
    private final EventStore eventStore;
    
    public void handle(CreateAccountCommand command) {
        AccountCreated event = new AccountCreated(
            UUID.randomUUID().toString(),
            command.getOwner()
        );
        eventStore.append(event.getAccountId(), "Account", event, 0);
    }
    
    public void handle(DepositMoneyCommand command) {
        // 1. Event store'dan aggregate'i yükle
        List<DomainEvent> events = eventStore.loadEvents(command.getAccountId());
        AccountAggregate account = AccountAggregate.fromEvents(events);
        
        // 2. İş kurallarını uygula (domain logic)
        MoneyDeposited event = account.deposit(command.getAmount());
        
        // 3. Event'i kaydet (optimistic locking ile)
        eventStore.append(command.getAccountId(), "Account", event, account.getVersion());
    }
    
    public void handle(WithdrawMoneyCommand command) {
        List<DomainEvent> events = eventStore.loadEvents(command.getAccountId());
        AccountAggregate account = AccountAggregate.fromEvents(events);
        
        // İş kuralı: bakiye yeterli mi?
        MoneyWithdrawn event = account.withdraw(command.getAmount());
        eventStore.append(command.getAccountId(), "Account", event, account.getVersion());
    }
}

Query Handler (Read Side)

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/accounts")
public class AccountQueryController {
    
    private final AccountViewRepository readRepository;
    
    @GetMapping("/{id}")
    public AccountView getAccount(@PathVariable String id) {
        return readRepository.findById(id)
            .orElseThrow(() -> new NotFoundException("Account not found: " + id));
    }
    
    @GetMapping("/{id}/transactions")
    public List<TransactionView> getTransactions(
            @PathVariable String id,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        return transactionViewRepository.findByAccountId(id, PageRequest.of(page, size));
    }
    
    @GetMapping("/search")
    public List<AccountView> searchAccounts(
            @RequestParam(required = false) String owner,
            @RequestParam(required = false) BigDecimal minBalance) {
        // Read model, sorguya optimize edilmiş denormalize veri içerir
        return readRepository.search(owner, minBalance);
    }
}

Event-Driven vs Event-Sourced

Bu iki kavram sıklıkla karıştırılır:

Event-DrivenEvent Sourcing
AmaçServisler arası iletişimDurum yönetimi
Event'in rolüHaberleşme aracıBirincil veri kaynağı
Durum depolamaKlasik CRUD olabilirEvent'lerden türetilir
ReplayGenelde yokTemel özellik
KarmaşıklıkDüşük-ortaYüksek

İkisi birlikte kullanıldığında en güçlü kombinasyonu oluşturur — ama bağımsız da kullanılabilirler.


Ne Zaman Kullanmalı?

✅ Uygun Senaryolar

  • Audit trail zorunluluğu — Finans, sağlık, hukuk: her değişikliğin kaydı olmalı

  • Karmaşık iş kuralları — Domain logic yoğun sistemler

  • Temporal query — "Geçen ay bu tarihte durum neydi?" sorusuna yanıt

  • Yüksek okuma/yazma oranı farkı — Çok okuyan, az yazan sistemler

  • Eventual consistency kabul edilebilir — Read model birkaç saniye gecikebilir

❌ Uygun Olmayan Senaryolar

  • Basit CRUD uygulamaları — Gereksiz karmaşıklık

  • Güçlü tutarlılık gereksinimi — CQRS eventual consistency getirir

  • Küçük ekipler — Operasyonel karmaşıklık yüksek

  • Prototip/MVP — İlk ürünü hızlı çıkarmak daha önemli


Özet

  • Event Sourcing, durumu event'ler dizisi olarak saklar. Veritabanında son durum yerine tüm değişiklik geçmişi tutulur. Bu, tam audit trail, temporal query ve replay yeteneği sağlar.

  • Event Store append-only bir veri deposudur. Event'ler asla güncellenmez veya silinmez. Optimistic locking ile concurrent write koruması sağlanır.

  • Aggregate, iş kurallarını kapsayan ve event'leri uygulayan domain nesnesidir. Event'leri replay ederek mevcut durumu hesaplar.

  • Materialized View (Projection), event'leri dinleyerek güncel okunabilir model tutar. Farklı sorgulara optimize edilmiş birden fazla view oluşturulabilir.

  • CQRS, okuma ve yazma modellerini ayırır. Write side event üretir, read side event'lerden projeksiyon günceller. Farklı veritabanları kullanılabilir.

  • Her sisteme uygun değildir — karmaşıklık, eventual consistency ve operasyonel yük getirir. Audit, finans ve karmaşık domain senaryolarında güçlü bir çözümdür.


Snapshot Optimizasyonu

Binlerce event'i her seferinde replay etmek performans sorunlarına yol açar. Snapshot mekanizması, belirli aralıklarla aggregate'in anlık durumunu kaydeder:

@Service
@RequiredArgsConstructor
public class SnapshotStore {
    
    private final SnapshotRepository snapshotRepository;
    private final ObjectMapper objectMapper;
    
    // Her 100 event'te bir snapshot al
    private static final int SNAPSHOT_INTERVAL = 100;
    
    public void saveSnapshotIfNeeded(String aggregateId, AccountAggregate aggregate) {
        if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
            Snapshot snapshot = new Snapshot();
            snapshot.setAggregateId(aggregateId);
            snapshot.setVersion(aggregate.getVersion());
            snapshot.setData(objectMapper.writeValueAsString(aggregate));
            snapshot.setCreatedAt(Instant.now());
            snapshotRepository.save(snapshot);
        }
    }
    
    public AccountAggregate loadAggregate(String aggregateId, EventStore eventStore) {
        // 1. En son snapshot'ı bul
        Optional<Snapshot> snapshot = snapshotRepository
            .findTopByAggregateIdOrderByVersionDesc(aggregateId);
        
        if (snapshot.isPresent()) {
            // 2. Snapshot'tan aggregate'i oluştur
            AccountAggregate aggregate = objectMapper.readValue(
                snapshot.get().getData(), AccountAggregate.class);
            
            // 3. Snapshot'tan sonraki event'leri uygula (incremental replay)
            List<DomainEvent> newEvents = eventStore.loadEventsAfterVersion(
                aggregateId, snapshot.get().getVersion());
            newEvents.forEach(aggregate::apply);
            
            return aggregate;
        }
        
        // Snapshot yok — tüm event'leri replay et
        List<DomainEvent> allEvents = eventStore.loadEvents(aggregateId);
        return AccountAggregate.fromEvents(allEvents);
    }
}

Snapshot ile: 10.000 event'li bir aggregate'i yüklemek için 100 event yerine (snapshot + son 100 event) sadece ~100 event replay edilir — 100x performans iyileştirmesi.