Event Sourcing & CQRS
Giriş
Bir banka hesap defterini düşünün. Eski usulde defteri açarsınız, bakiyeyi görürsünüz: "1.500 TL". Ama bu paranın nasıl biriktiğini bilmezsiniz — maaş mı yattı, borç mu ödendi, faiz mi geldi? Event Sourcing ile hesap defterindeki her işlem kayıtlıdır: "3.000 TL maaş yattı → 500 TL kira ödendi → 1.000 TL alışveriş yapıldı". Bakiye bu işlemlerin toplamından hesaplanır: 3000 - 500 - 1000 = 1.500 TL. Her şeyin hikayesi, geçmişi ve sebebi bellidir.
Geleneksel CRUD uygulamalarında bir kaydı güncellediğinizde eski değer kaybolur — veritabanında yalnızca son durum saklanır. Event Sourcing, bu yaklaşımı kökten değiştirerek her değişikliği bir olay (event) olarak kaydeder. CQRS (Command Query Responsibility Segregation) ise okuma ve yazma işlemlerini ayrı modellere böler. Bu iki pattern birlikte, özellikle karmaşık iş mantığı gerektiren sistemlerde güçlü bir çözüm sunar.
Bu derste Event Sourcing'in temellerini, Event Store yapısını, event replay mekanizmasını, Materialized View (projeksiyon) kavramını, CQRS ile read/write ayrımını, Spring Boot'ta implementasyonu ve ne zaman kullanılacağını derinlemesine öğreneceğiz.
Event Sourcing Nedir?
Event Sourcing'de bir nesnenin mevcut durumu, oluşumundan bu yana gerçekleşen tüm olayların sırasıyla uygulanması ile elde edilir. Veritabanında son durum değil, olaylar dizisi saklanır.
Geleneksel vs Event Sourcing
═══ GELENEKSEL (State Sourcing) ═══
Account tablosu:
| id | owner | balance |
| 1 | Ali | 1500 | ← Sadece son durum. Nasıl buraya geldi? Bilinmiyor.
Güncelleme: UPDATE accounts SET balance = 1500 WHERE id = 1
Eski değer (2000 TL): KAYBEDİLDİ!
═══ EVENT SOURCING ═══
Event Store:
| seq | aggregate_id | event_type | data | timestamp |
| 1 | acc-1 | AccountCreated | {owner: "Ali"} | 2024-01-01 10:00:00 |
| 2 | acc-1 | MoneyDeposited | {amount: 3000} | 2024-01-01 10:05:00 |
| 3 | acc-1 | MoneyWithdrawn | {amount: 500} | 2024-01-15 14:30:00 |
| 4 | acc-1 | MoneyWithdrawn | {amount: 1000} | 2024-01-20 18:00:00 |
Mevcut bakiye: 0 + 3000 - 500 - 1000 = 1500
Her şeyin kaydı var! "20 Ocak'ta 1000 TL nereye gitti?" sorusuna yanıt verebilirsiniz.Event Sourcing'in Faydaları
Tam audit trail: Her değişikliğin kaydı var — finans, sağlık, hukuk sektörlerinde zorunlu
Temporal query: "2 ay önce bakiye neydi?" sorusuna yanıt verebilirsiniz
Replay yeteneği: Tüm olayları baştan uygulayarak yeni görünümler oluşturabilirsiniz
Debug kolaylığı: Hatalı duruma nasıl ulaşıldığını olayları izleyerek anlarsınız
Undo/Redo: Kompensasyon event'leri ile işlemleri geri alabilirsiniz
Event Store
Event Store, olayların saklandığı özel bir veritabanı veya tablodur:
@Entity
@Table(name = "event_store")
@Getter
@NoArgsConstructor
public class StoredEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long sequenceNumber;
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String aggregateType;
@Column(nullable = false)
private String eventType;
@Column(columnDefinition = "TEXT", nullable = false)
private String eventData; // JSON
@Column(nullable = false)
private int version;
@Column(nullable = false)
private Instant occurredAt;
@Column(nullable = false)
private String triggeredBy; // Kim tetikledi (kullanıcı veya sistem)
}@Repository
public interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {
List<StoredEvent> findByAggregateIdOrderByVersionAsc(String aggregateId);
Optional<Integer> findMaxVersionByAggregateId(String aggregateId);
// Belirli bir versiyondan sonraki event'ler (incremental replay)
List<StoredEvent> findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(
String aggregateId, int afterVersion);
}@Service
@RequiredArgsConstructor
@Slf4j
public class EventStore {
private final StoredEventRepository repository;
private final ObjectMapper objectMapper;
private final ApplicationEventPublisher publisher;
public void append(String aggregateId, String aggregateType,
DomainEvent event, int expectedVersion) {
// Optimistic locking — versiyon kontrolü (concurrent write koruması)
int currentVersion = repository.findMaxVersionByAggregateId(aggregateId)
.orElse(0);
if (currentVersion != expectedVersion) {
throw new OptimisticLockingException(
String.format("Expected version %d but found %d for aggregate %s",
expectedVersion, currentVersion, aggregateId));
}
StoredEvent stored = new StoredEvent();
stored.setAggregateId(aggregateId);
stored.setAggregateType(aggregateType);
stored.setEventType(event.getClass().getSimpleName());
stored.setEventData(serialize(event));
stored.setVersion(expectedVersion + 1);
stored.setOccurredAt(Instant.now());
stored.setTriggeredBy(getCurrentUser());
repository.save(stored);
// Event'i yayınla — projection'lar (read model) dinler
publisher.publishEvent(event);
log.info("Event stored: aggregate={}, type={}, version={}",
aggregateId, event.getClass().getSimpleName(), expectedVersion + 1);
}
public List<DomainEvent> loadEvents(String aggregateId) {
return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
.stream()
.map(this::deserialize)
.toList();
}
private String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize event", e);
}
}
private DomainEvent deserialize(StoredEvent stored) {
try {
Class<?> eventClass = Class.forName(
"com.example.events." + stored.getEventType());
return (DomainEvent) objectMapper.readValue(stored.getEventData(), eventClass);
} catch (Exception e) {
throw new EventDeserializationException("Failed to deserialize event", e);
}
}
}Temel kurallar:
Append-only: Olaylar yalnızca eklenir, asla güncellenmez veya silinmez
Sıralı: Her olay bir version numarası taşır
Optimistic locking: Aynı aggregate'e eşzamanlı yazma koruması
Aggregate ve Event Replay
Aggregate, iş kurallarını kapsayan ve event'leri uygulayan domain nesnesidir:
public class AccountAggregate {
private String id;
private String owner;
private BigDecimal balance = BigDecimal.ZERO;
private AccountStatus status = AccountStatus.ACTIVE;
private int version = 0;
// Event replay — her event tipi için bir apply metodu
public void apply(DomainEvent event) {
if (event instanceof AccountCreated e) {
this.id = e.getAccountId();
this.owner = e.getOwner();
this.balance = BigDecimal.ZERO;
this.status = AccountStatus.ACTIVE;
} else if (event instanceof MoneyDeposited e) {
this.balance = this.balance.add(e.getAmount());
} else if (event instanceof MoneyWithdrawn e) {
this.balance = this.balance.subtract(e.getAmount());
} else if (event instanceof AccountClosed e) {
this.status = AccountStatus.CLOSED;
}
this.version++;
}
// Event store'dan aggregate'i yeniden oluşturma
public static AccountAggregate fromEvents(List<DomainEvent> events) {
AccountAggregate account = new AccountAggregate();
events.forEach(account::apply);
return account;
}
// İş kuralı — para çekme
public MoneyWithdrawn withdraw(BigDecimal amount) {
if (status != AccountStatus.ACTIVE) {
throw new AccountClosedException("Hesap kapalı");
}
if (balance.compareTo(amount) < 0) {
throw new InsufficientFundsException(
"Yetersiz bakiye. Mevcut: " + balance + ", İstenen: " + amount);
}
return new MoneyWithdrawn(this.id, amount);
}
// İş kuralı — para yatırma
public MoneyDeposited deposit(BigDecimal amount) {
if (amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new InvalidAmountException("Tutar pozitif olmalı");
}
return new MoneyDeposited(this.id, amount);
}
// Getters
public String getId() { return id; }
public BigDecimal getBalance() { return balance; }
public int getVersion() { return version; }
}Materialized View (Projeksiyon)
Event replay her sorguda yapılırsa performans sorunları oluşur (binlerce event'i her seferinde replay etmek). Materialized view (projection), event'leri dinleyerek güncel bir okunabilir model tutar:
@Service
@RequiredArgsConstructor
@Slf4j
public class AccountProjection {
private final AccountViewRepository readRepository;
@EventListener
@Transactional
public void on(AccountCreated event) {
AccountView view = new AccountView();
view.setId(event.getAccountId());
view.setOwner(event.getOwner());
view.setBalance(BigDecimal.ZERO);
view.setStatus("ACTIVE");
view.setCreatedAt(event.getOccurredAt());
view.setLastUpdatedAt(event.getOccurredAt());
readRepository.save(view);
log.info("Projection: Account created — {}", event.getAccountId());
}
@EventListener
@Transactional
public void on(MoneyDeposited event) {
AccountView view = readRepository.findById(event.getAccountId()).orElseThrow();
view.setBalance(view.getBalance().add(event.getAmount()));
view.setLastUpdatedAt(Instant.now());
view.setTransactionCount(view.getTransactionCount() + 1);
readRepository.save(view);
}
@EventListener
@Transactional
public void on(MoneyWithdrawn event) {
AccountView view = readRepository.findById(event.getAccountId()).orElseThrow();
view.setBalance(view.getBalance().subtract(event.getAmount()));
view.setLastUpdatedAt(Instant.now());
view.setTransactionCount(view.getTransactionCount() + 1);
readRepository.save(view);
}
}Projection'ın gücü: farklı görünümler oluşturabilirsiniz. Aynı event'lerden bir "hesap özeti" view'ı, bir "işlem geçmişi" view'ı, bir "aylık rapor" view'ı çıkarabilirsiniz.
CQRS: Command/Query Ayrımı
CQRS, okuma ve yazma operasyonlarını tamamen ayrı modellere böler:
┌─── Command Model ───┐
User ─ Command ▶│ (Write Side) │──▶ Event Store
│ Validation │
│ Business Rules │
└──────────────────────┘
│ events
▼
┌─── Query Model ─────┐
User ◀ Response│ (Read Side) │◀── Materialized View
│ Optimized for reads │ (projection)
│ Denormalized data │
└──────────────────────┘Command Handler (Write Side)
@Service
@RequiredArgsConstructor
public class AccountCommandHandler {
private final EventStore eventStore;
public void handle(CreateAccountCommand command) {
AccountCreated event = new AccountCreated(
UUID.randomUUID().toString(),
command.getOwner()
);
eventStore.append(event.getAccountId(), "Account", event, 0);
}
public void handle(DepositMoneyCommand command) {
// 1. Event store'dan aggregate'i yükle
List<DomainEvent> events = eventStore.loadEvents(command.getAccountId());
AccountAggregate account = AccountAggregate.fromEvents(events);
// 2. İş kurallarını uygula (domain logic)
MoneyDeposited event = account.deposit(command.getAmount());
// 3. Event'i kaydet (optimistic locking ile)
eventStore.append(command.getAccountId(), "Account", event, account.getVersion());
}
public void handle(WithdrawMoneyCommand command) {
List<DomainEvent> events = eventStore.loadEvents(command.getAccountId());
AccountAggregate account = AccountAggregate.fromEvents(events);
// İş kuralı: bakiye yeterli mi?
MoneyWithdrawn event = account.withdraw(command.getAmount());
eventStore.append(command.getAccountId(), "Account", event, account.getVersion());
}
}Query Handler (Read Side)
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/accounts")
public class AccountQueryController {
private final AccountViewRepository readRepository;
@GetMapping("/{id}")
public AccountView getAccount(@PathVariable String id) {
return readRepository.findById(id)
.orElseThrow(() -> new NotFoundException("Account not found: " + id));
}
@GetMapping("/{id}/transactions")
public List<TransactionView> getTransactions(
@PathVariable String id,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
return transactionViewRepository.findByAccountId(id, PageRequest.of(page, size));
}
@GetMapping("/search")
public List<AccountView> searchAccounts(
@RequestParam(required = false) String owner,
@RequestParam(required = false) BigDecimal minBalance) {
// Read model, sorguya optimize edilmiş denormalize veri içerir
return readRepository.search(owner, minBalance);
}
}Event-Driven vs Event-Sourced
Bu iki kavram sıklıkla karıştırılır:
| Event-Driven | Event Sourcing | |
|---|---|---|
| Amaç | Servisler arası iletişim | Durum yönetimi |
| Event'in rolü | Haberleşme aracı | Birincil veri kaynağı |
| Durum depolama | Klasik CRUD olabilir | Event'lerden türetilir |
| Replay | Genelde yok | Temel özellik |
| Karmaşıklık | Düşük-orta | Yüksek |
İkisi birlikte kullanıldığında en güçlü kombinasyonu oluşturur — ama bağımsız da kullanılabilirler.
Ne Zaman Kullanmalı?
✅ Uygun Senaryolar
Audit trail zorunluluğu — Finans, sağlık, hukuk: her değişikliğin kaydı olmalı
Karmaşık iş kuralları — Domain logic yoğun sistemler
Temporal query — "Geçen ay bu tarihte durum neydi?" sorusuna yanıt
Yüksek okuma/yazma oranı farkı — Çok okuyan, az yazan sistemler
Eventual consistency kabul edilebilir — Read model birkaç saniye gecikebilir
❌ Uygun Olmayan Senaryolar
Basit CRUD uygulamaları — Gereksiz karmaşıklık
Güçlü tutarlılık gereksinimi — CQRS eventual consistency getirir
Küçük ekipler — Operasyonel karmaşıklık yüksek
Prototip/MVP — İlk ürünü hızlı çıkarmak daha önemli
Özet
Event Sourcing, durumu event'ler dizisi olarak saklar. Veritabanında son durum yerine tüm değişiklik geçmişi tutulur. Bu, tam audit trail, temporal query ve replay yeteneği sağlar.
Event Store append-only bir veri deposudur. Event'ler asla güncellenmez veya silinmez. Optimistic locking ile concurrent write koruması sağlanır.
Aggregate, iş kurallarını kapsayan ve event'leri uygulayan domain nesnesidir. Event'leri replay ederek mevcut durumu hesaplar.
Materialized View (Projection), event'leri dinleyerek güncel okunabilir model tutar. Farklı sorgulara optimize edilmiş birden fazla view oluşturulabilir.
CQRS, okuma ve yazma modellerini ayırır. Write side event üretir, read side event'lerden projeksiyon günceller. Farklı veritabanları kullanılabilir.
Her sisteme uygun değildir — karmaşıklık, eventual consistency ve operasyonel yük getirir. Audit, finans ve karmaşık domain senaryolarında güçlü bir çözümdür.
Snapshot Optimizasyonu
Binlerce event'i her seferinde replay etmek performans sorunlarına yol açar. Snapshot mekanizması, belirli aralıklarla aggregate'in anlık durumunu kaydeder:
@Service
@RequiredArgsConstructor
public class SnapshotStore {
private final SnapshotRepository snapshotRepository;
private final ObjectMapper objectMapper;
// Her 100 event'te bir snapshot al
private static final int SNAPSHOT_INTERVAL = 100;
public void saveSnapshotIfNeeded(String aggregateId, AccountAggregate aggregate) {
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
Snapshot snapshot = new Snapshot();
snapshot.setAggregateId(aggregateId);
snapshot.setVersion(aggregate.getVersion());
snapshot.setData(objectMapper.writeValueAsString(aggregate));
snapshot.setCreatedAt(Instant.now());
snapshotRepository.save(snapshot);
}
}
public AccountAggregate loadAggregate(String aggregateId, EventStore eventStore) {
// 1. En son snapshot'ı bul
Optional<Snapshot> snapshot = snapshotRepository
.findTopByAggregateIdOrderByVersionDesc(aggregateId);
if (snapshot.isPresent()) {
// 2. Snapshot'tan aggregate'i oluştur
AccountAggregate aggregate = objectMapper.readValue(
snapshot.get().getData(), AccountAggregate.class);
// 3. Snapshot'tan sonraki event'leri uygula (incremental replay)
List<DomainEvent> newEvents = eventStore.loadEventsAfterVersion(
aggregateId, snapshot.get().getVersion());
newEvents.forEach(aggregate::apply);
return aggregate;
}
// Snapshot yok — tüm event'leri replay et
List<DomainEvent> allEvents = eventStore.loadEvents(aggregateId);
return AccountAggregate.fromEvents(allEvents);
}
}Snapshot ile: 10.000 event'li bir aggregate'i yüklemek için 100 event yerine (snapshot + son 100 event) sadece ~100 event replay edilir — 100x performans iyileştirmesi.
AI Asistan
Sorularını yanıtlamaya hazır