Pipeline Aggregations — derivative, moving_avg, cumulative_sum
Giriş — Muhasebecinin Defteri
Bir muhasebeci düşünün. Aylık ciro rakamlarını yan yana yazar, ama sadece rakamları yazmaz: "Bu ay geçen aya göre ne kadar arttı?", "Son 3 ayın ortalaması ne?", "Yılın başından bugüne toplam ne kadar kazandık?" sorularını da yanıtlar. Bu hesaplamalar ham rakamlara değil, önceden hesaplanmış sonuçlara dayanır.
Elasticsearch'ün pipeline aggregation'ları tam olarak bunu yapar: diğer aggregation'ların çıktılarını girdi olarak kullanır. Bir bucket aggregation'dan (örneğin aylık satışlar) çıkan sonuçları alır ve üzerinde türev, kümülatif toplam, hareketli ortalama gibi ileri hesaplamalar yapar.
1. Pipeline Aggregation Nedir?
Pipeline aggregation'lar iki önemli farkla diğer aggregation'lardan ayrılır:
Girdi olarak dokümanları değil, diğer aggregation sonuçlarını kullanır
Bucket'lar arası hesaplama yapar (önceki bucket'ın değerine göre)
İki Tip Pipeline Aggregation
| Tip | Açıklama | Örnek |
|---|---|---|
| Parent | Sibling aggregation'ın sonucunu alır, parent bucket'a ekler | avg_bucket, sum_bucket, max_bucket |
| Sibling | Aynı seviyedeki bucket'lar arası hesaplama yapar | derivative, cumulative_sum, moving_avg |
buckets_path — Yol Gösterici
Pipeline aggregation'lar buckets_path parametresi ile hedef aggregation'ı belirtir:
"buckets_path": "aylik_satislar>toplam_ciro"aylik_satislar→ bucket aggregation adı>→ alt aggregation ayracıtoplam_ciro→ metric aggregation adı
2. Test Verisi
PUT monthly_sales
{
"mappings": {
"properties": {
"revenue": { "type": "float" },
"orders": { "type": "integer" },
"category": { "type": "keyword" },
"region": { "type": "keyword" },
"date": { "type": "date" }
}
}
}
POST monthly_sales/_bulk
{"index":{}}
{"revenue":120000,"orders":45,"category":"Elektronik","region":"İstanbul","date":"2024-01-15"}
{"index":{}}
{"revenue":85000,"orders":30,"category":"Giyim","region":"Ankara","date":"2024-01-20"}
{"index":{}}
{"revenue":150000,"orders":55,"category":"Elektronik","region":"İstanbul","date":"2024-02-10"}
{"index":{}}
{"revenue":92000,"orders":35,"category":"Giyim","region":"İzmir","date":"2024-02-15"}
{"index":{}}
{"revenue":180000,"orders":65,"category":"Elektronik","region":"Ankara","date":"2024-03-05"}
{"index":{}}
{"revenue":110000,"orders":40,"category":"Giyim","region":"İstanbul","date":"2024-03-20"}
{"index":{}}
{"revenue":165000,"orders":60,"category":"Elektronik","region":"İzmir","date":"2024-04-10"}
{"index":{}}
{"revenue":95000,"orders":38,"category":"Giyim","region":"Ankara","date":"2024-04-25"}
{"index":{}}
{"revenue":200000,"orders":72,"category":"Elektronik","region":"İstanbul","date":"2024-05-08"}
{"index":{}}
{"revenue":125000,"orders":48,"category":"Giyim","region":"İstanbul","date":"2024-05-22"}
{"index":{}}
{"revenue":175000,"orders":62,"category":"Elektronik","region":"Ankara","date":"2024-06-12"}
{"index":{}}
{"revenue":108000,"orders":42,"category":"Giyim","region":"İzmir","date":"2024-06-28"}
{"index":{}}
{"revenue":210000,"orders":78,"category":"Elektronik","region":"İstanbul","date":"2024-07-15"}
{"index":{}}
{"revenue":130000,"orders":50,"category":"Giyim","region":"Ankara","date":"2024-07-20"}
{"index":{}}
{"revenue":195000,"orders":70,"category":"Elektronik","region":"İzmir","date":"2024-08-10"}
{"index":{}}
{"revenue":115000,"orders":44,"category":"Giyim","region":"İstanbul","date":"2024-08-25"}
{"index":{}}
{"revenue":220000,"orders":80,"category":"Elektronik","region":"Ankara","date":"2024-09-05"}
{"index":{}}
{"revenue":135000,"orders":52,"category":"Giyim","region":"İzmir","date":"2024-09-18"}3. derivative — Türev (Değişim Miktarı)
Ardışık bucket'lar arasındaki farkı hesaplar. "Bu ay geçen aya göre ne kadar değişti?"
3.1 Basit Türev
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
},
"ciro_degisim": {
"derivative": {
"buckets_path": "toplam_ciro"
}
}
}
}
}
}Yanıt (basitleştirilmiş):
{
"buckets": [
{ "key_as_string": "2024-01", "toplam_ciro": { "value": 205000 }, "ciro_degisim": null },
{ "key_as_string": "2024-02", "toplam_ciro": { "value": 242000 }, "ciro_degisim": { "value": 37000 } },
{ "key_as_string": "2024-03", "toplam_ciro": { "value": 290000 }, "ciro_degisim": { "value": 48000 } },
{ "key_as_string": "2024-04", "toplam_ciro": { "value": 260000 }, "ciro_degisim": { "value": -30000 } }
]
}İlk bucket'ın türevi
null— önceki bucket yokŞubat: 242000 - 205000 = +37000 (büyüme)
Nisan: 260000 - 290000 = -30000 (düşüş)
3.2 İkinci Türev — Hızlanma
Türevin türevi = değişimin hızı:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
},
"ciro_degisim": {
"derivative": {
"buckets_path": "toplam_ciro"
}
},
"degisim_hizi": {
"derivative": {
"buckets_path": "ciro_degisim"
}
}
}
}
}
}İkinci türev pozitifse büyüme hızlanıyor, negatifse yavaşlıyor.
3.3 unit Parametresi — Birim Başına Değişim
"ciro_degisim": {
"derivative": {
"buckets_path": "toplam_ciro",
"unit": "day"
}
}Aylık aralıklarda unit: "day" ile günlük değişim oranı hesaplanır.
4. cumulative_sum — Kümülatif Toplam
Yılın başından itibaren birikimli toplam — "Year-To-Date (YTD)" hesaplaması:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"aylik_ciro": {
"sum": { "field": "revenue" }
},
"kumulatif_ciro": {
"cumulative_sum": {
"buckets_path": "aylik_ciro"
}
}
}
}
}
}Yanıt:
{
"buckets": [
{ "key_as_string": "2024-01", "aylik_ciro": { "value": 205000 }, "kumulatif_ciro": { "value": 205000 } },
{ "key_as_string": "2024-02", "aylik_ciro": { "value": 242000 }, "kumulatif_ciro": { "value": 447000 } },
{ "key_as_string": "2024-03", "aylik_ciro": { "value": 290000 }, "kumulatif_ciro": { "value": 737000 } },
{ "key_as_string": "2024-04", "aylik_ciro": { "value": 260000 }, "kumulatif_ciro": { "value": 997000 } }
]
}Her ay önceki ayların toplamına eklenir — yılın sonunda toplam ciro grafiğini çizersiniz.
5. cumulative_cardinality — Kümülatif Benzersiz Sayı
Yeni müşteri kazanım trendini görmek için:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"benzersiz_bolge": {
"cardinality": { "field": "region" }
},
"kumulatif_bolge": {
"cumulative_cardinality": {
"buckets_path": "benzersiz_bolge"
}
}
}
}
}
}Her ay kaç yeni bölge eklendiğini görürsünüz. Kümülatif sayı her zaman artar veya sabit kalır.
6. moving_avg — Hareketli Ortalama (Kullanımdan Kaldırıldı, Alternatifleri)
⚠️ Not: moving_avg aggregation Elasticsearch 8.x'te kullanımdan kaldırılmıştır. Yerine moving_fn (moving function) kullanılır.
6.1 moving_fn — Hareketli Fonksiyon
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"aylik_ciro": {
"sum": { "field": "revenue" }
},
"hareketli_ort_3ay": {
"moving_fn": {
"buckets_path": "aylik_ciro",
"window": 3,
"script": "MovingFunctions.unweightedAvg(values)"
}
}
}
}
}
}window: 3 son 3 ayın ortalamasını alır. Verinin dalgalanmalarını yumuşatır.
6.2 Farklı Hareketli Fonksiyonlar
// Basit hareketli ortalama
"script": "MovingFunctions.unweightedAvg(values)"
// Doğrusal ağırlıklı hareketli ortalama
"script": "MovingFunctions.linearWeightedAvg(values)"
// Üstel ağırlıklı hareketli ortalama
"script": "MovingFunctions.ewma(values, 0.3)"
// Minimum
"script": "MovingFunctions.min(values)"
// Maksimum
"script": "MovingFunctions.max(values)"
// Toplam
"script": "MovingFunctions.sum(values)"
// Standart sapma
"script": "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))"6.3 Hareketli Ortalama ile Trend Analizi
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"ciro": {
"sum": { "field": "revenue" }
},
"hareketli_ort": {
"moving_fn": {
"buckets_path": "ciro",
"window": 3,
"script": "MovingFunctions.unweightedAvg(values)"
}
},
"trend": {
"derivative": {
"buckets_path": "hareketli_ort"
}
}
}
}
}
}Hareketli ortalamanın türevi = trend yönü. Pozitif ise yükseliş trendi, negatif ise düşüş trendi.
7. bucket_sort — Bucket Sıralama
Bucket'ları belirli bir metriğe göre sıralamak veya sayısını sınırlamak:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
},
"en_iyi_3_ay": {
"bucket_sort": {
"sort": [
{ "toplam_ciro": { "order": "desc" } }
],
"size": 3
}
}
}
}
}
}En yüksek cirolu 3 ay döner.
from Parametresi — Pagination
"en_iyi_aylar": {
"bucket_sort": {
"sort": [{ "toplam_ciro": { "order": "desc" } }],
"from": 3,
"size": 3
}
}4-6. sıradaki aylar döner (ilk 3'ü atla).
8. bucket_selector — Koşullu Filtreleme
Belirli bir koşulu sağlamayan bucket'ları filtreler:
// Sadece cirosu 250.000'den fazla olan aylar
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
},
"yuksek_ciro_filtre": {
"bucket_selector": {
"buckets_path": {
"ciro": "toplam_ciro"
},
"script": "params.ciro > 250000"
}
}
}
}
}
}Çoklu Koşul
"bucket_selector": {
"buckets_path": {
"ciro": "toplam_ciro",
"siparis": "toplam_siparis"
},
"script": "params.ciro > 200000 && params.siparis > 50"
}9. bucket_script — Bucket Üzerinde Hesaplama
İki metrik arasında hesaplama yaparak yeni bir değer üretir:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
},
"toplam_siparis": {
"sum": { "field": "orders" }
},
"siparis_basi_ciro": {
"bucket_script": {
"buckets_path": {
"ciro": "toplam_ciro",
"siparis": "toplam_siparis"
},
"script": "params.ciro / params.siparis"
}
}
}
}
}
}Her ayın sipariş başına ortalama cirosu hesaplanır.
10. Parent Pipeline Aggregation'lar
Tüm bucket'lar üzerinde tek bir değer hesaplayan pipeline'lar:
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"toplam_ciro": {
"sum": { "field": "revenue" }
}
}
},
"en_yuksek_ciro_ayi": {
"max_bucket": {
"buckets_path": "aylik>toplam_ciro"
}
},
"en_dusuk_ciro_ayi": {
"min_bucket": {
"buckets_path": "aylik>toplam_ciro"
}
},
"ortalama_aylik_ciro": {
"avg_bucket": {
"buckets_path": "aylik>toplam_ciro"
}
},
"toplam_yillik_ciro": {
"sum_bucket": {
"buckets_path": "aylik>toplam_ciro"
}
}
}
}Yanıt:
{
"en_yuksek_ciro_ayi": { "value": 355000, "keys": ["2024-09-01T00:00:00.000Z"] },
"en_dusuk_ciro_ayi": { "value": 205000, "keys": ["2024-01-01T00:00:00.000Z"] },
"ortalama_aylik_ciro": { "value": 280555.56 },
"toplam_yillik_ciro": { "value": 2525000 }
}max_bucket hem değeri hem hangi bucket'a ait olduğunu (keys) döndürür.
11. Gerçek Dünya: Satış Performans Dashboard'u
GET monthly_sales/_search
{
"size": 0,
"aggs": {
"aylik_trend": {
"date_histogram": {
"field": "date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"ciro": { "sum": { "field": "revenue" } },
"siparis": { "sum": { "field": "orders" } },
"kumulatif_ciro": {
"cumulative_sum": { "buckets_path": "ciro" }
},
"aylik_degisim": {
"derivative": { "buckets_path": "ciro" }
},
"hareketli_ort_3ay": {
"moving_fn": {
"buckets_path": "ciro",
"window": 3,
"script": "MovingFunctions.unweightedAvg(values)"
}
},
"siparis_basi_ciro": {
"bucket_script": {
"buckets_path": {
"c": "ciro",
"s": "siparis"
},
"script": "params.c / params.s"
}
}
}
},
"en_iyi_ay": {
"max_bucket": { "buckets_path": "aylik_trend>ciro" }
},
"ort_aylik": {
"avg_bucket": { "buckets_path": "aylik_trend>ciro" }
}
}
}Tek sorguda: aylık ciro trendi, kümülatif toplam, aylık değişim, 3 aylık hareketli ortalama, sipariş başına ciro, en iyi ay ve ortalama aylık ciro.
12. Java ile Pipeline Aggregations
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.*;
public class PipelineAggregationJava {
public static void salesDashboard(ElasticsearchClient client) throws Exception {
SearchResponse<Void> response = client.search(s -> s
.index("monthly_sales")
.size(0)
.aggregations("aylik", a -> a
.dateHistogram(dh -> dh
.field("date")
.calendarInterval(CalendarInterval.Month)
.format("yyyy-MM")
)
.aggregations("ciro", sub -> sub
.sum(su -> su.field("revenue"))
)
.aggregations("kumulatif", sub -> sub
.cumulativeSum(cs -> cs.bucketsPath("ciro"))
)
.aggregations("degisim", sub -> sub
.derivative(d -> d.bucketsPath("ciro"))
)
.aggregations("hareketli_ort", sub -> sub
.movingFn(mf -> mf
.bucketsPath("ciro")
.window(3)
.script("MovingFunctions.unweightedAvg(values)")
)
)
)
.aggregations("en_iyi_ay", a -> a
.maxBucket(mb -> mb.bucketsPath("aylik>ciro"))
)
.aggregations("ort_aylik", a -> a
.avgBucket(ab -> ab.bucketsPath("aylik>ciro"))
),
Void.class
);
// Aylık trend
var buckets = response.aggregations()
.get("aylik").dateHistogram().buckets().array();
System.out.println("=== Aylık Satış Trendi ===");
System.out.printf("%-10s | %15s | %15s | %15s | %15s%n",
"Ay", "Ciro", "Kümülatif", "Değişim", "Hareketli Ort.");
for (var bucket : buckets) {
double ciro = bucket.aggregations().get("ciro").sum().value();
double kumulatif = bucket.aggregations().get("kumulatif")
.cumulativeSum().value();
var degisim = bucket.aggregations().get("degisim").derivative();
String degisimStr = degisim.value() != null ?
String.format("%.0f", degisim.value()) : "—";
var hareketliOrt = bucket.aggregations().get("hareketli_ort")
.simpleValue();
String hareketliStr = hareketliOrt.value() != 0 ?
String.format("%.0f", hareketliOrt.value()) : "—";
System.out.printf("%-10s | %,15.0f | %,15.0f | %15s | %15s%n",
bucket.keyAsString(), ciro, kumulatif, degisimStr, hareketliStr);
}
// En iyi ay
var enIyi = response.aggregations().get("en_iyi_ay").maxBucket();
System.out.printf("%nEn iyi ay: %s (₺%.0f)%n",
enIyi.keys().get(0), enIyi.value());
// Ortalama
double ortAylik = response.aggregations()
.get("ort_aylik").avgBucket().value();
System.out.printf("Ortalama aylık ciro: ₺%.0f%n", ortAylik);
}
}13. Best Practices
✅ Yapın
| Uygulama | Neden |
|---|---|
buckets_path'i doğru belirtin | Yanlış path hata verir |
| Pipeline'ı date_histogram ile kullanın | Zaman serisi analizi doğal iş ortağı |
bucket_selector ile anlamsız bucket'ları filtreleyin | Sonuç kalitesini artırır |
gap_policy belirtin | Boş bucket'lar pipeline'ı bozabilir |
Hareketli ortalama window'unu verinize göre ayarlayın | Çok küçük = gürültülü, çok büyük = trendi geciktirir |
❌ Yapmayın
| Uygulama | Neden |
|---|---|
| Pipeline'ları 5+ seviye iç içe yerleştirmeyin | Karmaşıklık ve performans |
| Derivative'i düzensiz aralıklarda kullanmayın | Aralıklar eşit olmalı (date_histogram ideal) |
gap_policy: insert_zeros'u düşünmeden kullanmayın | Türev hesaplamalarını bozabilir |
14. Yaygın Hatalar
Hata 1: buckets_path Yanlış Yazımı
// ❌ Yanlış path ayracı (. yerine >)
"buckets_path": "aylik.toplam_ciro"
// ✅ > ayracı kullanın
"buckets_path": "aylik>toplam_ciro"
// Aynı seviyede ise doğrudan isim
"buckets_path": "toplam_ciro"Hata 2: gap_policy — Boş Bucket Yönetimi
Bazı aylarda veri yoksa pipeline aggregation hata verebilir:
"derivative": {
"buckets_path": "toplam_ciro",
"gap_policy": "skip" // Boş bucket'ları atla (varsayılan)
// veya "insert_zeros" — 0 olarak kabul et
// veya "keep_values" — önceki değeri koru
}| gap_policy | Davranış |
|---|---|
skip (varsayılan) | Boş bucket'ı atlar, sonraki bucket'la hesaplar |
insert_zeros | Boş bucket'a 0 yerleştirir |
keep_values | Önceki bucket'ın değerini korur |
Hata 3: Pipeline Aggregation'ı Yanlış Seviyeye Koymak
// ❌ Pipeline, bucket aggregation'ın DIŞINDA tanımlanmalı (parent pipeline)
// veya İÇİNDE (sibling pipeline)
// Parent pipeline — dışarıda
"aggs": {
"aylik": { "date_histogram": { ... }, "aggs": { "ciro": { "sum": { ... } } } },
"en_iyi": { "max_bucket": { "buckets_path": "aylik>ciro" } }
}
// Sibling pipeline — içeride
"aggs": {
"aylik": {
"date_histogram": { ... },
"aggs": {
"ciro": { "sum": { ... } },
"degisim": { "derivative": { "buckets_path": "ciro" } }
}
}
}Özet
Pipeline aggregation'lar diğer aggregation sonuçlarını girdi olarak kullanır — dokümanlarla değil, hesaplanmış metriklerle çalışır
`derivative` ardışık bucket'lar arası farkı hesaplar — büyüme/küçülme tespiti
`cumulative_sum` birikimli toplam hesaplar — YTD (Year-to-Date) raporları
`moving_fn` hareketli ortalama/fonksiyon hesaplar — trend analizi ve dalgalanma yumuşatma
`bucket_sort` bucket'ları sıralar ve limitler — "en iyi 3 ay" gibi sorgular
`bucket_selector` koşula göre bucket filtreler — "cirosu > 200K olan aylar"
`bucket_script` bucket metrikleri arası hesaplama yapar — "sipariş başına ciro"
Parent pipeline'lar (
max_bucket,avg_bucket) tüm bucket'lardan tek değer üretirbuckets_pathsöz diziminde>ayracı alt aggregation'ı belirtirgap_policyile boş bucket'ların pipeline hesabını nasıl etkileyeceğini kontrol edin
AI Asistan
Sorularını yanıtlamaya hazır