Ingest Pipelines — Veri Dönüşümü
Giriş — Elasticsearch İçinde Veri Dönüşümü
Bir posta merkezini düşün. Mektuplar gelir, zarflar açılır, adresler kontrol edilir, ağırlığa göre sınıflandırılır, damgalanır ve dağıtıma çıkar. Tüm bu işlemler mektup "teslim edilmeden önce" yapılır. Mektup kutuya düştüğünde zaten işlenmiş, zenginleştirilmiş, doğru kategoriye atılmış haldedir.
Elasticsearch'te Ingest Pipeline aynı mantık. Document index'e yazılmadan önce bir dizi processor üzerinden geçer: field'lar eklenir, dönüştürülür, zenginleştirilir. Logstash gibi harici bir araca ihtiyaç duymadan, Elasticsearch'ün kendi içinde veri dönüşümü yapabilirsiniz.
1. Ingest Pipeline Nedir?
Pipeline vs Logstash
| Özellik | Ingest Pipeline | Logstash |
|---|---|---|
| Nerede çalışır? | Elasticsearch node'larında | Ayrı sunucuda |
| Kurulum | Gereksiz (ES'in parçası) | Ayrı kurulum |
| Dönüşüm gücü | Orta (30+ processor) | Yüksek (200+ plugin) |
| Ölçekleme | Ingest node ekle | Logstash instance ekle |
| Durum yönetimi | Yok (stateless) | Queue, aggregate |
| Kullanım | Basit-orta dönüşüm | Karmaşık ETL |
Ne Zaman Ingest Pipeline?
Basit field dönüşümleri (lowercase, rename, date parse)
GeoIP zenginleştirme
User agent parsing
Timestamp düzeltme
Field ekleme/silme
Veri doğrulama
Ne Zaman Logstash?
Karmaşık grok pattern'ları
Event aggregation (birden fazla event'i birleştirme)
Birden fazla veri kaynağı
Birden fazla hedef (multi-output)
Stateful işlemler
2. Pipeline Oluşturma
Temel Pipeline
PUT _ingest/pipeline/my-first-pipeline
{
"description": "İlk pipeline — temel dönüşümler",
"processors": [
{
"set": {
"field": "ingested_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"lowercase": {
"field": "status"
}
}
]
}Pipeline ile Document Indexleme
// Tek document
PUT my-index/_doc/1?pipeline=my-first-pipeline
{
"message": "Hello World",
"status": "ACTIVE"
}
// Sonuç:
// {
// "message": "Hello World",
// "status": "active", ← lowercase
// "ingested_at": "2024-01-15T14:30:00.000Z" ← set
// }Bulk ile Pipeline
POST _bulk?pipeline=my-first-pipeline
{"index": {"_index": "my-index"}}
{"message": "Event 1", "status": "ERROR"}
{"index": {"_index": "my-index"}}
{"message": "Event 2", "status": "WARNING"}Index'e Default Pipeline Atama
// Index oluşturulurken
PUT my-index
{
"settings": {
"index.default_pipeline": "my-first-pipeline"
}
}
// Mevcut index'e ekleme
PUT my-index/_settings
{
"index.default_pipeline": "my-first-pipeline"
}
// Final pipeline (default pipeline'dan sonra çalışır, override edilemez)
PUT my-index/_settings
{
"index.final_pipeline": "audit-pipeline"
}default_pipeline ile final_pipeline farkı:
default_pipeline: Document indexlenirken çalışır. İsteğe bağlı olarak?pipeline=otherile override edilebilir.final_pipeline: Her zaman çalışır. Override edilemez.
3. Processor Kataloğu
3.1. Set — Field Ekleme/Değiştirme
{
"set": {
"field": "environment",
"value": "production"
}
}
// Dinamik değer (başka field'dan)
{
"set": {
"field": "full_name",
"value": "{{first_name}} {{last_name}}"
}
}
// Sadece yoksa ekle
{
"set": {
"field": "priority",
"value": "normal",
"override": false
}
}
// Ingest metadata kullan
{
"set": {
"field": "ingested_at",
"value": "{{_ingest.timestamp}}"
}
}3.2. Remove — Field Silme
{
"remove": {
"field": ["temp_field", "debug_info", "internal_id"],
"ignore_missing": true
}
}3.3. Rename — Field Yeniden Adlandırma
{
"rename": {
"field": "hostname",
"target_field": "host.name",
"ignore_missing": true
}
}3.4. Convert — Tip Dönüşümü
{
"convert": {
"field": "status_code",
"type": "integer"
}
}
{
"convert": {
"field": "price",
"type": "float"
}
}
// Desteklenen tipler: integer, long, float, double, string, boolean, ip, auto3.5. Lowercase / Uppercase
{
"lowercase": {
"field": "email"
}
}
{
"uppercase": {
"field": "country_code"
}
}3.6. Trim / Strip
{
"trim": {
"field": "user_input"
}
}3.7. Split — String Bölme
{
"split": {
"field": "tags",
"separator": ","
}
}
// Girdi: "tags": "java,elasticsearch,spring"
// Çıktı: "tags": ["java", "elasticsearch", "spring"]3.8. Join — Array Birleştirme
{
"join": {
"field": "tags",
"separator": ", "
}
}
// Girdi: "tags": ["java", "elasticsearch"]
// Çıktı: "tags": "java, elasticsearch"3.9. Grok — Pattern Matching
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:timestamp} \\[%{LOGLEVEL:level}\\] %{DATA:logger} - %{GREEDYDATA:log_message}"
],
"ignore_failure": true
}
}3.10. Dissect — Hızlı Token Çıkarma
Grok'un hafif alternatifi — regex kullanmaz, daha hızlıdır:
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} [%{level}] %{logger} - %{log_message}"
}
}
// Girdi: "2024-01-15 14:30:00 [ERROR] UserService - User not found"
// Çıktı:
// timestamp: "2024-01-15 14:30:00"
// level: "ERROR"
// logger: "UserService"
// log_message: "User not found"Dissect vs Grok:
| Özellik | Dissect | Grok |
|---|---|---|
| Hız | 5-10x hızlı | Yavaş (regex) |
| Esneklik | Sabit format | Esnek pattern |
| Regex | Hayır | Evet |
| Kullanım | Tutarlı log formatı | Değişken formatlar |
3.11. Date — Tarih Parse
{
"date": {
"field": "timestamp",
"formats": [
"yyyy-MM-dd HH:mm:ss",
"ISO8601",
"UNIX",
"UNIX_MS",
"dd/MMM/yyyy:HH:mm:ss Z"
],
"target_field": "@timestamp",
"timezone": "Europe/Istanbul"
}
}3.12. GeoIP — Coğrafi Konum
{
"geoip": {
"field": "client_ip",
"target_field": "geo",
"properties": ["city_name", "country_name", "country_iso_code", "location"]
}
}
// Çıktı:
// "geo": {
// "city_name": "Istanbul",
// "country_name": "Turkey",
// "country_iso_code": "TR",
// "location": { "lat": 41.0082, "lon": 28.9784 }
// }3.13. User Agent — Browser/OS Analizi
{
"user_agent": {
"field": "user_agent_string",
"target_field": "user_agent"
}
}
// Çıktı:
// "user_agent": {
// "name": "Chrome",
// "version": "120.0.0",
// "os": {
// "name": "Windows",
// "version": "10"
// },
// "device": { "name": "Other" }
// }3.14. Script — Custom Logic
{
"script": {
"lang": "painless",
"source": """
// Response time'ı kategorize et
def rt = ctx['response_time'];
if (rt < 100) {
ctx['response_category'] = 'fast';
} else if (rt < 500) {
ctx['response_category'] = 'normal';
} else if (rt < 2000) {
ctx['response_category'] = 'slow';
} else {
ctx['response_category'] = 'critical';
}
// Full name oluştur
ctx['full_name'] = ctx['first_name'] + ' ' + ctx['last_name'];
// Null check
if (ctx.containsKey('optional_field') && ctx['optional_field'] != null) {
ctx['has_optional'] = true;
} else {
ctx['has_optional'] = false;
}
"""
}
}3.15. Drop — Document Düşürme
{
"drop": {
"if": "ctx.level == 'DEBUG'"
}
}
// Daha karmaşık koşul
{
"drop": {
"if": """
ctx.containsKey('internal') && ctx.internal == true
|| (ctx.containsKey('status_code') && ctx.status_code == 304)
"""
}
}3.16. Pipeline — Nested Pipeline
{
"pipeline": {
"name": "sub-pipeline",
"if": "ctx.type == 'nginx'"
}
}3.17. Enrich — Harici Veriden Zenginleştirme
Başka bir index'teki verilerle document'ı zenginleştirme:
// 1. Kaynak index (zenginleştirme verisi)
PUT users/_doc/1
{
"email": "john@example.com",
"name": "John Doe",
"department": "Engineering",
"role": "Senior Developer"
}
// 2. Enrich policy oluştur
PUT _enrich/policy/user-lookup
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["name", "department", "role"]
}
}
// 3. Policy'yi execute et (index oluşturur)
POST _enrich/policy/user-lookup/_execute
// 4. Pipeline'da kullan
PUT _ingest/pipeline/enrich-pipeline
{
"processors": [
{
"enrich": {
"policy_name": "user-lookup",
"field": "user_email",
"target_field": "user_info",
"max_matches": 1
}
}
]
}
// 5. Test
PUT logs/_doc/1?pipeline=enrich-pipeline
{
"user_email": "john@example.com",
"action": "login"
}
// Sonuç:
// {
// "user_email": "john@example.com",
// "action": "login",
// "user_info": {
// "email": "john@example.com",
// "name": "John Doe",
// "department": "Engineering",
// "role": "Senior Developer"
// }
// }4. Conditional Processing
Her processor'a if koşulu ekleyebilirsiniz:
PUT _ingest/pipeline/conditional-pipeline
{
"processors": [
{
"set": {
"field": "is_error",
"value": true,
"if": "ctx.status_code != null && ctx.status_code >= 500"
}
},
{
"set": {
"field": "severity",
"value": "critical",
"if": "ctx.level == 'FATAL' || ctx.level == 'CRITICAL'"
}
},
{
"set": {
"field": "severity",
"value": "high",
"if": "ctx.level == 'ERROR'"
}
},
{
"set": {
"field": "severity",
"value": "medium",
"if": "ctx.level == 'WARN' || ctx.level == 'WARNING'"
}
},
{
"set": {
"field": "severity",
"value": "low",
"if": "ctx.severity == null"
}
},
{
"geoip": {
"field": "client_ip",
"target_field": "geo",
"if": "ctx.client_ip != null && ctx.client_ip != '127.0.0.1'"
}
}
]
}5. Hata Yönetimi
ignore_failure
{
"grok": {
"field": "message",
"patterns": ["%{COMBINEDAPACHELOG}"],
"ignore_failure": true
}
}on_failure — Hata Yakalama
PUT _ingest/pipeline/robust-pipeline
{
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}"],
"on_failure": [
{
"set": {
"field": "parse_error",
"value": "grok_failed"
}
},
{
"set": {
"field": "_index",
"value": "parse-failures-{{_index}}"
}
}
]
}
},
{
"date": {
"field": "timestamp",
"formats": ["ISO8601"],
"on_failure": [
{
"set": {
"field": "timestamp_parse_error",
"value": "{{_ingest.on_failure_message}}"
}
}
]
}
}
],
"on_failure": [
{
"set": {
"field": "pipeline_error",
"value": "{{_ingest.on_failure_message}}"
}
},
{
"set": {
"field": "_index",
"value": "pipeline-errors"
}
}
]
}Pipeline-level on_failure: Herhangi bir processor başarısız olursa çalışır. Processor-level on_failure: Sadece o processor başarısız olursa çalışır.
6. Pipeline Test Etme — Simulate API
Temel Simulate
POST _ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "2024-01-15T14:30:00Z [ERROR] UserService - User not found: id=123",
"client_ip": "83.149.9.216",
"status_code": "500"
}
},
{
"_source": {
"message": "2024-01-15T14:30:01Z [INFO] AuthService - Login successful",
"client_ip": "192.168.1.1",
"status_code": "200"
}
}
]
}Inline Pipeline Test (Kaydetmeden)
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} [%{level}] %{logger} - %{msg}"
}
},
{
"convert": {
"field": "status_code",
"type": "integer"
}
}
]
},
"docs": [
{
"_source": {
"message": "2024-01-15 14:30:00 [ERROR] UserService - User not found",
"status_code": "500"
}
}
]
}Verbose Simulate
POST _ingest/pipeline/my-pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"message": "test log line",
"status_code": "200"
}
}
]
}
// verbose=true ile her processor'dan sonraki ara durumu görürsünüz7. Bütünleşik Örnek: Web Application Log Pipeline
// Ana pipeline
PUT _ingest/pipeline/webapp-logs
{
"description": "Web application log processing pipeline",
"processors": [
// 1. Log parsing (dissect — performans için grok yerine)
{
"dissect": {
"field": "message",
"pattern": "%{timestamp} [%{thread}] %{level} %{logger} - %{log_message}",
"on_failure": [
{
"set": {
"field": "parse_method",
"value": "raw"
}
}
]
}
},
// 2. Timestamp dönüşümü
{
"date": {
"field": "timestamp",
"formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ", "yyyy-MM-dd HH:mm:ss"],
"target_field": "@timestamp",
"if": "ctx.containsKey('timestamp')",
"on_failure": [
{
"set": {
"field": "timestamp_error",
"value": true
}
}
]
}
},
// 3. Level normalize
{
"lowercase": {
"field": "level",
"if": "ctx.containsKey('level')"
}
},
// 4. Severity mapping
{
"script": {
"source": """
def level = ctx.level;
if (level == null) { ctx.severity = 0; return; }
def map = ['trace': 1, 'debug': 2, 'info': 3, 'warn': 4, 'warning': 4, 'error': 5, 'fatal': 6, 'critical': 6];
ctx.severity = map.getOrDefault(level, 0);
"""
}
},
// 5. GeoIP (varsa)
{
"geoip": {
"field": "client_ip",
"target_field": "geo",
"if": "ctx.containsKey('client_ip') && ctx.client_ip != null && ctx.client_ip != '127.0.0.1'",
"ignore_failure": true
}
},
// 6. User agent parse (varsa)
{
"user_agent": {
"field": "user_agent_string",
"target_field": "user_agent",
"if": "ctx.containsKey('user_agent_string')",
"ignore_failure": true
}
},
// 7. Ingestion metadata
{
"set": {
"field": "ingest.pipeline",
"value": "webapp-logs"
}
},
{
"set": {
"field": "ingest.timestamp",
"value": "{{_ingest.timestamp}}"
}
},
// 8. Temizlik
{
"remove": {
"field": ["message", "timestamp", "user_agent_string"],
"ignore_missing": true
}
}
],
"on_failure": [
{
"set": {
"field": "ingest.error.message",
"value": "{{_ingest.on_failure_message}}"
}
},
{
"set": {
"field": "ingest.error.processor",
"value": "{{_ingest.on_failure_processor_type}}"
}
}
]
}
// Test
POST _ingest/pipeline/webapp-logs/_simulate
{
"docs": [
{
"_source": {
"message": "2024-01-15T14:30:00.123+0300 [http-nio-8080-exec-5] ERROR c.m.a.UserController - User not found: id=12345",
"client_ip": "83.149.9.216",
"user_agent_string": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
}
]
}Pipeline'ı Index Template'e Bağlama
PUT _index_template/webapp-logs-template
{
"index_patterns": ["webapp-logs-*"],
"template": {
"settings": {
"index.default_pipeline": "webapp-logs",
"index.number_of_shards": 3,
"index.number_of_replicas": 1,
"index.lifecycle.name": "logs-policy"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"level": { "type": "keyword" },
"severity": { "type": "integer" },
"logger": { "type": "keyword" },
"log_message": { "type": "text" },
"client_ip": { "type": "ip" },
"geo.location": { "type": "geo_point" },
"thread": { "type": "keyword" }
}
}
}
}8. Pipeline Yönetimi
# Tüm pipeline'ları listele
GET _ingest/pipeline
# Belirli pipeline
GET _ingest/pipeline/webapp-logs
# Pipeline istatistikleri
GET _nodes/stats/ingest?filter_path=nodes.*.ingest
# Pipeline sil
DELETE _ingest/pipeline/old-pipelinePipeline İstatistiklerini Okuma
GET _nodes/stats/ingest
// Önemli alanlar:
{
"nodes": {
"node-1": {
"ingest": {
"total": {
"count": 1500000,
"time_in_millis": 45000,
"current": 5,
"failed": 250
},
"pipelines": {
"webapp-logs": {
"count": 1000000,
"time_in_millis": 30000,
"current": 3,
"failed": 200,
"processors": [
{
"dissect": {
"type": "dissect",
"stats": {
"count": 1000000,
"time_in_millis": 5000,
"failed": 50
}
}
}
]
}
}
}
}
}
}9. Java ile Pipeline Kullanımı
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.ingest.*;
// Pipeline oluştur
client.ingest().putPipeline(p -> p
.id("java-pipeline")
.description("Java ile oluşturulan pipeline")
.processors(proc -> proc
.set(s -> s
.field("processed_by")
.value(JsonData.of("java-client"))
)
)
.processors(proc -> proc
.lowercase(l -> l
.field("status")
)
)
);
// Pipeline ile document indexle
client.index(i -> i
.index("my-index")
.id("1")
.pipeline("java-pipeline")
.document(myDocument)
);
// Pipeline simüle et
var response = client.ingest().simulate(s -> s
.id("java-pipeline")
.docs(d -> d
.source(JsonData.of(Map.of(
"message", "test",
"status", "ACTIVE"
)))
)
);
response.docs().forEach(doc -> {
System.out.println(doc.doc().source());
});10. Best Practices
✅ Yap
| Konu | Öneri |
|---|---|
| Dissect > Grok | Tutarlı formatlar için dissect %5-10x hızlı |
| on_failure | Her kritik processor'a hata yakalama ekle |
| Simulate | Değişiklik öncesi _simulate ile test et |
| Default pipeline | Index template'e bağla, her seferinde belirtme |
| Conditional | if ile gereksiz işlem çalışmasını engelle |
| Enrich | Lookup tabloları için enrich processor kullan |
❌ Yapma
| Konu | Neden |
|---|---|
| Ağır script processor | Yazma performansını düşürür |
| Pipeline'sız GeoIP | İstediğiniz detayı veremez, pipeline ile kontrol |
| Hata yakalamadan production | Parse hatası tüm document'ı düşürür |
| Çok fazla processor | Her processor latency ekler, sayıyı minimize et |
| Test etmeden deploy | _simulate bedava — kullanın |
11. Yaygın Hatalar ve Çözümleri
Hata 1: "Pipeline Not Found"
// Sorun: Document indexlerken "pipeline not found" hatası
// Çözüm 1: Pipeline var mı kontrol et
GET _ingest/pipeline/my-pipeline
// Çözüm 2: Pipeline adı doğru mu?
// Default pipeline kontrol
GET my-index/_settings?flat_settings=true&filter_path=*.settings.index.default_pipelineHata 2: "Field Does Not Exist"
// Sorun: Processor var olmayan field'a erişmeye çalışıyor
// Çözüm 1: if koşulu ekle
{
"geoip": {
"field": "client_ip",
"if": "ctx.containsKey('client_ip') && ctx.client_ip != null"
}
}
// Çözüm 2: ignore_missing
{
"rename": {
"field": "old_name",
"target_field": "new_name",
"ignore_missing": true
}
}Hata 3: "Ingest Pipeline Yavaş"
# Kontrol: Pipeline istatistikleri
GET _nodes/stats/ingest?filter_path=nodes.*.ingest.pipelines.*.time_in_millis
# Çözümler:
# 1. Grok yerine dissect kullan
# 2. Script'leri optimize et
# 3. Gereksiz processor'ları kaldır
# 4. Dedicated ingest node kullanÖzet
Ingest Pipeline Elasticsearch içinde veri dönüşümü sağlar — Logstash'e gerek kalmadan basit-orta karmaşıklıktaki dönüşümler yapılabilir.
30+ processor mevcuttur — set, remove, rename, convert, grok, dissect, date, geoip, user_agent, script, enrich ve daha fazlası.
Dissect, Grok'tan 5-10x hızlıdır — tutarlı log formatları için dissect tercih edin. Grok sadece değişken formatlar için gereklidir.
`_simulate` API pipeline'ı test etmenin en güvenli yolu — production'a deploy etmeden önce mutlaka test edin.
`on_failure` ile hata yakalama zorunlu — parse hatası olan document'ları ayrı bir index'e yönlendirin, kaybetmeyin.
Enrich processor lookup tabloları için güçlü — harici veri kaynağından document'ları zenginleştirebilirsiniz (kullanıcı bilgisi, GeoIP, iş kuralları).
AI Asistan
Sorularını yanıtlamaya hazır