← Kursa Dön
📄 Text · 30 min

Ingest Pipelines — Veri Dönüşümü

Giriş — Elasticsearch İçinde Veri Dönüşümü

Bir posta merkezini düşün. Mektuplar gelir, zarflar açılır, adresler kontrol edilir, ağırlığa göre sınıflandırılır, damgalanır ve dağıtıma çıkar. Tüm bu işlemler mektup "teslim edilmeden önce" yapılır. Mektup kutuya düştüğünde zaten işlenmiş, zenginleştirilmiş, doğru kategoriye atılmış haldedir.

Elasticsearch'te Ingest Pipeline aynı mantık. Document index'e yazılmadan önce bir dizi processor üzerinden geçer: field'lar eklenir, dönüştürülür, zenginleştirilir. Logstash gibi harici bir araca ihtiyaç duymadan, Elasticsearch'ün kendi içinde veri dönüşümü yapabilirsiniz.


1. Ingest Pipeline Nedir?

Pipeline vs Logstash

ÖzellikIngest PipelineLogstash
Nerede çalışır?Elasticsearch node'larındaAyrı sunucuda
KurulumGereksiz (ES'in parçası)Ayrı kurulum
Dönüşüm gücüOrta (30+ processor)Yüksek (200+ plugin)
ÖlçeklemeIngest node ekleLogstash instance ekle
Durum yönetimiYok (stateless)Queue, aggregate
KullanımBasit-orta dönüşümKarmaşık ETL

Ne Zaman Ingest Pipeline?

  • Basit field dönüşümleri (lowercase, rename, date parse)

  • GeoIP zenginleştirme

  • User agent parsing

  • Timestamp düzeltme

  • Field ekleme/silme

  • Veri doğrulama

Ne Zaman Logstash?

  • Karmaşık grok pattern'ları

  • Event aggregation (birden fazla event'i birleştirme)

  • Birden fazla veri kaynağı

  • Birden fazla hedef (multi-output)

  • Stateful işlemler


2. Pipeline Oluşturma

Temel Pipeline

PUT _ingest/pipeline/my-first-pipeline
{
  "description": "İlk pipeline — temel dönüşümler",
  "processors": [
    {
      "set": {
        "field": "ingested_at",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "lowercase": {
        "field": "status"
      }
    }
  ]
}

Pipeline ile Document Indexleme

// Tek document
PUT my-index/_doc/1?pipeline=my-first-pipeline
{
  "message": "Hello World",
  "status": "ACTIVE"
}

// Sonuç:
// {
//   "message": "Hello World",
//   "status": "active",          ← lowercase
//   "ingested_at": "2024-01-15T14:30:00.000Z"  ← set
// }

Bulk ile Pipeline

POST _bulk?pipeline=my-first-pipeline
{"index": {"_index": "my-index"}}
{"message": "Event 1", "status": "ERROR"}
{"index": {"_index": "my-index"}}
{"message": "Event 2", "status": "WARNING"}

Index'e Default Pipeline Atama

// Index oluşturulurken
PUT my-index
{
  "settings": {
    "index.default_pipeline": "my-first-pipeline"
  }
}

// Mevcut index'e ekleme
PUT my-index/_settings
{
  "index.default_pipeline": "my-first-pipeline"
}

// Final pipeline (default pipeline'dan sonra çalışır, override edilemez)
PUT my-index/_settings
{
  "index.final_pipeline": "audit-pipeline"
}

default_pipeline ile final_pipeline farkı:

  • default_pipeline: Document indexlenirken çalışır. İsteğe bağlı olarak ?pipeline=other ile override edilebilir.

  • final_pipeline: Her zaman çalışır. Override edilemez.


3. Processor Kataloğu

3.1. Set — Field Ekleme/Değiştirme

{
  "set": {
    "field": "environment",
    "value": "production"
  }
}

// Dinamik değer (başka field'dan)
{
  "set": {
    "field": "full_name",
    "value": "{{first_name}} {{last_name}}"
  }
}

// Sadece yoksa ekle
{
  "set": {
    "field": "priority",
    "value": "normal",
    "override": false
  }
}

// Ingest metadata kullan
{
  "set": {
    "field": "ingested_at",
    "value": "{{_ingest.timestamp}}"
  }
}

3.2. Remove — Field Silme

{
  "remove": {
    "field": ["temp_field", "debug_info", "internal_id"],
    "ignore_missing": true
  }
}

3.3. Rename — Field Yeniden Adlandırma

{
  "rename": {
    "field": "hostname",
    "target_field": "host.name",
    "ignore_missing": true
  }
}

3.4. Convert — Tip Dönüşümü

{
  "convert": {
    "field": "status_code",
    "type": "integer"
  }
}

{
  "convert": {
    "field": "price",
    "type": "float"
  }
}

// Desteklenen tipler: integer, long, float, double, string, boolean, ip, auto

3.5. Lowercase / Uppercase

{
  "lowercase": {
    "field": "email"
  }
}

{
  "uppercase": {
    "field": "country_code"
  }
}

3.6. Trim / Strip

{
  "trim": {
    "field": "user_input"
  }
}

3.7. Split — String Bölme

{
  "split": {
    "field": "tags",
    "separator": ","
  }
}

// Girdi: "tags": "java,elasticsearch,spring"
// Çıktı: "tags": ["java", "elasticsearch", "spring"]

3.8. Join — Array Birleştirme

{
  "join": {
    "field": "tags",
    "separator": ", "
  }
}

// Girdi: "tags": ["java", "elasticsearch"]
// Çıktı: "tags": "java, elasticsearch"

3.9. Grok — Pattern Matching

{
  "grok": {
    "field": "message",
    "patterns": [
      "%{TIMESTAMP_ISO8601:timestamp} \\[%{LOGLEVEL:level}\\] %{DATA:logger} - %{GREEDYDATA:log_message}"
    ],
    "ignore_failure": true
  }
}

3.10. Dissect — Hızlı Token Çıkarma

Grok'un hafif alternatifi — regex kullanmaz, daha hızlıdır:

{
  "dissect": {
    "field": "message",
    "pattern": "%{timestamp} [%{level}] %{logger} - %{log_message}"
  }
}

// Girdi: "2024-01-15 14:30:00 [ERROR] UserService - User not found"
// Çıktı:
//   timestamp: "2024-01-15 14:30:00"
//   level: "ERROR"
//   logger: "UserService"
//   log_message: "User not found"

Dissect vs Grok:

ÖzellikDissectGrok
Hız5-10x hızlıYavaş (regex)
EsneklikSabit formatEsnek pattern
RegexHayırEvet
KullanımTutarlı log formatıDeğişken formatlar

3.11. Date — Tarih Parse

{
  "date": {
    "field": "timestamp",
    "formats": [
      "yyyy-MM-dd HH:mm:ss",
      "ISO8601",
      "UNIX",
      "UNIX_MS",
      "dd/MMM/yyyy:HH:mm:ss Z"
    ],
    "target_field": "@timestamp",
    "timezone": "Europe/Istanbul"
  }
}

3.12. GeoIP — Coğrafi Konum

{
  "geoip": {
    "field": "client_ip",
    "target_field": "geo",
    "properties": ["city_name", "country_name", "country_iso_code", "location"]
  }
}

// Çıktı:
// "geo": {
//   "city_name": "Istanbul",
//   "country_name": "Turkey",
//   "country_iso_code": "TR",
//   "location": { "lat": 41.0082, "lon": 28.9784 }
// }

3.13. User Agent — Browser/OS Analizi

{
  "user_agent": {
    "field": "user_agent_string",
    "target_field": "user_agent"
  }
}

// Çıktı:
// "user_agent": {
//   "name": "Chrome",
//   "version": "120.0.0",
//   "os": {
//     "name": "Windows",
//     "version": "10"
//   },
//   "device": { "name": "Other" }
// }

3.14. Script — Custom Logic

{
  "script": {
    "lang": "painless",
    "source": """
      // Response time'ı kategorize et
      def rt = ctx['response_time'];
      if (rt < 100) {
        ctx['response_category'] = 'fast';
      } else if (rt < 500) {
        ctx['response_category'] = 'normal';
      } else if (rt < 2000) {
        ctx['response_category'] = 'slow';
      } else {
        ctx['response_category'] = 'critical';
      }

      // Full name oluştur
      ctx['full_name'] = ctx['first_name'] + ' ' + ctx['last_name'];

      // Null check
      if (ctx.containsKey('optional_field') && ctx['optional_field'] != null) {
        ctx['has_optional'] = true;
      } else {
        ctx['has_optional'] = false;
      }
    """
  }
}

3.15. Drop — Document Düşürme

{
  "drop": {
    "if": "ctx.level == 'DEBUG'"
  }
}

// Daha karmaşık koşul
{
  "drop": {
    "if": """
      ctx.containsKey('internal') && ctx.internal == true
      || (ctx.containsKey('status_code') && ctx.status_code == 304)
    """
  }
}

3.16. Pipeline — Nested Pipeline

{
  "pipeline": {
    "name": "sub-pipeline",
    "if": "ctx.type == 'nginx'"
  }
}

3.17. Enrich — Harici Veriden Zenginleştirme

Başka bir index'teki verilerle document'ı zenginleştirme:

// 1. Kaynak index (zenginleştirme verisi)
PUT users/_doc/1
{
  "email": "john@example.com",
  "name": "John Doe",
  "department": "Engineering",
  "role": "Senior Developer"
}

// 2. Enrich policy oluştur
PUT _enrich/policy/user-lookup
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["name", "department", "role"]
  }
}

// 3. Policy'yi execute et (index oluşturur)
POST _enrich/policy/user-lookup/_execute

// 4. Pipeline'da kullan
PUT _ingest/pipeline/enrich-pipeline
{
  "processors": [
    {
      "enrich": {
        "policy_name": "user-lookup",
        "field": "user_email",
        "target_field": "user_info",
        "max_matches": 1
      }
    }
  ]
}

// 5. Test
PUT logs/_doc/1?pipeline=enrich-pipeline
{
  "user_email": "john@example.com",
  "action": "login"
}

// Sonuç:
// {
//   "user_email": "john@example.com",
//   "action": "login",
//   "user_info": {
//     "email": "john@example.com",
//     "name": "John Doe",
//     "department": "Engineering",
//     "role": "Senior Developer"
//   }
// }

4. Conditional Processing

Her processor'a if koşulu ekleyebilirsiniz:

PUT _ingest/pipeline/conditional-pipeline
{
  "processors": [
    {
      "set": {
        "field": "is_error",
        "value": true,
        "if": "ctx.status_code != null && ctx.status_code >= 500"
      }
    },
    {
      "set": {
        "field": "severity",
        "value": "critical",
        "if": "ctx.level == 'FATAL' || ctx.level == 'CRITICAL'"
      }
    },
    {
      "set": {
        "field": "severity",
        "value": "high",
        "if": "ctx.level == 'ERROR'"
      }
    },
    {
      "set": {
        "field": "severity",
        "value": "medium",
        "if": "ctx.level == 'WARN' || ctx.level == 'WARNING'"
      }
    },
    {
      "set": {
        "field": "severity",
        "value": "low",
        "if": "ctx.severity == null"
      }
    },
    {
      "geoip": {
        "field": "client_ip",
        "target_field": "geo",
        "if": "ctx.client_ip != null && ctx.client_ip != '127.0.0.1'"
      }
    }
  ]
}

5. Hata Yönetimi

ignore_failure

{
  "grok": {
    "field": "message",
    "patterns": ["%{COMBINEDAPACHELOG}"],
    "ignore_failure": true
  }
}

on_failure — Hata Yakalama

PUT _ingest/pipeline/robust-pipeline
{
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"],
        "on_failure": [
          {
            "set": {
              "field": "parse_error",
              "value": "grok_failed"
            }
          },
          {
            "set": {
              "field": "_index",
              "value": "parse-failures-{{_index}}"
            }
          }
        ]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": ["ISO8601"],
        "on_failure": [
          {
            "set": {
              "field": "timestamp_parse_error",
              "value": "{{_ingest.on_failure_message}}"
            }
          }
        ]
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "pipeline_error",
        "value": "{{_ingest.on_failure_message}}"
      }
    },
    {
      "set": {
        "field": "_index",
        "value": "pipeline-errors"
      }
    }
  ]
}

Pipeline-level on_failure: Herhangi bir processor başarısız olursa çalışır. Processor-level on_failure: Sadece o processor başarısız olursa çalışır.


6. Pipeline Test Etme — Simulate API

Temel Simulate

POST _ingest/pipeline/my-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "2024-01-15T14:30:00Z [ERROR] UserService - User not found: id=123",
        "client_ip": "83.149.9.216",
        "status_code": "500"
      }
    },
    {
      "_source": {
        "message": "2024-01-15T14:30:01Z [INFO] AuthService - Login successful",
        "client_ip": "192.168.1.1",
        "status_code": "200"
      }
    }
  ]
}

Inline Pipeline Test (Kaydetmeden)

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} [%{level}] %{logger} - %{msg}"
        }
      },
      {
        "convert": {
          "field": "status_code",
          "type": "integer"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2024-01-15 14:30:00 [ERROR] UserService - User not found",
        "status_code": "500"
      }
    }
  ]
}

Verbose Simulate

POST _ingest/pipeline/my-pipeline/_simulate?verbose=true
{
  "docs": [
    {
      "_source": {
        "message": "test log line",
        "status_code": "200"
      }
    }
  ]
}

// verbose=true ile her processor'dan sonraki ara durumu görürsünüz

7. Bütünleşik Örnek: Web Application Log Pipeline

// Ana pipeline
PUT _ingest/pipeline/webapp-logs
{
  "description": "Web application log processing pipeline",
  "processors": [
    // 1. Log parsing (dissect — performans için grok yerine)
    {
      "dissect": {
        "field": "message",
        "pattern": "%{timestamp} [%{thread}] %{level} %{logger} - %{log_message}",
        "on_failure": [
          {
            "set": {
              "field": "parse_method",
              "value": "raw"
            }
          }
        ]
      }
    },

    // 2. Timestamp dönüşümü
    {
      "date": {
        "field": "timestamp",
        "formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ", "yyyy-MM-dd HH:mm:ss"],
        "target_field": "@timestamp",
        "if": "ctx.containsKey('timestamp')",
        "on_failure": [
          {
            "set": {
              "field": "timestamp_error",
              "value": true
            }
          }
        ]
      }
    },

    // 3. Level normalize
    {
      "lowercase": {
        "field": "level",
        "if": "ctx.containsKey('level')"
      }
    },

    // 4. Severity mapping
    {
      "script": {
        "source": """
          def level = ctx.level;
          if (level == null) { ctx.severity = 0; return; }
          def map = ['trace': 1, 'debug': 2, 'info': 3, 'warn': 4, 'warning': 4, 'error': 5, 'fatal': 6, 'critical': 6];
          ctx.severity = map.getOrDefault(level, 0);
        """
      }
    },

    // 5. GeoIP (varsa)
    {
      "geoip": {
        "field": "client_ip",
        "target_field": "geo",
        "if": "ctx.containsKey('client_ip') && ctx.client_ip != null && ctx.client_ip != '127.0.0.1'",
        "ignore_failure": true
      }
    },

    // 6. User agent parse (varsa)
    {
      "user_agent": {
        "field": "user_agent_string",
        "target_field": "user_agent",
        "if": "ctx.containsKey('user_agent_string')",
        "ignore_failure": true
      }
    },

    // 7. Ingestion metadata
    {
      "set": {
        "field": "ingest.pipeline",
        "value": "webapp-logs"
      }
    },
    {
      "set": {
        "field": "ingest.timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    },

    // 8. Temizlik
    {
      "remove": {
        "field": ["message", "timestamp", "user_agent_string"],
        "ignore_missing": true
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "ingest.error.message",
        "value": "{{_ingest.on_failure_message}}"
      }
    },
    {
      "set": {
        "field": "ingest.error.processor",
        "value": "{{_ingest.on_failure_processor_type}}"
      }
    }
  ]
}

// Test
POST _ingest/pipeline/webapp-logs/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "2024-01-15T14:30:00.123+0300 [http-nio-8080-exec-5] ERROR c.m.a.UserController - User not found: id=12345",
        "client_ip": "83.149.9.216",
        "user_agent_string": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
      }
    }
  ]
}

Pipeline'ı Index Template'e Bağlama

PUT _index_template/webapp-logs-template
{
  "index_patterns": ["webapp-logs-*"],
  "template": {
    "settings": {
      "index.default_pipeline": "webapp-logs",
      "index.number_of_shards": 3,
      "index.number_of_replicas": 1,
      "index.lifecycle.name": "logs-policy"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "level": { "type": "keyword" },
        "severity": { "type": "integer" },
        "logger": { "type": "keyword" },
        "log_message": { "type": "text" },
        "client_ip": { "type": "ip" },
        "geo.location": { "type": "geo_point" },
        "thread": { "type": "keyword" }
      }
    }
  }
}

8. Pipeline Yönetimi

# Tüm pipeline'ları listele
GET _ingest/pipeline

# Belirli pipeline
GET _ingest/pipeline/webapp-logs

# Pipeline istatistikleri
GET _nodes/stats/ingest?filter_path=nodes.*.ingest

# Pipeline sil
DELETE _ingest/pipeline/old-pipeline

Pipeline İstatistiklerini Okuma

GET _nodes/stats/ingest

// Önemli alanlar:
{
  "nodes": {
    "node-1": {
      "ingest": {
        "total": {
          "count": 1500000,
          "time_in_millis": 45000,
          "current": 5,
          "failed": 250
        },
        "pipelines": {
          "webapp-logs": {
            "count": 1000000,
            "time_in_millis": 30000,
            "current": 3,
            "failed": 200,
            "processors": [
              {
                "dissect": {
                  "type": "dissect",
                  "stats": {
                    "count": 1000000,
                    "time_in_millis": 5000,
                    "failed": 50
                  }
                }
              }
            ]
          }
        }
      }
    }
  }
}

9. Java ile Pipeline Kullanımı

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.ingest.*;

// Pipeline oluştur
client.ingest().putPipeline(p -> p
    .id("java-pipeline")
    .description("Java ile oluşturulan pipeline")
    .processors(proc -> proc
        .set(s -> s
            .field("processed_by")
            .value(JsonData.of("java-client"))
        )
    )
    .processors(proc -> proc
        .lowercase(l -> l
            .field("status")
        )
    )
);

// Pipeline ile document indexle
client.index(i -> i
    .index("my-index")
    .id("1")
    .pipeline("java-pipeline")
    .document(myDocument)
);

// Pipeline simüle et
var response = client.ingest().simulate(s -> s
    .id("java-pipeline")
    .docs(d -> d
        .source(JsonData.of(Map.of(
            "message", "test",
            "status", "ACTIVE"
        )))
    )
);

response.docs().forEach(doc -> {
    System.out.println(doc.doc().source());
});

10. Best Practices

✅ Yap

KonuÖneri
Dissect > GrokTutarlı formatlar için dissect %5-10x hızlı
on_failureHer kritik processor'a hata yakalama ekle
SimulateDeğişiklik öncesi _simulate ile test et
Default pipelineIndex template'e bağla, her seferinde belirtme
Conditionalif ile gereksiz işlem çalışmasını engelle
EnrichLookup tabloları için enrich processor kullan

❌ Yapma

KonuNeden
Ağır script processorYazma performansını düşürür
Pipeline'sız GeoIPİstediğiniz detayı veremez, pipeline ile kontrol
Hata yakalamadan productionParse hatası tüm document'ı düşürür
Çok fazla processorHer processor latency ekler, sayıyı minimize et
Test etmeden deploy_simulate bedava — kullanın

11. Yaygın Hatalar ve Çözümleri

Hata 1: "Pipeline Not Found"

// Sorun: Document indexlerken "pipeline not found" hatası
// Çözüm 1: Pipeline var mı kontrol et
GET _ingest/pipeline/my-pipeline

// Çözüm 2: Pipeline adı doğru mu?
// Default pipeline kontrol
GET my-index/_settings?flat_settings=true&filter_path=*.settings.index.default_pipeline

Hata 2: "Field Does Not Exist"

// Sorun: Processor var olmayan field'a erişmeye çalışıyor
// Çözüm 1: if koşulu ekle
{
  "geoip": {
    "field": "client_ip",
    "if": "ctx.containsKey('client_ip') && ctx.client_ip != null"
  }
}

// Çözüm 2: ignore_missing
{
  "rename": {
    "field": "old_name",
    "target_field": "new_name",
    "ignore_missing": true
  }
}

Hata 3: "Ingest Pipeline Yavaş"

# Kontrol: Pipeline istatistikleri
GET _nodes/stats/ingest?filter_path=nodes.*.ingest.pipelines.*.time_in_millis

# Çözümler:
# 1. Grok yerine dissect kullan
# 2. Script'leri optimize et
# 3. Gereksiz processor'ları kaldır
# 4. Dedicated ingest node kullan

Özet

  1. Ingest Pipeline Elasticsearch içinde veri dönüşümü sağlar — Logstash'e gerek kalmadan basit-orta karmaşıklıktaki dönüşümler yapılabilir.

  2. 30+ processor mevcuttur — set, remove, rename, convert, grok, dissect, date, geoip, user_agent, script, enrich ve daha fazlası.

  3. Dissect, Grok'tan 5-10x hızlıdır — tutarlı log formatları için dissect tercih edin. Grok sadece değişken formatlar için gereklidir.

  4. `_simulate` API pipeline'ı test etmenin en güvenli yolu — production'a deploy etmeden önce mutlaka test edin.

  5. `on_failure` ile hata yakalama zorunlu — parse hatası olan document'ları ayrı bir index'e yönlendirin, kaybetmeyin.

  6. Enrich processor lookup tabloları için güçlü — harici veri kaynağından document'ları zenginleştirebilirsiniz (kullanıcı bilgisi, GeoIP, iş kuralları).