← Kursa Dön
📄 Text · 40 min

Bulk API Derinlemesine — Batch İşlemler

Giriş — Kamyon vs Bisiklet

Bir depodan mağazaya 1000 koli taşımanız gerekiyor. Her koliyi bisikletle tek tek taşısanız 1000 sefer yaparsınız. Ama bir kamyona 100'er koli yükleyip 10 seferde taşısanız, hem zamandan hem yakıttan tasarruf edersiniz. Yalnız kamyonu da tıka basa doldurmak istemezsiniz — devrilir.

Elasticsearch'te tek tek doküman indexlemek o bisiklete benzer. Her işlem için ayrı HTTP bağlantısı, ayrı routing hesabı, ayrı translog yazımı. Bulk API ise kamyondur: birden fazla işlemi tek bir HTTP isteğinde paketler. Ama yüklemeyi de doğru yapmanız gerekir — çok küçük batch performans artışı sağlamaz, çok büyük batch ise bellek sorunlarına yol açar.

Bu ders, Bulk API'nin ndjson formatından optimal batch boyutuna, partial failure yönetiminden retry stratejilerine, Java BulkProcessor pattern'ından performans benchmark'larına kadar her detayı derinlemesine ele alacak.


1. NDJSON Formatı — Bulk API'nin Dili

1.1 NDJSON Nedir?

Bulk API standart JSON değil, NDJSON (Newline Delimited JSON) formatı kullanır. Her satır bağımsız bir JSON objesidir ve satırlar \n (newline) ile ayrılır.

{action_and_meta_data}\n
{optional_source}\n
{action_and_meta_data}\n
{optional_source}\n
...

Neden standart JSON değil?

Standart JSON array ([{...}, {...}]) kullanılsaydı, Elasticsearch tüm request body'sini belleğe alıp parse etmek zorunda kalırdı. NDJSON formatında ise her satır bağımsız olarak parse edilir — bu streaming parse imkânı sağlar ve bellek tüketimini dramatik şekilde azaltır.

1.2 Action Tipleri

Bulk API dört action destekler:

POST /_bulk
// 1. index — Doküman oluştur veya varsa üzerine yaz
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop", "price": 999}

// 2. create — Sadece oluştur, varsa hata ver
{"create": {"_index": "products", "_id": "2"}}
{"title": "Phone", "price": 599}

// 3. update — Kısmi güncelleme
{"update": {"_index": "products", "_id": "1"}}
{"doc": {"price": 899}}

// 4. delete — Sil (source satırı YOK)
{"delete": {"_index": "products", "_id": "3"}}

Kritik kurallar:

  • Her action satırı tek satırda olmalı — JSON'u pretty-print yapmayın!

  • delete hariç her action'dan sonra bir source satırı gelir

  • Son satırdan sonra da \n olmalı

  • Aynı bulk request içinde farklı index'lere yazabilirsiniz

  • Aynı request'te index, update, delete karışık olabilir

1.3 Doğru ve Yanlış Format

// ✅ DOĞRU — Her satır tek satır, sonunda newline
{"index":{"_index":"products","_id":"1"}}
{"title":"Laptop","price":999}
{"index":{"_index":"products","_id":"2"}}
{"title":"Phone","price":599}
// ❌ YANLIŞ — Pretty-printed JSON
{
  "index": {
    "_index": "products",
    "_id": "1"
  }
}
{
  "title": "Laptop",
  "price": 999
}
// ❌ YANLIŞ — Son satırdan sonra newline yok
{"index":{"_index":"products","_id":"1"}}
{"title":"Laptop","price":999}⬚  // Newline eksik!

1.4 Content-Type

Bulk API'yi çağırırken Content-Type header'ı önemlidir:

# Doğru Content-Type
curl -X POST "localhost:9200/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @bulk-data.ndjson

# application/json da çalışır ama application/x-ndjson daha doğru

⚠️ Dikkat: --data-binary kullanın, -d değil! -d newline karakterlerini strip eder ve formatı bozar.


2. Endpoint Seçenekleri

2.1 Global vs Index-Specific Endpoint

// Global endpoint — her action'da _index belirtmek zorunlu
POST /_bulk
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop"}
{"index": {"_index": "orders", "_id": "1"}}
{"order_id": "ORD-001"}

// Index-specific endpoint — _index belirtmeye gerek yok
POST /products/_bulk
{"index": {"_id": "1"}}
{"title": "Laptop"}
{"index": {"_id": "2"}}
{"title": "Phone"}

Index-specific endpoint kullandığınızda, action satırlarında _index belirtmezseniz URL'deki index kullanılır. Belirtirseniz o değer override eder.

2.2 Routing ve Pipeline

// Custom routing ile bulk
POST /_bulk?routing=user_123
{"index": {"_index": "events"}}
{"event": "login"}

// Ingest pipeline ile bulk
POST /_bulk?pipeline=my-pipeline
{"index": {"_index": "logs"}}
{"message": "error occurred"}

// Action seviyesinde routing
POST /_bulk
{"index": {"_index": "events", "_id": "1", "routing": "user_123"}}
{"event": "login"}
{"index": {"_index": "events", "_id": "2", "routing": "user_456"}}
{"event": "logout"}

2.3 Refresh Parametresi

// Tüm işlemler bittikten sonra refresh
POST /_bulk?refresh=true
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop"}

// Sonraki doğal refresh'i bekle
POST /_bulk?refresh=wait_for
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop"}

// Refresh yapma (varsayılan)
POST /_bulk?refresh=false
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop"}

💡 İpucu: Büyük bulk import'larda refresh=false kullanın (zaten varsayılan). İmport bittikten sonra tek bir POST /products/_refresh çağrısı yapın.


3. Optimal Batch Size — En Kritik Karar

3.1 Neden Boyut Önemli?

Batch boyutu Bulk API performansının en kritik parametresidir. Çok küçük batch'ler HTTP overhead'i nedeniyle yavaş, çok büyük batch'ler ise bellek sorunlarına yol açar.

Batch Boyutu vs Throughput:

Throughput
    ▲
    │        ┌──────────────────
    │       ╱
    │      ╱
    │     ╱
    │    ╱
    │   ╱
    │  ╱
    │ ╱
    │╱
    └──────────────────────────► Batch Size
    100KB  1MB  5MB  15MB  50MB  100MB+
                          ↑
                   Tatlı nokta: 5-15MB

3.2 Genel Kurallar

Elasticsearch'ün resmi önerisi: 5-15 MB arası batch boyutu.

Ancak bu sadece başlangıç noktasıdır. Optimal boyut şunlara bağlıdır:

  • Doküman boyutu (küçük doküman = daha fazla doküman/batch)

  • Cluster donanımı (RAM, CPU, disk)

  • Mapping karmaşıklığı (çok sayıda field = daha fazla işlem)

  • Ingest pipeline varlığı (ek CPU yükü)

Boyut hesaplama rehberi:

Doküman boyutu: ~1 KB → batch: ~5000-10000 doküman (~5-10 MB)
Doküman boyutu: ~5 KB → batch: ~1000-2000 doküman (~5-10 MB)
Doküman boyutu: ~50 KB → batch: ~100-200 doküman (~5-10 MB)
Doküman boyutu: ~500 KB → batch: ~10-20 doküman (~5-10 MB)

3.3 Benchmark ile Optimal Boyut Bulma

Gerçek ortamınızda optimal batch boyutunu bulmak için benchmark yapmanız gerekir:

public class BulkBenchmark {
    private final ElasticsearchClient client;

    public BulkBenchmark(ElasticsearchClient client) {
        this.client = client;
    }

    public void runBenchmark(List<Map<String, Object>> allDocuments) throws IOException {
        int[] batchSizes = {100, 500, 1000, 2000, 5000, 10000};

        for (int batchSize : batchSizes) {
            long startTime = System.currentTimeMillis();
            int totalDocs = 0;

            for (int i = 0; i < allDocuments.size(); i += batchSize) {
                int end = Math.min(i + batchSize, allDocuments.size());
                List<Map<String, Object>> batch = allDocuments.subList(i, end);

                BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
                for (Map<String, Object> doc : batch) {
                    bulkBuilder.operations(op -> op
                        .index(idx -> idx
                            .index("benchmark-index")
                            .document(doc)
                        )
                    );
                }

                BulkResponse response = client.bulk(bulkBuilder.build());
                if (response.errors()) {
                    System.err.println("Batch had errors!");
                }
                totalDocs += batch.size();
            }

            long elapsed = System.currentTimeMillis() - startTime;
            double docsPerSecond = (double) totalDocs / elapsed * 1000;
            System.out.printf("Batch size: %5d | Time: %6d ms | Throughput: %.0f docs/s%n",
                batchSize, elapsed, docsPerSecond);
        }
    }
}

Örnek benchmark sonuçları:

Batch size:   100 | Time:  45000 ms | Throughput:  22222 docs/s
Batch size:   500 | Time:  15000 ms | Throughput:  66667 docs/s
Batch size:  1000 | Time:   9000 ms | Throughput: 111111 docs/s
Batch size:  2000 | Time:   7000 ms | Throughput: 142857 docs/s
Batch size:  5000 | Time:   6500 ms | Throughput: 153846 docs/s  ← Tatlı nokta
Batch size: 10000 | Time:   7200 ms | Throughput: 138889 docs/s  ← Azalmaya başladı

💡 İpucu: Benchmark'ı kendi ortamınızda yapın. Cloud, on-premise, SSD, HDD — her ortam farklı sonuç verir. "5-15 MB" genel önerisi iyi bir başlangıçtır ama production'a girmeden önce kendi verilerinizle test edin.


4. Hata Yönetimi — Partial Failure

4.1 Bulk API'de Hata Modeli

Bulk API'nin en önemli özelliklerinden biri: bir request içindeki bazı işlemler başarısız olabilirken diğerleri başarılı olabilir (partial failure). Tüm request ya başarılı ya başarısız olmaz — her işlem bağımsızdır.

POST /_bulk
{"index": {"_index": "products", "_id": "1"}}
{"title": "Laptop", "price": 999}
{"index": {"_index": "products", "_id": "2"}}
{"title": "Phone", "price": "not_a_number"}
{"index": {"_index": "products", "_id": "3"}}
{"title": "Tablet", "price": 499}

Yanıt:

{
  "took": 30,
  "errors": true,
  "items": [
    {
      "index": {
        "_index": "products",
        "_id": "1",
        "status": 201,
        "result": "created"
      }
    },
    {
      "index": {
        "_index": "products",
        "_id": "2",
        "status": 400,
        "error": {
          "type": "mapper_parsing_exception",
          "reason": "failed to parse field [price] of type [float]"
        }
      }
    },
    {
      "index": {
        "_index": "products",
        "_id": "3",
        "status": 201,
        "result": "created"
      }
    }
  ]
}

Kritik: "errors": true ise en az bir işlem başarısız olmuştur. Ama diğerleri başarılı olmuştur — tekrar göndermeyin!

4.2 Hata Kontrolü ve Retry

public class BulkErrorHandler {

    public void processBulkResponse(BulkResponse response,
                                     List<BulkOperation> originalOperations) {
        if (!response.errors()) {
            System.out.println("All operations successful");
            return;
        }

        // Başarısız işlemleri topla
        List<BulkOperation> failedOperations = new ArrayList<>();
        List<BulkResponseItem> failedItems = new ArrayList<>();

        List<BulkResponseItem> items = response.items();
        for (int i = 0; i < items.size(); i++) {
            BulkResponseItem item = items.get(i);

            if (item.error() != null) {
                int status = item.status();

                // Retry edilebilir hatalar
                if (status == 429 || status >= 500) {
                    failedOperations.add(originalOperations.get(i));
                    failedItems.add(item);
                    System.err.printf("Retryable error [%d] for doc %s: %s%n",
                        status, item.id(), item.error().reason());
                }
                // Retry edilemez hatalar (mapping hatası, validation)
                else if (status == 400) {
                    System.err.printf("Non-retryable error for doc %s: %s%n",
                        item.id(), item.error().reason());
                    // Dead letter queue'ya yaz veya logla
                }
            }
        }

        System.out.printf("%d/%d operations failed, %d retryable%n",
            failedItems.size(),
            items.size(),
            failedOperations.size());
    }
}

4.3 Hangi Hatalar Retry Edilmeli?

┌──────────┬──────────────────────────────────────┬────────────┐
│ Status   │ Hata Türü                            │ Retry?     │
├──────────┼──────────────────────────────────────┼────────────┤
│ 200/201  │ Başarılı                             │ Hayır ✅    │
│ 400      │ Mapping hatası, validation           │ Hayır ❌    │
│ 404      │ Index bulunamadı                     │ Duruma göre│
│ 409      │ Version conflict                     │ Duruma göre│
│ 429      │ Too many requests (throttling)       │ Evet ✅     │
│ 500      │ Internal server error                │ Evet ✅     │
│ 503      │ Service unavailable                  │ Evet ✅     │
└──────────┴──────────────────────────────────────┴────────────┘

5. Retry Stratejisi — Exponential Backoff

5.1 Temel Retry Pattern

public class BulkRetryHandler {
    private final ElasticsearchClient client;
    private static final int MAX_RETRIES = 3;
    private static final long INITIAL_BACKOFF_MS = 1000;

    public BulkRetryHandler(ElasticsearchClient client) {
        this.client = client;
    }

    public BulkResponse executeWithRetry(BulkRequest request) throws IOException {
        List<BulkOperation> pendingOps = request.operations();
        BulkResponse lastResponse = null;

        for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) {
            if (attempt > 0) {
                long backoff = INITIAL_BACKOFF_MS * (long) Math.pow(2, attempt - 1);
                // Jitter ekle — tüm client'ların aynı anda retry etmesini önle
                backoff += (long) (Math.random() * backoff * 0.3);
                System.out.printf("Retry attempt %d, waiting %d ms%n", attempt, backoff);
                try {
                    Thread.sleep(backoff);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted during retry", e);
                }
            }

            BulkRequest retryRequest = BulkRequest.of(b -> b.operations(pendingOps));
            lastResponse = client.bulk(retryRequest);

            if (!lastResponse.errors()) {
                return lastResponse;
            }

            // Sadece retry edilebilir hataları topla
            List<BulkOperation> retryableOps = new ArrayList<>();
            List<BulkResponseItem> items = lastResponse.items();
            for (int i = 0; i < items.size(); i++) {
                BulkResponseItem item = items.get(i);
                if (item.error() != null) {
                    int status = item.status();
                    if (status == 429 || status >= 500) {
                        retryableOps.add(pendingOps.get(i));
                    }
                }
            }

            if (retryableOps.isEmpty()) {
                return lastResponse;
            }

            pendingOps = retryableOps;
            System.out.printf("%d retryable operations remaining%n", pendingOps.size());
        }

        return lastResponse;
    }
}

5.2 Exponential Backoff Açıklaması

Attempt 0: Hemen gönder
Attempt 1: 1 saniye bekle    (1000 ms + jitter)
Attempt 2: 2 saniye bekle    (2000 ms + jitter)
Attempt 3: 4 saniye bekle    (4000 ms + jitter)

Jitter: Rastgele ek süre (%0-30)
Neden: Birden fazla client aynı anda retry ederse "thundering herd"
       problemi oluşur. Jitter bunu dağıtır.

6. Thread Pool ve İç Yapı

6.1 Elasticsearch'ün Bulk Thread Pool'u

Elasticsearch, bulk işlemleri için ayrı bir thread pool kullanır:

GET _nodes/thread_pool

// İlgili thread pool'lar:
{
  "nodes": {
    "node-1": {
      "thread_pool": {
        "write": {
          "type": "fixed",
          "size": 16,
          "queue_size": 10000
        },
        "search": {
          "type": "fixed_auto_queue_size",
          "size": 25,
          "queue_size": 1000
        }
      }
    }
  }
}
  • write thread pool: Bulk index, update, delete işlemleri bu pool'da çalışır

  • size: Eşzamanlı thread sayısı (varsayılan: CPU çekirdek sayısı)

  • queue_size: Kuyruk kapasitesi (varsayılan: 10000). Kuyruk dolduğunda 429 Too Many Requests döner

6.2 429 Too Many Requests

Queue dolduğunda 429 hatası alırsınız. Bu, cluster'ın mevcut yazma kapasitesinin doyduğu anlamına gelir:

{
  "error": {
    "type": "es_rejected_execution_exception",
    "reason": "rejected execution of coordinating operation"
  },
  "status": 429
}

Çözümler:

  1. Client tarafında retry with backoff uygula

  2. Batch boyutunu küçült

  3. Eşzamanlı bulk request sayısını azalt

  4. Cluster'a daha fazla data node ekle

// Thread pool istatistiklerini kontrol et
GET _nodes/stats/thread_pool

// write pool'unda rejected sayısı
{
  "nodes": {
    "node-1": {
      "thread_pool": {
        "write": {
          "threads": 16,
          "queue": 150,
          "active": 16,
          "rejected": 5000,
          "completed": 1500000
        }
      }
    }
  }
}
// rejected > 0 ise yazma baskısı var demektir

7. Refresh Politikası

7.1 Bulk Import Sırasında Refresh

Büyük miktarda veri yüklerken refresh politikası kritik önemdedir:

// 1. Import öncesi: Refresh'i kapat
PUT /products/_settings
{
  "index": { "refresh_interval": "-1" }
}

// 2. Replica'ları kapat (import hızı için)
PUT /products/_settings
{
  "index": { "number_of_replicas": 0 }
}

// 3. Bulk import yap
POST /products/_bulk
{"index": {"_id": "1"}}
{"title": "Product 1"}
// ... binlerce doküman

// 4. Import bitti — refresh'i geri aç
PUT /products/_settings
{
  "index": {
    "refresh_interval": "1s",
    "number_of_replicas": 1
  }
}

// 5. Manuel refresh + force merge
POST /products/_refresh
POST /products/_forcemerge?max_num_segments=5

7.2 Ne Kadar Fark Yapar?

Senaryo: 1 milyon doküman bulk import

refresh_interval: 1s  (varsayılan)
  → Süre: 180 saniye
  → Segment sayısı: ~180
  → Import sırasında merge pressure yüksek

refresh_interval: -1  (kapalı)
  → Süre: 60 saniye  (~3x hızlanma!)
  → Segment sayısı: Bulk buffer boyutuna bağlı
  → Import sırasında merge pressure düşük

8. Java BulkProcessor Pattern

8.1 Manuel Batch Yönetimi

public class ManualBulkIndexer {
    private final ElasticsearchClient client;
    private final int batchSize;
    private final List<BulkOperation> buffer;

    public ManualBulkIndexer(ElasticsearchClient client, int batchSize) {
        this.client = client;
        this.batchSize = batchSize;
        this.buffer = new ArrayList<>();
    }

    public void add(String index, String id, Object document) throws IOException {
        buffer.add(BulkOperation.of(op -> op
            .index(idx -> idx
                .index(index)
                .id(id)
                .document(document)
            )
        ));

        if (buffer.size() >= batchSize) {
            flush();
        }
    }

    public void flush() throws IOException {
        if (buffer.isEmpty()) return;

        List<BulkOperation> ops = new ArrayList<>(buffer);
        buffer.clear();

        BulkRequest request = BulkRequest.of(b -> b.operations(ops));
        BulkResponse response = client.bulk(request);

        if (response.errors()) {
            handleErrors(response, ops);
        }

        System.out.printf("Flushed %d operations in %d ms%n",
            ops.size(), response.took());
    }

    private void handleErrors(BulkResponse response, List<BulkOperation> ops) {
        for (int i = 0; i < response.items().size(); i++) {
            BulkResponseItem item = response.items().get(i);
            if (item.error() != null) {
                System.err.printf("Error for operation %d: %s%n",
                    i, item.error().reason());
            }
        }
    }

    // Try-with-resources için AutoCloseable
    public void close() throws IOException {
        flush();
    }
}

Kullanım:

ManualBulkIndexer indexer = new ManualBulkIndexer(client, 1000);
try {
    for (Product product : products) {
        indexer.add("products", product.getId(), product);
    }
} finally {
    indexer.close(); // Kalan dokümanları flush et
}

8.2 BulkIngester (Elasticsearch Java Client 8.x)

Elasticsearch Java Client 8.x, built-in bir BulkIngester sağlar:

public class BulkIngesterExample {

    public void bulkIngest(List<Product> products) throws Exception {
        // Transport oluştur
        RestClient restClient = RestClient.builder(
            new HttpHost("localhost", 9200)
        ).build();
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper()
        );
        ElasticsearchClient client = new ElasticsearchClient(transport);

        // BulkIngester oluştur
        BulkIngester<Product> ingester = BulkIngester.of(b -> b
            .client(client)
            .maxOperations(1000)           // Batch size (doküman sayısı)
            .maxSize(5 * 1024 * 1024)      // Max batch size (5MB)
            .flushInterval(5, TimeUnit.SECONDS) // Max bekleme süresi
            .maxConcurrentRequests(2)       // Eşzamanlı request sayısı
            .listener(new BulkListener<Product>() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request,
                                       List<Product> contexts) {
                    System.out.printf("Executing bulk #%d with %d operations%n",
                        executionId, request.operations().size());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      List<Product> contexts, BulkResponse response) {
                    if (response.errors()) {
                        System.err.printf("Bulk #%d had errors%n", executionId);
                    } else {
                        System.out.printf("Bulk #%d completed in %d ms%n",
                            executionId, response.took());
                    }
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      List<Product> contexts, Throwable failure) {
                    System.err.printf("Bulk #%d failed: %s%n",
                        executionId, failure.getMessage());
                }
            })
        );

        // Dokümanları ekle — otomatik batch'lenir
        for (Product product : products) {
            ingester.add(op -> op
                .index(idx -> idx
                    .index("products")
                    .id(product.getId())
                    .document(product)
                ),
                product  // Context (listener'da kullanılabilir)
            );
        }

        // Kalan dokümanları flush et ve kapat
        ingester.close();

        restClient.close();
    }
}

BulkIngester avantajları:

  • Otomatik batching (boyut ve sayı bazlı)

  • Flush interval (zaman bazlı)

  • Concurrent request desteği

  • Listener pattern ile monitoring

  • Thread-safe


9. Performans Optimizasyon Teknikleri

9.1 Checklist

┌──────────────────────────────────────┬─────────────┬─────────────────────────────┐
│ Optimizasyon                         │ Etki        │ Not                          │
├──────────────────────────────────────┼─────────────┼─────────────────────────────┤
│ refresh_interval: -1                 │ 2-3x hız    │ Import sonrası geri aç       │
│ number_of_replicas: 0                │ 1.5-2x hız  │ Import sonrası geri aç       │
│ Optimal batch size (5-15MB)          │ 2-5x hız    │ Benchmark ile bul            │
│ Translog async                       │ 1.2-1.5x    │ Veri kaybı riski!            │
│ Mapping optimization                 │ Değişken    │ Gereksiz field'ları index'leme│
│ _source disabled                     │ 1.1-1.3x    │ Reindex yapılamaz!           │
│ Pipeline'ı azalt                     │ Değişken    │ Gereksiz processor'ları kaldır│
│ Client-side parallelism              │ 2-4x hız    │ Thread sayısı: CPU * 2       │
│ Doğru shard sayısı                   │ Değişken    │ Çok fazla shard = overhead    │
│ SSD kullanımı                        │ 2-5x hız    │ HDD vs SSD farkı büyük       │
└──────────────────────────────────────┴─────────────┴─────────────────────────────┘

9.2 Paralel Bulk İşlemleri

public class ParallelBulkIndexer {
    private final ElasticsearchClient client;
    private final ExecutorService executor;

    public ParallelBulkIndexer(ElasticsearchClient client, int threadCount) {
        this.client = client;
        this.executor = Executors.newFixedThreadPool(threadCount);
    }

    public void parallelIndex(List<Map<String, Object>> documents,
                               int batchSize) throws Exception {
        // Dokümanları batch'lere böl
        List<List<Map<String, Object>>> batches = new ArrayList<>();
        for (int i = 0; i < documents.size(); i += batchSize) {
            batches.add(documents.subList(i,
                Math.min(i + batchSize, documents.size())));
        }

        // Paralel gönder
        List<Future<BulkResponse>> futures = new ArrayList<>();
        for (List<Map<String, Object>> batch : batches) {
            Future<BulkResponse> future = executor.submit(() -> {
                BulkRequest.Builder builder = new BulkRequest.Builder();
                for (Map<String, Object> doc : batch) {
                    builder.operations(op -> op
                        .index(idx -> idx
                            .index("products")
                            .document(doc)
                        )
                    );
                }
                return client.bulk(builder.build());
            });
            futures.add(future);
        }

        // Sonuçları topla
        int totalSuccess = 0;
        int totalFailed = 0;
        for (Future<BulkResponse> future : futures) {
            BulkResponse response = future.get();
            for (BulkResponseItem item : response.items()) {
                if (item.error() != null) totalFailed++;
                else totalSuccess++;
            }
        }

        System.out.printf("Completed: %d success, %d failed%n",
            totalSuccess, totalFailed);
    }

    public void shutdown() {
        executor.shutdown();
    }
}

⚠️ Dikkat: Paralel bulk gönderirken dikkatli olun. Çok fazla eşzamanlı request, write queue'yu doldurabilir ve 429 hatalarına yol açar. Genel kural: eşzamanlı bulk request sayısı = data node sayısı * 2 veya daha az.


10. Gerçek Dünya: Büyük Veri Import Pipeline

10.1 Tam Bir Import Senaryosu

10 milyon dokümanı CSV dosyasından Elasticsearch'e aktarma:

public class LargeDataImporter {
    private final ElasticsearchClient client;

    public LargeDataImporter(ElasticsearchClient client) {
        this.client = client;
    }

    public void importFromCSV(String csvPath) throws Exception {
        String indexName = "large-dataset";

        // 1. Index oluştur — optimize ayarlarla
        client.indices().create(c -> c
            .index(indexName)
            .settings(s -> s
                .numberOfShards("3")
                .numberOfReplicas("0")
                .refreshInterval(ri -> ri.time("-1"))
            )
            .mappings(m -> m
                .properties("title", p -> p.text(t -> t
                    .fields("keyword", f -> f.keyword(k -> k.ignoreAbove(256)))
                ))
                .properties("price", p -> p.scaledFloat(sf -> sf.scalingFactor(100.0)))
                .properties("category", p -> p.keyword(k -> k))
                .properties("created_at", p -> p.date(d -> d))
            )
        );

        // 2. Dokümanları oku ve bulk gönder
        long startTime = System.currentTimeMillis();
        int totalDocs = 0;
        int batchSize = 5000;
        List<BulkOperation> buffer = new ArrayList<>();

        try (BufferedReader reader = new BufferedReader(new FileReader(csvPath))) {
            String line;
            reader.readLine(); // Header'ı atla

            while ((line = reader.readLine()) != null) {
                String[] parts = line.split(",");
                Map<String, Object> doc = Map.of(
                    "title", parts[0],
                    "price", Double.parseDouble(parts[1]),
                    "category", parts[2],
                    "created_at", parts[3]
                );

                buffer.add(BulkOperation.of(op -> op
                    .index(idx -> idx
                        .index(indexName)
                        .document(doc)
                    )
                ));

                if (buffer.size() >= batchSize) {
                    List<BulkOperation> ops = new ArrayList<>(buffer);
                    buffer.clear();

                    BulkResponse response = client.bulk(b -> b.operations(ops));
                    totalDocs += ops.size();

                    if (totalDocs % 100000 == 0) {
                        System.out.printf("Indexed %d docs (%.1f docs/s)%n",
                            totalDocs,
                            (double) totalDocs / ((System.currentTimeMillis() - startTime) / 1000.0));
                    }
                }
            }

            // Kalan dokümanları flush et
            if (!buffer.isEmpty()) {
                client.bulk(b -> b.operations(buffer));
                totalDocs += buffer.size();
            }
        }

        // 3. Ayarları geri al
        client.indices().putSettings(s -> s
            .index(indexName)
            .settings(st -> st
                .refreshInterval(ri -> ri.time("1s"))
                .numberOfReplicas("1")
            )
        );

        // 4. Refresh ve force merge
        client.indices().refresh(r -> r.index(indexName));
        client.indices().forcemerge(f -> f.index(indexName).maxNumSegments(5L));

        long elapsed = System.currentTimeMillis() - startTime;
        System.out.printf("Import complete: %d docs in %.1f seconds (%.0f docs/s)%n",
            totalDocs, elapsed / 1000.0, (double) totalDocs / (elapsed / 1000.0));
    }
}

11. Yaygın Hatalar

Hata 1: Tüm batch'i retry etmek

// ❌ YANLIŞ — Başarılı dokümanlar tekrar yazılır (duplikasyon veya gereksiz yük)
if (response.errors()) {
    client.bulk(request); // Tüm batch'i tekrar gönder!
}

// ✅ DOĞRU — Sadece başarısız olanları retry et
if (response.errors()) {
    List<BulkOperation> failedOps = getRetryableOperations(response, originalOps);
    client.bulk(BulkRequest.of(b -> b.operations(failedOps)));
}

Hata 2: NDJSON formatını bozmak

# ❌ YANLIŞ — -d newline'ları siler
curl -d @data.ndjson ...

# ✅ DOĞRU — --data-binary formatı korur
curl --data-binary @data.ndjson ...

Hata 3: Çok büyük batch

// ❌ 100MB'lık batch → OutOfMemoryError veya circuit breaker
int batchSize = 1000000; // Çok büyük!

// ✅ 5-15MB arası
int batchSize = 5000; // ~1KB/doc → ~5MB toplam

12. Özet

  • Bulk API, NDJSON (Newline Delimited JSON) formatı kullanır — her satır bağımsız parse edilir, streaming imkânı sağlar

  • Optimal batch boyutu 5-15 MB arasıdır. Kendi ortamınızda benchmark yaparak tatlı noktayı bulun

  • Bulk API partial failure destekler — bazı işlemler başarısız olurken diğerleri başarılı olabilir. Sadece başarısız olanları retry edin, tüm batch'i değil

  • 429 Too Many Requests hatası, write queue'nun dolduğunu gösterir. Exponential backoff with jitter uygulayın

  • Import sırasında refresh_interval: -1 ve number_of_replicas: 0 ayarı 2-3x performans artışı sağlar. Import sonrası geri açmayı unutmayın

  • Java'da BulkIngester sınıfı otomatik batching, flush interval ve concurrent request desteği sağlar — kendi batch yönetiminizi yazmak yerine bunu kullanın

  • Paralel bulk gönderimde eşzamanlı request sayısını sınırlayın: data node sayısı × 2 iyi bir başlangıçtır