← Kursa Dön
📄 Text · 30 min

Pipeline Aggregations — derivative, moving_avg, cumulative_sum

Giriş — Muhasebecinin Defteri

Bir muhasebeci düşünün. Aylık ciro rakamlarını yan yana yazar, ama sadece rakamları yazmaz: "Bu ay geçen aya göre ne kadar arttı?", "Son 3 ayın ortalaması ne?", "Yılın başından bugüne toplam ne kadar kazandık?" sorularını da yanıtlar. Bu hesaplamalar ham rakamlara değil, önceden hesaplanmış sonuçlara dayanır.

Elasticsearch'ün pipeline aggregation'ları tam olarak bunu yapar: diğer aggregation'ların çıktılarını girdi olarak kullanır. Bir bucket aggregation'dan (örneğin aylık satışlar) çıkan sonuçları alır ve üzerinde türev, kümülatif toplam, hareketli ortalama gibi ileri hesaplamalar yapar.


1. Pipeline Aggregation Nedir?

Pipeline aggregation'lar iki önemli farkla diğer aggregation'lardan ayrılır:

  1. Girdi olarak dokümanları değil, diğer aggregation sonuçlarını kullanır

  2. Bucket'lar arası hesaplama yapar (önceki bucket'ın değerine göre)

İki Tip Pipeline Aggregation

TipAçıklamaÖrnek
ParentSibling aggregation'ın sonucunu alır, parent bucket'a ekleravg_bucket, sum_bucket, max_bucket
SiblingAynı seviyedeki bucket'lar arası hesaplama yaparderivative, cumulative_sum, moving_avg

buckets_path — Yol Gösterici

Pipeline aggregation'lar buckets_path parametresi ile hedef aggregation'ı belirtir:

"buckets_path": "aylik_satislar>toplam_ciro"
  • aylik_satislar → bucket aggregation adı

  • > → alt aggregation ayracı

  • toplam_ciro → metric aggregation adı


2. Test Verisi

PUT monthly_sales
{
  "mappings": {
    "properties": {
      "revenue": { "type": "float" },
      "orders": { "type": "integer" },
      "category": { "type": "keyword" },
      "region": { "type": "keyword" },
      "date": { "type": "date" }
    }
  }
}

POST monthly_sales/_bulk
{"index":{}}
{"revenue":120000,"orders":45,"category":"Elektronik","region":"İstanbul","date":"2024-01-15"}
{"index":{}}
{"revenue":85000,"orders":30,"category":"Giyim","region":"Ankara","date":"2024-01-20"}
{"index":{}}
{"revenue":150000,"orders":55,"category":"Elektronik","region":"İstanbul","date":"2024-02-10"}
{"index":{}}
{"revenue":92000,"orders":35,"category":"Giyim","region":"İzmir","date":"2024-02-15"}
{"index":{}}
{"revenue":180000,"orders":65,"category":"Elektronik","region":"Ankara","date":"2024-03-05"}
{"index":{}}
{"revenue":110000,"orders":40,"category":"Giyim","region":"İstanbul","date":"2024-03-20"}
{"index":{}}
{"revenue":165000,"orders":60,"category":"Elektronik","region":"İzmir","date":"2024-04-10"}
{"index":{}}
{"revenue":95000,"orders":38,"category":"Giyim","region":"Ankara","date":"2024-04-25"}
{"index":{}}
{"revenue":200000,"orders":72,"category":"Elektronik","region":"İstanbul","date":"2024-05-08"}
{"index":{}}
{"revenue":125000,"orders":48,"category":"Giyim","region":"İstanbul","date":"2024-05-22"}
{"index":{}}
{"revenue":175000,"orders":62,"category":"Elektronik","region":"Ankara","date":"2024-06-12"}
{"index":{}}
{"revenue":108000,"orders":42,"category":"Giyim","region":"İzmir","date":"2024-06-28"}
{"index":{}}
{"revenue":210000,"orders":78,"category":"Elektronik","region":"İstanbul","date":"2024-07-15"}
{"index":{}}
{"revenue":130000,"orders":50,"category":"Giyim","region":"Ankara","date":"2024-07-20"}
{"index":{}}
{"revenue":195000,"orders":70,"category":"Elektronik","region":"İzmir","date":"2024-08-10"}
{"index":{}}
{"revenue":115000,"orders":44,"category":"Giyim","region":"İstanbul","date":"2024-08-25"}
{"index":{}}
{"revenue":220000,"orders":80,"category":"Elektronik","region":"Ankara","date":"2024-09-05"}
{"index":{}}
{"revenue":135000,"orders":52,"category":"Giyim","region":"İzmir","date":"2024-09-18"}

3. derivative — Türev (Değişim Miktarı)

Ardışık bucket'lar arasındaki farkı hesaplar. "Bu ay geçen aya göre ne kadar değişti?"

3.1 Basit Türev

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        },
        "ciro_degisim": {
          "derivative": {
            "buckets_path": "toplam_ciro"
          }
        }
      }
    }
  }
}

Yanıt (basitleştirilmiş):

{
  "buckets": [
    { "key_as_string": "2024-01", "toplam_ciro": { "value": 205000 }, "ciro_degisim": null },
    { "key_as_string": "2024-02", "toplam_ciro": { "value": 242000 }, "ciro_degisim": { "value": 37000 } },
    { "key_as_string": "2024-03", "toplam_ciro": { "value": 290000 }, "ciro_degisim": { "value": 48000 } },
    { "key_as_string": "2024-04", "toplam_ciro": { "value": 260000 }, "ciro_degisim": { "value": -30000 } }
  ]
}
  • İlk bucket'ın türevi null — önceki bucket yok

  • Şubat: 242000 - 205000 = +37000 (büyüme)

  • Nisan: 260000 - 290000 = -30000 (düşüş)

3.2 İkinci Türev — Hızlanma

Türevin türevi = değişimin hızı:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        },
        "ciro_degisim": {
          "derivative": {
            "buckets_path": "toplam_ciro"
          }
        },
        "degisim_hizi": {
          "derivative": {
            "buckets_path": "ciro_degisim"
          }
        }
      }
    }
  }
}

İkinci türev pozitifse büyüme hızlanıyor, negatifse yavaşlıyor.

3.3 unit Parametresi — Birim Başına Değişim

"ciro_degisim": {
  "derivative": {
    "buckets_path": "toplam_ciro",
    "unit": "day"
  }
}

Aylık aralıklarda unit: "day" ile günlük değişim oranı hesaplanır.


4. cumulative_sum — Kümülatif Toplam

Yılın başından itibaren birikimli toplam — "Year-To-Date (YTD)" hesaplaması:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "aylik_ciro": {
          "sum": { "field": "revenue" }
        },
        "kumulatif_ciro": {
          "cumulative_sum": {
            "buckets_path": "aylik_ciro"
          }
        }
      }
    }
  }
}

Yanıt:

{
  "buckets": [
    { "key_as_string": "2024-01", "aylik_ciro": { "value": 205000 }, "kumulatif_ciro": { "value": 205000 } },
    { "key_as_string": "2024-02", "aylik_ciro": { "value": 242000 }, "kumulatif_ciro": { "value": 447000 } },
    { "key_as_string": "2024-03", "aylik_ciro": { "value": 290000 }, "kumulatif_ciro": { "value": 737000 } },
    { "key_as_string": "2024-04", "aylik_ciro": { "value": 260000 }, "kumulatif_ciro": { "value": 997000 } }
  ]
}

Her ay önceki ayların toplamına eklenir — yılın sonunda toplam ciro grafiğini çizersiniz.


5. cumulative_cardinality — Kümülatif Benzersiz Sayı

Yeni müşteri kazanım trendini görmek için:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "benzersiz_bolge": {
          "cardinality": { "field": "region" }
        },
        "kumulatif_bolge": {
          "cumulative_cardinality": {
            "buckets_path": "benzersiz_bolge"
          }
        }
      }
    }
  }
}

Her ay kaç yeni bölge eklendiğini görürsünüz. Kümülatif sayı her zaman artar veya sabit kalır.


6. moving_avg — Hareketli Ortalama (Kullanımdan Kaldırıldı, Alternatifleri)

⚠️ Not: moving_avg aggregation Elasticsearch 8.x'te kullanımdan kaldırılmıştır. Yerine moving_fn (moving function) kullanılır.

6.1 moving_fn — Hareketli Fonksiyon

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "aylik_ciro": {
          "sum": { "field": "revenue" }
        },
        "hareketli_ort_3ay": {
          "moving_fn": {
            "buckets_path": "aylik_ciro",
            "window": 3,
            "script": "MovingFunctions.unweightedAvg(values)"
          }
        }
      }
    }
  }
}

window: 3 son 3 ayın ortalamasını alır. Verinin dalgalanmalarını yumuşatır.

6.2 Farklı Hareketli Fonksiyonlar

// Basit hareketli ortalama
"script": "MovingFunctions.unweightedAvg(values)"

// Doğrusal ağırlıklı hareketli ortalama
"script": "MovingFunctions.linearWeightedAvg(values)"

// Üstel ağırlıklı hareketli ortalama
"script": "MovingFunctions.ewma(values, 0.3)"

// Minimum
"script": "MovingFunctions.min(values)"

// Maksimum
"script": "MovingFunctions.max(values)"

// Toplam
"script": "MovingFunctions.sum(values)"

// Standart sapma
"script": "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))"

6.3 Hareketli Ortalama ile Trend Analizi

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "ciro": {
          "sum": { "field": "revenue" }
        },
        "hareketli_ort": {
          "moving_fn": {
            "buckets_path": "ciro",
            "window": 3,
            "script": "MovingFunctions.unweightedAvg(values)"
          }
        },
        "trend": {
          "derivative": {
            "buckets_path": "hareketli_ort"
          }
        }
      }
    }
  }
}

Hareketli ortalamanın türevi = trend yönü. Pozitif ise yükseliş trendi, negatif ise düşüş trendi.


7. bucket_sort — Bucket Sıralama

Bucket'ları belirli bir metriğe göre sıralamak veya sayısını sınırlamak:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        },
        "en_iyi_3_ay": {
          "bucket_sort": {
            "sort": [
              { "toplam_ciro": { "order": "desc" } }
            ],
            "size": 3
          }
        }
      }
    }
  }
}

En yüksek cirolu 3 ay döner.

from Parametresi — Pagination

"en_iyi_aylar": {
  "bucket_sort": {
    "sort": [{ "toplam_ciro": { "order": "desc" } }],
    "from": 3,
    "size": 3
  }
}

4-6. sıradaki aylar döner (ilk 3'ü atla).


8. bucket_selector — Koşullu Filtreleme

Belirli bir koşulu sağlamayan bucket'ları filtreler:

// Sadece cirosu 250.000'den fazla olan aylar
GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        },
        "yuksek_ciro_filtre": {
          "bucket_selector": {
            "buckets_path": {
              "ciro": "toplam_ciro"
            },
            "script": "params.ciro > 250000"
          }
        }
      }
    }
  }
}

Çoklu Koşul

"bucket_selector": {
  "buckets_path": {
    "ciro": "toplam_ciro",
    "siparis": "toplam_siparis"
  },
  "script": "params.ciro > 200000 && params.siparis > 50"
}

9. bucket_script — Bucket Üzerinde Hesaplama

İki metrik arasında hesaplama yaparak yeni bir değer üretir:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        },
        "toplam_siparis": {
          "sum": { "field": "orders" }
        },
        "siparis_basi_ciro": {
          "bucket_script": {
            "buckets_path": {
              "ciro": "toplam_ciro",
              "siparis": "toplam_siparis"
            },
            "script": "params.ciro / params.siparis"
          }
        }
      }
    }
  }
}

Her ayın sipariş başına ortalama cirosu hesaplanır.


10. Parent Pipeline Aggregation'lar

Tüm bucket'lar üzerinde tek bir değer hesaplayan pipeline'lar:

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "toplam_ciro": {
          "sum": { "field": "revenue" }
        }
      }
    },
    "en_yuksek_ciro_ayi": {
      "max_bucket": {
        "buckets_path": "aylik>toplam_ciro"
      }
    },
    "en_dusuk_ciro_ayi": {
      "min_bucket": {
        "buckets_path": "aylik>toplam_ciro"
      }
    },
    "ortalama_aylik_ciro": {
      "avg_bucket": {
        "buckets_path": "aylik>toplam_ciro"
      }
    },
    "toplam_yillik_ciro": {
      "sum_bucket": {
        "buckets_path": "aylik>toplam_ciro"
      }
    }
  }
}

Yanıt:

{
  "en_yuksek_ciro_ayi": { "value": 355000, "keys": ["2024-09-01T00:00:00.000Z"] },
  "en_dusuk_ciro_ayi": { "value": 205000, "keys": ["2024-01-01T00:00:00.000Z"] },
  "ortalama_aylik_ciro": { "value": 280555.56 },
  "toplam_yillik_ciro": { "value": 2525000 }
}

max_bucket hem değeri hem hangi bucket'a ait olduğunu (keys) döndürür.


11. Gerçek Dünya: Satış Performans Dashboard'u

GET monthly_sales/_search
{
  "size": 0,
  "aggs": {
    "aylik_trend": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "ciro": { "sum": { "field": "revenue" } },
        "siparis": { "sum": { "field": "orders" } },
        "kumulatif_ciro": {
          "cumulative_sum": { "buckets_path": "ciro" }
        },
        "aylik_degisim": {
          "derivative": { "buckets_path": "ciro" }
        },
        "hareketli_ort_3ay": {
          "moving_fn": {
            "buckets_path": "ciro",
            "window": 3,
            "script": "MovingFunctions.unweightedAvg(values)"
          }
        },
        "siparis_basi_ciro": {
          "bucket_script": {
            "buckets_path": {
              "c": "ciro",
              "s": "siparis"
            },
            "script": "params.c / params.s"
          }
        }
      }
    },
    "en_iyi_ay": {
      "max_bucket": { "buckets_path": "aylik_trend>ciro" }
    },
    "ort_aylik": {
      "avg_bucket": { "buckets_path": "aylik_trend>ciro" }
    }
  }
}

Tek sorguda: aylık ciro trendi, kümülatif toplam, aylık değişim, 3 aylık hareketli ortalama, sipariş başına ciro, en iyi ay ve ortalama aylık ciro.


12. Java ile Pipeline Aggregations

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.*;

public class PipelineAggregationJava {

    public static void salesDashboard(ElasticsearchClient client) throws Exception {
        SearchResponse<Void> response = client.search(s -> s
            .index("monthly_sales")
            .size(0)
            .aggregations("aylik", a -> a
                .dateHistogram(dh -> dh
                    .field("date")
                    .calendarInterval(CalendarInterval.Month)
                    .format("yyyy-MM")
                )
                .aggregations("ciro", sub -> sub
                    .sum(su -> su.field("revenue"))
                )
                .aggregations("kumulatif", sub -> sub
                    .cumulativeSum(cs -> cs.bucketsPath("ciro"))
                )
                .aggregations("degisim", sub -> sub
                    .derivative(d -> d.bucketsPath("ciro"))
                )
                .aggregations("hareketli_ort", sub -> sub
                    .movingFn(mf -> mf
                        .bucketsPath("ciro")
                        .window(3)
                        .script("MovingFunctions.unweightedAvg(values)")
                    )
                )
            )
            .aggregations("en_iyi_ay", a -> a
                .maxBucket(mb -> mb.bucketsPath("aylik>ciro"))
            )
            .aggregations("ort_aylik", a -> a
                .avgBucket(ab -> ab.bucketsPath("aylik>ciro"))
            ),
            Void.class
        );

        // Aylık trend
        var buckets = response.aggregations()
            .get("aylik").dateHistogram().buckets().array();

        System.out.println("=== Aylık Satış Trendi ===");
        System.out.printf("%-10s | %15s | %15s | %15s | %15s%n",
            "Ay", "Ciro", "Kümülatif", "Değişim", "Hareketli Ort.");

        for (var bucket : buckets) {
            double ciro = bucket.aggregations().get("ciro").sum().value();
            double kumulatif = bucket.aggregations().get("kumulatif")
                .cumulativeSum().value();

            var degisim = bucket.aggregations().get("degisim").derivative();
            String degisimStr = degisim.value() != null ?
                String.format("%.0f", degisim.value()) : "—";

            var hareketliOrt = bucket.aggregations().get("hareketli_ort")
                .simpleValue();
            String hareketliStr = hareketliOrt.value() != 0 ?
                String.format("%.0f", hareketliOrt.value()) : "—";

            System.out.printf("%-10s | %,15.0f | %,15.0f | %15s | %15s%n",
                bucket.keyAsString(), ciro, kumulatif, degisimStr, hareketliStr);
        }

        // En iyi ay
        var enIyi = response.aggregations().get("en_iyi_ay").maxBucket();
        System.out.printf("%nEn iyi ay: %s (₺%.0f)%n",
            enIyi.keys().get(0), enIyi.value());

        // Ortalama
        double ortAylik = response.aggregations()
            .get("ort_aylik").avgBucket().value();
        System.out.printf("Ortalama aylık ciro: ₺%.0f%n", ortAylik);
    }
}

13. Best Practices

✅ Yapın

UygulamaNeden
buckets_path'i doğru belirtinYanlış path hata verir
Pipeline'ı date_histogram ile kullanınZaman serisi analizi doğal iş ortağı
bucket_selector ile anlamsız bucket'ları filtreleyinSonuç kalitesini artırır
gap_policy belirtinBoş bucket'lar pipeline'ı bozabilir
Hareketli ortalama window'unu verinize göre ayarlayınÇok küçük = gürültülü, çok büyük = trendi geciktirir

❌ Yapmayın

UygulamaNeden
Pipeline'ları 5+ seviye iç içe yerleştirmeyinKarmaşıklık ve performans
Derivative'i düzensiz aralıklarda kullanmayınAralıklar eşit olmalı (date_histogram ideal)
gap_policy: insert_zeros'u düşünmeden kullanmayınTürev hesaplamalarını bozabilir

14. Yaygın Hatalar

Hata 1: buckets_path Yanlış Yazımı

// ❌ Yanlış path ayracı (. yerine >)
"buckets_path": "aylik.toplam_ciro"

// ✅ > ayracı kullanın
"buckets_path": "aylik>toplam_ciro"

// Aynı seviyede ise doğrudan isim
"buckets_path": "toplam_ciro"

Hata 2: gap_policy — Boş Bucket Yönetimi

Bazı aylarda veri yoksa pipeline aggregation hata verebilir:

"derivative": {
  "buckets_path": "toplam_ciro",
  "gap_policy": "skip"  // Boş bucket'ları atla (varsayılan)
  // veya "insert_zeros" — 0 olarak kabul et
  // veya "keep_values" — önceki değeri koru
}
gap_policyDavranış
skip (varsayılan)Boş bucket'ı atlar, sonraki bucket'la hesaplar
insert_zerosBoş bucket'a 0 yerleştirir
keep_valuesÖnceki bucket'ın değerini korur

Hata 3: Pipeline Aggregation'ı Yanlış Seviyeye Koymak

// ❌ Pipeline, bucket aggregation'ın DIŞINDA tanımlanmalı (parent pipeline)
// veya İÇİNDE (sibling pipeline)

// Parent pipeline — dışarıda
"aggs": {
  "aylik": { "date_histogram": { ... }, "aggs": { "ciro": { "sum": { ... } } } },
  "en_iyi": { "max_bucket": { "buckets_path": "aylik>ciro" } }
}

// Sibling pipeline — içeride
"aggs": {
  "aylik": {
    "date_histogram": { ... },
    "aggs": {
      "ciro": { "sum": { ... } },
      "degisim": { "derivative": { "buckets_path": "ciro" } }
    }
  }
}

Özet

  • Pipeline aggregation'lar diğer aggregation sonuçlarını girdi olarak kullanır — dokümanlarla değil, hesaplanmış metriklerle çalışır

  • `derivative` ardışık bucket'lar arası farkı hesaplar — büyüme/küçülme tespiti

  • `cumulative_sum` birikimli toplam hesaplar — YTD (Year-to-Date) raporları

  • `moving_fn` hareketli ortalama/fonksiyon hesaplar — trend analizi ve dalgalanma yumuşatma

  • `bucket_sort` bucket'ları sıralar ve limitler — "en iyi 3 ay" gibi sorgular

  • `bucket_selector` koşula göre bucket filtreler — "cirosu > 200K olan aylar"

  • `bucket_script` bucket metrikleri arası hesaplama yapar — "sipariş başına ciro"

  • Parent pipeline'lar (max_bucket, avg_bucket) tüm bucket'lardan tek değer üretir

  • buckets_path söz diziminde > ayracı alt aggregation'ı belirtir

  • gap_policy ile boş bucket'ların pipeline hesabını nasıl etkileyeceğini kontrol edin