Multithreading ve Multiprocessing
Giriş: Neden Eşzamanlılık?
Bilgisayarın tek çekirdekli olduğu günler çoktan geride kaldı. Bugün cebindeki telefonda bile 8 çekirdek var. Ama Python programların varsayılan olarak tek çekirdekte çalışıyor. Bu, 8 şeritli bir otoyolda sadece 1 şeridi kullanmak gibi.
Bu derste Python'da eşzamanlılık (concurrency) ve paralellik (parallelism) kavramlarını öğrenecek, threading ve multiprocessing modüllerini kullanacak ve hangi durumda hangisini seçmen gerektiğini anlayacaksın.
Concurrency vs Parallelism
Analoji: Aşçı ve Mutfak
Concurrency (Eşzamanlılık): Bir aşçının birden fazla yemeği aynı anda idare etmesi. Çorbayı karıştırır, sonra salata keser, sonra fırını kontrol eder. Aslında aynı anda sadece bir iş yapıyor ama aralarında hızlıca geçiş yapıyor.
Parallelism (Paralellik): Birden fazla aşçının gerçekten aynı anda farklı yemekleri yapması. Biri çorba, diğeri salata, üçüncüsü tatlı. Fiziksel olarak eş zamanlı.
Concurrency: Parallelism:
Aşçı 1 Aşçı 1 Aşçı 2 Aşçı 3
├─ Çorba ├─ Çorba ├─ Salata ├─ Tatlı
├─ Salata ├─ Çorba ├─ Salata ├─ Tatlı
├─ Çorba ├─ Çorba ├─ Salata ├─ Tatlı
├─ Tatlı └─ Bitti └─ Bitti └─ Bitti
├─ Salata
└─ BittiPython'da:
Threading → Concurrency (çoğunlukla)
Multiprocessing → Parallelism (gerçek)
GIL: Global Interpreter Lock
Python'ın en tartışmalı özelliği: GIL.
GIL Nedir?
GIL, CPython interpreter'ında aynı anda sadece bir thread'in Python bytecode çalıştırmasını sağlayan bir kilittir. Yani 8 thread açsan bile, herhangi bir anda sadece 1 tanesi Python kodu çalıştırır.
Neden Var?
CPython'ın bellek yönetimi (reference counting) thread-safe değil. GIL olmasaydı, birden fazla thread aynı nesnenin referans sayacını değiştirmeye çalışabilir ve bellek bozulması olurdu. GIL bu sorunu basit ama agresif bir şekilde çözer.
GIL'in Etkisi
CPU-bound iş (hesaplama):
Thread 1: ████████████████
Thread 2: ████████████████
→ GIL yüzünden sırayla çalışır, hız artışı YOK
I/O-bound iş (dosya, ağ):
Thread 1: ██░░░░██░░░░██
Thread 2: ██░░░░██░░░░██
→ I/O beklerken GIL serbest kalır, hız artışı VARÖzet: Threading, I/O-bound işler için harika, CPU-bound işler için işe yaramaz. CPU-bound işler için multiprocessing kullan.
⚠️ Dikkat: GIL sadece CPython (standart Python) için geçerli. PyPy, Jython, IronPython gibi diğer Python implementasyonlarında GIL olmayabilir. Python 3.13 ile free-threaded (GIL'siz) mod da deneysel olarak eklendi.
threading Modülü
Thread Oluşturma: Temel Yol
import threading
import time
def worker(name, duration):
"""Bir işi simüle eden fonksiyon."""
print(f"🔨 {name} başladı")
time.sleep(duration)
print(f"✅ {name} bitti ({duration}s)")
# Thread oluştur
t1 = threading.Thread(target=worker, args=("İş 1", 2))
t2 = threading.Thread(target=worker, args=("İş 2", 3))
# Başlat
t1.start()
t2.start()
# Ana thread'in beklemesi
t1.join()
t2.join()
print("Tüm işler tamamlandı!")start() thread'i başlatır, join() thread'in bitmesini bekler.
Süre Karşılaştırması
import threading
import time
def download_file(url, duration):
"""Dosya indirmeyi simüle eder (I/O-bound)."""
print(f"⬇️ İndiriliyor: {url}")
time.sleep(duration) # I/O bekleme simülasyonu
print(f"✅ İndirildi: {url}")
urls = [
("file1.zip", 2),
("file2.zip", 3),
("file3.zip", 1),
("file4.zip", 2),
]
# Sıralı indirme
start = time.perf_counter()
for url, duration in urls:
download_file(url, duration)
sequential_time = time.perf_counter() - start
print(f"Sıralı: {sequential_time:.1f}s") # ~8s
# Paralel indirme (threading)
start = time.perf_counter()
threads = []
for url, duration in urls:
t = threading.Thread(target=download_file, args=(url, duration))
threads.append(t)
t.start()
for t in threads:
t.join()
parallel_time = time.perf_counter() - start
print(f"Threading: {parallel_time:.1f}s") # ~3s (en uzun süre)Threading ile 8 saniyelik iş 3 saniyeye düştü! Çünkü I/O beklerken diğer thread'ler çalışabiliyor.
Daemon Thread
Daemon thread'ler, ana program bitince otomatik sonlanır:
def background_task():
"""Arka planda çalışan görev."""
while True:
print("📡 Arka plan görevi çalışıyor...")
time.sleep(1)
# Daemon thread: ana program bitince otomatik kapanır
t = threading.Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
print("Ana program bitiyor — daemon thread de kapanacak")Race Condition ve Lock
Race Condition Nedir?
Birden fazla thread aynı veriye aynı anda erişmeye çalıştığında ortaya çıkan sorun.
import threading
# ❌ Race condition örneği
counter = 0
def increment(n):
global counter
for _ in range(n):
counter += 1 # Bu satır atomik DEĞİL!
# Aslında: temp = counter → temp += 1 → counter = temp
threads = []
for _ in range(10):
t = threading.Thread(target=increment, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Beklenen: 1,000,000")
print(f"Gerçek: {counter:,}") # Muhtemelen < 1,000,000!counter += 1 tek bir işlem gibi görünüyor ama aslında 3 adım: oku, artır, yaz. İki thread aynı anda okuyabilir ve birbirinin değişikliğini ezebilir.
Lock ile Çözüm
import threading
counter = 0
lock = threading.Lock()
def safe_increment(n):
global counter
for _ in range(n):
with lock: # Lock al
counter += 1
# Lock otomatik serbest bırakılır
threads = []
for _ in range(10):
t = threading.Thread(target=safe_increment, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Beklenen: 1,000,000")
print(f"Gerçek: {counter:,}") # Tam olarak 1,000,000!Lock bir context manager — with lock: ile kullanabilirsin. Blok içinde sadece bir thread çalışabilir.
RLock: Reentrant Lock
Normal Lock aynı thread tarafından bile ikinci kez alınamaz. RLock ise aynı thread tarafından birden fazla kez alınabilir:
rlock = threading.RLock()
def recursive_function(n):
with rlock:
if n > 0:
print(f"Seviye: {n}")
recursive_function(n - 1) # Aynı lock'ı tekrar alır
recursive_function(3)
# Normal Lock ile bu deadlock olurdu!Event: Thread'ler Arası Sinyal
import threading
import time
data_ready = threading.Event()
def producer():
"""Veri üreten thread."""
print("📦 Veri hazırlanıyor...")
time.sleep(2)
print("📦 Veri hazır!")
data_ready.set() # Sinyali gönder
def consumer():
"""Veri tüketen thread."""
print("⏳ Veri bekleniyor...")
data_ready.wait() # Sinyal gelene kadar bekle
print("🎉 Veri alındı, işleniyor!")
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t2.start()
t1.start()
t1.join()
t2.join()I/O Bound vs CPU Bound
Bu ayrım çok önemli — hangi aracı kullanacağını belirler.
I/O Bound İşler
CPU bekliyor, veri geliyor:
Dosya okuma/yazma
Ağ istekleri (HTTP, API)
Veritabanı sorguları
Kullanıcı girişi bekleme
Çözüm: threading veya asyncio
CPU Bound İşler
CPU hesaplama yapıyor, tamamen meşgul:
Matematiksel hesaplamalar
Resim/video işleme
Veri sıkıştırma
Machine learning eğitimi
Çözüm: multiprocessing
| I/O Bound | CPU Bound | |
|---|---|---|
| Darboğaz | Ağ, disk | İşlemci |
| Threading | ✅ İyi | ❌ GIL engeller |
| Multiprocessing | ⚠️ Overkill | ✅ Mükemmel |
| asyncio | ✅ En iyi | ❌ İşe yaramaz |
multiprocessing Modülü
multiprocessing, GIL'i atlatmanın yolu. Her process kendi Python interpreter'ını çalıştırır, dolayısıyla kendi GIL'i olur. Gerçek paralellik sağlar.
Temel Kullanım
import multiprocessing
import time
import os
def cpu_intensive(n):
"""CPU-yoğun hesaplama."""
print(f"Process {os.getpid()} başladı")
result = sum(i ** 2 for i in range(n))
print(f"Process {os.getpid()} bitti: {result}")
return result
if __name__ == "__main__":
# Process oluştur
p1 = multiprocessing.Process(target=cpu_intensive, args=(10_000_000,))
p2 = multiprocessing.Process(target=cpu_intensive, args=(10_000_000,))
start = time.perf_counter()
p1.start()
p2.start()
p1.join()
p2.join()
elapsed = time.perf_counter() - start
print(f"2 process ile: {elapsed:.2f}s")⚠️ Dikkat: Windows ve macOS'ta
multiprocessingkodu `if __name__ == "__main__":` bloğu içinde olmalı. Yoksa sonsuz döngüye girebilir.
Pool: Process Havuzu
from multiprocessing import Pool
import time
def calculate_square(n):
"""Kare hesapla (CPU-bound simülasyonu)."""
time.sleep(0.5) # İşlem süresi simülasyonu
return n ** 2
if __name__ == "__main__":
numbers = list(range(20))
# Sıralı
start = time.perf_counter()
results_seq = [calculate_square(n) for n in numbers]
print(f"Sıralı: {time.perf_counter() - start:.1f}s")
# Pool ile paralel
start = time.perf_counter()
with Pool(processes=4) as pool:
results_par = pool.map(calculate_square, numbers)
print(f"Pool(4): {time.perf_counter() - start:.1f}s")
print(f"Sonuçlar eşit: {results_seq == results_par}")Pool Metodları
from multiprocessing import Pool
def process_item(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# map: Sıralı sonuç, hepsini bekler
results = pool.map(process_item, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# imap: Iterator döner (lazy, sıralı)
for result in pool.imap(process_item, range(10)):
print(result, end=" ")
# imap_unordered: Iterator döner (sırasız, hızlı)
for result in pool.imap_unordered(process_item, range(10)):
print(result, end=" ")
# apply_async: Tek bir çağrı, non-blocking
future = pool.apply_async(process_item, (42,))
print(future.get()) # 1764concurrent.futures: Modern API
concurrent.futures modülü, threading ve multiprocessing için birleşik ve modern bir API sunar.
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_url(url):
"""URL'den veri çekmeyi simüle eder."""
print(f"⬇️ {url} çekiliyor...")
time.sleep(1)
return f"Veri({url})"
urls = [f"https://api.example.com/data/{i}" for i in range(8)]
# ThreadPoolExecutor ile
with ThreadPoolExecutor(max_workers=4) as executor:
# map: Sonuçları sıralı döner
results = list(executor.map(fetch_url, urls))
for result in results:
print(result)ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
"""Asal sayı kontrolü (CPU-bound)."""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
if __name__ == "__main__":
numbers = [
112272535095293, 112582705942171,
112272535095293, 115280095190773,
115797848077099, 1099726899285419
]
with ProcessPoolExecutor() as executor:
results = list(executor.map(is_prime, numbers))
for num, prime in zip(numbers, results):
print(f"{num}: {'Asal' if prime else 'Asal değil'}")submit() ve Future Nesnesi
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def download_file(filename):
"""Dosya indirme simülasyonu."""
duration = random.uniform(0.5, 3.0)
time.sleep(duration)
return f"{filename} ({duration:.1f}s)"
filenames = [f"dosya_{i}.zip" for i in range(6)]
with ThreadPoolExecutor(max_workers=3) as executor:
# submit() ile Future nesneleri al
futures = {
executor.submit(download_file, f): f
for f in filenames
}
# as_completed: Biten sırayla sonuçları al
for future in as_completed(futures):
filename = futures[future]
try:
result = future.result()
print(f"✅ {result}")
except Exception as e:
print(f"❌ {filename}: {e}")as_completed() sonuçları biten sırayla döner. Uzun süren işleri beklemeden, biten işlerin sonuçlarını hemen alabilirsin.
Thread-Safe Yapılar: queue.Queue
Birden fazla thread arasında veri paylaşmak için queue.Queue kullan. Thread-safe'tir — lock'a gerek yok.
import threading
import queue
import time
def producer(q, items):
"""Kuyruğa veri ekleyen producer."""
for item in items:
print(f"📦 Üretildi: {item}")
q.put(item)
time.sleep(0.5)
q.put(None) # Sentinel: "Bitti" sinyali
def consumer(q, name):
"""Kuyruktan veri alan consumer."""
while True:
item = q.get() # Veri gelene kadar bekler
if item is None:
q.put(None) # Diğer consumer'lar için sentinel'i geri koy
break
print(f"🔧 {name} işledi: {item}")
time.sleep(1)
q.task_done()
# Producer-Consumer pattern
q = queue.Queue(maxsize=5)
producer_thread = threading.Thread(
target=producer,
args=(q, ["veri_1", "veri_2", "veri_3", "veri_4", "veri_5"])
)
consumer1 = threading.Thread(target=consumer, args=(q, "İşçi-1"))
consumer2 = threading.Thread(target=consumer, args=(q, "İşçi-2"))
producer_thread.start()
consumer1.start()
consumer2.start()
producer_thread.join()
consumer1.join()
consumer2.join()
print("Tüm işlem tamamlandı!")Queue Tipleri
import queue
# FIFO Queue (varsayılan): İlk giren ilk çıkar
fifo = queue.Queue()
# LIFO Queue (Stack): Son giren ilk çıkar
lifo = queue.LifoQueue()
# Priority Queue: Önceliğe göre çıkar
pq = queue.PriorityQueue()
pq.put((3, "düşük öncelik"))
pq.put((1, "yüksek öncelik"))
pq.put((2, "orta öncelik"))
print(pq.get()) # (1, 'yüksek öncelik')
print(pq.get()) # (2, 'orta öncelik')
print(pq.get()) # (3, 'düşük öncelik')Pratik: Paralel Dosya İndirme (Threading)
import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_file(url):
"""Dosya indirme simülasyonu."""
size_mb = random.uniform(1, 50)
duration = size_mb / 10 # 10 MB/s simülasyonu
print(f"⬇️ Başladı: {url} ({size_mb:.1f} MB)")
time.sleep(duration)
print(f"✅ Bitti: {url} ({duration:.1f}s)")
return {"url": url, "size": size_mb, "time": duration}
urls = [
"https://example.com/data1.csv",
"https://example.com/data2.csv",
"https://example.com/images.zip",
"https://example.com/video.mp4",
"https://example.com/backup.tar.gz",
"https://example.com/docs.pdf",
]
# Paralel indirme
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(download_file, url): url for url in urls}
results = []
for future in as_completed(futures):
result = future.result()
results.append(result)
elapsed = time.perf_counter() - start
total_size = sum(r["size"] for r in results)
print(f"\n📊 Rapor:")
print(f" Dosya sayısı: {len(results)}")
print(f" Toplam boyut: {total_size:.1f} MB")
print(f" Toplam süre: {elapsed:.1f}s")
print(f" Hız: {total_size/elapsed:.1f} MB/s")Pratik: Paralel Hesaplama (Multiprocessing)
from multiprocessing import Pool
import time
import math
def count_primes_in_range(args):
"""Belirli bir aralıktaki asal sayıları say."""
start, end = args
count = 0
for n in range(start, end):
if n < 2:
continue
is_prime = True
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
is_prime = False
break
if is_prime:
count += 1
return count
if __name__ == "__main__":
N = 1_000_000
num_processes = 4
# Aralıkları böl
chunk_size = N // num_processes
ranges = [
(i * chunk_size, (i + 1) * chunk_size)
for i in range(num_processes)
]
# Sıralı
start = time.perf_counter()
seq_result = count_primes_in_range((0, N))
seq_time = time.perf_counter() - start
print(f"Sıralı: {seq_result} asal, {seq_time:.2f}s")
# Paralel
start = time.perf_counter()
with Pool(num_processes) as pool:
results = pool.map(count_primes_in_range, ranges)
par_result = sum(results)
par_time = time.perf_counter() - start
print(f"Paralel: {par_result} asal, {par_time:.2f}s")
print(f"Hızlanma: {seq_time/par_time:.1f}x")Threading vs Multiprocessing: Karar Tablosu
| Özellik | threading | multiprocessing |
|---|---|---|
| GIL | Etkilenir | Etkilenmez |
| I/O-bound | ✅ Mükemmel | ⚠️ Overkill |
| CPU-bound | ❌ Yavaş | ✅ Mükemmel |
| Bellek paylaşımı | Kolay (aynı process) | Zor (IPC gerekir) |
| Process başlatma | Hızlı | Yavaş |
| Bellek kullanımı | Az | Çok (her process ayrı) |
| Debug kolaylığı | Zor (race condition) | Daha kolay |
| Veri paylaşımı | Shared memory | Pickle/Queue |
💡 İpucu: Genel kural: I/O bekliyorsan
threadingveyaasyncio, CPU çalışıyorsamultiprocessingkullan. Emin değilsenconcurrent.futuresile başla —ThreadPoolExecutor'danProcessPoolExecutor'a geçiş tek satır değişikliği.
Yaygın Hatalar
1. Deadlock
# ❌ İki lock'ı farklı sırada almak → deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_1():
with lock_a:
time.sleep(0.1)
with lock_b: # lock_b'yi bekler
print("Thread 1")
def thread_2():
with lock_b:
time.sleep(0.1)
with lock_a: # lock_a'yı bekler → DEADLOCK!
print("Thread 2")
# ✅ Çözüm: Lock'ları her zaman aynı sırada al
def thread_1_fixed():
with lock_a:
with lock_b:
print("Thread 1")
def thread_2_fixed():
with lock_a: # Aynı sıra!
with lock_b:
print("Thread 2")2. if __name__ Bloğunu Unutmak
# ❌ Windows/macOS'ta hata verir
from multiprocessing import Pool
pool = Pool(4) # RuntimeError!
# ✅ Doğru
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(func, data)3. Global Değişken Paylaşımı
# ❌ Multiprocessing'te global değişken paylaşılmaz
counter = 0
def increment_mp():
global counter
counter += 1 # Her process KENDI kopyasını değiştirir!
# ✅ multiprocessing.Value veya Queue kullan
from multiprocessing import Value
def increment_shared(shared_counter):
with shared_counter.get_lock():
shared_counter.value += 1Özet
Concurrency, görevleri zaman paylaşımlı yürütmedir; parallelism ise gerçekten aynı anda yürütmedir. Python'da threading concurrency, multiprocessing ise parallelism sağlar.
GIL, CPython'da aynı anda sadece bir thread'in Python kodu çalıştırmasını sağlar. I/O-bound işlerde sorun olmaz, CPU-bound işlerde darboğaz yaratır.
`threading.Lock` ile race condition'ları önle. `queue.Queue` ile thread'ler arası güvenli veri paylaşımı yap.
I/O-bound işler (ağ, dosya) için
threadingveyaasyncio, CPU-bound işler (hesaplama) içinmultiprocessingkullan.`concurrent.futures` modülü
ThreadPoolExecutorveProcessPoolExecutorile modern ve birleşik bir API sunar.as_completed()ile sonuçları biten sırayla alabilirsin.Deadlock'tan kaçın: lock'ları her zaman aynı sırada al, timeout kullan ve mümkünse
withstatement kullan.
AI Asistan
Sorularını yanıtlamaya hazır