Versioning ve Concurrency Control
Giriş — Aynı Anda Aynı Kapıdan Geçmek
İki kişi aynı anda bir belgenin aynı satırını düzenlemeye çalışırsa ne olur? Biri diğerinin değişikliğini ezebilir — ve ikisi de bunun farkında olmaz. Veritabanı dünyasında buna lost update (kayıp güncelleme) problemi denir ve dağıtık sistemlerde en yaygın hatalardan biridir.
Elasticsearch dağıtık bir sistemdir. Aynı dokümanı farklı client'lar, farklı node'lar üzerinden eşzamanlı olarak güncelleyebilir. İşte bu yüzden Elasticsearch bir concurrency control mekanizması sunar: optimistic concurrency control (iyimser eşzamanlılık kontrolü). Bu ders, bu mekanizmanın her detayını — eski _version sisteminden modern _seq_no + _primary_term yaklaşımına, CAS pattern'ından Java'da concurrency handling'e kadar — derinlemesine ele alacak.
1. Problem: Lost Update
1.1 Senaryo
Bir e-ticaret sitesinde stok yönetimi yapıyorsunuz. Ürünün mevcut stok sayısı 10:
// Mevcut doküman
GET /products/_doc/1
{
"_index": "products",
"_id": "1",
"_version": 5,
"_seq_no": 42,
"_primary_term": 1,
"_source": {
"name": "Kablosuz Kulaklık",
"stock": 10,
"price": 299.99
}
}İki müşteri aynı anda sipariş veriyor:
Zaman Thread A (Müşteri 1) Thread B (Müşteri 2)
─────────────────────────────────────────────────────────
t1 GET stock → 10 GET stock → 10
t2 stock = 10 - 1 = 9 stock = 10 - 2 = 8
t3 PUT stock = 9 PUT stock = 8
t4 Başarılı ✅ Başarılı ✅
Sonuç: stock = 8 (Thread B'nin değeri)
Beklenen: stock = 7 (10 - 1 - 2)
Kayıp: 1 adet stok eksik sayıldı! ❌Bu lost update problemidir. Thread A'nın güncellemesi kayboldu çünkü Thread B daha yeni bir değeri yazdığını sanıyor ama aslında eski veriyi baz almış.
1.2 Çözüm Yaklaşımları
┌──────────────────────┬─────────────────────────────────────────────┐
│ Yaklaşım │ Açıklama │
├──────────────────────┼─────────────────────────────────────────────┤
│ Pessimistic Locking │ Okumadan önce kilitle, güncelle, kilidi aç │
│ (Karamsar) │ ES bunu DESTEKLEMİYOR │
├──────────────────────┼─────────────────────────────────────────────┤
│ Optimistic Locking │ Güncelleme sırasında versiyon kontrolü │
│ (İyimser) │ ES bunu KULLANIYOR ✅ │
├──────────────────────┼─────────────────────────────────────────────┤
│ Scripted Update │ Atomik güncelleme — oku+yaz tek işlemde │
│ │ ES bunu da DESTEKLİYOR ✅ │
└──────────────────────┴─────────────────────────────────────────────┘2. Eski Yöntem: _version (Deprecated)
2.1 Internal Version
Elasticsearch'ün ilk versiyonlarında her dokümanın bir _version numarası vardı. Her güncelleme bu numarayı 1 artırırdı:
// Doküman oluştur → version 1
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 10 }
// Yanıt: "_version": 1
// Güncelle → version 2
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 9 }
// Yanıt: "_version": 2
// Güncelle → version 3
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 8 }
// Yanıt: "_version": 3Concurrency control için version parametresi kullanılırdı:
// "Sadece version 3 ise güncelle" — ESKİ YÖNTEM
PUT /products/_doc/1?version=3&version_type=internal
{ "name": "Kulaklık", "stock": 7 }
// Eğer version artık 3 değilse → 409 Conflict2.2 External Version
Harici bir sistemin (örneğin RDBMS) versiyon numarasını kullanmak için:
// External version — verilen sayıdan büyükse kabul et
PUT /products/_doc/1?version=10&version_type=external
{ "name": "Kulaklık", "stock": 7 }
// version_type=external: Verilen version, mevcut version'dan büyük olmalı
// version_type=external_gte: Verilen version, mevcut version'a eşit veya büyük olmalı⚠️ Dikkat:
versionparametresi ile concurrency control deprecated'dır (Elasticsearch 6.7+). Yeni kodlarda kullanmayın._seq_no+_primary_termkullanın.version_type=externalise hâlâ harici versiyon senkronizasyonu için kullanılabilir.
2.3 Neden Deprecated?
Internal _version ile ilgili sorunlar:
Version numarası sadece artıyordu — doküman silinip yeniden oluşturulduğunda sıfırlanmıyor, veya eski versiyon numarasıyla karışıyordu
Dağıtık ortamda primary shard değiştiğinde (failover) versiyon tutarsızlıkları oluşabiliyordu
Silinen dokümanların version bilgisini GC sonrası tutmak maliyetliydi
3. Modern Yöntem: _seq_no + _primary_term
3.1 Kavramlar
Elasticsearch 6.7+ ile gelen modern concurrency control iki değere dayanır:
`_seq_no` (Sequence Number): Her shard'da monoton artan bir sayaç. Shard'a yapılan her write işlemi (index, update, delete) bu sayacı 1 artırır. Shard-local'dir — farklı shard'ların seq_no'ları birbirinden bağımsızdır.
`_primary_term`: Primary shard atamasının "nesil" numarası. Primary shard değiştiğinde (node çöktü, failover oldu), primary_term artar. Bu, eski primary'den gelen işlemleri yenisinden ayırt etmeyi sağlar.
Shard 0 - Timeline:
Primary Term 1 (Node A is primary):
seq_no=0: index doc-1
seq_no=1: update doc-1
seq_no=2: index doc-2
seq_no=3: delete doc-1
[Node A çöktü, Node B yeni primary oldu]
Primary Term 2 (Node B is primary):
seq_no=4: index doc-3
seq_no=5: update doc-2
...3.2 Doküman Yanıtında Değerler
// Doküman oluştur
PUT /products/_doc/1
{ "name": "Kulaklık", "stock": 10 }
// Yanıt:
{
"_index": "products",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"result": "created"
}
// Güncelle
POST /products/_update/1
{ "doc": { "stock": 9 } }
// Yanıt:
{
"_index": "products",
"_id": "1",
"_version": 2,
"_seq_no": 1, // ← Arttı
"_primary_term": 1, // ← Aynı (primary değişmedi)
"result": "updated"
}3.3 Optimistic Concurrency Control
İşte modern yöntem — if_seq_no ve if_primary_term parametreleri:
// 1. Dokümanı oku — seq_no ve primary_term'ü not et
GET /products/_doc/1
// Yanıt: _seq_no: 1, _primary_term: 1
// 2. "Sadece seq_no=1 VE primary_term=1 ise güncelle"
PUT /products/_doc/1?if_seq_no=1&if_primary_term=1
{
"name": "Kulaklık",
"stock": 9
}Başarılı durumda:
{
"_index": "products",
"_id": "1",
"_version": 3,
"_seq_no": 2, // Yeni seq_no
"_primary_term": 1,
"result": "updated"
}Çakışma durumunda (başka biri araya girdi):
{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, required seqNo [1], primary term [1]. current document has seqNo [2] and primary term [1]"
},
"status": 409
}3.4 Lost Update Problemi Çözülmüş
Zaman Thread A Thread B
─────────────────────────────────────────────────────────
t1 GET doc → seq_no:1, term:1 GET doc → seq_no:1, term:1
t2 stock = 10 - 1 = 9 stock = 10 - 2 = 8
t3 PUT ?if_seq_no=1&if_primary_term=1
→ Başarılı ✅ (seq_no: 2)
t4 PUT ?if_seq_no=1&if_primary_term=1
→ 409 Conflict ❌
(seq_no artık 2, 1 değil!)
t5 Retry: GET doc → seq_no:2
stock = 9 - 2 = 7
PUT ?if_seq_no=2&if_primary_term=1
→ Başarılı ✅
Sonuç: stock = 7 ✅ (Doğru!)4. Update API ve retry_on_conflict
4.1 Scripted Update — Atomik Güncelleme
_update API'si ile script kullanarak "read + modify + write" işlemini Elasticsearch tarafında atomik olarak yapabilirsiniz:
POST /products/_update/1
{
"script": {
"source": "ctx._source.stock -= params.quantity",
"lang": "painless",
"params": {
"quantity": 1
}
}
}Bu yaklaşım, client tarafında oku-değiştir-yaz döngüsüne gerek kalmadan stok azaltır. Ancak yine de concurrency riski vardır — iki eşzamanlı script update aynı anda çalışabilir.
4.2 retry_on_conflict
_update API'sinde retry_on_conflict parametresi, version conflict olduğunda otomatik retry yapar:
POST /products/_update/1?retry_on_conflict=3
{
"script": {
"source": "ctx._source.stock -= params.quantity",
"params": { "quantity": 1 }
}
}İç mekanizma:
Dokümanı oku (seq_no + primary_term)
Script'i çalıştır
Güncellemeyi yaz (if_seq_no=X & if_primary_term=Y ile)
Eğer conflict → adım 1'den tekrar başla
3 denemeden sonra hâlâ conflict varsa → hata dön
retry_on_conflict=3 akışı:
Deneme 1: GET(seq:5) → script → PUT(if_seq:5) → 409 Conflict
Deneme 2: GET(seq:6) → script → PUT(if_seq:6) → 409 Conflict
Deneme 3: GET(seq:7) → script → PUT(if_seq:7) → 200 OK ✅4.3 Scripted Update ile Koşullu Güncelleme
// Stok 0'ın altına düşmesin
POST /products/_update/1?retry_on_conflict=5
{
"script": {
"source": """
if (ctx._source.stock >= params.quantity) {
ctx._source.stock -= params.quantity;
} else {
ctx.op = 'noop'; // İşlemi iptal et
}
""",
"params": { "quantity": 3 }
}
}`ctx.op` seçenekleri:
"index": Normal güncelleme (varsayılan)"noop": Hiçbir şey yapma (conflict yok, sadece skip)"delete": Dokümanı sil
4.4 Bulk Update ile retry_on_conflict
POST /_bulk
{"update": {"_index": "products", "_id": "1", "retry_on_conflict": 3}}
{"script": {"source": "ctx._source.stock -= 1"}}
{"update": {"_index": "products", "_id": "2", "retry_on_conflict": 3}}
{"script": {"source": "ctx._source.stock -= 2"}}5. CAS Pattern — Compare and Swap
5.1 CAS Nedir?
Compare and Swap (CAS), lock-free concurrency control'ün temel yapı taşıdır. Fikir basittir:
Mevcut değeri oku
Beklenen değerle karşılaştır (compare)
Eşleşiyorsa yeni değeri yaz (swap)
Eşleşmiyorsa başarısız ol ve tekrar dene
Elasticsearch'ün if_seq_no + if_primary_term mekanizması tam bir CAS implementasyonudur.
5.2 CAS Pattern Implementasyonu
public class CASUpdater {
private final ElasticsearchClient client;
private static final int MAX_CAS_RETRIES = 10;
public CASUpdater(ElasticsearchClient client) {
this.client = client;
}
/**
* CAS pattern ile doküman güncelleme.
* @param index Index adı
* @param id Doküman ID
* @param updateFn Güncelleme fonksiyonu (mevcut doküman → güncel doküman)
* @return Güncellenmiş doküman
*/
public <T> T updateWithCAS(String index, String id, Class<T> clazz,
Function<T, T> updateFn) throws IOException {
for (int attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) {
// 1. Mevcut dokümanı oku
GetResponse<T> getResponse = client.get(g -> g
.index(index)
.id(id),
clazz
);
if (!getResponse.found()) {
throw new RuntimeException("Document not found: " + id);
}
T currentDoc = getResponse.source();
long seqNo = getResponse.seqNo();
long primaryTerm = getResponse.primaryTerm();
// 2. Güncelleme fonksiyonunu uygula
T updatedDoc = updateFn.apply(currentDoc);
try {
// 3. CAS ile güncelle
IndexResponse indexResponse = client.index(i -> i
.index(index)
.id(id)
.document(updatedDoc)
.ifSeqNo(seqNo)
.ifPrimaryTerm(primaryTerm)
);
System.out.printf("CAS update successful on attempt %d (seq_no: %d → %d)%n",
attempt + 1, seqNo, indexResponse.seqNo());
return updatedDoc;
} catch (ElasticsearchException e) {
if (e.status() == 409) {
// Version conflict — retry
System.out.printf("CAS conflict on attempt %d, retrying...%n",
attempt + 1);
// Kısa bir bekleme (optional, ama thundering herd'ü önler)
try {
Thread.sleep((long) (Math.random() * 50 * (attempt + 1)));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during CAS retry", ie);
}
continue;
}
throw e;
}
}
throw new RuntimeException("CAS update failed after " + MAX_CAS_RETRIES + " attempts");
}
}Kullanım:
CASUpdater updater = new CASUpdater(client);
// Stok azaltma — CAS ile güvenli
Product updated = updater.updateWithCAS("products", "1", Product.class,
product -> {
if (product.getStock() < 1) {
throw new RuntimeException("Out of stock!");
}
product.setStock(product.getStock() - 1);
return product;
}
);
System.out.println("New stock: " + updated.getStock());5.3 CAS vs retry_on_conflict
┌─────────────────────┬──────────────────────┬─────────────────────────┐
│ │ CAS (if_seq_no) │ retry_on_conflict │
├─────────────────────┼──────────────────────┼─────────────────────────┤
│ Çalışma yeri │ Client tarafı │ Elasticsearch tarafı │
│ Güncelleme tipi │ Full document replace│ Partial update / Script │
│ Logic kontrolü │ Client'ta tam kontrol│ Script ile sınırlı │
│ Network roundtrip │ Conflict başına 2 │ Conflict başına 0 (ES │
│ │ (GET + PUT) │ kendi içinde retry eder)│
│ Validasyon │ Zengin (Java logic) │ Painless script │
│ Best for │ Karmaşık iş mantığı │ Basit increment/decrement│
└─────────────────────┴──────────────────────┴─────────────────────────┘6. Version Conflict Senaryoları
6.1 Senaryo 1: E-Commerce Stok Yönetimi
public class StockManager {
private final CASUpdater updater;
public StockManager(ElasticsearchClient client) {
this.updater = new CASUpdater(client);
}
public boolean reserveStock(String productId, int quantity) {
try {
updater.updateWithCAS("products", productId, Product.class,
product -> {
if (product.getStock() < quantity) {
throw new InsufficientStockException(
"Need " + quantity + " but only " + product.getStock() + " available"
);
}
product.setStock(product.getStock() - quantity);
product.setLastUpdated(Instant.now().toString());
return product;
}
);
return true;
} catch (InsufficientStockException e) {
System.err.println("Stock insufficient: " + e.getMessage());
return false;
} catch (Exception e) {
System.err.println("Stock reservation failed: " + e.getMessage());
return false;
}
}
}6.2 Senaryo 2: Counter Güncelleme (Script ile)
Basit sayaç artırma için script + retry_on_conflict daha verimlidir:
// Sayfa görüntülenme sayısını artır
POST /pages/_update/homepage?retry_on_conflict=5
{
"script": {
"source": "ctx._source.view_count += 1"
},
"upsert": {
"view_count": 1,
"page": "homepage"
}
}upsert parametresi: Doküman yoksa bu değerle oluşturulur. Varsa script çalışır.
6.3 Senaryo 3: Distributed Lock Pattern
Elasticsearch ile basit bir distributed lock:
// Lock almak — create (sadece yoksa oluştur)
PUT /locks/_doc/resource-123?op_type=create
{
"locked_by": "worker-1",
"locked_at": "2024-02-27T10:30:00Z",
"ttl": "30s"
}
// Lock varsa → 409 Conflict (başkası almış)
// Lock yoksa → 201 Created (lock alındı)public class DistributedLock {
private final ElasticsearchClient client;
public DistributedLock(ElasticsearchClient client) {
this.client = client;
}
public boolean tryAcquire(String resourceId, String workerId,
Duration ttl) throws IOException {
try {
Map<String, Object> lockDoc = Map.of(
"locked_by", workerId,
"locked_at", Instant.now().toString(),
"expires_at", Instant.now().plus(ttl).toString()
);
client.index(i -> i
.index("locks")
.id(resourceId)
.document(lockDoc)
.opType(OpType.Create) // Sadece yoksa oluştur
);
return true; // Lock alındı
} catch (ElasticsearchException e) {
if (e.status() == 409) {
return false; // Lock başkasında
}
throw e;
}
}
public void release(String resourceId, String workerId) throws IOException {
// Sadece kendi lock'unu sil
GetResponse<Map> getResponse = client.get(g -> g
.index("locks")
.id(resourceId),
Map.class
);
if (getResponse.found()) {
Map source = getResponse.source();
if (workerId.equals(source.get("locked_by"))) {
client.delete(d -> d
.index("locks")
.id(resourceId)
.ifSeqNo(getResponse.seqNo())
.ifPrimaryTerm(getResponse.primaryTerm())
);
}
}
}
}⚠️ Dikkat: Elasticsearch gerçek bir distributed lock servisi değildir. Bu pattern basit durumlar için işe yarar ama Redis, ZooKeeper veya etcd gibi özel lock servislerinin sağladığı garantileri (fencing token, lease expiry) sağlamaz. Kritik iş mantığı için özel lock servisleri kullanın.
7. _version vs _seq_no Karşılaştırması
┌────────────────────┬──────────────────────┬────────────────────────────┐
│ │ _version (eski) │ _seq_no + _primary_term │
├────────────────────┼──────────────────────┼────────────────────────────┤
│ Scope │ Doküman bazlı │ Shard bazlı │
│ Artış │ Her doküman update'de│ Shard'daki her write'da │
│ Failover güvenliği │ Zayıf │ Güçlü (primary_term) │
│ GC sonrası davranış│ Sorunlu │ Sorunsuz │
│ Parametreler │ version,version_type │ if_seq_no,if_primary_term │
│ Durumu │ Deprecated │ Aktif, önerilen ✅ │
│ İç kullanım │ Lucene _version │ Lucene local checkpoint │
└────────────────────┴──────────────────────┴────────────────────────────┘_seq_no'nun avantajları:
Failover safety: Primary shard değiştiğinde
_primary_termartar. Eski primary'den gelen geç kalmış işlemler reddedilirNo tombstone problem:
_versionsilinen dokümanların versiyon bilgisini tutmak zorundaydı (tombstone)._seq_noshard seviyesinde olduğu için bu sorun yokReplication tracking: Replica'lar hangi seq_no'ya kadar senkronize olduğunu takip edebilir
8. Global ve Local Checkpoint
8.1 Checkpoint Kavramı
Elasticsearch, replikasyon senkronizasyonu için checkpoint sistemi kullanır:
Primary Shard (seq_no: 0, 1, 2, 3, 4, 5, 6, 7)
local_checkpoint: 7 (kendi yazdığı son seq_no)
global_checkpoint: 5 (tüm replica'ların onayladığı son seq_no)
Replica 1:
local_checkpoint: 5 (aldığı son seq_no)
Replica 2:
local_checkpoint: 7 (aldığı son seq_no)
Global checkpoint = min(tüm local checkpoints) = 5Global checkpoint, tüm kopyaların senkronize olduğu son seq_no'dur. Bu bilgi:
Translog temizleme kararları için kullanılır
Recovery sırasında hangi işlemlerin tekrar gönderilmesi gerektiğini belirler
// Checkpoint bilgilerini görmek
GET /products/_stats?level=shards
// Yanıtta:
{
"shards": {
"0": [{
"routing": { "primary": true },
"seq_no": {
"max_seq_no": 1500,
"local_checkpoint": 1500,
"global_checkpoint": 1498
}
}]
}
}9. Java'da Concurrency Handling
9.1 Complete Concurrency-Safe Service
public class ProductService {
private final ElasticsearchClient client;
private static final int MAX_RETRIES = 5;
private static final String INDEX = "products";
public ProductService(ElasticsearchClient client) {
this.client = client;
}
/**
* Script-based atomic update (basit işlemler için)
*/
public void decrementStock(String productId, int quantity) throws IOException {
UpdateResponse<Product> response = client.update(u -> u
.index(INDEX)
.id(productId)
.retryOnConflict(MAX_RETRIES)
.script(s -> s
.inline(i -> i
.source("""
if (ctx._source.stock >= params.qty) {
ctx._source.stock -= params.qty;
ctx._source.last_updated = params.now;
} else {
ctx.op = 'noop';
}
""")
.params("qty", JsonData.of(quantity))
.params("now", JsonData.of(Instant.now().toString()))
)
),
Product.class
);
if ("noop".equals(response.result().jsonValue())) {
throw new InsufficientStockException("Not enough stock for product: " + productId);
}
}
/**
* CAS-based full update (karmaşık iş mantığı için)
*/
public Product updateProduct(String productId,
Function<Product, Product> updateLogic) throws IOException {
for (int attempt = 0; attempt < MAX_RETRIES; attempt++) {
// Oku
GetResponse<Product> get = client.get(g -> g
.index(INDEX)
.id(productId),
Product.class
);
if (!get.found()) {
throw new ProductNotFoundException(productId);
}
// İş mantığını uygula
Product updated = updateLogic.apply(get.source());
// CAS ile yaz
try {
client.index(i -> i
.index(INDEX)
.id(productId)
.document(updated)
.ifSeqNo(get.seqNo())
.ifPrimaryTerm(get.primaryTerm())
);
return updated;
} catch (ElasticsearchException e) {
if (e.status() == 409) {
if (attempt < MAX_RETRIES - 1) {
sleepWithJitter(attempt);
continue;
}
}
throw e;
}
}
throw new ConcurrencyException("Failed after " + MAX_RETRIES + " retries");
}
/**
* Bulk update with concurrency control
*/
public void bulkUpdatePrices(Map<String, Double> priceUpdates) throws IOException {
// Tüm dokümanları oku (seq_no bilgisi için)
MgetResponse<Product> mget = client.mget(m -> m
.index(INDEX)
.ids(new ArrayList<>(priceUpdates.keySet())),
Product.class
);
// Bulk update oluştur
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
for (MultiGetResponseItem<Product> item : mget.docs()) {
if (item.isResult() && item.result().found()) {
var result = item.result();
String id = result.id();
Double newPrice = priceUpdates.get(id);
Product product = result.source();
product.setPrice(newPrice);
bulkBuilder.operations(op -> op
.index(idx -> idx
.index(INDEX)
.id(id)
.document(product)
.ifSeqNo(result.seqNo())
.ifPrimaryTerm(result.primaryTerm())
)
);
}
}
BulkResponse bulkResponse = client.bulk(bulkBuilder.build());
if (bulkResponse.errors()) {
// Conflict olan işlemleri logla
for (BulkResponseItem item : bulkResponse.items()) {
if (item.error() != null && item.status() == 409) {
System.err.printf("Conflict for doc %s: %s%n",
item.id(), item.error().reason());
}
}
}
}
private void sleepWithJitter(int attempt) {
try {
long baseMs = 50L * (long) Math.pow(2, attempt);
long jitter = (long) (Math.random() * baseMs);
Thread.sleep(baseMs + jitter);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}9.2 Thread-Safe Counter Service
public class CounterService {
private final ElasticsearchClient client;
public CounterService(ElasticsearchClient client) {
this.client = client;
}
/**
* Atomik sayaç artırma — upsert ile
*/
public long increment(String counterName, long amount) throws IOException {
UpdateResponse<Map> response = client.update(u -> u
.index("counters")
.id(counterName)
.retryOnConflict(10)
.script(s -> s
.inline(i -> i
.source("ctx._source.value += params.amount")
.params("amount", JsonData.of(amount))
)
)
.upsert(Map.of("value", amount, "name", counterName)),
Map.class
);
return response.version();
}
/**
* Atomik sayaç okuma
*/
public long get(String counterName) throws IOException {
GetResponse<Map> response = client.get(g -> g
.index("counters")
.id(counterName),
Map.class
);
if (!response.found()) return 0;
return ((Number) response.source().get("value")).longValue();
}
}10. Yaygın Hatalar
Hata 1: Concurrency control olmadan güncelleme
// ❌ YANLIŞ — Lost update riski!
Product product = client.get(g -> g.index("products").id("1"), Product.class).source();
product.setStock(product.getStock() - 1);
client.index(i -> i.index("products").id("1").document(product));
// ✅ DOĞRU — CAS ile güncelleme
GetResponse<Product> get = client.get(g -> g.index("products").id("1"), Product.class);
client.index(i -> i
.index("products").id("1")
.document(updatedProduct)
.ifSeqNo(get.seqNo())
.ifPrimaryTerm(get.primaryTerm())
);Hata 2: Eski _version parametresini kullanmak
// ❌ Deprecated
PUT /products/_doc/1?version=5
{ "stock": 9 }
// ✅ Modern yöntem
PUT /products/_doc/1?if_seq_no=42&if_primary_term=1
{ "stock": 9 }Hata 3: retry_on_conflict olmadan script update
// ❌ Yüksek concurrent yazmalarda conflict alırsınız
POST /products/_update/1
{ "script": { "source": "ctx._source.stock -= 1" } }
// ✅ retry_on_conflict ekleyin
POST /products/_update/1?retry_on_conflict=5
{ "script": { "source": "ctx._source.stock -= 1" } }11. Özet
Lost update, dağıtık sistemlerde en yaygın concurrency sorunudur. Elasticsearch optimistic concurrency control ile bunu çözer
Eski
_versionsistemi deprecated'dır. Modern kodlarda `_seq_no` + `_primary_term` kullanınif_seq_noveif_primary_termparametreleri CAS (Compare and Swap) mekanizması sağlar: "sadece beklenen değer ise güncelle"Basit increment/decrement işlemleri için script + `retry_on_conflict` en verimli yöntemdir — client'a roundtrip gerekmez
Karmaşık iş mantığı gerektiren güncellemeler için CAS pattern kullanın: oku → iş mantığını uygula → CAS ile yaz → conflict varsa retry
_primary_term, primary shard failover'ı sonrası eski primary'den gelen geç işlemleri reddeder — veri tutarlılığını garantilerGlobal checkpoint, tüm replica'ların senkronize olduğu son seq_no'dur ve replikasyon güvenliğini sağlar
AI Asistan
Sorularını yanıtlamaya hazır