← Kursa Dön
📄 Text · 18 min

Multithreading ve Multiprocessing

Giriş: Neden Eşzamanlılık?

Bilgisayarın tek çekirdekli olduğu günler çoktan geride kaldı. Bugün cebindeki telefonda bile 8 çekirdek var. Ama Python programların varsayılan olarak tek çekirdekte çalışıyor. Bu, 8 şeritli bir otoyolda sadece 1 şeridi kullanmak gibi.

Bu derste Python'da eşzamanlılık (concurrency) ve paralellik (parallelism) kavramlarını öğrenecek, threading ve multiprocessing modüllerini kullanacak ve hangi durumda hangisini seçmen gerektiğini anlayacaksın.


Concurrency vs Parallelism

Analoji: Aşçı ve Mutfak

Concurrency (Eşzamanlılık): Bir aşçının birden fazla yemeği aynı anda idare etmesi. Çorbayı karıştırır, sonra salata keser, sonra fırını kontrol eder. Aslında aynı anda sadece bir iş yapıyor ama aralarında hızlıca geçiş yapıyor.

Parallelism (Paralellik): Birden fazla aşçının gerçekten aynı anda farklı yemekleri yapması. Biri çorba, diğeri salata, üçüncüsü tatlı. Fiziksel olarak eş zamanlı.

Concurrency:      Parallelism:
Aşçı 1            Aşçı 1        Aşçı 2        Aşçı 3
├─ Çorba           ├─ Çorba       ├─ Salata      ├─ Tatlı
├─ Salata          ├─ Çorba       ├─ Salata      ├─ Tatlı
├─ Çorba           ├─ Çorba       ├─ Salata      ├─ Tatlı
├─ Tatlı           └─ Bitti       └─ Bitti       └─ Bitti
├─ Salata
└─ Bitti

Python'da:

  • Threading → Concurrency (çoğunlukla)

  • Multiprocessing → Parallelism (gerçek)


GIL: Global Interpreter Lock

Python'ın en tartışmalı özelliği: GIL.

GIL Nedir?

GIL, CPython interpreter'ında aynı anda sadece bir thread'in Python bytecode çalıştırmasını sağlayan bir kilittir. Yani 8 thread açsan bile, herhangi bir anda sadece 1 tanesi Python kodu çalıştırır.

Neden Var?

CPython'ın bellek yönetimi (reference counting) thread-safe değil. GIL olmasaydı, birden fazla thread aynı nesnenin referans sayacını değiştirmeye çalışabilir ve bellek bozulması olurdu. GIL bu sorunu basit ama agresif bir şekilde çözer.

GIL'in Etkisi

CPU-bound iş (hesaplama):
Thread 1: ████████████████
Thread 2:                 ████████████████
→ GIL yüzünden sırayla çalışır, hız artışı YOK

I/O-bound iş (dosya, ağ):
Thread 1: ██░░░░██░░░░██
Thread 2:   ██░░░░██░░░░██
→ I/O beklerken GIL serbest kalır, hız artışı VAR

Özet: Threading, I/O-bound işler için harika, CPU-bound işler için işe yaramaz. CPU-bound işler için multiprocessing kullan.

⚠️ Dikkat: GIL sadece CPython (standart Python) için geçerli. PyPy, Jython, IronPython gibi diğer Python implementasyonlarında GIL olmayabilir. Python 3.13 ile free-threaded (GIL'siz) mod da deneysel olarak eklendi.


threading Modülü

Thread Oluşturma: Temel Yol

import threading
import time

def worker(name, duration):
    """Bir işi simüle eden fonksiyon."""
    print(f"🔨 {name} başladı")
    time.sleep(duration)
    print(f"✅ {name} bitti ({duration}s)")

# Thread oluştur
t1 = threading.Thread(target=worker, args=("İş 1", 2))
t2 = threading.Thread(target=worker, args=("İş 2", 3))

# Başlat
t1.start()
t2.start()

# Ana thread'in beklemesi
t1.join()
t2.join()

print("Tüm işler tamamlandı!")

start() thread'i başlatır, join() thread'in bitmesini bekler.

Süre Karşılaştırması

import threading
import time

def download_file(url, duration):
    """Dosya indirmeyi simüle eder (I/O-bound)."""
    print(f"⬇️ İndiriliyor: {url}")
    time.sleep(duration)  # I/O bekleme simülasyonu
    print(f"✅ İndirildi: {url}")

urls = [
    ("file1.zip", 2),
    ("file2.zip", 3),
    ("file3.zip", 1),
    ("file4.zip", 2),
]

# Sıralı indirme
start = time.perf_counter()
for url, duration in urls:
    download_file(url, duration)
sequential_time = time.perf_counter() - start
print(f"Sıralı: {sequential_time:.1f}s")  # ~8s

# Paralel indirme (threading)
start = time.perf_counter()
threads = []
for url, duration in urls:
    t = threading.Thread(target=download_file, args=(url, duration))
    threads.append(t)
    t.start()

for t in threads:
    t.join()
parallel_time = time.perf_counter() - start
print(f"Threading: {parallel_time:.1f}s")  # ~3s (en uzun süre)

Threading ile 8 saniyelik iş 3 saniyeye düştü! Çünkü I/O beklerken diğer thread'ler çalışabiliyor.

Daemon Thread

Daemon thread'ler, ana program bitince otomatik sonlanır:

def background_task():
    """Arka planda çalışan görev."""
    while True:
        print("📡 Arka plan görevi çalışıyor...")
        time.sleep(1)

# Daemon thread: ana program bitince otomatik kapanır
t = threading.Thread(target=background_task, daemon=True)
t.start()

time.sleep(3)
print("Ana program bitiyor — daemon thread de kapanacak")

Race Condition ve Lock

Race Condition Nedir?

Birden fazla thread aynı veriye aynı anda erişmeye çalıştığında ortaya çıkan sorun.

import threading

# ❌ Race condition örneği
counter = 0

def increment(n):
    global counter
    for _ in range(n):
        counter += 1  # Bu satır atomik DEĞİL!
        # Aslında: temp = counter → temp += 1 → counter = temp

threads = []
for _ in range(10):
    t = threading.Thread(target=increment, args=(100_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Beklenen: 1,000,000")
print(f"Gerçek:   {counter:,}")  # Muhtemelen < 1,000,000!

counter += 1 tek bir işlem gibi görünüyor ama aslında 3 adım: oku, artır, yaz. İki thread aynı anda okuyabilir ve birbirinin değişikliğini ezebilir.

Lock ile Çözüm

import threading

counter = 0
lock = threading.Lock()

def safe_increment(n):
    global counter
    for _ in range(n):
        with lock:  # Lock al
            counter += 1
        # Lock otomatik serbest bırakılır

threads = []
for _ in range(10):
    t = threading.Thread(target=safe_increment, args=(100_000,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"Beklenen: 1,000,000")
print(f"Gerçek:   {counter:,}")  # Tam olarak 1,000,000!

Lock bir context manager — with lock: ile kullanabilirsin. Blok içinde sadece bir thread çalışabilir.

RLock: Reentrant Lock

Normal Lock aynı thread tarafından bile ikinci kez alınamaz. RLock ise aynı thread tarafından birden fazla kez alınabilir:

rlock = threading.RLock()

def recursive_function(n):
    with rlock:
        if n > 0:
            print(f"Seviye: {n}")
            recursive_function(n - 1)  # Aynı lock'ı tekrar alır

recursive_function(3)
# Normal Lock ile bu deadlock olurdu!

Event: Thread'ler Arası Sinyal

import threading
import time

data_ready = threading.Event()

def producer():
    """Veri üreten thread."""
    print("📦 Veri hazırlanıyor...")
    time.sleep(2)
    print("📦 Veri hazır!")
    data_ready.set()  # Sinyali gönder

def consumer():
    """Veri tüketen thread."""
    print("⏳ Veri bekleniyor...")
    data_ready.wait()  # Sinyal gelene kadar bekle
    print("🎉 Veri alındı, işleniyor!")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()

I/O Bound vs CPU Bound

Bu ayrım çok önemli — hangi aracı kullanacağını belirler.

I/O Bound İşler

CPU bekliyor, veri geliyor:

  • Dosya okuma/yazma

  • Ağ istekleri (HTTP, API)

  • Veritabanı sorguları

  • Kullanıcı girişi bekleme

Çözüm: threading veya asyncio

CPU Bound İşler

CPU hesaplama yapıyor, tamamen meşgul:

  • Matematiksel hesaplamalar

  • Resim/video işleme

  • Veri sıkıştırma

  • Machine learning eğitimi

Çözüm: multiprocessing

I/O BoundCPU Bound
DarboğazAğ, diskİşlemci
Threading✅ İyi❌ GIL engeller
Multiprocessing⚠️ Overkill✅ Mükemmel
asyncio✅ En iyi❌ İşe yaramaz

multiprocessing Modülü

multiprocessing, GIL'i atlatmanın yolu. Her process kendi Python interpreter'ını çalıştırır, dolayısıyla kendi GIL'i olur. Gerçek paralellik sağlar.

Temel Kullanım

import multiprocessing
import time
import os

def cpu_intensive(n):
    """CPU-yoğun hesaplama."""
    print(f"Process {os.getpid()} başladı")
    result = sum(i ** 2 for i in range(n))
    print(f"Process {os.getpid()} bitti: {result}")
    return result

if __name__ == "__main__":
    # Process oluştur
    p1 = multiprocessing.Process(target=cpu_intensive, args=(10_000_000,))
    p2 = multiprocessing.Process(target=cpu_intensive, args=(10_000_000,))
    
    start = time.perf_counter()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    elapsed = time.perf_counter() - start
    
    print(f"2 process ile: {elapsed:.2f}s")

⚠️ Dikkat: Windows ve macOS'ta multiprocessing kodu `if __name__ == "__main__":` bloğu içinde olmalı. Yoksa sonsuz döngüye girebilir.

Pool: Process Havuzu

from multiprocessing import Pool
import time

def calculate_square(n):
    """Kare hesapla (CPU-bound simülasyonu)."""
    time.sleep(0.5)  # İşlem süresi simülasyonu
    return n ** 2

if __name__ == "__main__":
    numbers = list(range(20))
    
    # Sıralı
    start = time.perf_counter()
    results_seq = [calculate_square(n) for n in numbers]
    print(f"Sıralı: {time.perf_counter() - start:.1f}s")
    
    # Pool ile paralel
    start = time.perf_counter()
    with Pool(processes=4) as pool:
        results_par = pool.map(calculate_square, numbers)
    print(f"Pool(4): {time.perf_counter() - start:.1f}s")
    
    print(f"Sonuçlar eşit: {results_seq == results_par}")

Pool Metodları

from multiprocessing import Pool

def process_item(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        # map: Sıralı sonuç, hepsini bekler
        results = pool.map(process_item, range(10))
        print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        
        # imap: Iterator döner (lazy, sıralı)
        for result in pool.imap(process_item, range(10)):
            print(result, end=" ")
        
        # imap_unordered: Iterator döner (sırasız, hızlı)
        for result in pool.imap_unordered(process_item, range(10)):
            print(result, end=" ")
        
        # apply_async: Tek bir çağrı, non-blocking
        future = pool.apply_async(process_item, (42,))
        print(future.get())  # 1764

concurrent.futures: Modern API

concurrent.futures modülü, threading ve multiprocessing için birleşik ve modern bir API sunar.

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_url(url):
    """URL'den veri çekmeyi simüle eder."""
    print(f"⬇️ {url} çekiliyor...")
    time.sleep(1)
    return f"Veri({url})"

urls = [f"https://api.example.com/data/{i}" for i in range(8)]

# ThreadPoolExecutor ile
with ThreadPoolExecutor(max_workers=4) as executor:
    # map: Sonuçları sıralı döner
    results = list(executor.map(fetch_url, urls))
    
for result in results:
    print(result)

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    """Asal sayı kontrolü (CPU-bound)."""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

if __name__ == "__main__":
    numbers = [
        112272535095293, 112582705942171,
        112272535095293, 115280095190773,
        115797848077099, 1099726899285419
    ]
    
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(is_prime, numbers))
    
    for num, prime in zip(numbers, results):
        print(f"{num}: {'Asal' if prime else 'Asal değil'}")

submit() ve Future Nesnesi

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def download_file(filename):
    """Dosya indirme simülasyonu."""
    duration = random.uniform(0.5, 3.0)
    time.sleep(duration)
    return f"{filename} ({duration:.1f}s)"

filenames = [f"dosya_{i}.zip" for i in range(6)]

with ThreadPoolExecutor(max_workers=3) as executor:
    # submit() ile Future nesneleri al
    futures = {
        executor.submit(download_file, f): f 
        for f in filenames
    }
    
    # as_completed: Biten sırayla sonuçları al
    for future in as_completed(futures):
        filename = futures[future]
        try:
            result = future.result()
            print(f"✅ {result}")
        except Exception as e:
            print(f"❌ {filename}: {e}")

as_completed() sonuçları biten sırayla döner. Uzun süren işleri beklemeden, biten işlerin sonuçlarını hemen alabilirsin.


Thread-Safe Yapılar: queue.Queue

Birden fazla thread arasında veri paylaşmak için queue.Queue kullan. Thread-safe'tir — lock'a gerek yok.

import threading
import queue
import time

def producer(q, items):
    """Kuyruğa veri ekleyen producer."""
    for item in items:
        print(f"📦 Üretildi: {item}")
        q.put(item)
        time.sleep(0.5)
    q.put(None)  # Sentinel: "Bitti" sinyali

def consumer(q, name):
    """Kuyruktan veri alan consumer."""
    while True:
        item = q.get()  # Veri gelene kadar bekler
        if item is None:
            q.put(None)  # Diğer consumer'lar için sentinel'i geri koy
            break
        print(f"🔧 {name} işledi: {item}")
        time.sleep(1)
        q.task_done()

# Producer-Consumer pattern
q = queue.Queue(maxsize=5)

producer_thread = threading.Thread(
    target=producer, 
    args=(q, ["veri_1", "veri_2", "veri_3", "veri_4", "veri_5"])
)
consumer1 = threading.Thread(target=consumer, args=(q, "İşçi-1"))
consumer2 = threading.Thread(target=consumer, args=(q, "İşçi-2"))

producer_thread.start()
consumer1.start()
consumer2.start()

producer_thread.join()
consumer1.join()
consumer2.join()
print("Tüm işlem tamamlandı!")

Queue Tipleri

import queue

# FIFO Queue (varsayılan): İlk giren ilk çıkar
fifo = queue.Queue()

# LIFO Queue (Stack): Son giren ilk çıkar
lifo = queue.LifoQueue()

# Priority Queue: Önceliğe göre çıkar
pq = queue.PriorityQueue()
pq.put((3, "düşük öncelik"))
pq.put((1, "yüksek öncelik"))
pq.put((2, "orta öncelik"))

print(pq.get())  # (1, 'yüksek öncelik')
print(pq.get())  # (2, 'orta öncelik')
print(pq.get())  # (3, 'düşük öncelik')

Pratik: Paralel Dosya İndirme (Threading)

import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def download_file(url):
    """Dosya indirme simülasyonu."""
    size_mb = random.uniform(1, 50)
    duration = size_mb / 10  # 10 MB/s simülasyonu
    
    print(f"⬇️  Başladı: {url} ({size_mb:.1f} MB)")
    time.sleep(duration)
    print(f"✅ Bitti:    {url} ({duration:.1f}s)")
    
    return {"url": url, "size": size_mb, "time": duration}

urls = [
    "https://example.com/data1.csv",
    "https://example.com/data2.csv",
    "https://example.com/images.zip",
    "https://example.com/video.mp4",
    "https://example.com/backup.tar.gz",
    "https://example.com/docs.pdf",
]

# Paralel indirme
start = time.perf_counter()

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(download_file, url): url for url in urls}
    results = []
    
    for future in as_completed(futures):
        result = future.result()
        results.append(result)

elapsed = time.perf_counter() - start
total_size = sum(r["size"] for r in results)

print(f"\n📊 Rapor:")
print(f"   Dosya sayısı: {len(results)}")
print(f"   Toplam boyut: {total_size:.1f} MB")
print(f"   Toplam süre:  {elapsed:.1f}s")
print(f"   Hız:          {total_size/elapsed:.1f} MB/s")

Pratik: Paralel Hesaplama (Multiprocessing)

from multiprocessing import Pool
import time
import math

def count_primes_in_range(args):
    """Belirli bir aralıktaki asal sayıları say."""
    start, end = args
    count = 0
    for n in range(start, end):
        if n < 2:
            continue
        is_prime = True
        for i in range(2, int(math.sqrt(n)) + 1):
            if n % i == 0:
                is_prime = False
                break
        if is_prime:
            count += 1
    return count

if __name__ == "__main__":
    N = 1_000_000
    num_processes = 4
    
    # Aralıkları böl
    chunk_size = N // num_processes
    ranges = [
        (i * chunk_size, (i + 1) * chunk_size) 
        for i in range(num_processes)
    ]
    
    # Sıralı
    start = time.perf_counter()
    seq_result = count_primes_in_range((0, N))
    seq_time = time.perf_counter() - start
    print(f"Sıralı:    {seq_result} asal, {seq_time:.2f}s")
    
    # Paralel
    start = time.perf_counter()
    with Pool(num_processes) as pool:
        results = pool.map(count_primes_in_range, ranges)
    par_result = sum(results)
    par_time = time.perf_counter() - start
    print(f"Paralel:   {par_result} asal, {par_time:.2f}s")
    
    print(f"Hızlanma:  {seq_time/par_time:.1f}x")

Threading vs Multiprocessing: Karar Tablosu

Özellikthreadingmultiprocessing
GILEtkilenirEtkilenmez
I/O-bound✅ Mükemmel⚠️ Overkill
CPU-bound❌ Yavaş✅ Mükemmel
Bellek paylaşımıKolay (aynı process)Zor (IPC gerekir)
Process başlatmaHızlıYavaş
Bellek kullanımıAzÇok (her process ayrı)
Debug kolaylığıZor (race condition)Daha kolay
Veri paylaşımıShared memoryPickle/Queue

💡 İpucu: Genel kural: I/O bekliyorsan threading veya asyncio, CPU çalışıyorsa multiprocessing kullan. Emin değilsen concurrent.futures ile başla — ThreadPoolExecutor'dan ProcessPoolExecutor'a geçiş tek satır değişikliği.


Yaygın Hatalar

1. Deadlock

# ❌ İki lock'ı farklı sırada almak → deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()

def thread_1():
    with lock_a:
        time.sleep(0.1)
        with lock_b:  # lock_b'yi bekler
            print("Thread 1")

def thread_2():
    with lock_b:
        time.sleep(0.1)
        with lock_a:  # lock_a'yı bekler → DEADLOCK!
            print("Thread 2")

# ✅ Çözüm: Lock'ları her zaman aynı sırada al
def thread_1_fixed():
    with lock_a:
        with lock_b:
            print("Thread 1")

def thread_2_fixed():
    with lock_a:  # Aynı sıra!
        with lock_b:
            print("Thread 2")

2. if __name__ Bloğunu Unutmak

# ❌ Windows/macOS'ta hata verir
from multiprocessing import Pool
pool = Pool(4)  # RuntimeError!

# ✅ Doğru
if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.map(func, data)

3. Global Değişken Paylaşımı

# ❌ Multiprocessing'te global değişken paylaşılmaz
counter = 0

def increment_mp():
    global counter
    counter += 1  # Her process KENDI kopyasını değiştirir!

# ✅ multiprocessing.Value veya Queue kullan
from multiprocessing import Value

def increment_shared(shared_counter):
    with shared_counter.get_lock():
        shared_counter.value += 1

Özet

  • Concurrency, görevleri zaman paylaşımlı yürütmedir; parallelism ise gerçekten aynı anda yürütmedir. Python'da threading concurrency, multiprocessing ise parallelism sağlar.

  • GIL, CPython'da aynı anda sadece bir thread'in Python kodu çalıştırmasını sağlar. I/O-bound işlerde sorun olmaz, CPU-bound işlerde darboğaz yaratır.

  • `threading.Lock` ile race condition'ları önle. `queue.Queue` ile thread'ler arası güvenli veri paylaşımı yap.

  • I/O-bound işler (ağ, dosya) için threading veya asyncio, CPU-bound işler (hesaplama) için multiprocessing kullan.

  • `concurrent.futures` modülü ThreadPoolExecutor ve ProcessPoolExecutor ile modern ve birleşik bir API sunar. as_completed() ile sonuçları biten sırayla alabilirsin.

  • Deadlock'tan kaçın: lock'ları her zaman aynı sırada al, timeout kullan ve mümkünse with statement kullan.