Composite Aggregation ve Pratik Örnekler
Giriş — Excel Pivot Tablosu
Excel'de pivot tablo kullanmış olabilirsiniz: satırları kategoriye, sütunları aya göre düzenler, hücrelerde toplam ciroyu görürsünüz. Binlerce satırlık veriyi anlamlı bir tabloya dönüştürür. Ama veri çok büyükse? Milyonlarca satır, yüzlerce kategori?
Normal terms aggregation'da size: 10000 yazmak bellek felaketine yol açar. İşte burada composite aggregation devreye girer. Tüm bucket'ları sayfalayarak getirir — bellekte hepsini birden tutmak yerine parça parça işler. Ayrıca birden fazla alanı tek bir aggregation'da birleştirir — gerçek bir pivot tablo gibi.
1. Composite Aggregation Nedir?
Composite aggregation, birden fazla kaynaktan oluşan bucket'ları sayfalayarak (pagination) döndüren bir aggregation tipidir.
Neden Gerekli?
terms aggregation'ın sınırlamaları:
// ❌ 100.000 benzersiz kategori varsa?
"terms": {
"field": "category",
"size": 100000 // Bellek patlar!
}Composite aggregation çözümü:
// ✅ Sayfalayarak tüm bucket'ları getirir
"composite": {
"size": 1000, // Her seferde 1000 bucket
"sources": [...]
}Composite vs terms Farkları
| Özellik | terms | composite |
|---|---|---|
| Pagination | ❌ | ✅ (after_key) |
| Bellek | Tüm bucket'lar bellekte | Sayfa sayfa |
| Çoklu alan | İç içe terms gerekir | sources ile doğrudan |
| Sıralama | Çeşitli | Sadece kaynak alanlara göre |
| Tüm bucket'lar | size ile sınırlı | Hepsini iterate edebilir |
2. Temel Kullanım
2.1 Tek Kaynak
GET sales/_search
{
"size": 0,
"aggs": {
"tum_kategoriler": {
"composite": {
"size": 5,
"sources": [
{
"kategori": {
"terms": { "field": "category" }
}
}
]
}
}
}
}Yanıt:
{
"aggregations": {
"tum_kategoriler": {
"after_key": { "kategori": "Kulaklık" },
"buckets": [
{ "key": { "kategori": "Akıllı Saat" }, "doc_count": 1 },
{ "key": { "kategori": "Bilgisayar" }, "doc_count": 2 },
{ "key": { "kategori": "Kulaklık" }, "doc_count": 2 }
]
}
}
}2.2 Sonraki Sayfa — after
GET sales/_search
{
"size": 0,
"aggs": {
"tum_kategoriler": {
"composite": {
"size": 5,
"sources": [
{
"kategori": {
"terms": { "field": "category" }
}
}
],
"after": { "kategori": "Kulaklık" }
}
}
}
}after parametresi önceki yanıtın after_key değerini alır. Bucket'lar bitene kadar tekrarlanır.
2.3 Tüm Bucket'ları Iterate Etme (Pseudo-code)
1. İlk sorgu: composite (size: 1000)
2. Yanıtı işle
3. after_key varsa → after parametresiyle tekrar sorgula
4. after_key yoksa → tüm bucket'lar tamamlandı3. Çoklu Kaynak — Pivot Tablo
Composite'in gerçek gücü birden fazla kaynağı birleştirmesidir:
3.1 İki Kaynaklı Composite
GET sales/_search
{
"size": 0,
"aggs": {
"kategori_marka": {
"composite": {
"size": 10,
"sources": [
{ "kategori": { "terms": { "field": "category" } } },
{ "marka": { "terms": { "field": "brand" } } }
]
},
"aggs": {
"toplam_ciro": { "sum": { "field": "revenue" } },
"ort_fiyat": { "avg": { "field": "price" } }
}
}
}
}Yanıt:
{
"buckets": [
{
"key": { "kategori": "Akıllı Saat", "marka": "Samsung" },
"doc_count": 1,
"toplam_ciro": { "value": 25999.98 },
"ort_fiyat": { "value": 12999.99 }
},
{
"key": { "kategori": "Bilgisayar", "marka": "Apple" },
"doc_count": 1,
"toplam_ciro": { "value": 42999.99 },
"ort_fiyat": { "value": 42999.99 }
},
{
"key": { "kategori": "Bilgisayar", "marka": "Lenovo" },
"doc_count": 1,
"toplam_ciro": { "value": 77999.98 },
"ort_fiyat": { "value": 38999.99 }
}
]
}Her benzersiz (kategori, marka) çifti bir bucket oluşturur — Excel pivot tablosu gibi.
3.2 Üç Kaynaklı Composite
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"detayli_rapor": {
"composite": {
"size": 50,
"sources": [
{
"ay": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
}
}
},
{ "kategori": { "terms": { "field": "category" } } },
{ "bolge": { "terms": { "field": "region" } } }
]
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } },
"siparis": { "sum": { "field": "orders" } }
}
}
}
}Ay × Kategori × Bölge matrisini oluşturur — her kombinasyon için ciro ve sipariş sayısı.
4. Composite Kaynakları (Source Tipleri)
4.1 terms Source
{ "marka": { "terms": { "field": "brand", "order": "asc" } } }4.2 date_histogram Source
{
"ay": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM",
"time_zone": "Europe/Istanbul"
}
}
}4.3 histogram Source
{
"fiyat_araligi": {
"histogram": {
"field": "price",
"interval": 10000
}
}
}4.4 Karışık Kaynaklar
"sources": [
{ "ay": { "date_histogram": { "field": "date", "calendar_interval": "month" } } },
{ "fiyat_seg": { "histogram": { "field": "price", "interval": 20000 } } },
{ "marka": { "terms": { "field": "brand" } } }
]Tarih + sayısal aralık + kategorik alan birlikte kullanılabilir.
5. Sıralama ve missing_bucket
5.1 Kaynak Sıralaması
"sources": [
{
"kategori": {
"terms": {
"field": "category",
"order": "desc"
}
}
}
]Her kaynak bağımsız olarak asc veya desc sıralanabilir.
5.2 missing_bucket — Null Değerler
Varsayılan olarak, kaynak alanı olmayan dokümanlar atlanır. missing_bucket: true ile onlar da dahil edilir:
"sources": [
{
"marka": {
"terms": {
"field": "brand",
"missing_bucket": true,
"missing_order": "last"
}
}
}
]Markası olmayan dokümanlar null bucket'ta toplanır ve sona yerleştirilir.
6. Gerçek Dünya Senaryoları
6.1 Aylık Satış Raporu (Excel Export)
Tüm ayları, kategorileri ve bölgeleri içeren kapsamlı rapor:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"rapor": {
"composite": {
"size": 100,
"sources": [
{
"ay": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
}
}
},
{ "kategori": { "terms": { "field": "category" } } }
]
},
"aggs": {
"toplam_ciro": { "sum": { "field": "revenue" } },
"toplam_siparis": { "sum": { "field": "orders" } },
"ort_siparis_tutari": {
"bucket_script": {
"buckets_path": {
"c": "toplam_ciro",
"s": "toplam_siparis"
},
"script": "params.s > 0 ? params.c / params.s : 0"
}
}
}
}
}
}6.2 Bölge Bazlı Performans Raporu
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"bolge_rapor": {
"composite": {
"size": 50,
"sources": [
{ "bolge": { "terms": { "field": "region" } } },
{ "kategori": { "terms": { "field": "category" } } }
]
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } },
"siparis": { "sum": { "field": "orders" } },
"ciro_stats": { "stats": { "field": "revenue" } }
}
}
}
}6.3 Zaman Serisi + Kategori Derinlemesine
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"trend_analiz": {
"composite": {
"size": 100,
"sources": [
{
"ceyrek": {
"date_histogram": {
"field": "date",
"calendar_interval": "quarter",
"format": "yyyy-QQQ"
}
}
},
{ "kategori": { "terms": { "field": "category" } } },
{ "bolge": { "terms": { "field": "region" } } }
]
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } },
"siparis": { "sum": { "field": "orders" } }
}
}
}
}Çeyreklik × Kategori × Bölge matrisini oluşturur.
7. Composite ile Tam Veri İterasyonu
Tüm bucket'ları programatik olarak iterate etmek:
REST API
// İlk istek
GET sales/_search
{
"size": 0,
"aggs": {
"tum_veriler": {
"composite": {
"size": 1000,
"sources": [
{ "kategori": { "terms": { "field": "category" } } },
{ "marka": { "terms": { "field": "brand" } } }
]
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } }
}
}
}
}
// Sonraki istekler — after_key ile
GET sales/_search
{
"size": 0,
"aggs": {
"tum_veriler": {
"composite": {
"size": 1000,
"sources": [
{ "kategori": { "terms": { "field": "category" } } },
{ "marka": { "terms": { "field": "brand" } } }
],
"after": { "kategori": "Kulaklık", "marka": "Sony" }
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } }
}
}
}
}8. Aggregation Performans Optimizasyonu
8.1 Genel İpuçları
// 1. size: 0 — doküman döndürme
"size": 0
// 2. Query ile filtreleme — aggregation'dan önce veriyi daralt
"query": {
"bool": {
"filter": [
{ "range": { "date": { "gte": "2024-01-01" } } },
{ "term": { "category": "Elektronik" } }
]
}
}
// 3. _source: false — (zaten size: 0 ile gereksiz ama explicit olmak iyi)
"_source": false8.2 Aggregation Cache
Elasticsearch aggregation sonuçlarını cache'ler. Cache'in çalışması için:
Sorgu değişmemeli
Index değişmemeli (yeni doküman eklenince cache invalid olur)
Shard request cache aktif olmalı (varsayılan: aktif)
// Cache'i zorunlu kullan
GET sales/_search?request_cache=true
{
"size": 0,
"aggs": { ... }
}8.3 Execution Hint — terms Aggregation
GET sales/_search
{
"size": 0,
"aggs": {
"markalar": {
"terms": {
"field": "brand",
"execution_hint": "map"
}
}
}
}| execution_hint | Davranış | Uygun |
|---|---|---|
| (otomatik) | Elasticsearch karar verir | Çoğu durum |
map | HashMap kullanır | Az sayıda benzersiz değer |
global_ordinals | Ordinal mapping kullanır | Çok sayıda benzersiz değer (varsayılan) |
8.4 Sampled Aggregation — Örnekleme
Çok büyük veri setlerinde yaklaşık sonuç için:
GET sales/_search
{
"size": 0,
"aggs": {
"orneklem": {
"sampler": {
"shard_size": 200
},
"aggs": {
"top_markalar": {
"terms": { "field": "brand" }
}
}
}
}
}Her shard'dan sadece 200 doküman örneğiyle aggregation yapar — büyük veri setlerinde çok hızlı.
9. Raporlama Mimarisi: Aggregation vs Pre-computed
Real-time Aggregation
Kullanıcı İsteği → Elasticsearch Query + Aggregation → Sonuç✅ Her zaman güncel ❌ Büyük veri setlerinde yavaş olabilir
Pre-computed (Transform)
Elasticsearch Transform → Özet Index → Kullanıcı İsteği → Basit QueryElasticsearch Transform API ile periyodik olarak aggregation sonuçlarını ayrı bir index'e yazabilirsiniz:
PUT _transform/monthly_sales_summary
{
"source": {
"index": "monthly_sales"
},
"dest": {
"index": "monthly_sales_report"
},
"pivot": {
"group_by": {
"month": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
}
},
"category": {
"terms": { "field": "category" }
},
"region": {
"terms": { "field": "region" }
}
},
"aggregations": {
"total_revenue": { "sum": { "field": "revenue" } },
"total_orders": { "sum": { "field": "orders" } },
"avg_revenue": { "avg": { "field": "revenue" } }
}
},
"frequency": "1h",
"sync": {
"time": {
"field": "date",
"delay": "60s"
}
}
}
// Transform'u başlat
POST _transform/monthly_sales_summary/_startTransform sonucu monthly_sales_report index'inde pre-computed veriler hazır olur — dashboard sorguları milisaniyeler içinde döner.
10. Java ile Composite Aggregation — Tam İterasyon
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.*;
import java.util.HashMap;
import java.util.Map;
public class CompositeAggregationJava {
public static void iterateAllBuckets(ElasticsearchClient client) throws Exception {
Map<String, String> afterKey = null;
int totalBuckets = 0;
while (true) {
final Map<String, String> currentAfterKey = afterKey;
SearchResponse<Void> response = client.search(s -> s
.index("sales")
.size(0)
.aggregations("rapor", a -> {
var composite = CompositeAggregation.of(c -> {
c.size(100)
.sources(
Map.of("kategori", CompositeAggregationSource.of(
src -> src.terms(t -> t.field("category"))
)),
Map.of("marka", CompositeAggregationSource.of(
src -> src.terms(t -> t.field("brand"))
))
);
if (currentAfterKey != null) {
c.after(currentAfterKey);
}
return c;
});
return Aggregation.of(agg -> agg
.composite(composite)
.aggregations("ciro", sub -> sub
.sum(su -> su.field("revenue"))
)
);
}),
Void.class
);
var compositeResult = response.aggregations()
.get("rapor").composite();
var buckets = compositeResult.buckets().array();
if (buckets.isEmpty()) break;
for (var bucket : buckets) {
totalBuckets++;
String kategori = bucket.key().get("kategori").stringValue();
String marka = bucket.key().get("marka").stringValue();
double ciro = bucket.aggregations().get("ciro").sum().value();
System.out.printf("%-15s | %-10s | Adet: %d | Ciro: ₺%.2f%n",
kategori, marka, bucket.docCount(), ciro);
}
// after_key'i al
var nextAfterKey = compositeResult.afterKey();
if (nextAfterKey == null || nextAfterKey.isEmpty()) break;
afterKey = new HashMap<>();
nextAfterKey.forEach((k, v) -> afterKey.put(k, v.toString()));
}
System.out.printf("%nToplam %d bucket iterate edildi.%n", totalBuckets);
}
// Dashboard raporu — multi-aggregation
public static void dashboardReport(ElasticsearchClient client) throws Exception {
SearchResponse<Void> response = client.search(s -> s
.index("monthly_sales")
.size(0)
// Aylık trend (date_histogram)
.aggregations("aylik_trend", a -> a
.dateHistogram(dh -> dh
.field("date")
.calendarInterval(CalendarInterval.Month)
.format("yyyy-MM")
)
.aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
.aggregations("siparis", sub -> sub.sum(su -> su.field("orders")))
)
// Kategori dağılımı
.aggregations("kategoriler", a -> a
.terms(t -> t.field("category").size(10))
.aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
)
// Bölge performansı
.aggregations("bolgeler", a -> a
.terms(t -> t.field("region").size(10)
.order(Map.of("ciro", SortOrder.Desc)))
.aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
)
// Genel metrikler
.aggregations("genel_ciro", a -> a.sum(su -> su.field("revenue")))
.aggregations("genel_siparis", a -> a.sum(su -> su.field("orders")))
.aggregations("benzersiz_bolge", a -> a.cardinality(c -> c.field("region"))),
Void.class
);
// Sonuçları yazdır
System.out.println("=== DASHBOARD RAPORU ===\n");
// Genel metrikler
double toplamCiro = response.aggregations().get("genel_ciro").sum().value();
double toplamSiparis = response.aggregations().get("genel_siparis").sum().value();
long benzersizBolge = response.aggregations()
.get("benzersiz_bolge").cardinality().value();
System.out.printf("Toplam Ciro: ₺%,.0f%n", toplamCiro);
System.out.printf("Toplam Sipariş: %,.0f%n", toplamSiparis);
System.out.printf("Aktif Bölge: %d%n%n", benzersizBolge);
// Aylık trend
System.out.println("--- Aylık Trend ---");
var aylar = response.aggregations()
.get("aylik_trend").dateHistogram().buckets().array();
for (var ay : aylar) {
System.out.printf(" %s | Ciro: ₺%,.0f | Sipariş: %,.0f%n",
ay.keyAsString(),
ay.aggregations().get("ciro").sum().value(),
ay.aggregations().get("siparis").sum().value());
}
// Kategori dağılımı
System.out.println("\n--- Kategori Dağılımı ---");
var kategoriler = response.aggregations()
.get("kategoriler").sterms().buckets().array();
for (var kat : kategoriler) {
System.out.printf(" %-15s | Ciro: ₺%,.0f%n",
kat.key().stringValue(),
kat.aggregations().get("ciro").sum().value());
}
// Bölge performansı
System.out.println("\n--- Bölge Performansı ---");
var bolgeler = response.aggregations()
.get("bolgeler").sterms().buckets().array();
for (var bolge : bolgeler) {
System.out.printf(" %-15s | Ciro: ₺%,.0f%n",
bolge.key().stringValue(),
bolge.aggregations().get("ciro").sum().value());
}
}
}11. Best Practices
✅ Yapın
| Uygulama | Neden |
|---|---|
| Çok sayıda benzersiz değer varsa composite kullanın | Bellek-güvenli pagination |
| Composite size'ı 500-5000 arası tutun | Optimal performans |
| Dashboard sorgularını cache'leyin | Sık tekrarlanan sorgular hızlanır |
| Büyük raporlar için Transform API kullanın | Pre-computed sonuçlar, milisaniye yanıt |
| Query filter ile veri kümesini daraltın | Aggregation öncesi filtreleme performansı artırır |
❌ Yapmayın
| Uygulama | Neden |
|---|---|
| terms size > 10.000 kullanmayın | Bellek sorunu; composite tercih edin |
| 5+ kaynaklı composite oluşturmayın | Bucket sayısı katlanarak artar |
| Aggregation sonuçlarını client-side birleştirmeyin | Elasticsearch'te yapın |
| Her istekte Transform oluşturmayın | Periyodik çalışmak için tasarlanmış |
12. Yaygın Hatalar
Hata 1: Composite'te Sub-aggregation Sınırlaması
// ❌ Composite içinde pipeline aggregation desteklenmez (ES 8.x'te kısmen eklendi)
"composite": {
"sources": [...],
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } },
"degisim": { "derivative": { "buckets_path": "ciro" } } // ❌ Hata verebilir
}
// ✅ Pipeline hesaplamasını client-side yapınHata 2: after_key Yanlış Kullanımı
// ❌ after_key formatı sources ile eşleşmeli
"sources": [
{ "kategori": { "terms": { "field": "category" } } },
{ "marka": { "terms": { "field": "brand" } } }
]
"after": { "kategori": "Telefon" } // ❌ "marka" eksik!
// ✅ Tüm kaynaklar dahil
"after": { "kategori": "Telefon", "marka": "Samsung" }Hata 3: Composite'te Sıralama Sınırlaması
// ❌ Sub-aggregation'a göre sıralama YAPILAMAZ
"composite": {
"sources": [
{ "kategori": { "terms": { "field": "category" } } }
]
}
// "order": { "toplam_ciro": "desc" } — BU YOK!
// ✅ Client-side sıralama yapın veya bucket_sort kullanmayın
// Composite sadece kaynak alanlarına göre sıralar13. Aggregation Seçim Rehberi
Hangi aggregation'ı kullanmalıyım?
│
├── Tek sayısal değer mi istiyorum? → Metric Aggregation
│ ├── Ortalama → avg
│ ├── Toplam → sum
│ ├── Min/Max → min, max
│ ├── Hepsi birden → stats
│ ├── Benzersiz sayı → cardinality
│ └── Yüzdelik → percentiles
│
├── Gruplamak mı istiyorum? → Bucket Aggregation
│ ├── Kategorik alan → terms
│ ├── Sayısal aralık → histogram / range
│ ├── Zaman serisi → date_histogram
│ ├── Çok fazla benzersiz değer → composite
│ └── Koşullu gruplama → filter / filters
│
├── Bucket'lar arası hesaplama mı? → Pipeline Aggregation
│ ├── Değişim miktarı → derivative
│ ├── Kümülatif toplam → cumulative_sum
│ ├── Hareketli ortalama → moving_fn
│ ├── Bucket filtresi → bucket_selector
│ └── Bucket hesaplaması → bucket_script
│
└── Büyük veri raporu mu? → Transform API + CompositeÖzet
Composite aggregation birden fazla kaynağı birleştirir ve sayfalayarak tüm bucket'ları getirir — bellek-güvenli
after_keyile cursor-based pagination yapılır — terms'insizesınırlaması yokturKaynaklar
terms,histogramvedate_histogramolabilir — karma kullanım desteklenirmissing_bucket: trueile null değerli dokümanlar da dahil edilirTransform API periyodik olarak aggregation sonuçlarını pre-compute eder — dashboard performansını dramatik artırır
Aggregation performansı için:
size: 0, query filter, request cache, uygun execution hintComposite aggregation pipeline aggregation desteği sınırlıdır — karmaşık hesaplamalar client-side yapılmalıdır
Aggregation seçimi: metrik → metric agg, gruplama → bucket agg, bucket arası → pipeline agg, büyük veri → composite + transform
AI Asistan
Sorularını yanıtlamaya hazır