← Kursa Dön
📄 Text · 30 min

Composite Aggregation ve Pratik Örnekler

Giriş — Excel Pivot Tablosu

Excel'de pivot tablo kullanmış olabilirsiniz: satırları kategoriye, sütunları aya göre düzenler, hücrelerde toplam ciroyu görürsünüz. Binlerce satırlık veriyi anlamlı bir tabloya dönüştürür. Ama veri çok büyükse? Milyonlarca satır, yüzlerce kategori?

Normal terms aggregation'da size: 10000 yazmak bellek felaketine yol açar. İşte burada composite aggregation devreye girer. Tüm bucket'ları sayfalayarak getirir — bellekte hepsini birden tutmak yerine parça parça işler. Ayrıca birden fazla alanı tek bir aggregation'da birleştirir — gerçek bir pivot tablo gibi.


1. Composite Aggregation Nedir?

Composite aggregation, birden fazla kaynaktan oluşan bucket'ları sayfalayarak (pagination) döndüren bir aggregation tipidir.

Neden Gerekli?

terms aggregation'ın sınırlamaları:

// ❌ 100.000 benzersiz kategori varsa?
"terms": {
  "field": "category",
  "size": 100000  // Bellek patlar!
}

Composite aggregation çözümü:

// ✅ Sayfalayarak tüm bucket'ları getirir
"composite": {
  "size": 1000,  // Her seferde 1000 bucket
  "sources": [...]
}

Composite vs terms Farkları

Özelliktermscomposite
Pagination✅ (after_key)
BellekTüm bucket'lar bellekteSayfa sayfa
Çoklu alanİç içe terms gerekirsources ile doğrudan
SıralamaÇeşitliSadece kaynak alanlara göre
Tüm bucket'larsize ile sınırlıHepsini iterate edebilir

2. Temel Kullanım

2.1 Tek Kaynak

GET sales/_search
{
  "size": 0,
  "aggs": {
    "tum_kategoriler": {
      "composite": {
        "size": 5,
        "sources": [
          {
            "kategori": {
              "terms": { "field": "category" }
            }
          }
        ]
      }
    }
  }
}

Yanıt:

{
  "aggregations": {
    "tum_kategoriler": {
      "after_key": { "kategori": "Kulaklık" },
      "buckets": [
        { "key": { "kategori": "Akıllı Saat" }, "doc_count": 1 },
        { "key": { "kategori": "Bilgisayar" }, "doc_count": 2 },
        { "key": { "kategori": "Kulaklık" }, "doc_count": 2 }
      ]
    }
  }
}

2.2 Sonraki Sayfa — after

GET sales/_search
{
  "size": 0,
  "aggs": {
    "tum_kategoriler": {
      "composite": {
        "size": 5,
        "sources": [
          {
            "kategori": {
              "terms": { "field": "category" }
            }
          }
        ],
        "after": { "kategori": "Kulaklık" }
      }
    }
  }
}

after parametresi önceki yanıtın after_key değerini alır. Bucket'lar bitene kadar tekrarlanır.

2.3 Tüm Bucket'ları Iterate Etme (Pseudo-code)

1. İlk sorgu: composite (size: 1000)
2. Yanıtı işle
3. after_key varsa → after parametresiyle tekrar sorgula
4. after_key yoksa → tüm bucket'lar tamamlandı

3. Çoklu Kaynak — Pivot Tablo

Composite'in gerçek gücü birden fazla kaynağı birleştirmesidir:

3.1 İki Kaynaklı Composite

GET sales/_search
{
  "size": 0,
  "aggs": {
    "kategori_marka": {
      "composite": {
        "size": 10,
        "sources": [
          { "kategori": { "terms": { "field": "category" } } },
          { "marka": { "terms": { "field": "brand" } } }
        ]
      },
      "aggs": {
        "toplam_ciro": { "sum": { "field": "revenue" } },
        "ort_fiyat": { "avg": { "field": "price" } }
      }
    }
  }
}

Yanıt:

{
  "buckets": [
    {
      "key": { "kategori": "Akıllı Saat", "marka": "Samsung" },
      "doc_count": 1,
      "toplam_ciro": { "value": 25999.98 },
      "ort_fiyat": { "value": 12999.99 }
    },
    {
      "key": { "kategori": "Bilgisayar", "marka": "Apple" },
      "doc_count": 1,
      "toplam_ciro": { "value": 42999.99 },
      "ort_fiyat": { "value": 42999.99 }
    },
    {
      "key": { "kategori": "Bilgisayar", "marka": "Lenovo" },
      "doc_count": 1,
      "toplam_ciro": { "value": 77999.98 },
      "ort_fiyat": { "value": 38999.99 }
    }
  ]
}

Her benzersiz (kategori, marka) çifti bir bucket oluşturur — Excel pivot tablosu gibi.

3.2 Üç Kaynaklı Composite

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "detayli_rapor": {
      "composite": {
        "size": 50,
        "sources": [
          {
            "ay": {
              "date_histogram": {
                "field": "date",
                "calendar_interval": "month",
                "format": "yyyy-MM"
              }
            }
          },
          { "kategori": { "terms": { "field": "category" } } },
          { "bolge": { "terms": { "field": "region" } } }
        ]
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } },
        "siparis": { "sum": { "field": "orders" } }
      }
    }
  }
}

Ay × Kategori × Bölge matrisini oluşturur — her kombinasyon için ciro ve sipariş sayısı.


4. Composite Kaynakları (Source Tipleri)

4.1 terms Source

{ "marka": { "terms": { "field": "brand", "order": "asc" } } }

4.2 date_histogram Source

{
  "ay": {
    "date_histogram": {
      "field": "date",
      "calendar_interval": "month",
      "format": "yyyy-MM",
      "time_zone": "Europe/Istanbul"
    }
  }
}

4.3 histogram Source

{
  "fiyat_araligi": {
    "histogram": {
      "field": "price",
      "interval": 10000
    }
  }
}

4.4 Karışık Kaynaklar

"sources": [
  { "ay": { "date_histogram": { "field": "date", "calendar_interval": "month" } } },
  { "fiyat_seg": { "histogram": { "field": "price", "interval": 20000 } } },
  { "marka": { "terms": { "field": "brand" } } }
]

Tarih + sayısal aralık + kategorik alan birlikte kullanılabilir.


5. Sıralama ve missing_bucket

5.1 Kaynak Sıralaması

"sources": [
  {
    "kategori": {
      "terms": {
        "field": "category",
        "order": "desc"
      }
    }
  }
]

Her kaynak bağımsız olarak asc veya desc sıralanabilir.

5.2 missing_bucket — Null Değerler

Varsayılan olarak, kaynak alanı olmayan dokümanlar atlanır. missing_bucket: true ile onlar da dahil edilir:

"sources": [
  {
    "marka": {
      "terms": {
        "field": "brand",
        "missing_bucket": true,
        "missing_order": "last"
      }
    }
  }
]

Markası olmayan dokümanlar null bucket'ta toplanır ve sona yerleştirilir.


6. Gerçek Dünya Senaryoları

6.1 Aylık Satış Raporu (Excel Export)

Tüm ayları, kategorileri ve bölgeleri içeren kapsamlı rapor:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "rapor": {
      "composite": {
        "size": 100,
        "sources": [
          {
            "ay": {
              "date_histogram": {
                "field": "date",
                "calendar_interval": "month",
                "format": "yyyy-MM"
              }
            }
          },
          { "kategori": { "terms": { "field": "category" } } }
        ]
      },
      "aggs": {
        "toplam_ciro": { "sum": { "field": "revenue" } },
        "toplam_siparis": { "sum": { "field": "orders" } },
        "ort_siparis_tutari": {
          "bucket_script": {
            "buckets_path": {
              "c": "toplam_ciro",
              "s": "toplam_siparis"
            },
            "script": "params.s > 0 ? params.c / params.s : 0"
          }
        }
      }
    }
  }
}

6.2 Bölge Bazlı Performans Raporu

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "bolge_rapor": {
      "composite": {
        "size": 50,
        "sources": [
          { "bolge": { "terms": { "field": "region" } } },
          { "kategori": { "terms": { "field": "category" } } }
        ]
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } },
        "siparis": { "sum": { "field": "orders" } },
        "ciro_stats": { "stats": { "field": "revenue" } }
      }
    }
  }
}

6.3 Zaman Serisi + Kategori Derinlemesine

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "trend_analiz": {
      "composite": {
        "size": 100,
        "sources": [
          {
            "ceyrek": {
              "date_histogram": {
                "field": "date",
                "calendar_interval": "quarter",
                "format": "yyyy-QQQ"
              }
            }
          },
          { "kategori": { "terms": { "field": "category" } } },
          { "bolge": { "terms": { "field": "region" } } }
        ]
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } },
        "siparis": { "sum": { "field": "orders" } }
      }
    }
  }
}

Çeyreklik × Kategori × Bölge matrisini oluşturur.


7. Composite ile Tam Veri İterasyonu

Tüm bucket'ları programatik olarak iterate etmek:

REST API

// İlk istek
GET sales/_search
{
  "size": 0,
  "aggs": {
    "tum_veriler": {
      "composite": {
        "size": 1000,
        "sources": [
          { "kategori": { "terms": { "field": "category" } } },
          { "marka": { "terms": { "field": "brand" } } }
        ]
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } }
      }
    }
  }
}

// Sonraki istekler — after_key ile
GET sales/_search
{
  "size": 0,
  "aggs": {
    "tum_veriler": {
      "composite": {
        "size": 1000,
        "sources": [
          { "kategori": { "terms": { "field": "category" } } },
          { "marka": { "terms": { "field": "brand" } } }
        ],
        "after": { "kategori": "Kulaklık", "marka": "Sony" }
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } }
      }
    }
  }
}

8. Aggregation Performans Optimizasyonu

8.1 Genel İpuçları

// 1. size: 0 — doküman döndürme
"size": 0

// 2. Query ile filtreleme — aggregation'dan önce veriyi daralt
"query": {
  "bool": {
    "filter": [
      { "range": { "date": { "gte": "2024-01-01" } } },
      { "term": { "category": "Elektronik" } }
    ]
  }
}

// 3. _source: false — (zaten size: 0 ile gereksiz ama explicit olmak iyi)
"_source": false

8.2 Aggregation Cache

Elasticsearch aggregation sonuçlarını cache'ler. Cache'in çalışması için:

  • Sorgu değişmemeli

  • Index değişmemeli (yeni doküman eklenince cache invalid olur)

  • Shard request cache aktif olmalı (varsayılan: aktif)

// Cache'i zorunlu kullan
GET sales/_search?request_cache=true
{
  "size": 0,
  "aggs": { ... }
}

8.3 Execution Hint — terms Aggregation

GET sales/_search
{
  "size": 0,
  "aggs": {
    "markalar": {
      "terms": {
        "field": "brand",
        "execution_hint": "map"
      }
    }
  }
}
execution_hintDavranışUygun
(otomatik)Elasticsearch karar verirÇoğu durum
mapHashMap kullanırAz sayıda benzersiz değer
global_ordinalsOrdinal mapping kullanırÇok sayıda benzersiz değer (varsayılan)

8.4 Sampled Aggregation — Örnekleme

Çok büyük veri setlerinde yaklaşık sonuç için:

GET sales/_search
{
  "size": 0,
  "aggs": {
    "orneklem": {
      "sampler": {
        "shard_size": 200
      },
      "aggs": {
        "top_markalar": {
          "terms": { "field": "brand" }
        }
      }
    }
  }
}

Her shard'dan sadece 200 doküman örneğiyle aggregation yapar — büyük veri setlerinde çok hızlı.


9. Raporlama Mimarisi: Aggregation vs Pre-computed

Real-time Aggregation

Kullanıcı İsteği → Elasticsearch Query + Aggregation → Sonuç

✅ Her zaman güncel ❌ Büyük veri setlerinde yavaş olabilir

Pre-computed (Transform)

Elasticsearch Transform → Özet Index → Kullanıcı İsteği → Basit Query

Elasticsearch Transform API ile periyodik olarak aggregation sonuçlarını ayrı bir index'e yazabilirsiniz:

PUT _transform/monthly_sales_summary
{
  "source": {
    "index": "monthly_sales"
  },
  "dest": {
    "index": "monthly_sales_report"
  },
  "pivot": {
    "group_by": {
      "month": {
        "date_histogram": {
          "field": "date",
          "calendar_interval": "month"
        }
      },
      "category": {
        "terms": { "field": "category" }
      },
      "region": {
        "terms": { "field": "region" }
      }
    },
    "aggregations": {
      "total_revenue": { "sum": { "field": "revenue" } },
      "total_orders": { "sum": { "field": "orders" } },
      "avg_revenue": { "avg": { "field": "revenue" } }
    }
  },
  "frequency": "1h",
  "sync": {
    "time": {
      "field": "date",
      "delay": "60s"
    }
  }
}

// Transform'u başlat
POST _transform/monthly_sales_summary/_start

Transform sonucu monthly_sales_report index'inde pre-computed veriler hazır olur — dashboard sorguları milisaniyeler içinde döner.


10. Java ile Composite Aggregation — Tam İterasyon

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.*;

import java.util.HashMap;
import java.util.Map;

public class CompositeAggregationJava {

    public static void iterateAllBuckets(ElasticsearchClient client) throws Exception {
        Map<String, String> afterKey = null;
        int totalBuckets = 0;

        while (true) {
            final Map<String, String> currentAfterKey = afterKey;

            SearchResponse<Void> response = client.search(s -> s
                .index("sales")
                .size(0)
                .aggregations("rapor", a -> {
                    var composite = CompositeAggregation.of(c -> {
                        c.size(100)
                         .sources(
                             Map.of("kategori", CompositeAggregationSource.of(
                                 src -> src.terms(t -> t.field("category"))
                             )),
                             Map.of("marka", CompositeAggregationSource.of(
                                 src -> src.terms(t -> t.field("brand"))
                             ))
                         );
                        if (currentAfterKey != null) {
                            c.after(currentAfterKey);
                        }
                        return c;
                    });

                    return Aggregation.of(agg -> agg
                        .composite(composite)
                        .aggregations("ciro", sub -> sub
                            .sum(su -> su.field("revenue"))
                        )
                    );
                }),
                Void.class
            );

            var compositeResult = response.aggregations()
                .get("rapor").composite();

            var buckets = compositeResult.buckets().array();
            if (buckets.isEmpty()) break;

            for (var bucket : buckets) {
                totalBuckets++;
                String kategori = bucket.key().get("kategori").stringValue();
                String marka = bucket.key().get("marka").stringValue();
                double ciro = bucket.aggregations().get("ciro").sum().value();

                System.out.printf("%-15s | %-10s | Adet: %d | Ciro: ₺%.2f%n",
                    kategori, marka, bucket.docCount(), ciro);
            }

            // after_key'i al
            var nextAfterKey = compositeResult.afterKey();
            if (nextAfterKey == null || nextAfterKey.isEmpty()) break;

            afterKey = new HashMap<>();
            nextAfterKey.forEach((k, v) -> afterKey.put(k, v.toString()));
        }

        System.out.printf("%nToplam %d bucket iterate edildi.%n", totalBuckets);
    }

    // Dashboard raporu — multi-aggregation
    public static void dashboardReport(ElasticsearchClient client) throws Exception {
        SearchResponse<Void> response = client.search(s -> s
            .index("monthly_sales")
            .size(0)
            // Aylık trend (date_histogram)
            .aggregations("aylik_trend", a -> a
                .dateHistogram(dh -> dh
                    .field("date")
                    .calendarInterval(CalendarInterval.Month)
                    .format("yyyy-MM")
                )
                .aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
                .aggregations("siparis", sub -> sub.sum(su -> su.field("orders")))
            )
            // Kategori dağılımı
            .aggregations("kategoriler", a -> a
                .terms(t -> t.field("category").size(10))
                .aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
            )
            // Bölge performansı
            .aggregations("bolgeler", a -> a
                .terms(t -> t.field("region").size(10)
                    .order(Map.of("ciro", SortOrder.Desc)))
                .aggregations("ciro", sub -> sub.sum(su -> su.field("revenue")))
            )
            // Genel metrikler
            .aggregations("genel_ciro", a -> a.sum(su -> su.field("revenue")))
            .aggregations("genel_siparis", a -> a.sum(su -> su.field("orders")))
            .aggregations("benzersiz_bolge", a -> a.cardinality(c -> c.field("region"))),
            Void.class
        );

        // Sonuçları yazdır
        System.out.println("=== DASHBOARD RAPORU ===\n");

        // Genel metrikler
        double toplamCiro = response.aggregations().get("genel_ciro").sum().value();
        double toplamSiparis = response.aggregations().get("genel_siparis").sum().value();
        long benzersizBolge = response.aggregations()
            .get("benzersiz_bolge").cardinality().value();

        System.out.printf("Toplam Ciro: ₺%,.0f%n", toplamCiro);
        System.out.printf("Toplam Sipariş: %,.0f%n", toplamSiparis);
        System.out.printf("Aktif Bölge: %d%n%n", benzersizBolge);

        // Aylık trend
        System.out.println("--- Aylık Trend ---");
        var aylar = response.aggregations()
            .get("aylik_trend").dateHistogram().buckets().array();
        for (var ay : aylar) {
            System.out.printf("  %s | Ciro: ₺%,.0f | Sipariş: %,.0f%n",
                ay.keyAsString(),
                ay.aggregations().get("ciro").sum().value(),
                ay.aggregations().get("siparis").sum().value());
        }

        // Kategori dağılımı
        System.out.println("\n--- Kategori Dağılımı ---");
        var kategoriler = response.aggregations()
            .get("kategoriler").sterms().buckets().array();
        for (var kat : kategoriler) {
            System.out.printf("  %-15s | Ciro: ₺%,.0f%n",
                kat.key().stringValue(),
                kat.aggregations().get("ciro").sum().value());
        }

        // Bölge performansı
        System.out.println("\n--- Bölge Performansı ---");
        var bolgeler = response.aggregations()
            .get("bolgeler").sterms().buckets().array();
        for (var bolge : bolgeler) {
            System.out.printf("  %-15s | Ciro: ₺%,.0f%n",
                bolge.key().stringValue(),
                bolge.aggregations().get("ciro").sum().value());
        }
    }
}

11. Best Practices

✅ Yapın

UygulamaNeden
Çok sayıda benzersiz değer varsa composite kullanınBellek-güvenli pagination
Composite size'ı 500-5000 arası tutunOptimal performans
Dashboard sorgularını cache'leyinSık tekrarlanan sorgular hızlanır
Büyük raporlar için Transform API kullanınPre-computed sonuçlar, milisaniye yanıt
Query filter ile veri kümesini daraltınAggregation öncesi filtreleme performansı artırır

❌ Yapmayın

UygulamaNeden
terms size > 10.000 kullanmayınBellek sorunu; composite tercih edin
5+ kaynaklı composite oluşturmayınBucket sayısı katlanarak artar
Aggregation sonuçlarını client-side birleştirmeyinElasticsearch'te yapın
Her istekte Transform oluşturmayınPeriyodik çalışmak için tasarlanmış

12. Yaygın Hatalar

Hata 1: Composite'te Sub-aggregation Sınırlaması

// ❌ Composite içinde pipeline aggregation desteklenmez (ES 8.x'te kısmen eklendi)
"composite": {
  "sources": [...],
},
"aggs": {
  "ciro": { "sum": { "field": "revenue" } },
  "degisim": { "derivative": { "buckets_path": "ciro" } }  // ❌ Hata verebilir
}

// ✅ Pipeline hesaplamasını client-side yapın

Hata 2: after_key Yanlış Kullanımı

// ❌ after_key formatı sources ile eşleşmeli
"sources": [
  { "kategori": { "terms": { "field": "category" } } },
  { "marka": { "terms": { "field": "brand" } } }
]
"after": { "kategori": "Telefon" }  // ❌ "marka" eksik!

// ✅ Tüm kaynaklar dahil
"after": { "kategori": "Telefon", "marka": "Samsung" }

Hata 3: Composite'te Sıralama Sınırlaması

// ❌ Sub-aggregation'a göre sıralama YAPILAMAZ
"composite": {
  "sources": [
    { "kategori": { "terms": { "field": "category" } } }
  ]
}
// "order": { "toplam_ciro": "desc" }  — BU YOK!

// ✅ Client-side sıralama yapın veya bucket_sort kullanmayın
// Composite sadece kaynak alanlarına göre sıralar

13. Aggregation Seçim Rehberi

Hangi aggregation'ı kullanmalıyım?
│
├── Tek sayısal değer mi istiyorum? → Metric Aggregation
│   ├── Ortalama → avg
│   ├── Toplam → sum
│   ├── Min/Max → min, max
│   ├── Hepsi birden → stats
│   ├── Benzersiz sayı → cardinality
│   └── Yüzdelik → percentiles
│
├── Gruplamak mı istiyorum? → Bucket Aggregation
│   ├── Kategorik alan → terms
│   ├── Sayısal aralık → histogram / range
│   ├── Zaman serisi → date_histogram
│   ├── Çok fazla benzersiz değer → composite
│   └── Koşullu gruplama → filter / filters
│
├── Bucket'lar arası hesaplama mı? → Pipeline Aggregation
│   ├── Değişim miktarı → derivative
│   ├── Kümülatif toplam → cumulative_sum
│   ├── Hareketli ortalama → moving_fn
│   ├── Bucket filtresi → bucket_selector
│   └── Bucket hesaplaması → bucket_script
│
└── Büyük veri raporu mu? → Transform API + Composite

Özet

  • Composite aggregation birden fazla kaynağı birleştirir ve sayfalayarak tüm bucket'ları getirir — bellek-güvenli

  • after_key ile cursor-based pagination yapılır — terms'in size sınırlaması yoktur

  • Kaynaklar terms, histogram ve date_histogram olabilir — karma kullanım desteklenir

  • missing_bucket: true ile null değerli dokümanlar da dahil edilir

  • Transform API periyodik olarak aggregation sonuçlarını pre-compute eder — dashboard performansını dramatik artırır

  • Aggregation performansı için: size: 0, query filter, request cache, uygun execution hint

  • Composite aggregation pipeline aggregation desteği sınırlıdır — karmaşık hesaplamalar client-side yapılmalıdır

  • Aggregation seçimi: metrik → metric agg, gruplama → bucket agg, bucket arası → pipeline agg, büyük veri → composite + transform