← Kursa Dön
📄 Text · 25 min

Spring WebFlux ve Reactive Programming

Giriş — Neden Reactive?

Bir nehir düşün. Nehrin kaynağından denize kadar su akar. Sen nehrin kenarında duruyorsun ve akan suyu izliyorsun. Suyu durduramazsın, hızını ayarlayamazsın — su gelir, sen tepki verirsin. Bir balık geçerse yakala, çöp geçerse filtrele, su seviyesi yükselirse alarm ver.

İşte reactive programming tam olarak bu. Veriyi "git al, getir, bekle" şeklinde değil, "veri geldiğinde şunu yap" şeklinde düşünürsün. Nehir (veri kaynağı) akmaya devam eder, sen ona react edersin (tepki verirsin).

Peki neden buna ihtiyacımız var?

Klasik Spring MVC modelinde her HTTP isteği için bir thread atanır. Thread veritabanı cevabını beklerken, başka bir HTTP servisi cevabını beklerken, dosya okurken — bloklanır ve hiçbir iş yapmadan boşta durur. 200 thread'in 190'ı "bekliyor" durumdaysa, 11. concurrent kullanıcıyı karşılayamazsın. Thread'ler pahalı kaynaklardır — her biri ~1MB bellek tutar, OS seviyesinde context switch maliyeti vardır.

Spring WebFlux bu modeli kökünden değiştirir: non-blocking I/O ile az sayıda thread'le binlerce concurrent bağlantıyı handle edebilirsin. Thread hiçbir zaman beklemez — bir işlem tamamlandığında callback tetiklenir ve thread başka bir iş yapmaya geçer.


Imperative vs Reactive — İki Farklı Dünya

Imperative (Klasik) Yaklaşım

// Klasik Spring MVC — thread her adımda bloklanır
@GetMapping("/api/dashboard/{userId}")
public DashboardResponse getDashboard(@PathVariable Long userId) {
    // Thread bloklandı — DB sorgusu (~10ms)
    User user = userRepository.findById(userId).orElseThrow();

    // Thread bloklandı — HTTP çağrısı (~50ms)
    List<Order> orders = orderClient.getOrders(userId);

    // Thread bloklandı — başka HTTP çağrısı (~30ms)
    WalletInfo wallet = walletClient.getBalance(userId);

    // Toplam: ~90ms bekle, ~0.5ms çalış
    // Thread zamanının %99.5'i boşa gitti
    return new DashboardResponse(user, orders, wallet);
}

Bu kodda her satır sırayla çalışır. Bir satır bitmeden diğerine geçilmez. Thread 90ms boyunca bloklanır, CPU'da çalışma süresi 0.5ms bile değildir.

Reactive Yaklaşım

// WebFlux — thread asla bloklanmaz
@GetMapping("/api/dashboard/{userId}")
public Mono<DashboardResponse> getDashboard(@PathVariable Long userId) {
    Mono<User> userMono = userRepository.findById(userId);
    Mono<List<Order>> ordersMono = orderClient.getOrders(userId);
    Mono<WalletInfo> walletMono = walletClient.getBalance(userId);

    // Üç çağrı PARALEL çalışır — thread beklemez
    return Mono.zip(userMono, ordersMono, walletMono)
        .map(tuple -> new DashboardResponse(
            tuple.getT1(),   // User
            tuple.getT2(),   // Orders
            tuple.getT3()    // Wallet
        ));
    // Toplam: ~50ms (en yavaş çağrı kadar), thread 0ms bloklanır
}

Farkı gördün mü? Reactive versiyonda üç I/O çağrısı aynı anda başlatılır. Thread hiçbirini beklemez — sonuç geldiğinde zip operatörü hepsini birleştirir. Süre 90ms yerine ~50ms (en yavaş çağrı kadar).

Nehir analojisine dönelim: Imperative yaklaşımda nehirden su almak istiyorsun — kovayı daldırıp bekliyorsun. Reactive yaklaşımda nehre bir boru bağlıyorsun — su geldiğinde otomatik olarak deponuna akar. Sen boru hattını kurarsın, suyu beklemeye gerek yok.


Project Reactor: Mono ve Flux

Spring WebFlux, Project Reactor kütüphanesini temel alır. Reactor'ın iki ana tipi vardır:

Mono — 0 veya 1 Eleman

Mono<T>, 0 veya 1 eleman yayınlayan bir reactive publisher'dır. Veritabanından tek bir kayıt getirme, HTTP çağrısından tek bir cevap alma gibi senaryolarda kullanılır.

// Mono oluşturma yolları
Mono<String> mono1 = Mono.just("Merhaba");          // Değerli Mono
Mono<String> mono2 = Mono.empty();                    // Boş Mono (Optional.empty gibi)
Mono<String> mono3 = Mono.error(new RuntimeException("Hata!")); // Hatalı Mono

// Lazy oluşturma — subscribe edilene kadar çalışmaz
Mono<User> userMono = Mono.fromCallable(() -> {
    System.out.println("Bu, subscribe edildiğinde çalışır");
    return userRepository.findById(1L);
});

// Mono zincirleme
Mono<String> result = Mono.just(42)
    .map(num -> num * 2)              // 84
    .map(num -> "Sonuç: " + num);     // "Sonuç: 84"

Flux — 0 ile N Eleman

Flux<T>, 0'dan N'ye kadar eleman yayınlayan bir reactive publisher'dır. Liste döndüren sorgular, stream veriler, event akışları için kullanılır.

// Flux oluşturma yolları
Flux<Integer> flux1 = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> flux2 = Flux.range(1, 10);              // 1'den 10'a
Flux<String> flux3 = Flux.fromIterable(List.of("a", "b", "c"));
Flux<Long> flux4 = Flux.interval(Duration.ofSeconds(1)); // Her saniye bir eleman

// Flux zincirleme
Flux<String> names = Flux.just("ahmet", "mehmet", "ayşe")
    .filter(name -> name.length() > 4)   // "ahmet", "mehmet"
    .map(String::toUpperCase);            // "AHMET", "MEHMET"

Mono ve Flux Arasındaki İlişki

// Flux → Mono dönüşümleri
Mono<String> first = Flux.just("a", "b", "c").next();          // "a"
Mono<List<String>> all = Flux.just("a", "b", "c").collectList(); // ["a", "b", "c"]
Mono<Long> count = Flux.just("a", "b", "c").count();            // 3

// Mono → Flux dönüşümü
Flux<String> flux = Mono.just("hello").flux();                   // Tek elemanlı Flux

Operatörler — Reactive Araç Kutusu

Reactor operatörleri, Java Stream API'ye benzer ama asenkron çalışır. Nehir analojisiyle: operatörler, nehir boyunca kurduğun filtreler, dönüştürücüler ve birleştiricilerdir.

map — Senkron Dönüşüm

Her elemanı senkron bir fonksiyonla dönüştürür. Java Stream'deki map ile aynı mantık.

Flux<Integer> doubled = Flux.just(1, 2, 3)
    .map(n -> n * 2);  // 2, 4, 6

Mono<UserDto> userDto = userRepository.findById(1L)
    .map(user -> new UserDto(user.getName(), user.getEmail()));

flatMap — Asenkron Dönüşüm

Her elemanı başka bir Mono/Flux'a dönüştürür ve sonuçları düzleştirir. I/O gerektiren dönüşümlerde kullanılır. map ile flatMap arasındaki fark kritiktir:

// YANLIŞ — map ile reactive çağrı yaparsan iç içe Mono olur
Mono<Mono<Order>> wrong = userMono.map(user ->
    orderService.findLastOrder(user.getId())  // Mono<Order> döner
);
// Sonuç: Mono<Mono<Order>> — istenmeyen iç içe yapı

// DOĞRU — flatMap iç Mono'yu düzleştirir
Mono<Order> correct = userMono.flatMap(user ->
    orderService.findLastOrder(user.getId())  // Mono<Order> döner
);
// Sonuç: Mono<Order> — temiz, düz

// Flux'ta flatMap — her eleman için ayrı çağrı, sonuçlar birleşir
Flux<Order> allOrders = Flux.just(1L, 2L, 3L)
    .flatMap(userId -> orderService.findByUserId(userId));
// 3 kullanıcının siparişleri tek Flux'ta birleşir

Kural: Dönüşüm sonucu Mono/Flux ise flatMap, düz değer ise map kullan.

filter — Eleme

Flux<Product> expensive = productRepository.findAll()
    .filter(product -> product.getPrice() > 1000);

zip — Paralel Birleştirme

Birden fazla Mono/Flux'ı paralel çalıştırır ve sonuçları birleştirir. Dashboard senaryosu için idealdir.

// İki Mono'yu birleştir — ikisi de paralel çalışır
Mono<DashboardData> dashboard = Mono.zip(
    userService.findById(userId),
    orderService.getStats(userId)
).map(tuple -> new DashboardData(tuple.getT1(), tuple.getT2()));

// Üç ve daha fazla Mono
Mono<FullProfile> profile = Mono.zip(
    userService.findById(userId),
    orderService.getOrders(userId),
    walletService.getBalance(userId)
).map(tuple -> new FullProfile(
    tuple.getT1(),
    tuple.getT2(),
    tuple.getT3()
));

merge ve concat — Flux Birleştirme

Flux<Notification> local = notificationService.getLocal(userId);
Flux<Notification> push = notificationService.getPush(userId);

// merge — her iki kaynaktan gelen elemanları GELME SIRASINA göre birleştirir
// Hangisi önce cevap verirse o önce gelir (interleaved)
Flux<Notification> merged = Flux.merge(local, push);

// concat — önce birinci kaynağı tüket, sonra ikinciyi başlat
// Sıra garantili ama sıralı (sequential)
Flux<Notification> concatenated = Flux.concat(local, push);

switchIfEmpty ve defaultIfEmpty

// Boş Mono için varsayılan değer
Mono<User> user = userRepository.findById(id)
    .defaultIfEmpty(User.guest());

// Boş Mono için alternatif kaynak
Mono<User> user2 = userCache.findById(id)
    .switchIfEmpty(userRepository.findById(id));  // Cache'te yoksa DB'den al

onErrorResume ve onErrorReturn — Hata Yönetimi

Mono<User> user = userService.findById(id)
    .onErrorReturn(new User("Bilinmeyen"))            // Hata olursa default değer
    .onErrorResume(ex -> {                              // Hata olursa alternatif akış
        log.warn("User servisi hata verdi: {}", ex.getMessage());
        return userCache.findById(id);                  // Cache'e fallback
    });

Subscribe ve Block — Akışı Tetiklemek

Reactor'da hiçbir şey subscribe edilmeden çalışmaz. Mono ve Flux tembel (lazy) yapılardır — sadece pipeline'ı tanımlarsın, tetiklemezsin.

// Bu satır HİÇBİR ŞEY yapmaz — sadece pipeline tanımı
Mono<User> pipeline = userRepository.findById(1L)
    .map(user -> {
        System.out.println("Bu asla çalışmaz!");
        return user;
    });

// subscribe() ile tetikle
pipeline.subscribe(
    user -> System.out.println("Kullanıcı: " + user.getName()),  // onNext
    error -> System.err.println("Hata: " + error.getMessage()),  // onError
    () -> System.out.println("Tamamlandı")                        // onComplete
);

block() — Reactive'den Imperative'e Geçiş

block() methodu, reactive akışı bloklayarak sonucu döndürür. Thread'i bloklar — reactive'in tüm avantajını yok eder.

// block() — thread bloklanır, sonuç gelene kadar bekler
User user = userRepository.findById(1L).block();  // Imperative dünyaya döndük

// block() ile timeout
User user2 = userRepository.findById(1L)
    .block(Duration.ofSeconds(5));  // 5 saniye bekle, gelmezse exception

⚠️ Dikkat: block() çağrısını asla reactive pipeline içinde (WebFlux controller, reactive servis) kullanma. Reactor'ın event loop thread'ini bloklar ve tüm uygulamayı kilitler. block() sadece test, main metod veya imperative kodla entegrasyon noktalarında kullanılmalı.

// YANLIŞ — WebFlux controller'da block() kullanma!
@GetMapping("/api/users/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
    User user = userRepository.findById(id).block();  // ❌ Event loop'u bloklar!
    return Mono.just(new UserDto(user));
}

// DOĞRU — tüm pipeline reactive kalmalı
@GetMapping("/api/users/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
    return userRepository.findById(id)
        .map(UserDto::from);  // ✅ Non-blocking
}

Spring WebFlux Controller

WebFlux controller'ları, klasik Spring MVC controller'larına çok benzer. Aynı annotation'ları kullanırsın — tek fark dönüş tipinin Mono veya Flux olmasıdır.

Proje Kurulumu

WebFlux projesi oluşturmak için spring-boot-starter-webflux dependency'sini kullan. `spring-boot-starter-web` ile birlikte kullanma — ikisi çakışır.

<!-- pom.xml -->
<dependencies>
    <!-- WebFlux — Tomcat yerine Netty sunucusu kullanır -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- R2DBC — reactive veritabanı erişimi (opsiyonel) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>

    <!-- H2 R2DBC driver (geliştirme için) -->
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-h2</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- PostgreSQL R2DBC driver (production için) -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>r2dbc-postgresql</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
# application.properties
spring.r2dbc.url=r2dbc:h2:mem:///testdb
spring.r2dbc.username=sa
spring.r2dbc.password=

# Netty sunucu portu (Tomcat yerine Netty çalışır)
server.port=8080

Annotated Controller

@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {

    private final ProductService productService;

    // Tek ürün — Mono döner
    @GetMapping("/{id}")
    public Mono<ResponseEntity<Product>> getProduct(@PathVariable Long id) {
        return productService.findById(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    // Tüm ürünler — Flux döner
    @GetMapping
    public Flux<Product> getAllProducts() {
        return productService.findAll();
    }

    // Fiyat aralığına göre filtrele
    @GetMapping("/search")
    public Flux<Product> searchProducts(
            @RequestParam String keyword,
            @RequestParam(defaultValue = "0") double minPrice,
            @RequestParam(defaultValue = "999999") double maxPrice) {
        return productService.search(keyword, minPrice, maxPrice);
    }

    // Ürün oluştur
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Product> createProduct(@RequestBody Product product) {
        return productService.save(product);
    }

    // Ürün güncelle
    @PutMapping("/{id}")
    public Mono<ResponseEntity<Product>> updateProduct(
            @PathVariable Long id,
            @RequestBody Product product) {
        return productService.update(id, product)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    // Ürün sil
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteProduct(@PathVariable Long id) {
        return productService.deleteById(id);
    }

    // Server-Sent Events — gerçek zamanlı fiyat güncellemeleri
    @GetMapping(value = "/price-stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Product> streamPriceUpdates() {
        return productService.getPriceUpdateStream();
    }
}

MVC controller'larından tek fark: Product yerine Mono<Product>, List<Product> yerine Flux<Product> döndürüyorsun. Annotation'lar (@RestController, @GetMapping, @RequestBody) birebir aynı.

Service Katmanı

@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductRepository productRepository;

    public Mono<Product> findById(Long id) {
        return productRepository.findById(id);
    }

    public Flux<Product> findAll() {
        return productRepository.findAll();
    }

    public Flux<Product> search(String keyword, double minPrice, double maxPrice) {
        return productRepository.findByNameContainingIgnoreCase(keyword)
            .filter(p -> p.getPrice() >= minPrice && p.getPrice() <= maxPrice);
    }

    public Mono<Product> save(Product product) {
        product.setCreatedAt(LocalDateTime.now());
        return productRepository.save(product);
    }

    public Mono<Product> update(Long id, Product updated) {
        return productRepository.findById(id)
            .flatMap(existing -> {
                existing.setName(updated.getName());
                existing.setPrice(updated.getPrice());
                existing.setDescription(updated.getDescription());
                existing.setUpdatedAt(LocalDateTime.now());
                return productRepository.save(existing);
            });
        // findById boş dönerse flatMap çalışmaz → Mono.empty() döner
    }

    public Mono<Void> deleteById(Long id) {
        return productRepository.deleteById(id);
    }
}

WebClient — Reactive HTTP Client

RestTemplate blocking'dir — thread'i bloklar. WebFlux dünyasında onun yerini WebClient alır. WebClient tamamen non-blocking çalışır ve hem reactive hem imperative kodda kullanılabilir.

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient orderServiceClient() {
        return WebClient.builder()
            .baseUrl("http://order-service:8081")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .filter(ExchangeFilterFunctions.basicAuthentication("user", "pass"))
            .build();
    }

    @Bean
    public WebClient paymentServiceClient() {
        return WebClient.builder()
            .baseUrl("http://payment-service:8082")
            .build();
    }
}
@Service
@RequiredArgsConstructor
public class OrderIntegrationService {

    private final WebClient orderServiceClient;

    // GET isteği — tek kayıt
    public Mono<Order> getOrder(Long orderId) {
        return orderServiceClient.get()
            .uri("/api/orders/{id}", orderId)
            .retrieve()
            .bodyToMono(Order.class)
            .onErrorResume(WebClientResponseException.NotFound.class,
                ex -> Mono.empty());  // 404 → boş Mono
    }

    // GET isteği — liste
    public Flux<Order> getUserOrders(Long userId) {
        return orderServiceClient.get()
            .uri(uriBuilder -> uriBuilder
                .path("/api/orders")
                .queryParam("userId", userId)
                .queryParam("status", "ACTIVE")
                .build())
            .retrieve()
            .bodyToFlux(Order.class);
    }

    // POST isteği — kayıt oluşturma
    public Mono<Order> createOrder(CreateOrderRequest request) {
        return orderServiceClient.post()
            .uri("/api/orders")
            .bodyValue(request)
            .retrieve()
            .bodyToMono(Order.class);
    }

    // Hata yönetimi ile
    public Mono<Order> getOrderWithErrorHandling(Long orderId) {
        return orderServiceClient.get()
            .uri("/api/orders/{id}", orderId)
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError, response ->
                response.bodyToMono(String.class)
                    .flatMap(body -> Mono.error(
                        new OrderClientException("Client hatası: " + body))))
            .onStatus(HttpStatusCode::is5xxServerError, response ->
                Mono.error(new OrderServiceException("Order servisi down")))
            .bodyToMono(Order.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(Retry.backoff(3, Duration.ofMillis(500))
                .filter(ex -> ex instanceof OrderServiceException));
    }
}

WebClient'ın güçlü yanı: timeout, retry, onStatus gibi operatörleri pipeline'a doğal şekilde ekleyebilirsin. RestTemplate'de bunları yapmak için interceptor yazman gerekirdi.

💡 İpucu: WebClient sadece WebFlux projelerinde değil, klasik Spring MVC projelerinde de kullanılabilir. Spring 6.1+ ile RestTemplate resmi olarak "maintenance mode"a alındı — yeni projelerde WebClient veya RestClient tercih et.


R2DBC — Reactive Veritabanı Erişimi

JPA/Hibernate blocking'dir — thread veritabanı cevabını beklerken bloklanır. R2DBC (Reactive Relational Database Connectivity), JDBC'nin reactive karşılığıdır. Non-blocking veritabanı sorgulama sağlar.

Entity Tanımı

R2DBC'de JPA annotation'ları (@Entity, @Table, @Column) kullanılmaz. Yerine Spring Data'nın annotation'ları kullanılır:

import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.data.relational.core.mapping.Column;

@Table("products")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {

    @Id
    private Long id;

    private String name;

    private String description;

    private Double price;

    @Column("category_id")
    private Long categoryId;

    @Column("created_at")
    private LocalDateTime createdAt;

    @Column("updated_at")
    private LocalDateTime updatedAt;
}

ReactiveCrudRepository

public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {

    // Derived query — isimlendirme kuralları JPA ile aynı
    Flux<Product> findByNameContainingIgnoreCase(String name);

    Flux<Product> findByCategoryId(Long categoryId);

    Flux<Product> findByPriceBetween(Double min, Double max);

    Mono<Long> countByCategoryId(Long categoryId);

    // Custom query — @Query annotation'ı
    @Query("SELECT * FROM products WHERE price > :minPrice ORDER BY price DESC LIMIT :limit")
    Flux<Product> findExpensiveProducts(Double minPrice, int limit);

    @Query("SELECT * FROM products WHERE name ILIKE CONCAT('%', :keyword, '%') AND price BETWEEN :min AND :max")
    Flux<Product> searchProducts(String keyword, Double min, Double max);
}

ReactiveCrudRepository'nin temel metodları:

MetotDönüş TipiAçıklama
findById(ID id)Mono<T>ID ile bul
findAll()Flux<T>Tüm kayıtları getir
save(T entity)Mono<T>Kaydet (insert veya update)
deleteById(ID id)Mono<Void>ID ile sil
count()Mono<Long>Toplam kayıt sayısı
existsById(ID id)Mono<Boolean>Var mı?

Schema Yönetimi

R2DBC'de JPA'nın ddl-auto=create özelliği yoktur. Tablo oluşturmayı sen yapmalısın. Flyway veya schema.sql kullanabilirsin:

-- src/main/resources/schema.sql
CREATE TABLE IF NOT EXISTS products (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DOUBLE PRECISION NOT NULL,
    category_id BIGINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP
);
# application.properties — schema.sql'i otomatik çalıştır
spring.sql.init.mode=always
spring.sql.init.schema-locations=classpath:schema.sql

JPA vs R2DBC Karşılaştırma

ÖzellikJPA/HibernateR2DBC
I/O modeliBlocking (JDBC)Non-blocking
Lazy loading✅ Var❌ Yok
İlişki mapping@OneToMany, @ManyToOne otomatikManuel join/query
CacheL1/L2 cacheYok (harici cache kullan)
Schema generationddl-autoManuel (Flyway/schema.sql)
Transaction@Transactional@Transactional (reactive)
Olgunluk20+ yıl, ekosistem devasaGörece yeni, ekosistem büyüyor
Öğrenme eğrisiOrtaYüksek

⚠️ Dikkat: R2DBC'de @ManyToOne, @OneToMany gibi ilişki annotation'ları yoktur. İlişkisel verileri çekmek için manuel join sorgusu yazman veya service katmanında flatMap ile birleştirmen gerekir. Bu, JPA'ya alışkın geliştiricilerin en çok zorlandığı noktadır.

// JPA'da ilişki — otomatik
@Entity
public class Order {
    @ManyToOne(fetch = FetchType.LAZY)
    private User user;  // Otomatik JOIN
}

// R2DBC'de ilişki — manuel
@Service
public class OrderService {
    public Mono<OrderWithUser> getOrderWithUser(Long orderId) {
        return orderRepository.findById(orderId)
            .flatMap(order -> userRepository.findById(order.getUserId())
                .map(user -> new OrderWithUser(order, user)));
    }
}

Functional Endpoints — RouterFunction Alternatifi

Spring WebFlux iki programlama modeli sunar:

  1. Annotated Controllers@RestController, @GetMapping (yukarıda gördük)

  2. Functional EndpointsRouterFunction ve HandlerFunction

Functional endpoints, route tanımlarını Java kodunda fonksiyonel stil ile yapmanı sağlar:

@Configuration
public class ProductRouter {

    @Bean
    public RouterFunction<ServerResponse> productRoutes(ProductHandler handler) {
        return RouterFunctions.route()
            .path("/api/products", builder -> builder
                .GET("", handler::getAllProducts)
                .GET("/{id}", handler::getProduct)
                .POST("", handler::createProduct)
                .PUT("/{id}", handler::updateProduct)
                .DELETE("/{id}", handler::deleteProduct)
            )
            .build();
    }
}

@Component
@RequiredArgsConstructor
public class ProductHandler {

    private final ProductService productService;

    public Mono<ServerResponse> getAllProducts(ServerRequest request) {
        return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(productService.findAll(), Product.class);
    }

    public Mono<ServerResponse> getProduct(ServerRequest request) {
        Long id = Long.parseLong(request.pathVariable("id"));
        return productService.findById(id)
            .flatMap(product -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(product))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createProduct(ServerRequest request) {
        return request.bodyToMono(Product.class)
            .flatMap(productService::save)
            .flatMap(saved -> ServerResponse
                .created(URI.create("/api/products/" + saved.getId()))
                .bodyValue(saved));
    }

    public Mono<ServerResponse> updateProduct(ServerRequest request) {
        Long id = Long.parseLong(request.pathVariable("id"));
        return request.bodyToMono(Product.class)
            .flatMap(product -> productService.update(id, product))
            .flatMap(updated -> ServerResponse.ok().bodyValue(updated))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> deleteProduct(ServerRequest request) {
        Long id = Long.parseLong(request.pathVariable("id"));
        return productService.deleteById(id)
            .then(ServerResponse.noContent().build());
    }
}

Annotated vs Functional — hangisini seçmeli?

Çoğu projede annotated controller yeterli ve daha okunaklı. Functional endpoints, route'ları programatik olarak oluşturman gereken durumlarda (dinamik route, çok sayıda mikro endpoint) veya Spring framework'ün iç yapısına hakim olan ekiplerde tercih edilir. Yeni başlıyorsan annotated controller ile devam et.


Back-Pressure — Akış Kontrolü

Nehir analojisine dönelim. Nehir çok hızlı akarsa ve sen o hızda işleyemezsen ne olur? Su taşar. Reactive programlamada da aynı durum var: producer (üretici) çok hızlı veri üretir, consumer (tüketici) yetişemezse bellek patlar veya veriler kaybolur.

Back-pressure, tüketicinin üreticiye "yavaşla" demesi mekanizmasıdır.

// Back-pressure olmadan — tehlikeli
Flux.range(1, 1_000_000)
    .subscribe(n -> {
        // Her elemanı işlemek 100ms sürüyor
        // 1 milyon eleman × 100ms = 100,000 saniye
        // Ama Flux hepsini anında üretiyor — bellek patlar!
        Thread.sleep(100);
        process(n);
    });

// Back-pressure ile — güvenli
Flux.range(1, 1_000_000)
    .onBackpressureBuffer(1000)  // Max 1000 eleman buffer'la, gerisini düşür
    .subscribe(n -> process(n));

// Farklı back-pressure stratejileri
Flux.range(1, 1_000_000)
    .onBackpressureDrop(dropped ->
        log.warn("Eleman düşürüldü: {}", dropped))  // Yetişemediklerini düşür
    .subscribe(n -> process(n));

Flux.range(1, 1_000_000)
    .onBackpressureLatest()  // Sadece en son elemanı tut, gerisini at
    .subscribe(n -> process(n));

WebFlux'ta back-pressure genellikle framework tarafından otomatik yönetilir. Ama yüksek hacimli veri akışlarında (Kafka consumer, WebSocket stream) back-pressure stratejini bilinçli olarak seçmelisin.


Gerçek Dünya: E-Ticaret Dashboard API

Tüm kavramları bir araya getiren bütünleşik bir örnek. Bir e-ticaret uygulamasının dashboard API'sini WebFlux ile yazalım:

@RestController
@RequestMapping("/api/dashboard")
@RequiredArgsConstructor
public class DashboardController {

    private final UserRepository userRepository;
    private final WebClient orderServiceClient;
    private final WebClient paymentServiceClient;
    private final WebClient recommendationClient;

    @GetMapping("/{userId}")
    public Mono<DashboardResponse> getDashboard(@PathVariable Long userId) {

        // 1. Kullanıcı bilgisi — R2DBC ile veritabanından
        Mono<User> userMono = userRepository.findById(userId)
            .switchIfEmpty(Mono.error(
                new ResponseStatusException(HttpStatus.NOT_FOUND, "Kullanıcı bulunamadı")));

        // 2. Son siparişler — WebClient ile order servisinden
        Flux<OrderSummary> ordersMono = orderServiceClient.get()
            .uri("/api/orders?userId={userId}&limit=5", userId)
            .retrieve()
            .bodyToFlux(OrderSummary.class)
            .onErrorResume(ex -> {
                log.warn("Order servisi erişilemedi, boş liste dönüyor");
                return Flux.empty();  // Servis down ise boş dön, hata fırlatma
            });

        // 3. Cüzdan bakiyesi — WebClient ile payment servisinden
        Mono<WalletInfo> walletMono = paymentServiceClient.get()
            .uri("/api/wallets/{userId}", userId)
            .retrieve()
            .bodyToMono(WalletInfo.class)
            .timeout(Duration.ofSeconds(3))
            .onErrorReturn(new WalletInfo(BigDecimal.ZERO, "TRY")); // Timeout → default

        // 4. Önerilen ürünler — WebClient ile recommendation servisinden
        Flux<ProductRecommendation> recommendationsMono = recommendationClient.get()
            .uri("/api/recommendations/{userId}?limit=10", userId)
            .retrieve()
            .bodyToFlux(ProductRecommendation.class)
            .onErrorResume(ex -> Flux.empty());

        // HEPSİ PARALEL ÇALIŞIR — toplam süre en yavaş çağrı kadar
        return Mono.zip(
            userMono,
            ordersMono.collectList(),
            walletMono,
            recommendationsMono.collectList()
        ).map(tuple -> new DashboardResponse(
            tuple.getT1(),    // User
            tuple.getT2(),    // List<OrderSummary>
            tuple.getT3(),    // WalletInfo
            tuple.getT4()     // List<ProductRecommendation>
        ));
    }
}

// Response DTO
public record DashboardResponse(
    User user,
    List<OrderSummary> recentOrders,
    WalletInfo wallet,
    List<ProductRecommendation> recommendations
) {}

Bu kodda dikkat edilecek noktalar:

  • 4 farklı veri kaynağı (DB + 3 HTTP servisi) paralel çağrılır

  • Her çağrı için ayrı hata stratejisi var (biri down ise diğerleri çalışmaya devam eder)

  • Thread hiçbir zaman bloklanmaz — tüm I/O non-blocking

  • Klasik MVC ile bu 4 çağrı sıralı olurdu: 10 + 50 + 30 + 40 = 130ms. WebFlux ile paralel: max(10, 50, 30, 40) = 50ms


Spring MVC vs WebFlux — Karşılaştırma Tablosu

KriterSpring MVCSpring WebFlux
I/O modeliBlocking (thread-per-request)Non-blocking (event loop)
SunucuTomcat, JettyNetty (varsayılan), Tomcat, Jetty
Programlama modeliImperative (sıralı kod)Reactive (Mono/Flux pipeline)
Thread kullanımı200+ thread, çoğu bekliyor4-8 thread (CPU çekirdek sayısı kadar)
VeritabanıJPA/JDBC (blocking)R2DBC (non-blocking)
HTTP ClientRestTemplate / RestClientWebClient
ÖlçeklenebilirlikOrta (thread limiti)Yüksek (binlerce concurrent bağlantı)
Kod karmaşıklığıDüşük — sıralı, anlaşılırYüksek — pipeline, operator zinciri
DebugKolay — stack trace netZor — async stack trace karmaşık
EkosistemDevasa — her kütüphane desteklerSınırlı — blocking kütüphane kullanamazsın
Öğrenme eğrisiDüşükYüksek
TestKolay — MockMvc, assertStepVerifier ile reactive test

Ne Zaman WebFlux?

WebFlux kullan:

  • Yüksek eşzamanlılık gerekiyorsa (binlerce concurrent bağlantı)

  • Uygulama I/O-bound ise (çok sayıda HTTP çağrısı, DB sorgusu, dosya okuma)

  • Microservice mimarisinde gateway veya aggregator servisler

  • Streaming veri (SSE, WebSocket, real-time dashboard)

  • Az kaynak ile yüksek throughput gerekiyorsa

WebFlux KULLANMA:

  • Uygulama CPU-bound ise (resim işleme, ML, ağır hesaplama) — thread zaten CPU'da çalışıyor, non-blocking bir avantaj sağlamaz

  • Ekipte reactive deneyimi yoksa — öğrenme eğrisi yüksek, hata yapma riski artar

  • Blocking kütüphane kullanman gerekiyorsa (JDBC, JPA, çoğu ORM, Apache HttpClient) — pipeline'ı bloklar ve avantaj kaybolur

  • Basit CRUD uygulaması — MVC yeter ve çok daha basit

  • Debug ve monitoring altyapın reactive'e uygun değilse

💡 İpucu: Java 21+ kullanıyorsan Virtual Threads alternatifini de değerlendir. Virtual threads, MVC'nin basit programlama modelini korurken WebFlux'a yakın ölçeklenebilirlik sağlar. Mevcut JPA/JDBC kodunu değiştirmeden binlerce concurrent bağlantıyı handle edebilirsin. WebFlux'ın avantajı back-pressure ve streaming senaryolarında; virtual threads'in avantajı mevcut kodla uyumluluk ve basitlik.


Yaygın Hatalar

1. Reactive Pipeline'ı Subscribe Etmemek

// YANLIŞ — pipeline tanımlandı ama hiçbir zaman çalışmaz
@PostMapping("/api/logs")
public Mono<Void> logAction(@RequestBody ActionLog log) {
    auditService.saveLog(log);  // Bu Mono döner ama subscribe edilmedi!
    return Mono.empty();
}

// DOĞRU — pipeline'ı return et, Spring subscribe eder
@PostMapping("/api/logs")
public Mono<Void> logAction(@RequestBody ActionLog log) {
    return auditService.saveLog(log);  // Spring framework subscribe eder
}

2. Blocking Kodu Reactive Pipeline'da Çağırmak

// YANLIŞ — JDBC çağrısı blocking, event loop'u bloklar
public Mono<User> getUser(Long id) {
    User user = jdbcTemplate.queryForObject(            // ❌ Blocking!
        "SELECT * FROM users WHERE id = ?", User.class, id);
    return Mono.just(user);
}

// DOĞRU — blocking kodu ayrı thread pool'da çalıştır
public Mono<User> getUser(Long id) {
    return Mono.fromCallable(() ->
        jdbcTemplate.queryForObject(
            "SELECT * FROM users WHERE id = ?", User.class, id))
        .subscribeOn(Schedulers.boundedElastic());  // Blocking thread pool'da çalıştır
}

3. Mono.zip ile Null Değer

// YANLIŞ — zip içindeki herhangi bir Mono empty ise TÜM zip empty döner
Mono<Dashboard> dashboard = Mono.zip(
    userService.findById(userId),         // Bu empty dönerse...
    orderService.getOrders(userId).collectList(),
    walletService.getBalance(userId)
).map(tuple -> new Dashboard(...));       // ...tüm map atlanır, empty döner

// DOĞRU — empty olabilecek Mono'lara default değer ver
Mono<Dashboard> dashboard = Mono.zip(
    userService.findById(userId).switchIfEmpty(Mono.error(new NotFoundException("User"))),
    orderService.getOrders(userId).collectList().defaultIfEmpty(List.of()),
    walletService.getBalance(userId).defaultIfEmpty(WalletInfo.empty())
).map(tuple -> new Dashboard(...));

Test — StepVerifier

Reactive kodun testini StepVerifier ile yazarsın:

@Test
void shouldFindProductById() {
    // Given
    Product saved = productRepository.save(
        new Product(null, "Laptop", "Gaming laptop", 25000.0, 1L, null, null)
    ).block();  // Test'te block() kullanabilirsin

    // When & Then
    StepVerifier.create(productService.findById(saved.getId()))
        .assertNext(product -> {
            assertThat(product.getName()).isEqualTo("Laptop");
            assertThat(product.getPrice()).isEqualTo(25000.0);
        })
        .verifyComplete();  // Mono'nun tamamlandığını doğrula
}

@Test
void shouldReturnEmptyForNonExistentProduct() {
    StepVerifier.create(productService.findById(999L))
        .verifyComplete();  // Boş Mono — hiç eleman gelmeden tamamlanmalı
}

@Test
void shouldFilterProductsByPrice() {
    // Given — test verileri kaydet
    Flux.just(
        new Product(null, "Mouse", "", 200.0, 1L, null, null),
        new Product(null, "Keyboard", "", 500.0, 1L, null, null),
        new Product(null, "Monitor", "", 5000.0, 1L, null, null)
    ).flatMap(productRepository::save).blockLast();

    // When & Then
    StepVerifier.create(productService.search("", 400.0, 6000.0))
        .expectNextCount(2)  // 500 ve 5000 fiyatlı 2 ürün
        .verifyComplete();
}

@Test
void shouldHandleError() {
    StepVerifier.create(
        productService.findById(-1L)
            .switchIfEmpty(Mono.error(new NotFoundException("Not found")))
    )
    .expectError(NotFoundException.class)
    .verify();
}

Özet

  • Reactive programming, veriyi "git al, bekle" değil "geldiğinde tepki ver" yaklaşımıyla işler — thread'ler asla bloklanmaz, az thread ile binlerce concurrent bağlantı handle edilir

  • Mono (0-1 eleman) ve Flux (0-N eleman) Reactor'ın temel yapı taşlarıdır — map senkron dönüşüm, flatMap asenkron dönüşüm, zip paralel birleştirme, filter eleme için kullanılır

  • WebFlux controller'ları MVC ile neredeyse aynıdır — fark dönüş tipinin Mono<T> veya Flux<T> olmasıdır. WebClient, RestTemplate'in reactive karşılığıdır

  • R2DBC, JDBC'nin reactive versiyonudur — ReactiveCrudRepository ile kullanılır ancak JPA'nın otomatik ilişki mapping'i yoktur, join'leri kendin yazmalısın

  • `block()` çağrısını reactive pipeline içinde asla kullanma — event loop thread'ini bloklar ve tüm uygulamayı kilitler. block() sadece test veya imperative entegrasyon noktalarında kullanılmalı

  • Çoğu proje için Spring MVC + Virtual Threads (Java 21+) daha pragmatik bir seçimdir — WebFlux'ın asıl gücü yüksek eşzamanlılık, streaming ve back-pressure gerektiren senaryolarda ortaya çıkar