← Kursa Dön
📄 Text · 35 min

Logstash — Input, Filter, Output Pipeline

Giriş — Input, Filter, Output Pipeline'ları, Grok ve Mutate

Bir fabrikanın üretim bandını düşün. Bir taraftan hammadde giriyor (input), ortada işleniyor — kesilip, boyanıp, şekil veriliyor (filter), ve diğer taraftan bitmiş ürün çıkıyor (output). Fabrikanın gücü, bu üç aşamanın ne kadar verimli çalıştığında.

Logstash tam olarak bu fabrika. Verileriniz onlarca farklı kaynaktan gelebilir: dosyalar, veritabanları, message queue'lar, API'ler. Logstash bunları alır (input), dönüştürür (filter), ve istediğiniz hedefe yazar (output). Elasticsearch'e veri besleyen en güçlü araçlardan biri.


1. Logstash Nedir?

ELK Stack'teki Yeri

Veri Kaynakları → [Logstash] → [Elasticsearch] → [Kibana]
                    (ETL)       (Depolama)       (Görselleştirme)

Logstash bir ETL (Extract, Transform, Load) aracıdır:

  • Extract: Veri kaynaklarından veri çeker

  • Transform: Veriyi dönüştürür, zenginleştirir, filtreler

  • Load: Hedef sistemlere yazar

Neden Logstash?

ÖzellikAçıklama
200+ PluginNeredeyse her veri kaynağını ve hedefi destekler
Güçlü filtrelemeGrok, mutate, geoip, date, aggregate
GüvenilirPersistent queue, at-least-once delivery
ÖlçeklenebilirBirden fazla instance paralel çalışabilir

Logstash vs Beats vs Ingest Pipeline

AraçKullanımDönüşüm Gücü
BeatsHafif veri göndericiMinimal (basit filtreleme)
Ingest PipelineES içinde dönüşümOrta (processor'lar)
LogstashTam ETLMaksimum (grok, aggregate, jdbc...)

Kural: Basit veri toplama → Beats. Karmaşık dönüşüm, birden fazla kaynak/hedef → Logstash.


2. Logstash Kurulumu

Docker ile

docker run -d \
  --name logstash \
  -v /path/to/pipeline:/usr/share/logstash/pipeline \
  -v /path/to/config:/usr/share/logstash/config \
  -e "XPACK_MONITORING_ENABLED=true" \
  docker.elastic.co/logstash/logstash:8.12.0

Linux'ta

# RPM (RedHat/CentOS)
sudo rpm -ivh logstash-8.12.0-x86_64.rpm

# APT (Ubuntu/Debian)
sudo apt-get install logstash

# Servis başlat
sudo systemctl start logstash
sudo systemctl enable logstash

Konfigürasyon Dosyaları

/etc/logstash/
├── logstash.yml          # Genel ayarlar
├── pipelines.yml         # Pipeline tanımları
├── jvm.options           # JVM ayarları
└── conf.d/               # Pipeline config dosyaları
    ├── 01-input.conf
    ├── 02-filter.conf
    └── 03-output.conf
# logstash.yml
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: persisted
queue.max_bytes: 4gb
path.data: /var/lib/logstash
# pipelines.yml — Birden fazla pipeline
- pipeline.id: web-logs
  path.config: "/etc/logstash/conf.d/web-logs/*.conf"
  pipeline.workers: 2

- pipeline.id: app-logs
  path.config: "/etc/logstash/conf.d/app-logs/*.conf"
  pipeline.workers: 4

3. Pipeline Yapısı — Input, Filter, Output

Her Logstash pipeline üç bölümden oluşur:

input {
  # Veri kaynakları
}

filter {
  # Dönüşüm kuralları
}

output {
  # Hedef sistemler
}

Minimal Örnek

# /etc/logstash/conf.d/minimal.conf
input {
  stdin {}
}

filter {
  mutate {
    add_field => { "environment" => "test" }
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

Test: echo "Hello Logstash" | logstash -f minimal.conf


4. Input Plugin'leri

File Input

input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb/nginx"
    codec => "plain"

    # Multiline log desteği (stack trace gibi)
    # codec => multiline {
    #   pattern => "^%{TIMESTAMP_ISO8601}"
    #   negate => true
    #   what => "previous"
    # }
  }
}

Beats Input

# Filebeat, Metricbeat gibi Beat'lerden veri al
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }
}

Kafka Input

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["web-logs", "app-logs"]
    group_id => "logstash-consumers"
    consumer_threads => 3
    codec => json
    auto_offset_reset => "latest"
  }
}

JDBC Input (Veritabanından)

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://db-host:3306/myapp"
    jdbc_user => "logstash_reader"
    jdbc_password => "${DB_PASSWORD}"
    jdbc_driver_library => "/usr/share/logstash/vendor/mysql-connector.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    schedule => "*/5 * * * *"
    statement => "SELECT * FROM orders WHERE updated_at > :sql_last_value ORDER BY updated_at"
    use_column_value => true
    tracking_column => "updated_at"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/var/lib/logstash/jdbc_last_run"
  }
}

HTTP Input (Webhook)

input {
  http {
    port => 8080
    codec => json
    additional_codecs => {
      "text/plain" => "plain"
    }
  }
}

Syslog Input

input {
  syslog {
    port => 514
    type => "syslog"
  }
}

5. Filter Plugin'leri — Veri Dönüşümü

5.1. Grok — Pattern Matching

Grok, unstructured text'i structured data'ya çeviren en güçlü filter. Regular expression'ları okunabilir pattern'lara sarmalar.

filter {
  grok {
    match => {
      "message" => "%{COMBINEDAPACHELOG}"
    }
  }
}

%{COMBINEDAPACHELOG} şu alanları çıkarır:

  • clientip, ident, auth, timestamp, verb, request, httpversion, response, bytes, referrer, agent

Grok Custom Pattern

filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_timestamp} \[%{LOGLEVEL:level}\] %{DATA:class} - %{GREEDYDATA:log_message}"
    }
  }
}

# Örnek log:
# 2024-01-15T14:30:00.123Z [ERROR] com.myapp.UserService - User not found: id=12345

# Çıktı:
# log_timestamp: "2024-01-15T14:30:00.123Z"
# level: "ERROR"
# class: "com.myapp.UserService"
# log_message: "User not found: id=12345"

Temel Grok Pattern'ları

PatternEşleşmeÖrnek
%{IP}IP adresi192.168.1.1
%{TIMESTAMP_ISO8601}ISO timestamp2024-01-15T14:30:00Z
%{LOGLEVEL}Log seviyesiERROR, WARN, INFO
%{NUMBER}Sayı12345, 3.14
%{WORD}Tek kelimehello
%{DATA}Lazy match (tembel)anything
%{GREEDYDATA}Greedy match (açgözlü)everything to end
%{NOTSPACE}Boşluk olmayanusername
%{QUOTEDSTRING}Tırnak içi"hello world"

Kendi Pattern'ınızı Tanımlama

filter {
  grok {
    patterns_dir => ["/etc/logstash/patterns"]
    match => {
      "message" => "%{MYAPP_LOG}"
    }
  }
}
# /etc/logstash/patterns/myapp
MYAPP_LOG %{TIMESTAMP_ISO8601:timestamp} \[%{WORD:thread}\] %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:message}
JAVACLASS (?:[a-zA-Z$_][a-zA-Z$_0-9]*\.)*[a-zA-Z$_][a-zA-Z$_0-9]*

💡 İpucu: Grok pattern'larını test etmek için Kibana'daki Grok Debugger aracını kullanın. Dev Tools → Grok Debugger menüsünden erişebilirsiniz.

5.2. Mutate — Alan Manipülasyonu

filter {
  mutate {
    # Field adı değiştir
    rename => { "clientip" => "client_ip" }

    # Field tipi dönüştür
    convert => {
      "response_code" => "integer"
      "response_time" => "float"
      "bytes" => "integer"
    }

    # Field ekle
    add_field => {
      "environment" => "production"
      "processed_by" => "logstash-1"
    }

    # Field sil
    remove_field => ["message", "@version", "host"]

    # String işlemleri
    lowercase => ["level"]
    uppercase => ["country_code"]
    strip => ["user_input"]
    gsub => [
      "message", "\r\n", "\n",
      "user_agent", "\"", ""
    ]

    # Field birleştir
    merge => { "tags" => "extra_tags" }

    # Field böl
    split => { "categories" => "," }
  }
}

5.3. Date — Zaman Damgası Parse

filter {
  date {
    match => ["log_timestamp", "ISO8601"]
    target => "@timestamp"
    timezone => "Europe/Istanbul"
  }
}

# Birden fazla format desteği
filter {
  date {
    match => ["timestamp",
      "dd/MMM/yyyy:HH:mm:ss Z",
      "yyyy-MM-dd HH:mm:ss",
      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
      "UNIX",
      "UNIX_MS"
    ]
    target => "@timestamp"
  }
}

5.4. GeoIP — Coğrafi Konum

filter {
  geoip {
    source => "client_ip"
    target => "geo"
    # Varsayılan olarak: city, country, location (lat/lon), continent
  }
}

# Çıktı:
# "geo": {
#   "city_name": "Istanbul",
#   "country_code2": "TR",
#   "country_name": "Turkey",
#   "location": {
#     "lon": 28.9784,
#     "lat": 41.0082
#   }
# }

5.5. Conditional Filter

filter {
  # Sadece error log'lara uygula
  if [level] == "ERROR" {
    mutate {
      add_tag => ["alert"]
      add_field => { "severity" => "high" }
    }
  }

  # Status code'a göre işlem
  if [response] and [response] >= 500 {
    mutate { add_tag => ["server_error"] }
  } else if [response] and [response] >= 400 {
    mutate { add_tag => ["client_error"] }
  }

  # Field varlığı kontrolü
  if [user_agent] {
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }

  # Belirli tag'leri olan event'leri düşür
  if "debug" in [tags] {
    drop {}
  }
}

5.6. Aggregate — Event Birleştirme

filter {
  # Transaction ID'ye göre event'leri birleştir
  if [event_type] == "request_start" {
    aggregate {
      task_id => "%{transaction_id}"
      code => """
        map['start_time'] = event.get('timestamp')
        map['request_path'] = event.get('path')
      """
    }
  }

  if [event_type] == "request_end" {
    aggregate {
      task_id => "%{transaction_id}"
      code => """
        event.set('start_time', map['start_time'])
        event.set('request_path', map['request_path'])
        event.set('duration_ms', event.get('timestamp').to_f - map['start_time'].to_f)
      """
      end_of_task => true
      timeout => 120
    }
  }
}

6. Output Plugin'leri

Elasticsearch Output

output {
  elasticsearch {
    hosts => ["https://es-node1:9200", "https://es-node2:9200"]
    user => "logstash_writer"
    password => "${ES_PASSWORD}"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]

    # Index adlandırma
    index => "logs-%{[service]}-%{+YYYY.MM.dd}"

    # Data stream kullanımı
    # data_stream => true
    # data_stream_type => "logs"
    # data_stream_dataset => "myapp"
    # data_stream_namespace => "production"

    # Pipeline
    pipeline => "my-ingest-pipeline"

    # Bulk ayarları
    bulk_path => "/_bulk"
  }
}

Birden Fazla Output

output {
  # Elasticsearch'e yaz
  elasticsearch {
    hosts => ["https://es:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  # Aynı anda S3'e de yaz (arşiv)
  s3 {
    region => "eu-central-1"
    bucket => "my-log-archive"
    prefix => "logstash/logs/%{+YYYY/MM/dd}"
    codec => json_lines
    time_file => 15
    size_file => 104857600
  }

  # Error log'ları ayrı bir yere gönder
  if "alert" in [tags] {
    http {
      url => "https://alerting.internal/webhook"
      http_method => "post"
      format => "json"
    }
  }

  # Debug: Console'a da bas (test için)
  # stdout { codec => rubydebug }
}

Kafka Output

output {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topic_id => "processed-logs"
    codec => json
    compression_type => "snappy"
  }
}

7. Bütünleşik Örnek: Web Log Pipeline

Gerçek dünya senaryosu: Nginx access log'larını parse et, zenginleştir ve Elasticsearch'e yaz.

# /etc/logstash/conf.d/nginx-pipeline.conf

input {
  beats {
    port => 5044
    type => "nginx-access"
  }
}

filter {
  # 1. Nginx log'unu parse et
  grok {
    match => {
      "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status_code:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:user_agent}"'
    }
    remove_field => ["message"]
  }

  # 2. Timestamp parse
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    target => "@timestamp"
    remove_field => ["timestamp"]
  }

  # 3. GeoIP
  geoip {
    source => "client_ip"
    target => "geo"
  }

  # 4. User Agent parse
  useragent {
    source => "user_agent"
    target => "ua"
  }

  # 5. Status code kategorilendirme
  if [status_code] >= 500 {
    mutate {
      add_field => { "status_category" => "server_error" }
      add_tag => ["error"]
    }
  } else if [status_code] >= 400 {
    mutate {
      add_field => { "status_category" => "client_error" }
    }
  } else if [status_code] >= 300 {
    mutate {
      add_field => { "status_category" => "redirect" }
    }
  } else {
    mutate {
      add_field => { "status_category" => "success" }
    }
  }

  # 6. Static asset'leri filtrele (opsiyonel)
  if [request] =~ /\.(css|js|png|jpg|gif|ico|woff|svg)$/ {
    mutate {
      add_tag => ["static_asset"]
    }
    # drop {}  # İstemiyorsan tamamen at
  }

  # 7. Gereksiz field'ları temizle
  mutate {
    remove_field => ["@version", "agent", "ecs", "input", "log"]
  }
}

output {
  elasticsearch {
    hosts => ["https://es-node1:9200"]
    user => "logstash_writer"
    password => "${ES_PASSWORD}"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    index => "nginx-access-%{+YYYY.MM.dd}"
    template_name => "nginx-access"
    template_overwrite => true
  }
}

8. Pipeline Yönetimi

Birden Fazla Pipeline

# pipelines.yml
- pipeline.id: nginx-logs
  path.config: "/etc/logstash/conf.d/nginx/*.conf"
  pipeline.workers: 4
  queue.type: persisted
  queue.max_bytes: 2gb

- pipeline.id: app-logs
  path.config: "/etc/logstash/conf.d/app/*.conf"
  pipeline.workers: 2
  queue.type: memory

- pipeline.id: metrics
  path.config: "/etc/logstash/conf.d/metrics/*.conf"
  pipeline.workers: 1

Pipeline-to-Pipeline

# Router pipeline
input {
  beats { port => 5044 }
}

output {
  if [type] == "nginx" {
    pipeline { send_to => "nginx-processing" }
  } else if [type] == "app" {
    pipeline { send_to => "app-processing" }
  }
}

# Nginx processing pipeline
input {
  pipeline { address => "nginx-processing" }
}
filter { ... }
output { ... }

# App processing pipeline
input {
  pipeline { address => "app-processing" }
}
filter { ... }
output { ... }

Pipeline Monitoring

# Logstash API
curl http://localhost:9600/_node/stats/pipelines

# Hot threads
curl http://localhost:9600/_node/hot_threads

# Pipeline info
curl http://localhost:9600/_node/pipelines

9. Dead Letter Queue (DLQ)

Parse edemediğiniz veya yazamadığınız event'ler için:

# logstash.yml
dead_letter_queue.enable: true
dead_letter_queue.max_bytes: 1gb
path.dead_letter_queue: /var/lib/logstash/dead_letter_queue
# DLQ'yu okuyan ayrı pipeline
input {
  dead_letter_queue {
    path => "/var/lib/logstash/dead_letter_queue"
    pipeline_id => "nginx-logs"
    commit_offsets => true
  }
}

filter {
  # Hata nedenini logla
  mutate {
    add_field => {
      "dlq_reason" => "%{[@metadata][dead_letter_queue][reason]}"
      "dlq_plugin" => "%{[@metadata][dead_letter_queue][plugin_type]}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://es:9200"]
    index => "dlq-events-%{+YYYY.MM.dd}"
  }
}

10. Best Practices

✅ Yap

KonuÖneri
Persistent queuequeue.type: persisted — crash'te veri kaybını önler
Grok debuggerPattern'ları Kibana'da test et
Conditional outputFarklı veri tiplerini farklı index'lere yaz
DLQParse hatalarını kaybet değil, DLQ'ya at
Pipeline separationFarklı veri kaynakları için farklı pipeline
Environment variablesŞifreleri ${VAR} ile al, config'e yazma

❌ Yapma

KonuNeden
Tek pipeline'da her şeyDebug zor, bir hata tüm veriyi etkiler
Grok'ta GREEDYDATA fazla kullanPerformans düşer, spesifik pattern yaz
Memory queue + büyük veriCrash'te veri kaybı
Şifreler plain textConfig dosyaları versiyon kontrolünde
Filter olmadan outputRaw veri Elasticsearch'ü şişirir

11. Yaygın Hatalar ve Çözümleri

Hata 1: Grok Parse Failure

// Sorun: "_grokparsefailure" tag'i ekleniyor
// Neden: Pattern log formatına uymuyor

// Çözüm: Grok debugger'da test et ve pattern'ı düzelt
// Veya hatayı yakala:
filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    tag_on_failure => ["_grok_failed"]
  }

  if "_grok_failed" in [tags] {
    mutate {
      add_field => { "parse_error" => "true" }
    }
  }
}

Hata 2: Logstash Bellek Sorunu

# jvm.options — Heap ayarı
-Xms2g
-Xmx2g

# Büyük pipeline'lar için:
-Xms4g
-Xmx4g

# ⚠️ Heap'i toplam RAM'in %50'sinden fazla yapma

Hata 3: Elasticsearch Connection Refused

// Sorun: ES'e bağlanamıyor
// Çözüm: Retry ayarları
output {
  elasticsearch {
    hosts => ["https://es:9200"]
    retry_on_conflict => 3
    retry_initial_interval => 2
    retry_max_interval => 64
  }
}

Özet

  1. Logstash bir ETL aracıdır — 200+ plugin ile neredeyse her kaynaktan veri alıp, dönüştürüp, her hedefe yazabilir.

  2. Pipeline üç bölümden oluşur: Input (veri al), Filter (dönüştür), Output (yaz). Her bölüm birden fazla plugin içerebilir.

  3. Grok unstructured text'i structured data'ya çeviren en güçlü filtre — pattern'ları Kibana Grok Debugger'da test edin.

  4. Mutate en sık kullanılan filtre — field ekleme, silme, yeniden adlandırma, tip dönüşümü için.

  5. Persistent queue production'da zorunlu — crash durumunda veri kaybını önler.

  6. Conditional logic ile farklı veri tiplerini farklı şekilde işleyebilir ve farklı hedeflere yönlendirebilirsiniz.