← Kursa Dön
📄 Text · 30 min

ThreadPoolTaskExecutor

Giriş

Bir fabrikadaki işçi havuzunu düşünün. Fabrika açıldığında belirli sayıda işçi (core) her zaman hazırdır. Sipariş artarsa bekleme kuyruğuna eklenir. Kuyruk da dolarsa geçici işçiler (max pool) alınır. Tüm kapasiteler doluysa sipariş reddedilir veya müşterinin kendisi yapar (CallerRunsPolicy). İşte ThreadPoolTaskExecutor tam olarak bu fabrika modelidir.

Production ortamında asenkron işlemlerin kontrolsüz thread oluşturması uygulamanın çökmesine neden olabilir. Her @Async metot çağrısında yeni thread oluşturmak yerine, sabit sayıda thread'in paylaşıldığı bir thread pool kullanmak, bellek kullanımını sınırlamak ve performansı optimize etmek için kritik öneme sahiptir.

Spring Framework, thread pool yönetimi için ThreadPoolTaskExecutor sınıfını sunar. Bu sınıf, Java'nın java.util.concurrent.ThreadPoolExecutor'ını Spring uyumlu şekilde sarar ve ek konfigürasyon seçenekleri ekler. Bu derste ThreadPoolTaskExecutor'ın iç mekanizmasını, core/max pool ve kuyruk etkileşimini, rejection policy'lerini, birden fazla executor tanımlamayı, monitoring stratejilerini ve production best practice'lerini derinlemesine öğreneceğiz.


TaskExecutor Arayüzü

Spring Framework, thread pool yönetimi için TaskExecutor arayüzünü tanımlar. Bu arayüz, Java'nın java.util.concurrent.Executor arayüzünü genişletir:

public interface TaskExecutor extends Executor {
    void execute(Runnable task);
}

Spring, bu arayüzün birçok implementasyonunu sağlar:

ImplementasyonAçıklamaProduction Uygunluğu
SimpleAsyncTaskExecutorHer görev için yeni thread oluşturur❌ Asla kullanmayın
SyncTaskExecutorGörevi çağıran thread'de çalıştırır (senkron)❌ Test için
ThreadPoolTaskExecutorSabit thread pool, kuyruk, rejection policy✅ Production standardı
ConcurrentTaskExecutorJava Executor'ı saran adapter⚠️ Özel durumlar
ThreadPoolTaskSchedulerZamanlama yetenekli pool✅ @Scheduled için

⚠️ Dikkat: @Async kullandığınızda hiçbir executor bean tanımlamazsanız, Spring Boot SimpleAsyncTaskExecutor kullanır — her çağrıda yeni thread oluşturur! 1000 çağrı = 1000 thread = bellek tükenmesi. Bu yüzden özel executor tanımlamak zorunludur.


ThreadPoolTaskExecutor Konfigürasyonu

@Configuration
@EnableAsync
public class AsyncConfig {
    
    @Bean(name = "taskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // Temel pool ayarları
        executor.setCorePoolSize(5);           // Her zaman hazır bekleyen thread
        executor.setMaxPoolSize(20);           // Maksimum thread sayısı
        executor.setQueueCapacity(100);        // Kuyruk kapasitesi
        
        // Thread isimlendirme (loglarda ve debug'da görünür)
        executor.setThreadNamePrefix("async-");
        
        // Boşta kalan thread'in yaşam süresi (saniye)
        executor.setKeepAliveSeconds(60);
        
        // Core thread'ler de timeout olabilir mi?
        executor.setAllowCoreThreadTimeOut(false); // false → core thread'ler her zaman yaşar
        
        // Uygulama kapanırken kuyruktaki görevleri tamamla
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        
        // Rejection policy
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        executor.initialize();
        return executor;
    }
}

Core/Max Pool Size ve Queue Mekanizması

Thread pool'un çalışma mantığını anlamak, doğru konfigürasyon için kritiktir. Yeni bir görev geldiğinde şu akış izlenir:

Görev geldi
     │
     ▼
┌─────────────────────┐
│ Core thread boş mu? │── Evet ──→ Core thread çalıştırır
└─────────────────────┘
     │ Hayır
     ▼
┌─────────────────────┐
│ Kuyrukta yer var mı?│── Evet ──→ Kuyruğa ekle, sırasını bekle
└─────────────────────┘
     │ Hayır (kuyruk DOLU)
     ▼
┌─────────────────────────┐
│ Max thread'e ulaşıldı mı│── Hayır ──→ Yeni thread oluştur
└─────────────────────────┘
     │ Evet (hem kuyruk hem pool DOLU)
     ▼
Rejection Policy çalışır

Somut Örnek

corePoolSize=5, maxPoolSize=20, queueCapacity=100 ayarlarıyla:

Görev 1-5:     5 core thread çalıştırır         [5 thread aktif]
Görev 6-105:   Kuyruğa eklenir (100 kapasite)   [5 thread + 100 kuyrukta]
Görev 106-120: Yeni thread'ler oluşur (15 ek)   [20 thread + 100 kuyrukta]
Görev 121:     Rejection Policy devreye girer!   [KAPASITE AŞILDI]

Toplam kapasite: 20 (thread) + 100 (kuyruk) = 120 eşzamanlı görev

Kritik Soru: Neden Önce Kuyruk, Sonra Thread?

Bu sıralama sezgisel olmayabilir. "Neden önce daha fazla thread oluşturmuyoruz?" diye sorabilirsiniz. Cevap: thread oluşturma maliyetlidir (OS kaynak tahsisi, stack allocation). Kuyruk ise sadece bir pointer eklemektir — neredeyse maliyetsiz.

Ancak bu demektir ki: kuyruk kapasitesi çok büyükse, maxPoolSize'a asla ulaşılmaz!

corePoolSize=5, maxPoolSize=50, queueCapacity=Integer.MAX_VALUE

Sonuç: Her zaman sadece 5 thread çalışır!
Çünkü kuyruk hiç dolmaz → yeni thread oluşturma tetiklenmez.

ÇÖZÜM: queueCapacity'yi makul bir değere ayarlayın (25-200)

Rejection Policy

Hem kuyruk hem de thread pool dolduğunda ne yapılacağını RejectedExecutionHandler belirler:

// 1. AbortPolicy (varsayılan) — Exception fırlatır
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// RejectedExecutionException → görev KAYBOLUR, çağıran hata alır
// Kullanım: Kritik işlemlerde — görev kaybolmamalı, hata bildirilmeli

// 2. CallerRunsPolicy — Çağıran thread çalıştırır
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Görev reddedilmez, çağıranın kendi thread'inde çalıştırılır
// Doğal back-pressure: sistem aşırı yüklendiğinde çağıran da yavaşlar
// Kullanım: ÇOK senaryoda EN İYİ seçenek

// 3. DiscardPolicy — Sessizce atar
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// Görev sessizce atılır — hata fırlatılmaz, log yazılmaz
// Kullanım: Önemsiz görevlerde (opsiyonel metrik, log)

// 4. DiscardOldestPolicy — En eski görevi atar
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// Kuyruktaki EN ESKİ görevi atar, yeniyi ekler
// Kullanım: Güncellik önemli, eski veri geçersiz (real-time metrik)

Custom Rejection Handler

executor.setRejectedExecutionHandler((task, poolExecutor) -> {
    log.error("🚨 Task rejected! Pool: active={}, queue={}, completed={}",
        poolExecutor.getActiveCount(),
        poolExecutor.getQueue().size(),
        poolExecutor.getCompletedTaskCount());
    
    // Metrik kaydet
    meterRegistry.counter("executor.rejected", "pool", "main").increment();
    
    // Alerte gönder
    alertService.sendCritical("Thread pool capacity exhausted!");
    
    // Fallback: çağıranın thread'inde çalıştır
    if (!poolExecutor.isShutdown()) {
        task.run();
    }
});

💡 Production önerisi: Çoğu senaryoda CallerRunsPolicy en güvenlidir. Sistem aşırı yüklendiğinde çağıran thread de çalışmaya katılır ve doğal bir back-pressure mekanizması oluşur. Görev kaybolmaz, hata fırlatılmaz.


Birden Fazla Executor

Farklı iş yükleri için farklı thread pool'lar tanımlamak iyi bir pratiktir. Her pool kendi kaynağını yönetir ve birbirini etkilemez:

@Configuration
@EnableAsync
public class AsyncConfig {
    
    // E-posta gönderimi — düşük öncelik, az thread yeterli
    @Bean(name = "emailExecutor")
    public TaskExecutor emailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("email-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
    // Rapor oluşturma — uzun süren, yoğun kaynak
    @Bean(name = "reportExecutor")
    public TaskExecutor reportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);  // Kuyruk küçük → yeni thread hızlı oluşur
        executor.setThreadNamePrefix("report-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(120);  // Raporlar uzun sürebilir
        executor.initialize();
        return executor;
    }
    
    // Bildirim — yüksek hacim, kaybolabilir
    @Bean(name = "notificationExecutor")
    public TaskExecutor notificationExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("notification-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }
}

Kullanım:

@Async("emailExecutor")
public void sendEmail(String to, String body) { /* ... */ }

@Async("reportExecutor")
public CompletableFuture<Report> generateReport(String type) { /* ... */ }

@Async("notificationExecutor")
public void sendPush(String userId, String message) { /* ... */ }

Bu yaklaşımın faydası: bir rapor oluşturma tüm thread'leri tüketse bile, e-posta ve bildirim servisleri etkilenmez.


application.yml ile Yapılandırma

Spring Boot, spring.task.execution prefix'i ile varsayılan TaskExecutor'ı yapılandırma dosyasından konfigüre etmenize olanak tanır:

spring:
  task:
    execution:
      pool:
        core-size: 5
        max-size: 20
        queue-capacity: 100
        keep-alive: 60s
      thread-name-prefix: "app-async-"
      shutdown:
        await-termination: true
        await-termination-period: 30s

Bu konfigürasyon, @Bean ile tanımlanan özel executor yoksa otomatik olarak uygulanır. Ancak birden fazla executor gerektiğinde @Bean ile tanım zorunludur.


Monitoring

Thread pool'un sağlığını izlemek, production'da kritiktir. Pool tükendiğinde uygulamanız yavaşlar veya durur.

Periyodik Log ile İzleme

@Component
@Slf4j
public class ThreadPoolMonitor {
    
    private final ThreadPoolTaskExecutor taskExecutor;
    
    public ThreadPoolMonitor(@Qualifier("taskExecutor") TaskExecutor taskExecutor) {
        this.taskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
    }
    
    @Scheduled(fixedRate = 30000)  // Her 30 saniyede bir
    public void logPoolStatus() {
        ThreadPoolExecutor pool = taskExecutor.getThreadPoolExecutor();
        
        log.info("📊 Thread Pool Status — " +
            "active={}, pool={}, core={}, max={}, " +
            "queue={}, completed={}, rejected={}",
            pool.getActiveCount(),
            pool.getPoolSize(),
            pool.getCorePoolSize(),
            pool.getMaximumPoolSize(),
            pool.getQueue().size(),
            pool.getCompletedTaskCount(),
            pool.getTaskCount() - pool.getCompletedTaskCount() - pool.getActiveCount()
        );
        
        // Uyarı koşulları
        double utilization = (double) pool.getActiveCount() / pool.getMaximumPoolSize();
        if (utilization > 0.8) {
            log.warn("⚠️ Thread pool utilization is HIGH: {}%", (int)(utilization * 100));
        }
        
        if (pool.getQueue().size() > pool.getQueue().remainingCapacity()) {
            log.warn("⚠️ Task queue is more than 50% full!");
        }
    }
}

Micrometer Metrics ile İzleme

@Configuration
public class ExecutorMetricsConfig {
    
    @Bean
    public TaskExecutor taskExecutor(MeterRegistry registry) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-");
        executor.initialize();
        
        // Micrometer ile metrikleri expose et
        ExecutorServiceMetrics.monitor(registry, 
            executor.getThreadPoolExecutor(), "async-pool");
        
        return executor;
    }
}

// Artık şu metrikler Prometheus/Grafana'da mevcut:
// executor_pool_size_threads{name="async-pool"}
// executor_active_threads{name="async-pool"}
// executor_queued_tasks{name="async-pool"}
// executor_completed_tasks_total{name="async-pool"}

Pool Boyutu Hesaplama

I/O-Bound İş Yükleri

Veritabanı sorguları, HTTP çağrıları, dosya okuma gibi I/O ağırlıklı işlerde thread'ler çoğu zaman bekler:

Formül: corePoolSize = CPU çekirdek sayısı × (1 + bekleme/hesaplama oranı)

Örnek: 4 çekirdek, ortalama %90 bekleme (%10 CPU)
corePoolSize = 4 × (1 + 0.9/0.1) = 4 × 10 = 40

Pratikte: 4 çekirdek CPU için I/O-bound iş → 20-40 thread

CPU-Bound İş Yükleri

Hesaplama ağırlıklı işlerde thread sayısını artırmak fayda sağlamaz:

Formül: corePoolSize = CPU çekirdek sayısı + 1

Örnek: 4 çekirdek
corePoolSize = 4 + 1 = 5

+1'in sebebi: bir thread beklemeye geçerse (GC pause vb.) diğeri devralır

Hibrit İş Yükleri (Gerçek Dünya)

@Configuration
@EnableAsync
public class AsyncConfig {
    
    // I/O-bound: HTTP çağrıları, veritabanı
    @Bean(name = "ioExecutor")
    public TaskExecutor ioExecutor() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(cpuCores * 4);    // 4 çekirdek → 16
        executor.setMaxPoolSize(cpuCores * 8);     // 4 çekirdek → 32
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("io-");
        executor.initialize();
        return executor;
    }
    
    // CPU-bound: Hesaplama, dönüşüm
    @Bean(name = "cpuExecutor")
    public TaskExecutor cpuExecutor() {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(cpuCores + 1);    // 4 çekirdek → 5
        executor.setMaxPoolSize(cpuCores + 1);     // Sabit (daha fazla fayda yok)
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("cpu-");
        executor.initialize();
        return executor;
    }
}

Graceful Shutdown

Uygulama kapanırken kuyruktaki görevlerin tamamlanmasını beklemek önemlidir:

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(20);
    executor.setQueueCapacity(100);
    
    // Graceful shutdown
    executor.setWaitForTasksToCompleteOnShutdown(true);  // Görevleri tamamla
    executor.setAwaitTerminationSeconds(60);              // En fazla 60 saniye bekle
    
    executor.initialize();
    return executor;
}
Uygulama kapanıyor:
  1. Yeni görevler reddedilir
  2. Kuyruktaki görevler çalıştırılır
  3. Aktif thread'ler tamamlanmasını bekler
  4. 60 saniye geçerse → kalan thread'ler zorla durdurulur (interrupt)

Yaygın Hatalar

1. QueueCapacity Çok Büyük → MaxPoolSize Etkisiz

// ❌ YANLIŞ — maxPoolSize=50 ama kuyruk sonsuz
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(Integer.MAX_VALUE);  // Kuyruk asla dolmaz!
// Sonuç: Her zaman sadece 5 thread çalışır, 50'ye asla ulaşılmaz

// ✅ DOĞRU — Makul kuyruk boyutu
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);  // 100 görevden sonra yeni thread oluşur

2. Thread İsmi Vermemek

// ❌ YANLIŞ — thread isimleri "pool-1-thread-1" gibi anlamsız
executor.setThreadNamePrefix(null);

// ✅ DOĞRU — hangi pool'un thread'i olduğu loglarda net görünür
executor.setThreadNamePrefix("email-");
// Log: [email-1] Sending email to user@example.com

3. Shutdown Ayarlarını İhmal Etmek

// ❌ YANLIŞ — uygulama kapanınca kuyruktaki görevler kaybolur
// Varsayılan: waitForTasksToCompleteOnShutdown = false

// ✅ DOĞRU
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);

Gerçek Dünya Örneği: E-Ticaret Thread Pool Stratejisi

@Configuration
@EnableAsync
public class ECommerceAsyncConfig {
    
    // Sipariş işleme — kritik, güvenilir
    @Bean(name = "orderExecutor")
    public TaskExecutor orderExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(30);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("order-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(120);
        executor.initialize();
        return executor;
    }
    
    // E-posta — düşük öncelik, kaybolabilir ama loglanmalı
    @Bean(name = "emailExecutor")
    public TaskExecutor emailExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("email-");
        executor.setRejectedExecutionHandler((task, pool) -> {
            log.warn("Email task rejected — queue full. Discarding.");
        });
        executor.initialize();
        return executor;
    }
    
    // Analitik — en düşük öncelik, tamamen opsiyonel
    @Bean(name = "analyticsExecutor")
    public TaskExecutor analyticsExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("analytics-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }
}

Özet

  • ThreadPoolTaskExecutor, Spring'in production-ready thread pool implementasyonudur. @Async ile birlikte kullanıldığında asenkron görevlerin kontrollü ve verimli çalışmasını sağlar.

  • Pool mekanizması: core threadkuyrukmax threadrejection policy sırası ile çalışır. Kuyruk kapasitesi çok büyükse maxPoolSize'a hiç ulaşılmaz.

  • CallerRunsPolicy çoğu senaryoda en güvenli rejection policy'dir — doğal back-pressure sağlar, görev kaybolmaz.

  • Farklı iş yükleri için ayrı executor tanımlayın. Rapor oluşturma tüm thread'leri tüketirse e-posta servisi etkilenmesin.

  • Thread isimlerini her zaman belirleyin — setThreadNamePrefix("email-"). Hata ayıklamada hangi pool'un thread'inin sorunlu olduğunu anında görürsünüz.

  • Pool boyutu: I/O-bound iş için CPU × 4–8, CPU-bound iş için CPU + 1. Ölçüm yapın — teori sizi bir yere kadar götürür, gerçek yük altında test edin.

  • Graceful shutdown yapılandırın: setWaitForTasksToCompleteOnShutdown(true) ile kuyruktaki görevlerin tamamlanmasını sağlayın.