Logstash — Input, Filter, Output Pipeline
Giriş — Input, Filter, Output Pipeline'ları, Grok ve Mutate
Bir fabrikanın üretim bandını düşün. Bir taraftan hammadde giriyor (input), ortada işleniyor — kesilip, boyanıp, şekil veriliyor (filter), ve diğer taraftan bitmiş ürün çıkıyor (output). Fabrikanın gücü, bu üç aşamanın ne kadar verimli çalıştığında.
Logstash tam olarak bu fabrika. Verileriniz onlarca farklı kaynaktan gelebilir: dosyalar, veritabanları, message queue'lar, API'ler. Logstash bunları alır (input), dönüştürür (filter), ve istediğiniz hedefe yazar (output). Elasticsearch'e veri besleyen en güçlü araçlardan biri.
1. Logstash Nedir?
ELK Stack'teki Yeri
Veri Kaynakları → [Logstash] → [Elasticsearch] → [Kibana]
(ETL) (Depolama) (Görselleştirme)Logstash bir ETL (Extract, Transform, Load) aracıdır:
Extract: Veri kaynaklarından veri çeker
Transform: Veriyi dönüştürür, zenginleştirir, filtreler
Load: Hedef sistemlere yazar
Neden Logstash?
| Özellik | Açıklama |
|---|---|
| 200+ Plugin | Neredeyse her veri kaynağını ve hedefi destekler |
| Güçlü filtreleme | Grok, mutate, geoip, date, aggregate |
| Güvenilir | Persistent queue, at-least-once delivery |
| Ölçeklenebilir | Birden fazla instance paralel çalışabilir |
Logstash vs Beats vs Ingest Pipeline
| Araç | Kullanım | Dönüşüm Gücü |
|---|---|---|
| Beats | Hafif veri gönderici | Minimal (basit filtreleme) |
| Ingest Pipeline | ES içinde dönüşüm | Orta (processor'lar) |
| Logstash | Tam ETL | Maksimum (grok, aggregate, jdbc...) |
Kural: Basit veri toplama → Beats. Karmaşık dönüşüm, birden fazla kaynak/hedef → Logstash.
2. Logstash Kurulumu
Docker ile
docker run -d \
--name logstash \
-v /path/to/pipeline:/usr/share/logstash/pipeline \
-v /path/to/config:/usr/share/logstash/config \
-e "XPACK_MONITORING_ENABLED=true" \
docker.elastic.co/logstash/logstash:8.12.0Linux'ta
# RPM (RedHat/CentOS)
sudo rpm -ivh logstash-8.12.0-x86_64.rpm
# APT (Ubuntu/Debian)
sudo apt-get install logstash
# Servis başlat
sudo systemctl start logstash
sudo systemctl enable logstashKonfigürasyon Dosyaları
/etc/logstash/
├── logstash.yml # Genel ayarlar
├── pipelines.yml # Pipeline tanımları
├── jvm.options # JVM ayarları
└── conf.d/ # Pipeline config dosyaları
├── 01-input.conf
├── 02-filter.conf
└── 03-output.conf# logstash.yml
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 4gb
path.data: /var/lib/logstash# pipelines.yml — Birden fazla pipeline
- pipeline.id: web-logs
path.config: "/etc/logstash/conf.d/web-logs/*.conf"
pipeline.workers: 2
- pipeline.id: app-logs
path.config: "/etc/logstash/conf.d/app-logs/*.conf"
pipeline.workers: 43. Pipeline Yapısı — Input, Filter, Output
Her Logstash pipeline üç bölümden oluşur:
input {
# Veri kaynakları
}
filter {
# Dönüşüm kuralları
}
output {
# Hedef sistemler
}Minimal Örnek
# /etc/logstash/conf.d/minimal.conf
input {
stdin {}
}
filter {
mutate {
add_field => { "environment" => "test" }
}
}
output {
stdout {
codec => rubydebug
}
}Test: echo "Hello Logstash" | logstash -f minimal.conf
4. Input Plugin'leri
File Input
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb/nginx"
codec => "plain"
# Multiline log desteği (stack trace gibi)
# codec => multiline {
# pattern => "^%{TIMESTAMP_ISO8601}"
# negate => true
# what => "previous"
# }
}
}Beats Input
# Filebeat, Metricbeat gibi Beat'lerden veri al
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}Kafka Input
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
topics => ["web-logs", "app-logs"]
group_id => "logstash-consumers"
consumer_threads => 3
codec => json
auto_offset_reset => "latest"
}
}JDBC Input (Veritabanından)
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://db-host:3306/myapp"
jdbc_user => "logstash_reader"
jdbc_password => "${DB_PASSWORD}"
jdbc_driver_library => "/usr/share/logstash/vendor/mysql-connector.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
schedule => "*/5 * * * *"
statement => "SELECT * FROM orders WHERE updated_at > :sql_last_value ORDER BY updated_at"
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
last_run_metadata_path => "/var/lib/logstash/jdbc_last_run"
}
}HTTP Input (Webhook)
input {
http {
port => 8080
codec => json
additional_codecs => {
"text/plain" => "plain"
}
}
}Syslog Input
input {
syslog {
port => 514
type => "syslog"
}
}5. Filter Plugin'leri — Veri Dönüşümü
5.1. Grok — Pattern Matching
Grok, unstructured text'i structured data'ya çeviren en güçlü filter. Regular expression'ları okunabilir pattern'lara sarmalar.
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}%{COMBINEDAPACHELOG} şu alanları çıkarır:
clientip,ident,auth,timestamp,verb,request,httpversion,response,bytes,referrer,agent
Grok Custom Pattern
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_timestamp} \[%{LOGLEVEL:level}\] %{DATA:class} - %{GREEDYDATA:log_message}"
}
}
}
# Örnek log:
# 2024-01-15T14:30:00.123Z [ERROR] com.myapp.UserService - User not found: id=12345
# Çıktı:
# log_timestamp: "2024-01-15T14:30:00.123Z"
# level: "ERROR"
# class: "com.myapp.UserService"
# log_message: "User not found: id=12345"Temel Grok Pattern'ları
| Pattern | Eşleşme | Örnek |
|---|---|---|
%{IP} | IP adresi | 192.168.1.1 |
%{TIMESTAMP_ISO8601} | ISO timestamp | 2024-01-15T14:30:00Z |
%{LOGLEVEL} | Log seviyesi | ERROR, WARN, INFO |
%{NUMBER} | Sayı | 12345, 3.14 |
%{WORD} | Tek kelime | hello |
%{DATA} | Lazy match (tembel) | anything |
%{GREEDYDATA} | Greedy match (açgözlü) | everything to end |
%{NOTSPACE} | Boşluk olmayan | username |
%{QUOTEDSTRING} | Tırnak içi | "hello world" |
Kendi Pattern'ınızı Tanımlama
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => {
"message" => "%{MYAPP_LOG}"
}
}
}# /etc/logstash/patterns/myapp
MYAPP_LOG %{TIMESTAMP_ISO8601:timestamp} \[%{WORD:thread}\] %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:message}
JAVACLASS (?:[a-zA-Z$_][a-zA-Z$_0-9]*\.)*[a-zA-Z$_][a-zA-Z$_0-9]*💡 İpucu: Grok pattern'larını test etmek için Kibana'daki Grok Debugger aracını kullanın.
Dev Tools → Grok Debuggermenüsünden erişebilirsiniz.
5.2. Mutate — Alan Manipülasyonu
filter {
mutate {
# Field adı değiştir
rename => { "clientip" => "client_ip" }
# Field tipi dönüştür
convert => {
"response_code" => "integer"
"response_time" => "float"
"bytes" => "integer"
}
# Field ekle
add_field => {
"environment" => "production"
"processed_by" => "logstash-1"
}
# Field sil
remove_field => ["message", "@version", "host"]
# String işlemleri
lowercase => ["level"]
uppercase => ["country_code"]
strip => ["user_input"]
gsub => [
"message", "\r\n", "\n",
"user_agent", "\"", ""
]
# Field birleştir
merge => { "tags" => "extra_tags" }
# Field böl
split => { "categories" => "," }
}
}5.3. Date — Zaman Damgası Parse
filter {
date {
match => ["log_timestamp", "ISO8601"]
target => "@timestamp"
timezone => "Europe/Istanbul"
}
}
# Birden fazla format desteği
filter {
date {
match => ["timestamp",
"dd/MMM/yyyy:HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"UNIX",
"UNIX_MS"
]
target => "@timestamp"
}
}5.4. GeoIP — Coğrafi Konum
filter {
geoip {
source => "client_ip"
target => "geo"
# Varsayılan olarak: city, country, location (lat/lon), continent
}
}
# Çıktı:
# "geo": {
# "city_name": "Istanbul",
# "country_code2": "TR",
# "country_name": "Turkey",
# "location": {
# "lon": 28.9784,
# "lat": 41.0082
# }
# }5.5. Conditional Filter
filter {
# Sadece error log'lara uygula
if [level] == "ERROR" {
mutate {
add_tag => ["alert"]
add_field => { "severity" => "high" }
}
}
# Status code'a göre işlem
if [response] and [response] >= 500 {
mutate { add_tag => ["server_error"] }
} else if [response] and [response] >= 400 {
mutate { add_tag => ["client_error"] }
}
# Field varlığı kontrolü
if [user_agent] {
useragent {
source => "user_agent"
target => "ua"
}
}
# Belirli tag'leri olan event'leri düşür
if "debug" in [tags] {
drop {}
}
}5.6. Aggregate — Event Birleştirme
filter {
# Transaction ID'ye göre event'leri birleştir
if [event_type] == "request_start" {
aggregate {
task_id => "%{transaction_id}"
code => """
map['start_time'] = event.get('timestamp')
map['request_path'] = event.get('path')
"""
}
}
if [event_type] == "request_end" {
aggregate {
task_id => "%{transaction_id}"
code => """
event.set('start_time', map['start_time'])
event.set('request_path', map['request_path'])
event.set('duration_ms', event.get('timestamp').to_f - map['start_time'].to_f)
"""
end_of_task => true
timeout => 120
}
}
}6. Output Plugin'leri
Elasticsearch Output
output {
elasticsearch {
hosts => ["https://es-node1:9200", "https://es-node2:9200"]
user => "logstash_writer"
password => "${ES_PASSWORD}"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
# Index adlandırma
index => "logs-%{[service]}-%{+YYYY.MM.dd}"
# Data stream kullanımı
# data_stream => true
# data_stream_type => "logs"
# data_stream_dataset => "myapp"
# data_stream_namespace => "production"
# Pipeline
pipeline => "my-ingest-pipeline"
# Bulk ayarları
bulk_path => "/_bulk"
}
}Birden Fazla Output
output {
# Elasticsearch'e yaz
elasticsearch {
hosts => ["https://es:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# Aynı anda S3'e de yaz (arşiv)
s3 {
region => "eu-central-1"
bucket => "my-log-archive"
prefix => "logstash/logs/%{+YYYY/MM/dd}"
codec => json_lines
time_file => 15
size_file => 104857600
}
# Error log'ları ayrı bir yere gönder
if "alert" in [tags] {
http {
url => "https://alerting.internal/webhook"
http_method => "post"
format => "json"
}
}
# Debug: Console'a da bas (test için)
# stdout { codec => rubydebug }
}Kafka Output
output {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topic_id => "processed-logs"
codec => json
compression_type => "snappy"
}
}7. Bütünleşik Örnek: Web Log Pipeline
Gerçek dünya senaryosu: Nginx access log'larını parse et, zenginleştir ve Elasticsearch'e yaz.
# /etc/logstash/conf.d/nginx-pipeline.conf
input {
beats {
port => 5044
type => "nginx-access"
}
}
filter {
# 1. Nginx log'unu parse et
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status_code:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"'
}
remove_field => ["message"]
}
# 2. Timestamp parse
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
remove_field => ["timestamp"]
}
# 3. GeoIP
geoip {
source => "client_ip"
target => "geo"
}
# 4. User Agent parse
useragent {
source => "user_agent"
target => "ua"
}
# 5. Status code kategorilendirme
if [status_code] >= 500 {
mutate {
add_field => { "status_category" => "server_error" }
add_tag => ["error"]
}
} else if [status_code] >= 400 {
mutate {
add_field => { "status_category" => "client_error" }
}
} else if [status_code] >= 300 {
mutate {
add_field => { "status_category" => "redirect" }
}
} else {
mutate {
add_field => { "status_category" => "success" }
}
}
# 6. Static asset'leri filtrele (opsiyonel)
if [request] =~ /\.(css|js|png|jpg|gif|ico|woff|svg)$/ {
mutate {
add_tag => ["static_asset"]
}
# drop {} # İstemiyorsan tamamen at
}
# 7. Gereksiz field'ları temizle
mutate {
remove_field => ["@version", "agent", "ecs", "input", "log"]
}
}
output {
elasticsearch {
hosts => ["https://es-node1:9200"]
user => "logstash_writer"
password => "${ES_PASSWORD}"
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
index => "nginx-access-%{+YYYY.MM.dd}"
template_name => "nginx-access"
template_overwrite => true
}
}8. Pipeline Yönetimi
Birden Fazla Pipeline
# pipelines.yml
- pipeline.id: nginx-logs
path.config: "/etc/logstash/conf.d/nginx/*.conf"
pipeline.workers: 4
queue.type: persisted
queue.max_bytes: 2gb
- pipeline.id: app-logs
path.config: "/etc/logstash/conf.d/app/*.conf"
pipeline.workers: 2
queue.type: memory
- pipeline.id: metrics
path.config: "/etc/logstash/conf.d/metrics/*.conf"
pipeline.workers: 1Pipeline-to-Pipeline
# Router pipeline
input {
beats { port => 5044 }
}
output {
if [type] == "nginx" {
pipeline { send_to => "nginx-processing" }
} else if [type] == "app" {
pipeline { send_to => "app-processing" }
}
}
# Nginx processing pipeline
input {
pipeline { address => "nginx-processing" }
}
filter { ... }
output { ... }
# App processing pipeline
input {
pipeline { address => "app-processing" }
}
filter { ... }
output { ... }Pipeline Monitoring
# Logstash API
curl http://localhost:9600/_node/stats/pipelines
# Hot threads
curl http://localhost:9600/_node/hot_threads
# Pipeline info
curl http://localhost:9600/_node/pipelines9. Dead Letter Queue (DLQ)
Parse edemediğiniz veya yazamadığınız event'ler için:
# logstash.yml
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
path.dead_letter_queue: /var/lib/logstash/dead_letter_queue# DLQ'yu okuyan ayrı pipeline
input {
dead_letter_queue {
path => "/var/lib/logstash/dead_letter_queue"
pipeline_id => "nginx-logs"
commit_offsets => true
}
}
filter {
# Hata nedenini logla
mutate {
add_field => {
"dlq_reason" => "%{[@metadata][dead_letter_queue][reason]}"
"dlq_plugin" => "%{[@metadata][dead_letter_queue][plugin_type]}"
}
}
}
output {
elasticsearch {
hosts => ["https://es:9200"]
index => "dlq-events-%{+YYYY.MM.dd}"
}
}10. Best Practices
✅ Yap
| Konu | Öneri |
|---|---|
| Persistent queue | queue.type: persisted — crash'te veri kaybını önler |
| Grok debugger | Pattern'ları Kibana'da test et |
| Conditional output | Farklı veri tiplerini farklı index'lere yaz |
| DLQ | Parse hatalarını kaybet değil, DLQ'ya at |
| Pipeline separation | Farklı veri kaynakları için farklı pipeline |
| Environment variables | Şifreleri ${VAR} ile al, config'e yazma |
❌ Yapma
| Konu | Neden |
|---|---|
| Tek pipeline'da her şey | Debug zor, bir hata tüm veriyi etkiler |
| Grok'ta GREEDYDATA fazla kullan | Performans düşer, spesifik pattern yaz |
| Memory queue + büyük veri | Crash'te veri kaybı |
| Şifreler plain text | Config dosyaları versiyon kontrolünde |
| Filter olmadan output | Raw veri Elasticsearch'ü şişirir |
11. Yaygın Hatalar ve Çözümleri
Hata 1: Grok Parse Failure
// Sorun: "_grokparsefailure" tag'i ekleniyor
// Neden: Pattern log formatına uymuyor
// Çözüm: Grok debugger'da test et ve pattern'ı düzelt
// Veya hatayı yakala:
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
tag_on_failure => ["_grok_failed"]
}
if "_grok_failed" in [tags] {
mutate {
add_field => { "parse_error" => "true" }
}
}
}Hata 2: Logstash Bellek Sorunu
# jvm.options — Heap ayarı
-Xms2g
-Xmx2g
# Büyük pipeline'lar için:
-Xms4g
-Xmx4g
# ⚠️ Heap'i toplam RAM'in %50'sinden fazla yapmaHata 3: Elasticsearch Connection Refused
// Sorun: ES'e bağlanamıyor
// Çözüm: Retry ayarları
output {
elasticsearch {
hosts => ["https://es:9200"]
retry_on_conflict => 3
retry_initial_interval => 2
retry_max_interval => 64
}
}Özet
Logstash bir ETL aracıdır — 200+ plugin ile neredeyse her kaynaktan veri alıp, dönüştürüp, her hedefe yazabilir.
Pipeline üç bölümden oluşur: Input (veri al), Filter (dönüştür), Output (yaz). Her bölüm birden fazla plugin içerebilir.
Grok unstructured text'i structured data'ya çeviren en güçlü filtre — pattern'ları Kibana Grok Debugger'da test edin.
Mutate en sık kullanılan filtre — field ekleme, silme, yeniden adlandırma, tip dönüşümü için.
Persistent queue production'da zorunlu — crash durumunda veri kaybını önler.
Conditional logic ile farklı veri tiplerini farklı şekilde işleyebilir ve farklı hedeflere yönlendirebilirsiniz.
AI Asistan
Sorularını yanıtlamaya hazır