ThreadPoolTaskExecutor
Giriş
Bir fabrikadaki işçi havuzunu düşünün. Fabrika açıldığında belirli sayıda işçi (core) her zaman hazırdır. Sipariş artarsa bekleme kuyruğuna eklenir. Kuyruk da dolarsa geçici işçiler (max pool) alınır. Tüm kapasiteler doluysa sipariş reddedilir veya müşterinin kendisi yapar (CallerRunsPolicy). İşte ThreadPoolTaskExecutor tam olarak bu fabrika modelidir.
Production ortamında asenkron işlemlerin kontrolsüz thread oluşturması uygulamanın çökmesine neden olabilir. Her @Async metot çağrısında yeni thread oluşturmak yerine, sabit sayıda thread'in paylaşıldığı bir thread pool kullanmak, bellek kullanımını sınırlamak ve performansı optimize etmek için kritik öneme sahiptir.
Spring Framework, thread pool yönetimi için ThreadPoolTaskExecutor sınıfını sunar. Bu sınıf, Java'nın java.util.concurrent.ThreadPoolExecutor'ını Spring uyumlu şekilde sarar ve ek konfigürasyon seçenekleri ekler. Bu derste ThreadPoolTaskExecutor'ın iç mekanizmasını, core/max pool ve kuyruk etkileşimini, rejection policy'lerini, birden fazla executor tanımlamayı, monitoring stratejilerini ve production best practice'lerini derinlemesine öğreneceğiz.
TaskExecutor Arayüzü
Spring Framework, thread pool yönetimi için TaskExecutor arayüzünü tanımlar. Bu arayüz, Java'nın java.util.concurrent.Executor arayüzünü genişletir:
public interface TaskExecutor extends Executor {
void execute(Runnable task);
}Spring, bu arayüzün birçok implementasyonunu sağlar:
| Implementasyon | Açıklama | Production Uygunluğu |
|---|---|---|
SimpleAsyncTaskExecutor | Her görev için yeni thread oluşturur | ❌ Asla kullanmayın |
SyncTaskExecutor | Görevi çağıran thread'de çalıştırır (senkron) | ❌ Test için |
ThreadPoolTaskExecutor | Sabit thread pool, kuyruk, rejection policy | ✅ Production standardı |
ConcurrentTaskExecutor | Java Executor'ı saran adapter | ⚠️ Özel durumlar |
ThreadPoolTaskScheduler | Zamanlama yetenekli pool | ✅ @Scheduled için |
⚠️ Dikkat:
@Asynckullandığınızda hiçbir executor bean tanımlamazsanız, Spring BootSimpleAsyncTaskExecutorkullanır — her çağrıda yeni thread oluşturur! 1000 çağrı = 1000 thread = bellek tükenmesi. Bu yüzden özel executor tanımlamak zorunludur.
ThreadPoolTaskExecutor Konfigürasyonu
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// Temel pool ayarları
executor.setCorePoolSize(5); // Her zaman hazır bekleyen thread
executor.setMaxPoolSize(20); // Maksimum thread sayısı
executor.setQueueCapacity(100); // Kuyruk kapasitesi
// Thread isimlendirme (loglarda ve debug'da görünür)
executor.setThreadNamePrefix("async-");
// Boşta kalan thread'in yaşam süresi (saniye)
executor.setKeepAliveSeconds(60);
// Core thread'ler de timeout olabilir mi?
executor.setAllowCoreThreadTimeOut(false); // false → core thread'ler her zaman yaşar
// Uygulama kapanırken kuyruktaki görevleri tamamla
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
// Rejection policy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}Core/Max Pool Size ve Queue Mekanizması
Thread pool'un çalışma mantığını anlamak, doğru konfigürasyon için kritiktir. Yeni bir görev geldiğinde şu akış izlenir:
Görev geldi
│
▼
┌─────────────────────┐
│ Core thread boş mu? │── Evet ──→ Core thread çalıştırır
└─────────────────────┘
│ Hayır
▼
┌─────────────────────┐
│ Kuyrukta yer var mı?│── Evet ──→ Kuyruğa ekle, sırasını bekle
└─────────────────────┘
│ Hayır (kuyruk DOLU)
▼
┌─────────────────────────┐
│ Max thread'e ulaşıldı mı│── Hayır ──→ Yeni thread oluştur
└─────────────────────────┘
│ Evet (hem kuyruk hem pool DOLU)
▼
Rejection Policy çalışırSomut Örnek
corePoolSize=5, maxPoolSize=20, queueCapacity=100 ayarlarıyla:
Görev 1-5: 5 core thread çalıştırır [5 thread aktif]
Görev 6-105: Kuyruğa eklenir (100 kapasite) [5 thread + 100 kuyrukta]
Görev 106-120: Yeni thread'ler oluşur (15 ek) [20 thread + 100 kuyrukta]
Görev 121: Rejection Policy devreye girer! [KAPASITE AŞILDI]
Toplam kapasite: 20 (thread) + 100 (kuyruk) = 120 eşzamanlı görevKritik Soru: Neden Önce Kuyruk, Sonra Thread?
Bu sıralama sezgisel olmayabilir. "Neden önce daha fazla thread oluşturmuyoruz?" diye sorabilirsiniz. Cevap: thread oluşturma maliyetlidir (OS kaynak tahsisi, stack allocation). Kuyruk ise sadece bir pointer eklemektir — neredeyse maliyetsiz.
Ancak bu demektir ki: kuyruk kapasitesi çok büyükse, maxPoolSize'a asla ulaşılmaz!
corePoolSize=5, maxPoolSize=50, queueCapacity=Integer.MAX_VALUE
Sonuç: Her zaman sadece 5 thread çalışır!
Çünkü kuyruk hiç dolmaz → yeni thread oluşturma tetiklenmez.
ÇÖZÜM: queueCapacity'yi makul bir değere ayarlayın (25-200)Rejection Policy
Hem kuyruk hem de thread pool dolduğunda ne yapılacağını RejectedExecutionHandler belirler:
// 1. AbortPolicy (varsayılan) — Exception fırlatır
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// RejectedExecutionException → görev KAYBOLUR, çağıran hata alır
// Kullanım: Kritik işlemlerde — görev kaybolmamalı, hata bildirilmeli
// 2. CallerRunsPolicy — Çağıran thread çalıştırır
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// Görev reddedilmez, çağıranın kendi thread'inde çalıştırılır
// Doğal back-pressure: sistem aşırı yüklendiğinde çağıran da yavaşlar
// Kullanım: ÇOK senaryoda EN İYİ seçenek
// 3. DiscardPolicy — Sessizce atar
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// Görev sessizce atılır — hata fırlatılmaz, log yazılmaz
// Kullanım: Önemsiz görevlerde (opsiyonel metrik, log)
// 4. DiscardOldestPolicy — En eski görevi atar
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// Kuyruktaki EN ESKİ görevi atar, yeniyi ekler
// Kullanım: Güncellik önemli, eski veri geçersiz (real-time metrik)Custom Rejection Handler
executor.setRejectedExecutionHandler((task, poolExecutor) -> {
log.error("🚨 Task rejected! Pool: active={}, queue={}, completed={}",
poolExecutor.getActiveCount(),
poolExecutor.getQueue().size(),
poolExecutor.getCompletedTaskCount());
// Metrik kaydet
meterRegistry.counter("executor.rejected", "pool", "main").increment();
// Alerte gönder
alertService.sendCritical("Thread pool capacity exhausted!");
// Fallback: çağıranın thread'inde çalıştır
if (!poolExecutor.isShutdown()) {
task.run();
}
});💡 Production önerisi: Çoğu senaryoda
CallerRunsPolicyen güvenlidir. Sistem aşırı yüklendiğinde çağıran thread de çalışmaya katılır ve doğal bir back-pressure mekanizması oluşur. Görev kaybolmaz, hata fırlatılmaz.
Birden Fazla Executor
Farklı iş yükleri için farklı thread pool'lar tanımlamak iyi bir pratiktir. Her pool kendi kaynağını yönetir ve birbirini etkilemez:
@Configuration
@EnableAsync
public class AsyncConfig {
// E-posta gönderimi — düşük öncelik, az thread yeterli
@Bean(name = "emailExecutor")
public TaskExecutor emailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("email-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
// Rapor oluşturma — uzun süren, yoğun kaynak
@Bean(name = "reportExecutor")
public TaskExecutor reportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25); // Kuyruk küçük → yeni thread hızlı oluşur
executor.setThreadNamePrefix("report-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(120); // Raporlar uzun sürebilir
executor.initialize();
return executor;
}
// Bildirim — yüksek hacim, kaybolabilir
@Bean(name = "notificationExecutor")
public TaskExecutor notificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("notification-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}Kullanım:
@Async("emailExecutor")
public void sendEmail(String to, String body) { /* ... */ }
@Async("reportExecutor")
public CompletableFuture<Report> generateReport(String type) { /* ... */ }
@Async("notificationExecutor")
public void sendPush(String userId, String message) { /* ... */ }Bu yaklaşımın faydası: bir rapor oluşturma tüm thread'leri tüketse bile, e-posta ve bildirim servisleri etkilenmez.
application.yml ile Yapılandırma
Spring Boot, spring.task.execution prefix'i ile varsayılan TaskExecutor'ı yapılandırma dosyasından konfigüre etmenize olanak tanır:
spring:
task:
execution:
pool:
core-size: 5
max-size: 20
queue-capacity: 100
keep-alive: 60s
thread-name-prefix: "app-async-"
shutdown:
await-termination: true
await-termination-period: 30sBu konfigürasyon, @Bean ile tanımlanan özel executor yoksa otomatik olarak uygulanır. Ancak birden fazla executor gerektiğinde @Bean ile tanım zorunludur.
Monitoring
Thread pool'un sağlığını izlemek, production'da kritiktir. Pool tükendiğinde uygulamanız yavaşlar veya durur.
Periyodik Log ile İzleme
@Component
@Slf4j
public class ThreadPoolMonitor {
private final ThreadPoolTaskExecutor taskExecutor;
public ThreadPoolMonitor(@Qualifier("taskExecutor") TaskExecutor taskExecutor) {
this.taskExecutor = (ThreadPoolTaskExecutor) taskExecutor;
}
@Scheduled(fixedRate = 30000) // Her 30 saniyede bir
public void logPoolStatus() {
ThreadPoolExecutor pool = taskExecutor.getThreadPoolExecutor();
log.info("📊 Thread Pool Status — " +
"active={}, pool={}, core={}, max={}, " +
"queue={}, completed={}, rejected={}",
pool.getActiveCount(),
pool.getPoolSize(),
pool.getCorePoolSize(),
pool.getMaximumPoolSize(),
pool.getQueue().size(),
pool.getCompletedTaskCount(),
pool.getTaskCount() - pool.getCompletedTaskCount() - pool.getActiveCount()
);
// Uyarı koşulları
double utilization = (double) pool.getActiveCount() / pool.getMaximumPoolSize();
if (utilization > 0.8) {
log.warn("⚠️ Thread pool utilization is HIGH: {}%", (int)(utilization * 100));
}
if (pool.getQueue().size() > pool.getQueue().remainingCapacity()) {
log.warn("⚠️ Task queue is more than 50% full!");
}
}
}Micrometer Metrics ile İzleme
@Configuration
public class ExecutorMetricsConfig {
@Bean
public TaskExecutor taskExecutor(MeterRegistry registry) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.initialize();
// Micrometer ile metrikleri expose et
ExecutorServiceMetrics.monitor(registry,
executor.getThreadPoolExecutor(), "async-pool");
return executor;
}
}
// Artık şu metrikler Prometheus/Grafana'da mevcut:
// executor_pool_size_threads{name="async-pool"}
// executor_active_threads{name="async-pool"}
// executor_queued_tasks{name="async-pool"}
// executor_completed_tasks_total{name="async-pool"}Pool Boyutu Hesaplama
I/O-Bound İş Yükleri
Veritabanı sorguları, HTTP çağrıları, dosya okuma gibi I/O ağırlıklı işlerde thread'ler çoğu zaman bekler:
Formül: corePoolSize = CPU çekirdek sayısı × (1 + bekleme/hesaplama oranı)
Örnek: 4 çekirdek, ortalama %90 bekleme (%10 CPU)
corePoolSize = 4 × (1 + 0.9/0.1) = 4 × 10 = 40
Pratikte: 4 çekirdek CPU için I/O-bound iş → 20-40 threadCPU-Bound İş Yükleri
Hesaplama ağırlıklı işlerde thread sayısını artırmak fayda sağlamaz:
Formül: corePoolSize = CPU çekirdek sayısı + 1
Örnek: 4 çekirdek
corePoolSize = 4 + 1 = 5
+1'in sebebi: bir thread beklemeye geçerse (GC pause vb.) diğeri devralırHibrit İş Yükleri (Gerçek Dünya)
@Configuration
@EnableAsync
public class AsyncConfig {
// I/O-bound: HTTP çağrıları, veritabanı
@Bean(name = "ioExecutor")
public TaskExecutor ioExecutor() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuCores * 4); // 4 çekirdek → 16
executor.setMaxPoolSize(cpuCores * 8); // 4 çekirdek → 32
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("io-");
executor.initialize();
return executor;
}
// CPU-bound: Hesaplama, dönüşüm
@Bean(name = "cpuExecutor")
public TaskExecutor cpuExecutor() {
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(cpuCores + 1); // 4 çekirdek → 5
executor.setMaxPoolSize(cpuCores + 1); // Sabit (daha fazla fayda yok)
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("cpu-");
executor.initialize();
return executor;
}
}Graceful Shutdown
Uygulama kapanırken kuyruktaki görevlerin tamamlanmasını beklemek önemlidir:
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
// Graceful shutdown
executor.setWaitForTasksToCompleteOnShutdown(true); // Görevleri tamamla
executor.setAwaitTerminationSeconds(60); // En fazla 60 saniye bekle
executor.initialize();
return executor;
}Uygulama kapanıyor:
1. Yeni görevler reddedilir
2. Kuyruktaki görevler çalıştırılır
3. Aktif thread'ler tamamlanmasını bekler
4. 60 saniye geçerse → kalan thread'ler zorla durdurulur (interrupt)Yaygın Hatalar
1. QueueCapacity Çok Büyük → MaxPoolSize Etkisiz
// ❌ YANLIŞ — maxPoolSize=50 ama kuyruk sonsuz
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(Integer.MAX_VALUE); // Kuyruk asla dolmaz!
// Sonuç: Her zaman sadece 5 thread çalışır, 50'ye asla ulaşılmaz
// ✅ DOĞRU — Makul kuyruk boyutu
executor.setCorePoolSize(5);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100); // 100 görevden sonra yeni thread oluşur2. Thread İsmi Vermemek
// ❌ YANLIŞ — thread isimleri "pool-1-thread-1" gibi anlamsız
executor.setThreadNamePrefix(null);
// ✅ DOĞRU — hangi pool'un thread'i olduğu loglarda net görünür
executor.setThreadNamePrefix("email-");
// Log: [email-1] Sending email to user@example.com3. Shutdown Ayarlarını İhmal Etmek
// ❌ YANLIŞ — uygulama kapanınca kuyruktaki görevler kaybolur
// Varsayılan: waitForTasksToCompleteOnShutdown = false
// ✅ DOĞRU
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);Gerçek Dünya Örneği: E-Ticaret Thread Pool Stratejisi
@Configuration
@EnableAsync
public class ECommerceAsyncConfig {
// Sipariş işleme — kritik, güvenilir
@Bean(name = "orderExecutor")
public TaskExecutor orderExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("order-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(120);
executor.initialize();
return executor;
}
// E-posta — düşük öncelik, kaybolabilir ama loglanmalı
@Bean(name = "emailExecutor")
public TaskExecutor emailExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("email-");
executor.setRejectedExecutionHandler((task, pool) -> {
log.warn("Email task rejected — queue full. Discarding.");
});
executor.initialize();
return executor;
}
// Analitik — en düşük öncelik, tamamen opsiyonel
@Bean(name = "analyticsExecutor")
public TaskExecutor analyticsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("analytics-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}Özet
ThreadPoolTaskExecutor, Spring'in production-ready thread pool implementasyonudur.@Asyncile birlikte kullanıldığında asenkron görevlerin kontrollü ve verimli çalışmasını sağlar.Pool mekanizması: core thread → kuyruk → max thread → rejection policy sırası ile çalışır. Kuyruk kapasitesi çok büyükse maxPoolSize'a hiç ulaşılmaz.
CallerRunsPolicy çoğu senaryoda en güvenli rejection policy'dir — doğal back-pressure sağlar, görev kaybolmaz.
Farklı iş yükleri için ayrı executor tanımlayın. Rapor oluşturma tüm thread'leri tüketirse e-posta servisi etkilenmesin.
Thread isimlerini her zaman belirleyin —
setThreadNamePrefix("email-"). Hata ayıklamada hangi pool'un thread'inin sorunlu olduğunu anında görürsünüz.Pool boyutu: I/O-bound iş için
CPU × 4–8, CPU-bound iş içinCPU + 1. Ölçüm yapın — teori sizi bir yere kadar götürür, gerçek yük altında test edin.Graceful shutdown yapılandırın:
setWaitForTasksToCompleteOnShutdown(true)ile kuyruktaki görevlerin tamamlanmasını sağlayın.
AI Asistan
Sorularını yanıtlamaya hazır