← Kursa Dön
📄 Text · 40 min

Versioning ve Concurrency Control

Giriş — Aynı Anda Aynı Kapıdan Geçmek

İki kişi aynı anda bir belgenin aynı satırını düzenlemeye çalışırsa ne olur? Biri diğerinin değişikliğini ezebilir — ve ikisi de bunun farkında olmaz. Veritabanı dünyasında buna lost update (kayıp güncelleme) problemi denir ve dağıtık sistemlerde en yaygın hatalardan biridir.

Elasticsearch dağıtık bir sistemdir. Aynı dokümanı farklı client'lar, farklı node'lar üzerinden eşzamanlı olarak güncelleyebilir. İşte bu yüzden Elasticsearch bir concurrency control mekanizması sunar: optimistic concurrency control (iyimser eşzamanlılık kontrolü). Bu ders, bu mekanizmanın her detayını — eski _version sisteminden modern _seq_no + _primary_term yaklaşımına, CAS pattern'ından Java'da concurrency handling'e kadar — derinlemesine ele alacak.


1. Problem: Lost Update

1.1 Senaryo

Bir e-ticaret sitesinde stok yönetimi yapıyorsunuz. Ürünün mevcut stok sayısı 10:

// Mevcut doküman
GET /products/_doc/1

{
  "_index": "products",
  "_id": "1",
  "_version": 5,
  "_seq_no": 42,
  "_primary_term": 1,
  "_source": {
    "name": "Kablosuz Kulaklık",
    "stock": 10,
    "price": 299.99
  }
}

İki müşteri aynı anda sipariş veriyor:

Zaman  Thread A (Müşteri 1)          Thread B (Müşteri 2)
─────────────────────────────────────────────────────────
t1     GET stock → 10                 GET stock → 10
t2     stock = 10 - 1 = 9            stock = 10 - 2 = 8
t3     PUT stock = 9                  PUT stock = 8
t4     Başarılı ✅                    Başarılı ✅

Sonuç: stock = 8 (Thread B'nin değeri)
Beklenen: stock = 7 (10 - 1 - 2)
Kayıp: 1 adet stok eksik sayıldı! ❌

Bu lost update problemidir. Thread A'nın güncellemesi kayboldu çünkü Thread B daha yeni bir değeri yazdığını sanıyor ama aslında eski veriyi baz almış.

1.2 Çözüm Yaklaşımları

┌──────────────────────┬─────────────────────────────────────────────┐
│ Yaklaşım             │ Açıklama                                    │
├──────────────────────┼─────────────────────────────────────────────┤
│ Pessimistic Locking  │ Okumadan önce kilitle, güncelle, kilidi aç  │
│ (Karamsar)           │ ES bunu DESTEKLEMİYOR                       │
├──────────────────────┼─────────────────────────────────────────────┤
│ Optimistic Locking   │ Güncelleme sırasında versiyon kontrolü      │
│ (İyimser)            │ ES bunu KULLANIYOR ✅                       │
├──────────────────────┼─────────────────────────────────────────────┤
│ Scripted Update      │ Atomik güncelleme — oku+yaz tek işlemde     │
│                      │ ES bunu da DESTEKLİYOR ✅                   │
└──────────────────────┴─────────────────────────────────────────────┘

2. Eski Yöntem: _version (Deprecated)

2.1 Internal Version

Elasticsearch'ün ilk versiyonlarında her dokümanın bir _version numarası vardı. Her güncelleme bu numarayı 1 artırırdı:

// Doküman oluştur → version 1
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 10 }
// Yanıt: "_version": 1

// Güncelle → version 2
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 9 }
// Yanıt: "_version": 2

// Güncelle → version 3
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 8 }
// Yanıt: "_version": 3

Concurrency control için version parametresi kullanılırdı:

// "Sadece version 3 ise güncelle" — ESKİ YÖNTEM
PUT /products/_doc/1?version=3&version_type=internal
{ "name": "Kulaklık", "stock": 7 }

// Eğer version artık 3 değilse → 409 Conflict

2.2 External Version

Harici bir sistemin (örneğin RDBMS) versiyon numarasını kullanmak için:

// External version — verilen sayıdan büyükse kabul et
PUT /products/_doc/1?version=10&version_type=external
{ "name": "Kulaklık", "stock": 7 }

// version_type=external: Verilen version, mevcut version'dan büyük olmalı
// version_type=external_gte: Verilen version, mevcut version'a eşit veya büyük olmalı

⚠️ Dikkat: version parametresi ile concurrency control deprecated'dır (Elasticsearch 6.7+). Yeni kodlarda kullanmayın. _seq_no + _primary_term kullanın. version_type=external ise hâlâ harici versiyon senkronizasyonu için kullanılabilir.

2.3 Neden Deprecated?

Internal _version ile ilgili sorunlar:

  • Version numarası sadece artıyordu — doküman silinip yeniden oluşturulduğunda sıfırlanmıyor, veya eski versiyon numarasıyla karışıyordu

  • Dağıtık ortamda primary shard değiştiğinde (failover) versiyon tutarsızlıkları oluşabiliyordu

  • Silinen dokümanların version bilgisini GC sonrası tutmak maliyetliydi


3. Modern Yöntem: _seq_no + _primary_term

3.1 Kavramlar

Elasticsearch 6.7+ ile gelen modern concurrency control iki değere dayanır:

`_seq_no` (Sequence Number): Her shard'da monoton artan bir sayaç. Shard'a yapılan her write işlemi (index, update, delete) bu sayacı 1 artırır. Shard-local'dir — farklı shard'ların seq_no'ları birbirinden bağımsızdır.

`_primary_term`: Primary shard atamasının "nesil" numarası. Primary shard değiştiğinde (node çöktü, failover oldu), primary_term artar. Bu, eski primary'den gelen işlemleri yenisinden ayırt etmeyi sağlar.

Shard 0 - Timeline:
Primary Term 1 (Node A is primary):
  seq_no=0: index doc-1
  seq_no=1: update doc-1
  seq_no=2: index doc-2
  seq_no=3: delete doc-1

  [Node A çöktü, Node B yeni primary oldu]

Primary Term 2 (Node B is primary):
  seq_no=4: index doc-3
  seq_no=5: update doc-2
  ...

3.2 Doküman Yanıtında Değerler

// Doküman oluştur
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 10 }

// Yanıt:
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "result": "created"
}

// Güncelle
POST /products/_update/1
{ "doc": { "stock": 9 } }

// Yanıt:
{
  "_index": "products",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,         // ← Arttı
  "_primary_term": 1,   // ← Aynı (primary değişmedi)
  "result": "updated"
}

3.3 Optimistic Concurrency Control

İşte modern yöntem — if_seq_no ve if_primary_term parametreleri:

// 1. Dokümanı oku — seq_no ve primary_term'ü not et
GET /products/_doc/1

// Yanıt: _seq_no: 1, _primary_term: 1

// 2. "Sadece seq_no=1 VE primary_term=1 ise güncelle"
PUT /products/_doc/1?if_seq_no=1&if_primary_term=1
{
  "name": "Kulaklık",
  "stock": 9
}

Başarılı durumda:

{
  "_index": "products",
  "_id": "1",
  "_version": 3,
  "_seq_no": 2,        // Yeni seq_no
  "_primary_term": 1,
  "result": "updated"
}

Çakışma durumunda (başka biri araya girdi):

{
  "error": {
    "type": "version_conflict_engine_exception",
    "reason": "[1]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]"
  },
  "status": 409
}

3.4 Lost Update Problemi Çözülmüş

Zaman  Thread A                           Thread B
─────────────────────────────────────────────────────────
t1     GET doc → seq_no:1, term:1         GET doc → seq_no:1, term:1
t2     stock = 10 - 1 = 9                 stock = 10 - 2 = 8
t3     PUT ?if_seq_no=1&if_primary_term=1  
       → Başarılı ✅ (seq_no: 2)
t4                                         PUT ?if_seq_no=1&if_primary_term=1
                                           → 409 Conflict ❌
                                           (seq_no artık 2, 1 değil!)
t5                                         Retry: GET doc → seq_no:2
                                           stock = 9 - 2 = 7
                                           PUT ?if_seq_no=2&if_primary_term=1
                                           → Başarılı ✅

Sonuç: stock = 7 ✅ (Doğru!)

4. Update API ve retry_on_conflict

4.1 Scripted Update — Atomik Güncelleme

_update API'si ile script kullanarak "read + modify + write" işlemini Elasticsearch tarafında atomik olarak yapabilirsiniz:

POST /products/_update/1
{
  "script": {
    "source": "ctx._source.stock -= params.quantity",
    "lang": "painless",
    "params": {
      "quantity": 1
    }
  }
}

Bu yaklaşım, client tarafında oku-değiştir-yaz döngüsüne gerek kalmadan stok azaltır. Ancak yine de concurrency riski vardır — iki eşzamanlı script update aynı anda çalışabilir.

4.2 retry_on_conflict

_update API'sinde retry_on_conflict parametresi, version conflict olduğunda otomatik retry yapar:

POST /products/_update/1?retry_on_conflict=3
{
  "script": {
    "source": "ctx._source.stock -= params.quantity",
    "params": { "quantity": 1 }
  }
}

İç mekanizma:

  1. Dokümanı oku (seq_no + primary_term)

  2. Script'i çalıştır

  3. Güncellemeyi yaz (if_seq_no=X & if_primary_term=Y ile)

  4. Eğer conflict → adım 1'den tekrar başla

  5. 3 denemeden sonra hâlâ conflict varsa → hata dön

retry_on_conflict=3 akışı:
  Deneme 1: GET(seq:5) → script → PUT(if_seq:5) → 409 Conflict
  Deneme 2: GET(seq:6) → script → PUT(if_seq:6) → 409 Conflict
  Deneme 3: GET(seq:7) → script → PUT(if_seq:7) → 200 OK ✅

4.3 Scripted Update ile Koşullu Güncelleme

// Stok 0'ın altına düşmesin
POST /products/_update/1?retry_on_conflict=5
{
  "script": {
    "source": """
      if (ctx._source.stock >= params.quantity) {
        ctx._source.stock -= params.quantity;
      } else {
        ctx.op = 'noop';  // İşlemi iptal et
      }
    """,
    "params": { "quantity": 3 }
  }
}

`ctx.op` seçenekleri:

  • "index": Normal güncelleme (varsayılan)

  • "noop": Hiçbir şey yapma (conflict yok, sadece skip)

  • "delete": Dokümanı sil

4.4 Bulk Update ile retry_on_conflict

POST /_bulk
{"update": {"_index": "products", "_id": "1", "retry_on_conflict": 3}}
{"script": {"source": "ctx._source.stock -= 1"}}
{"update": {"_index": "products", "_id": "2", "retry_on_conflict": 3}}
{"script": {"source": "ctx._source.stock -= 2"}}

5. CAS Pattern — Compare and Swap

5.1 CAS Nedir?

Compare and Swap (CAS), lock-free concurrency control'ün temel yapı taşıdır. Fikir basittir:

  1. Mevcut değeri oku

  2. Beklenen değerle karşılaştır (compare)

  3. Eşleşiyorsa yeni değeri yaz (swap)

  4. Eşleşmiyorsa başarısız ol ve tekrar dene

Elasticsearch'ün if_seq_no + if_primary_term mekanizması tam bir CAS implementasyonudur.

5.2 CAS Pattern Implementasyonu

public class CASUpdater {
    private final ElasticsearchClient client;
    private static final int MAX_CAS_RETRIES = 10;

    public CASUpdater(ElasticsearchClient client) {
        this.client = client;
    }

    /**
     * CAS pattern ile doküman güncelleme.
     * @param index Index adı
     * @param id Doküman ID
     * @param updateFn Güncelleme fonksiyonu (mevcut doküman → güncel doküman)
     * @return Güncellenmiş doküman
     */
    public <T> T updateWithCAS(String index, String id, Class<T> clazz,
                                Function<T, T> updateFn) throws IOException {

        for (int attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) {
            // 1. Mevcut dokümanı oku
            GetResponse<T> getResponse = client.get(g -> g
                .index(index)
                .id(id),
                clazz
            );

            if (!getResponse.found()) {
                throw new RuntimeException("Document not found: " + id);
            }

            T currentDoc = getResponse.source();
            long seqNo = getResponse.seqNo();
            long primaryTerm = getResponse.primaryTerm();

            // 2. Güncelleme fonksiyonunu uygula
            T updatedDoc = updateFn.apply(currentDoc);

            try {
                // 3. CAS ile güncelle
                IndexResponse indexResponse = client.index(i -> i
                    .index(index)
                    .id(id)
                    .document(updatedDoc)
                    .ifSeqNo(seqNo)
                    .ifPrimaryTerm(primaryTerm)
                );

                System.out.printf("CAS update successful on attempt %d (seq_no: %d → %d)%n",
                    attempt + 1, seqNo, indexResponse.seqNo());
                return updatedDoc;

            } catch (ElasticsearchException e) {
                if (e.status() == 409) {
                    // Version conflict — retry
                    System.out.printf("CAS conflict on attempt %d, retrying...%n",
                        attempt + 1);

                    // Kısa bir bekleme (optional, ama thundering herd'ü önler)
                    try {
                        Thread.sleep((long) (Math.random() * 50 * (attempt + 1)));
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new IOException("Interrupted during CAS retry", ie);
                    }
                    continue;
                }
                throw e;
            }
        }

        throw new RuntimeException("CAS update failed after " + MAX_CAS_RETRIES + " attempts");
    }
}

Kullanım:

CASUpdater updater = new CASUpdater(client);

// Stok azaltma — CAS ile güvenli
Product updated = updater.updateWithCAS("products", "1", Product.class,
    product -> {
        if (product.getStock() < 1) {
            throw new RuntimeException("Out of stock!");
        }
        product.setStock(product.getStock() - 1);
        return product;
    }
);

System.out.println("New stock: " + updated.getStock());

5.3 CAS vs retry_on_conflict

┌─────────────────────┬──────────────────────┬─────────────────────────┐
│                     │ CAS (if_seq_no)      │ retry_on_conflict       │
├─────────────────────┼──────────────────────┼─────────────────────────┤
│ Çalışma yeri        │ Client tarafı        │ Elasticsearch tarafı    │
│ Güncelleme tipi     │ Full document replace│ Partial update / Script │
│ Logic kontrolü      │ Client'ta tam kontrol│ Script ile sınırlı      │
│ Network roundtrip   │ Conflict başına 2    │ Conflict başına 0 (ES   │
│                     │ (GET + PUT)          │ kendi içinde retry eder)│
│ Validasyon          │ Zengin (Java logic)  │ Painless script         │
│ Best for            │ Karmaşık iş mantığı │ Basit increment/decrement│
└─────────────────────┴──────────────────────┴─────────────────────────┘

6. Version Conflict Senaryoları

6.1 Senaryo 1: E-Commerce Stok Yönetimi

public class StockManager {
    private final CASUpdater updater;

    public StockManager(ElasticsearchClient client) {
        this.updater = new CASUpdater(client);
    }

    public boolean reserveStock(String productId, int quantity) {
        try {
            updater.updateWithCAS("products", productId, Product.class,
                product -> {
                    if (product.getStock() < quantity) {
                        throw new InsufficientStockException(
                            "Need " + quantity + " but only " + product.getStock() + " available"
                        );
                    }
                    product.setStock(product.getStock() - quantity);
                    product.setLastUpdated(Instant.now().toString());
                    return product;
                }
            );
            return true;
        } catch (InsufficientStockException e) {
            System.err.println("Stock insufficient: " + e.getMessage());
            return false;
        } catch (Exception e) {
            System.err.println("Stock reservation failed: " + e.getMessage());
            return false;
        }
    }
}

6.2 Senaryo 2: Counter Güncelleme (Script ile)

Basit sayaç artırma için script + retry_on_conflict daha verimlidir:

// Sayfa görüntülenme sayısını artır
POST /pages/_update/homepage?retry_on_conflict=5
{
  "script": {
    "source": "ctx._source.view_count += 1"
  },
  "upsert": {
    "view_count": 1,
    "page": "homepage"
  }
}

upsert parametresi: Doküman yoksa bu değerle oluşturulur. Varsa script çalışır.

6.3 Senaryo 3: Distributed Lock Pattern

Elasticsearch ile basit bir distributed lock:

// Lock almak — create (sadece yoksa oluştur)
PUT /locks/_doc/resource-123?op_type=create
{
  "locked_by": "worker-1",
  "locked_at": "2024-02-27T10:30:00Z",
  "ttl": "30s"
}

// Lock varsa → 409 Conflict (başkası almış)
// Lock yoksa → 201 Created (lock alındı)
public class DistributedLock {
    private final ElasticsearchClient client;

    public DistributedLock(ElasticsearchClient client) {
        this.client = client;
    }

    public boolean tryAcquire(String resourceId, String workerId,
                               Duration ttl) throws IOException {
        try {
            Map<String, Object> lockDoc = Map.of(
                "locked_by", workerId,
                "locked_at", Instant.now().toString(),
                "expires_at", Instant.now().plus(ttl).toString()
            );

            client.index(i -> i
                .index("locks")
                .id(resourceId)
                .document(lockDoc)
                .opType(OpType.Create)  // Sadece yoksa oluştur
            );

            return true;  // Lock alındı
        } catch (ElasticsearchException e) {
            if (e.status() == 409) {
                return false;  // Lock başkasında
            }
            throw e;
        }
    }

    public void release(String resourceId, String workerId) throws IOException {
        // Sadece kendi lock'unu sil
        GetResponse<Map> getResponse = client.get(g -> g
            .index("locks")
            .id(resourceId),
            Map.class
        );

        if (getResponse.found()) {
            Map source = getResponse.source();
            if (workerId.equals(source.get("locked_by"))) {
                client.delete(d -> d
                    .index("locks")
                    .id(resourceId)
                    .ifSeqNo(getResponse.seqNo())
                    .ifPrimaryTerm(getResponse.primaryTerm())
                );
            }
        }
    }
}

⚠️ Dikkat: Elasticsearch gerçek bir distributed lock servisi değildir. Bu pattern basit durumlar için işe yarar ama Redis, ZooKeeper veya etcd gibi özel lock servislerinin sağladığı garantileri (fencing token, lease expiry) sağlamaz. Kritik iş mantığı için özel lock servisleri kullanın.


7. _version vs _seq_no Karşılaştırması

┌────────────────────┬──────────────────────┬────────────────────────────┐
│                    │ _version (eski)      │ _seq_no + _primary_term    │
├────────────────────┼──────────────────────┼────────────────────────────┤
│ Scope              │ Doküman bazlı        │ Shard bazlı                │
│ Artış              │ Her doküman update'de│ Shard'daki her write'da    │
│ Failover güvenliği │ Zayıf                │ Güçlü (primary_term)       │
│ GC sonrası davranış│ Sorunlu              │ Sorunsuz                   │
│ Parametreler       │ version,version_type │ if_seq_no,if_primary_term  │
│ Durumu             │ Deprecated           │ Aktif, önerilen ✅          │
│ İç kullanım        │ Lucene _version      │ Lucene local checkpoint    │
└────────────────────┴──────────────────────┴────────────────────────────┘

_seq_no'nun avantajları:

  1. Failover safety: Primary shard değiştiğinde _primary_term artar. Eski primary'den gelen geç kalmış işlemler reddedilir

  2. No tombstone problem: _version silinen dokümanların versiyon bilgisini tutmak zorundaydı (tombstone). _seq_no shard seviyesinde olduğu için bu sorun yok

  3. Replication tracking: Replica'lar hangi seq_no'ya kadar senkronize olduğunu takip edebilir


8. Global ve Local Checkpoint

8.1 Checkpoint Kavramı

Elasticsearch, replikasyon senkronizasyonu için checkpoint sistemi kullanır:

Primary Shard (seq_no: 0, 1, 2, 3, 4, 5, 6, 7)
  local_checkpoint: 7   (kendi yazdığı son seq_no)
  global_checkpoint: 5  (tüm replica'ların onayladığı son seq_no)

Replica 1:
  local_checkpoint: 5   (aldığı son seq_no)

Replica 2:
  local_checkpoint: 7   (aldığı son seq_no)

Global checkpoint = min(tüm local checkpoints) = 5

Global checkpoint, tüm kopyaların senkronize olduğu son seq_no'dur. Bu bilgi:

  • Translog temizleme kararları için kullanılır

  • Recovery sırasında hangi işlemlerin tekrar gönderilmesi gerektiğini belirler

// Checkpoint bilgilerini görmek
GET /products/_stats?level=shards

// Yanıtta:
{
  "shards": {
    "0": [{
      "routing": { "primary": true },
      "seq_no": {
        "max_seq_no": 1500,
        "local_checkpoint": 1500,
        "global_checkpoint": 1498
      }
    }]
  }
}

9. Java'da Concurrency Handling

9.1 Complete Concurrency-Safe Service

public class ProductService {
    private final ElasticsearchClient client;
    private static final int MAX_RETRIES = 5;
    private static final String INDEX = "products";

    public ProductService(ElasticsearchClient client) {
        this.client = client;
    }

    /**
     * Script-based atomic update (basit işlemler için)
     */
    public void decrementStock(String productId, int quantity) throws IOException {
        UpdateResponse<Product> response = client.update(u -> u
            .index(INDEX)
            .id(productId)
            .retryOnConflict(MAX_RETRIES)
            .script(s -> s
                .inline(i -> i
                    .source("""
                        if (ctx._source.stock >= params.qty) {
                            ctx._source.stock -= params.qty;
                            ctx._source.last_updated = params.now;
                        } else {
                            ctx.op = 'noop';
                        }
                    """)
                    .params("qty", JsonData.of(quantity))
                    .params("now", JsonData.of(Instant.now().toString()))
                )
            ),
            Product.class
        );

        if ("noop".equals(response.result().jsonValue())) {
            throw new InsufficientStockException("Not enough stock for product: " + productId);
        }
    }

    /**
     * CAS-based full update (karmaşık iş mantığı için)
     */
    public Product updateProduct(String productId,
                                  Function<Product, Product> updateLogic) throws IOException {
        for (int attempt = 0; attempt < MAX_RETRIES; attempt++) {
            // Oku
            GetResponse<Product> get = client.get(g -> g
                .index(INDEX)
                .id(productId),
                Product.class
            );

            if (!get.found()) {
                throw new ProductNotFoundException(productId);
            }

            // İş mantığını uygula
            Product updated = updateLogic.apply(get.source());

            // CAS ile yaz
            try {
                client.index(i -> i
                    .index(INDEX)
                    .id(productId)
                    .document(updated)
                    .ifSeqNo(get.seqNo())
                    .ifPrimaryTerm(get.primaryTerm())
                );
                return updated;
            } catch (ElasticsearchException e) {
                if (e.status() == 409) {
                    if (attempt < MAX_RETRIES - 1) {
                        sleepWithJitter(attempt);
                        continue;
                    }
                }
                throw e;
            }
        }

        throw new ConcurrencyException("Failed after " + MAX_RETRIES + " retries");
    }

    /**
     * Bulk update with concurrency control
     */
    public void bulkUpdatePrices(Map<String, Double> priceUpdates) throws IOException {
        // Tüm dokümanları oku (seq_no bilgisi için)
        MgetResponse<Product> mget = client.mget(m -> m
            .index(INDEX)
            .ids(new ArrayList<>(priceUpdates.keySet())),
            Product.class
        );

        // Bulk update oluştur
        BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
        for (MultiGetResponseItem<Product> item : mget.docs()) {
            if (item.isResult() && item.result().found()) {
                var result = item.result();
                String id = result.id();
                Double newPrice = priceUpdates.get(id);

                Product product = result.source();
                product.setPrice(newPrice);

                bulkBuilder.operations(op -> op
                    .index(idx -> idx
                        .index(INDEX)
                        .id(id)
                        .document(product)
                        .ifSeqNo(result.seqNo())
                        .ifPrimaryTerm(result.primaryTerm())
                    )
                );
            }
        }

        BulkResponse bulkResponse = client.bulk(bulkBuilder.build());

        if (bulkResponse.errors()) {
            // Conflict olan işlemleri logla
            for (BulkResponseItem item : bulkResponse.items()) {
                if (item.error() != null && item.status() == 409) {
                    System.err.printf("Conflict for doc %s: %s%n",
                        item.id(), item.error().reason());
                }
            }
        }
    }

    private void sleepWithJitter(int attempt) {
        try {
            long baseMs = 50L * (long) Math.pow(2, attempt);
            long jitter = (long) (Math.random() * baseMs);
            Thread.sleep(baseMs + jitter);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

9.2 Thread-Safe Counter Service

public class CounterService {
    private final ElasticsearchClient client;

    public CounterService(ElasticsearchClient client) {
        this.client = client;
    }

    /**
     * Atomik sayaç artırma — upsert ile
     */
    public long increment(String counterName, long amount) throws IOException {
        UpdateResponse<Map> response = client.update(u -> u
            .index("counters")
            .id(counterName)
            .retryOnConflict(10)
            .script(s -> s
                .inline(i -> i
                    .source("ctx._source.value += params.amount")
                    .params("amount", JsonData.of(amount))
                )
            )
            .upsert(Map.of("value", amount, "name", counterName)),
            Map.class
        );

        return response.version();
    }

    /**
     * Atomik sayaç okuma
     */
    public long get(String counterName) throws IOException {
        GetResponse<Map> response = client.get(g -> g
            .index("counters")
            .id(counterName),
            Map.class
        );

        if (!response.found()) return 0;
        return ((Number) response.source().get("value")).longValue();
    }
}

10. Yaygın Hatalar

Hata 1: Concurrency control olmadan güncelleme

// ❌ YANLIŞ — Lost update riski!
Product product = client.get(g -> g.index("products").id("1"), Product.class).source();
product.setStock(product.getStock() - 1);
client.index(i -> i.index("products").id("1").document(product));

// ✅ DOĞRU — CAS ile güncelleme
GetResponse<Product> get = client.get(g -> g.index("products").id("1"), Product.class);
client.index(i -> i
    .index("products").id("1")
    .document(updatedProduct)
    .ifSeqNo(get.seqNo())
    .ifPrimaryTerm(get.primaryTerm())
);

Hata 2: Eski _version parametresini kullanmak

// ❌ Deprecated
PUT /products/_doc/1?version=5
{ "stock": 9 }

// ✅ Modern yöntem
PUT /products/_doc/1?if_seq_no=42&if_primary_term=1
{ "stock": 9 }

Hata 3: retry_on_conflict olmadan script update

// ❌ Yüksek concurrent yazmalarda conflict alırsınız
POST /products/_update/1
{ "script": { "source": "ctx._source.stock -= 1" } }

// ✅ retry_on_conflict ekleyin
POST /products/_update/1?retry_on_conflict=5
{ "script": { "source": "ctx._source.stock -= 1" } }

11. Özet

  • Lost update, dağıtık sistemlerde en yaygın concurrency sorunudur. Elasticsearch optimistic concurrency control ile bunu çözer

  • Eski _version sistemi deprecated'dır. Modern kodlarda `_seq_no` + `_primary_term` kullanın

  • if_seq_no ve if_primary_term parametreleri CAS (Compare and Swap) mekanizması sağlar: "sadece beklenen değer ise güncelle"

  • Basit increment/decrement işlemleri için script + `retry_on_conflict` en verimli yöntemdir — client'a roundtrip gerekmez

  • Karmaşık iş mantığı gerektiren güncellemeler için CAS pattern kullanın: oku → iş mantığını uygula → CAS ile yaz → conflict varsa retry

  • _primary_term, primary shard failover'ı sonrası eski primary'den gelen geç işlemleri reddeder — veri tutarlılığını garantiler

  • Global checkpoint, tüm replica'ların senkronize olduğu son seq_no'dur ve replikasyon güvenliğini sağlar