← Kursa Dön
📄 Text · 35 min

Java İleri Konular — Async, Scroll, Bulk Processor

Giriş — Senkron API Yetmediği Yer

Bölüm 8.1-8.4'te Elasticsearch Java Client'ın temellerini öğrendiniz: bağlantı, CRUD, arama, Spring Data. Ama gerçek dünyada işler daha karmaşıktır. 10 milyon dokümanı scroll etmeniz, binlerce dokümanı bulk ile index'lemeniz, asenkron sorgular atmanız, hata durumlarında retry yapmanız gerekir.

Bu ders, production ortamında Java ile Elasticsearch kullanırken ihtiyaç duyacağınız ileri konuları kapsar: async client, scroll/PIT iteration, BulkProcessor, custom serialization, hata yönetimi ve retry pattern'ları.


1. Maven Bağımlılıkları

Bu dersteki tüm örnekler için gereken bağımlılıklar:

<dependencies>
    <!-- Elasticsearch Java Client -->
    <dependency>
        <groupId>co.elastic.clients</groupId>
        <artifactId>elasticsearch-java</artifactId>
        <version>8.12.0</version>
    </dependency>

    <!-- JSON işleme -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.17.0</version>
    </dependency>

    <!-- Jakarta JSON Processing (zorunlu) -->
    <dependency>
        <groupId>jakarta.json</groupId>
        <artifactId>jakarta.json-api</artifactId>
        <version>2.1.3</version>
    </dependency>

    <!-- JSON-P implementasyonu -->
    <dependency>
        <groupId>org.eclipse.parsson</groupId>
        <artifactId>parsson</artifactId>
        <version>1.1.5</version>
    </dependency>
</dependencies>

2. ElasticsearchAsyncClient

2.1 Async Client Oluşturma

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http")
).build();

RestClientTransport transport = new RestClientTransport(
    restClient, new JacksonJsonpMapper()
);

// Senkron client
ElasticsearchClient syncClient = new ElasticsearchClient(transport);

// Asenkron client — aynı transport'u paylaşır
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);

Her iki client da aynı transport/connection pool'u kullanır. Async client tüm metotlarda CompletableFuture döner.

2.2 CompletableFuture Pattern

import java.util.concurrent.CompletableFuture;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import java.util.Map;

// Basit async arama
CompletableFuture<SearchResponse<Map>> future = asyncClient.search(s -> s
    .index("products")
    .query(q -> q
        .match(m -> m.field("title").query("Samsung"))
    )
    .size(10),
    Map.class
);

// Non-blocking: sonuç geldiğinde callback çalışır
future.thenAccept(response -> {
    System.out.println("Toplam: " + response.hits().total().value());
    response.hits().hits().forEach(hit ->
        System.out.println("  " + hit.source().get("title"))
    );
}).exceptionally(ex -> {
    System.err.println("Hata: " + ex.getMessage());
    return null;
});

// Veya blocking beklemek isterseniz:
SearchResponse<Map> response = future.get(); // throws Exception

2.3 Paralel Async Sorgular

Birden fazla sorguyu paralel çalıştırma:

import java.util.concurrent.CompletableFuture;

// 3 farklı sorguyu paralel at
CompletableFuture<SearchResponse<Map>> telefonlar = asyncClient.search(s -> s
    .index("products")
    .query(q -> q.term(t -> t.field("category").value("Telefon")))
    .size(5),
    Map.class
);

CompletableFuture<SearchResponse<Map>> tabletler = asyncClient.search(s -> s
    .index("products")
    .query(q -> q.term(t -> t.field("category").value("Tablet")))
    .size(5),
    Map.class
);

CompletableFuture<SearchResponse<Map>> kulakliklar = asyncClient.search(s -> s
    .index("products")
    .query(q -> q.term(t -> t.field("category").value("Kulaklık")))
    .size(5),
    Map.class
);

// Hepsi tamamlanana kadar bekle
CompletableFuture.allOf(telefonlar, tabletler, kulakliklar)
    .thenRun(() -> {
        try {
            System.out.println("Telefonlar: " + telefonlar.get().hits().total().value());
            System.out.println("Tabletler: " + tabletler.get().hits().total().value());
            System.out.println("Kulaklıklar: " + kulakliklar.get().hits().total().value());
        } catch (Exception e) {
            e.printStackTrace();
        }
    })
    .join(); // main thread'de bekle

3 sorgu paralel çalışır — sıralı yerine ~3x hız kazanırsınız (network latency'ye bağlı).

2.4 Timeout ile Async

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

try {
    SearchResponse<Map> response = asyncClient.search(s -> s
        .index("products")
        .query(q -> q.matchAll(m -> m))
        .size(100),
        Map.class
    ).get(5, TimeUnit.SECONDS); // 5 saniye timeout

    System.out.println("Sonuç: " + response.hits().total().value());
} catch (TimeoutException e) {
    System.err.println("Sorgu 5 saniyede tamamlanamadı!");
} catch (Exception e) {
    System.err.println("Hata: " + e.getMessage());
}

3. Scroll API ile Tam İterasyon

Scroll API, büyük veri setlerini sayfa sayfa iterate etmek için kullanılır. from/size ile 10.000'den fazla sonuç getirilemez — scroll bu sınırı aşar.

3.1 Scroll Pattern

import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch._types.Time;
import java.util.List;
import java.util.Map;

// Adım 1: İlk scroll isteği
SearchResponse<Map> searchResponse = syncClient.search(s -> s
    .index("large_index")
    .query(q -> q.matchAll(m -> m))
    .size(1000)  // Sayfa boyutu
    .scroll(Time.of(t -> t.time("2m"))),  // Scroll context süresi
    Map.class
);

String scrollId = searchResponse.scrollId();
List<Hit<Map>> hits = searchResponse.hits().hits();
long totalProcessed = 0;

System.out.println("Toplam doküman: " + searchResponse.hits().total().value());

// Adım 2: Scroll loop
while (!hits.isEmpty()) {
    // Bu sayfadaki dokümanları işle
    for (Hit<Map> hit : hits) {
        Map<String, Object> source = hit.source();
        // ... dokümanı işle ...
        totalProcessed++;
    }
    System.out.printf("İşlenen: %d doküman%n", totalProcessed);

    // Sonraki sayfayı al
    SearchResponse<Map> scrollResponse = syncClient.scroll(sc -> sc
        .scrollId(scrollId)
        .scroll(Time.of(t -> t.time("2m"))),
        Map.class
    );

    scrollId = scrollResponse.scrollId();
    hits = scrollResponse.hits().hits();
}

// Adım 3: Scroll context'i temizle (önemli!)
syncClient.clearScroll(c -> c.scrollId(scrollId));
System.out.println("Scroll tamamlandı! Toplam: " + totalProcessed);

⚠️ Dikkat: clearScroll çağrısını mutlaka yapın. Açık scroll context'ler bellek tüketir. try-finally bloğu kullanmak iyi bir pratiktir.

3.2 try-finally ile Güvenli Scroll

String scrollId = null;
try {
    SearchResponse<Map> response = syncClient.search(s -> s
        .index("large_index")
        .query(q -> q.matchAll(m -> m))
        .size(1000)
        .scroll(Time.of(t -> t.time("2m"))),
        Map.class
    );
    scrollId = response.scrollId();
    List<Hit<Map>> hits = response.hits().hits();

    while (!hits.isEmpty()) {
        for (Hit<Map> hit : hits) {
            // İşle...
        }

        String currentScrollId = scrollId;
        SearchResponse<Map> scrollResp = syncClient.scroll(sc -> sc
            .scrollId(currentScrollId)
            .scroll(Time.of(t -> t.time("2m"))),
            Map.class
        );
        scrollId = scrollResp.scrollId();
        hits = scrollResp.hits().hits();
    }
} finally {
    if (scrollId != null) {
        String finalScrollId = scrollId;
        syncClient.clearScroll(c -> c.scrollId(finalScrollId));
    }
}

4. PIT + search_after ile Sayfalama

Scroll API snapshot bazlıdır — scroll context açıkken index değişiklikleri görülmez. PIT (Point in Time) + search_after daha modern ve ölçeklenebilir bir alternatiftir.

4.1 PIT + search_after Pattern

import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch._types.*;

// Adım 1: PIT aç
OpenPointInTimeResponse pitResponse = syncClient.openPointInTime(p -> p
    .index("products")
    .keepAlive(Time.of(t -> t.time("5m")))
);
String pitId = pitResponse.id();

try {
    List<FieldValue> searchAfter = null;
    int pageSize = 500;
    long total = 0;

    while (true) {
        // search_after değişkenini lambda'da kullanabilmek için
        List<FieldValue> currentSearchAfter = searchAfter;

        SearchResponse<Map> response = syncClient.search(s -> {
            var builder = s
                .size(pageSize)
                .query(q -> q.matchAll(m -> m))
                .pit(p -> p.id(pitId).keepAlive(Time.of(t -> t.time("5m"))))
                .sort(so -> so.field(f -> f.field("_shard_doc").order(SortOrder.Asc)));

            if (currentSearchAfter != null) {
                builder.searchAfter(currentSearchAfter);
            }
            return builder;
        }, Map.class);

        List<Hit<Map>> hits = response.hits().hits();
        if (hits.isEmpty()) break;

        for (Hit<Map> hit : hits) {
            // Dokümanı işle
            total++;
        }

        // Son hit'in sort değerlerini al
        Hit<Map> lastHit = hits.get(hits.size() - 1);
        searchAfter = lastHit.sort();

        System.out.printf("İşlenen: %d doküman%n", total);
    }

    System.out.println("PIT iteration tamamlandı! Toplam: " + total);
} finally {
    // PIT'i kapat
    syncClient.closePointInTime(c -> c.id(pitId));
}

4.2 Scroll vs PIT Karşılaştırması

ÖzellikScrollPIT + search_after
SnapshotEvet (scroll açıldığında)Evet (PIT açıldığında)
Paralel kullanımTek threadBirden fazla thread
State yönetimiServer-side (scroll context)Client-side (search_after)
TavsiyeEski yöntemYeni yöntem, tercih edin
SortingOpsiyonelZorunlu

💡 İpucu: Yeni projelerde PIT + search_after tercih edin. Scroll API hâlâ çalışır ama PIT daha esnektir.


5. BulkProcessor — Yüksek Verimli Toplu İşlem

5.1 Manuel Bulk API

import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import java.util.Map;

BulkRequest.Builder builder = new BulkRequest.Builder();

// 1000 doküman ekle
for (int i = 0; i < 1000; i++) {
    int docId = i;
    builder.operations(op -> op
        .index(idx -> idx
            .index("products")
            .id(String.valueOf(docId))
            .document(Map.of(
                "name", "Ürün " + docId,
                "price", Math.random() * 10000,
                "category", docId % 2 == 0 ? "Elektronik" : "Giyim"
            ))
        )
    );
}

BulkResponse response = syncClient.bulk(builder.build());

// Hata kontrolü
if (response.errors()) {
    for (BulkResponseItem item : response.items()) {
        if (item.error() != null) {
            System.err.printf("Hata [%s]: %s — %s%n",
                item.id(),
                item.error().type(),
                item.error().reason()
            );
        }
    }
} else {
    System.out.println("Tümü başarılı! " + response.items().size() + " doküman");
}

5.2 BulkIngester — Otomatik Flush

Elasticsearch Java Client 8.x ile gelen BulkIngester, otomatik flush ve retry sağlar:

import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;

// BulkIngester oluştur
BulkIngester<Void> ingester = BulkIngester.of(b -> b
    .client(syncClient)
    .maxOperations(500)         // 500 operasyonda flush
    .maxSize(5 * 1024 * 1024)   // 5MB'da flush
    .flushInterval(2, TimeUnit.SECONDS)  // 2 saniyede flush
    .maxConcurrentRequests(3)   // Paralel 3 bulk isteği
    .listener(new BulkListener<Void>() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request, List<Void> contexts) {
            System.out.printf("Bulk #%d: %d operasyon gönderiliyor...%n",
                executionId, request.operations().size());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                            List<Void> contexts, BulkResponse response) {
            System.out.printf("Bulk #%d: %d operasyon tamamlandı (hata: %s)%n",
                executionId, response.items().size(), response.errors());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request,
                            List<Void> contexts, Throwable failure) {
            System.err.printf("Bulk #%d BAŞARISIZ: %s%n",
                executionId, failure.getMessage());
        }
    })
);

// Dokümanları ekle — otomatik batch'lenir ve flush edilir
for (int i = 0; i < 50000; i++) {
    int docId = i;
    ingester.add(op -> op
        .index(idx -> idx
            .index("products")
            .document(Map.of(
                "name", "Ürün " + docId,
                "price", Math.random() * 10000
            ))
        )
    );
}

// Kalan operasyonları flush et ve kapat
ingester.close();
System.out.println("Tüm bulk operasyonlar tamamlandı!");

BulkIngester şu durumlarda otomatik flush yapar:

  • maxOperations sayısına ulaşıldığında

  • maxSize byte sınırına ulaşıldığında

  • flushInterval süresi dolduğunda

⚠️ Dikkat: ingester.close() çağrısını unutmayın — son batch'i flush eder.


6. Jackson Integration ve Custom JsonpMapper

6.1 JacksonJsonpMapper ile Custom ObjectMapper

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;

// Custom ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);

// Custom mapper ile transport oluştur
JacksonJsonpMapper mapper = new JacksonJsonpMapper(objectMapper);
RestClientTransport transport = new RestClientTransport(restClient, mapper);
ElasticsearchClient client = new ElasticsearchClient(transport);

6.2 Custom POJO ile Çalışma

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.time.LocalDateTime;

@JsonIgnoreProperties(ignoreUnknown = true)
public class Product {
    private String name;
    private String category;
    private double price;
    private double rating;

    @JsonProperty("in_stock")
    private boolean inStock;

    @JsonProperty("created_at")
    private LocalDateTime createdAt;

    // Constructors
    public Product() {}

    public Product(String name, String category, double price) {
        this.name = name;
        this.category = category;
        this.price = price;
        this.inStock = true;
        this.createdAt = LocalDateTime.now();
    }

    // Getters & Setters
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public String getCategory() { return category; }
    public void setCategory(String category) { this.category = category; }
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    public double getRating() { return rating; }
    public void setRating(double rating) { this.rating = rating; }
    public boolean isInStock() { return inStock; }
    public void setInStock(boolean inStock) { this.inStock = inStock; }
    public LocalDateTime getCreatedAt() { return createdAt; }
    public void setCreatedAt(LocalDateTime createdAt) { this.createdAt = createdAt; }
}

// POJO ile index'leme
Product product = new Product("Samsung Galaxy S24", "Telefon", 54999);
product.setRating(4.7);

client.index(i -> i
    .index("products")
    .id("p1")
    .document(product)
);

// POJO ile arama
SearchResponse<Product> response = client.search(s -> s
    .index("products")
    .query(q -> q.match(m -> m.field("name").query("Samsung")))
    .size(10),
    Product.class  // Doğrudan Product objesine deserialize
);

for (Hit<Product> hit : response.hits().hits()) {
    Product p = hit.source();
    System.out.printf("%s - %s - %.0f TL (%.1f ⭐)%n",
        p.getName(), p.getCategory(), p.getPrice(), p.getRating());
}

7. Error Handling ve Retry Pattern

7.1 ElasticsearchException Handling

import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.transport.TransportException;
import java.io.IOException;

try {
    SearchResponse<Map> response = client.search(s -> s
        .index("nonexistent_index")
        .query(q -> q.matchAll(m -> m)),
        Map.class
    );
} catch (ElasticsearchException e) {
    // Elasticsearch'ten gelen hata (4xx, 5xx)
    System.err.printf("ES Hatası: %s [%d] — %s%n",
        e.error().type(),
        e.status(),
        e.error().reason()
    );

    switch (e.status()) {
        case 404:
            System.err.println("Index bulunamadı!");
            break;
        case 400:
            System.err.println("Geçersiz sorgu!");
            break;
        case 429:
            System.err.println("Rate limit aşıldı — retry gerekli!");
            break;
        case 503:
            System.err.println("Cluster hazır değil — retry gerekli!");
            break;
    }
} catch (TransportException e) {
    // Bağlantı hatası
    System.err.println("Transport hatası: " + e.getMessage());
} catch (IOException e) {
    // Genel I/O hatası
    System.err.println("I/O hatası: " + e.getMessage());
}

7.2 Bulk Response Error Handling

BulkResponse bulkResponse = client.bulk(b -> b
    .operations(ops)
);

if (bulkResponse.errors()) {
    List<BulkOperation> failedOps = new ArrayList<>();

    for (int i = 0; i < bulkResponse.items().size(); i++) {
        BulkResponseItem item = bulkResponse.items().get(i);
        if (item.error() != null) {
            System.err.printf("Doküman %s hatalı: [%s] %s%n",
                item.id(),
                item.error().type(),
                item.error().reason()
            );

            // Retry edilebilir hata mı?
            String errorType = item.error().type();
            if ("version_conflict_engine_exception".equals(errorType)
                || "mapper_parsing_exception".equals(errorType)) {
                // Bu hatalar retry ile çözülmez — loglayıp geç
                System.err.println("  → Kalıcı hata, atlanıyor.");
            } else {
                // Geçici hata — retry listesine ekle
                failedOps.add(ops.get(i));
            }
        }
    }

    if (!failedOps.isEmpty()) {
        System.out.printf("%d operasyon retry edilecek%n", failedOps.size());
        // Retry logic...
    }
}

7.3 Exponential Backoff Retry Pattern

public class ElasticsearchRetry {

    private static final int MAX_RETRIES = 3;
    private static final long BASE_DELAY_MS = 1000;

    public static <T> T withRetry(RetryableOperation<T> operation) throws Exception {
        Exception lastException = null;

        for (int attempt = 0; attempt <= MAX_RETRIES; attempt++) {
            try {
                return operation.execute();
            } catch (ElasticsearchException e) {
                lastException = e;

                if (!isRetryable(e.status())) {
                    throw e; // Retry edilemez hata
                }

                if (attempt < MAX_RETRIES) {
                    long delay = BASE_DELAY_MS * (long) Math.pow(2, attempt);
                    // Jitter ekle — thundering herd sorununu önle
                    delay += (long) (Math.random() * delay * 0.1);
                    System.out.printf("Retry %d/%d — %dms bekliyor... (%s)%n",
                        attempt + 1, MAX_RETRIES, delay, e.error().type());
                    Thread.sleep(delay);
                }
            } catch (IOException e) {
                lastException = e;
                if (attempt < MAX_RETRIES) {
                    long delay = BASE_DELAY_MS * (long) Math.pow(2, attempt);
                    Thread.sleep(delay);
                }
            }
        }

        throw new RuntimeException("Max retry aşıldı", lastException);
    }

    private static boolean isRetryable(int statusCode) {
        return statusCode == 429  // Too Many Requests
            || statusCode == 502  // Bad Gateway
            || statusCode == 503  // Service Unavailable
            || statusCode == 504; // Gateway Timeout
    }

    @FunctionalInterface
    public interface RetryableOperation<T> {
        T execute() throws Exception;
    }
}

// Kullanım
SearchResponse<Map> response = ElasticsearchRetry.withRetry(() ->
    client.search(s -> s
        .index("products")
        .query(q -> q.matchAll(m -> m))
        .size(10),
        Map.class
    )
);

8. Connection Pool Tuning

8.1 RestClient Yapılandırması

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

RestClient restClient = RestClient.builder(
    new HttpHost("es-node-1", 9200, "http"),
    new HttpHost("es-node-2", 9200, "http"),
    new HttpHost("es-node-3", 9200, "http")
)
.setRequestConfigCallback(config -> config
    .setConnectTimeout(5000)       // Bağlantı timeout: 5s
    .setSocketTimeout(60000)       // Socket timeout: 60s
    .setConnectionRequestTimeout(1000)  // Pool'dan bağlantı bekleme: 1s
)
.setHttpClientConfigCallback(httpClient -> httpClient
    .setMaxConnTotal(100)          // Toplam max bağlantı
    .setMaxConnPerRoute(30)        // Node başına max bağlantı
    .setDefaultCredentialsProvider(credentialsProvider) // Auth
)
.setFailureListener(new RestClient.FailureListener() {
    @Override
    public void onFailure(org.elasticsearch.client.Node node) {
        System.err.println("Node failed: " + node.getHost());
    }
})
.build();

8.2 Connection Pool Best Practices

ParametreVarsayılanTavsiyeNeden
maxConnTotal3050-200Yüksek throughput için
maxConnPerRoute1020-50Node başına yeterli bağlantı
connectTimeout1000ms3000-5000msAğ gecikmesine tolerans
socketTimeout30000ms30000-120000msUzun sorgular için
SniffingKapalıAçık (production)Node keşfi otomatik

8.3 Node Sniffer

import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;

// Failure listener
SniffOnFailureListener failureListener = new SniffOnFailureListener();

RestClient restClient = RestClient.builder(
    new HttpHost("es-node-1", 9200)
)
.setFailureListener(failureListener)
.build();

// Sniffer — cluster'daki node'ları otomatik keşfeder
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(60000)          // Her 60s'de sniff
    .setSniffAfterFailureDelayMillis(30000) // Hata sonrası 30s'de sniff
    .build();

failureListener.setSniffer(sniffer);

// Kapatırken sniffer'ı da kapat
// sniffer.close();
// restClient.close();

Sniffer, cluster'a yeni node eklendiğinde veya node düştüğünde otomatik olarak bağlantıları günceller.


9. Yaygın Hatalar

Hata 1: Scroll Context'i Temizlememek

// ❌ Scroll tamamlandı ama clearScroll çağrılmadı
// → Bellek sızıntısı, "too many scroll contexts" hatası

// ✅ Her zaman try-finally kullanın
try {
    // scroll loop...
} finally {
    client.clearScroll(c -> c.scrollId(scrollId));
}

Hata 2: BulkIngester'ı Close Etmemek

// ❌ Son batch flush edilmez
BulkIngester ingester = BulkIngester.of(b -> b.client(client).maxOperations(500));
for (...) { ingester.add(...); }
// close() çağrılmadı — son batch kaybolur!

// ✅ try-with-resources kullanın
try (BulkIngester<Void> ingester = BulkIngester.of(b -> b.client(client).maxOperations(500))) {
    for (...) { ingester.add(...); }
} // Otomatik close ve flush

Hata 3: Retry Edilemez Hataları Retry Etmek

// ❌ 400 Bad Request retry ile düzelmez
// mapper_parsing_exception, search_phase_execution_exception

// ✅ Sadece geçici hataları retry edin: 429, 502, 503, 504

Özet

  • ElasticsearchAsyncClient CompletableFuture döner — paralel sorgular için allOf() kullanın

  • Scroll API büyük veri iteration için — clearScroll unutmayın, try-finally kullanın

  • PIT + search_after modern alternatif — stateless, paralel kullanılabilir

  • BulkIngester otomatik batch, flush ve listener desteği — close() ile son batch'i flush edin

  • JacksonJsonpMapper custom ObjectMapper ile — JavaTimeModule, snake_case, custom POJO

  • Error handling ElasticsearchException (4xx/5xx), TransportException (bağlantı), IOException (I/O) ayrımı

  • Retry pattern exponential backoff + jitter — sadece geçici hataları (429, 503) retry edin

  • Connection pool maxConnTotal, maxConnPerRoute, timeout'lar — Sniffer ile otomatik node keşfi