BOOK 4 - 스프링 5를 활용한 리액티브 프로그래밍(2)

스프링을 이용한 리액티브 프로그래밍 - 기본 개념

  • 리액티브 시스템의 기반이 될 수 있는 많은 패턴과 프로그래밍 기술이 있다.
    • 예를 들어, Callback 및 CompletableFuture 는 메시지 기반 아키텍쳐를 구현하는 데 널리 사용된다.
    • 리액티브 프로그래밍 또한 같은 역할을 수행하는 주요 기술이다.

관찰자(Observer) 패턴

  • Observer 패턴은 GoF 디자인 패턴 중 하나이다.
  • 관찰자 패턴은 이벤트를 발생시키는 역할인 subject(주체)와 이벤트를 수신하는 역할인 observer(관찰자) 두가지 요소가 존재한다.
    • 주체는 일반적으로 자신의 메서드 중 하나를 호출하여 관찰자에게 상태 변경을 알린다.
  • 관찰자 패턴을 사용하면 런타임에 객체 사이에 일대다 의존성을 등록할 수 있다.
    • 각 컴포넌트가 활발히 상호작용하게 하면서도 응용 프로그램 사이의 결합도를 낮출 수 있다.

image

  • 관찰자 패턴은 SubjectObserver 2개의 인터페이스로 구성된다.
    • Observer는 Subject에 등록되고 Subject로부터 알림을 수신한다.
    • Subject는 스스로 이벤트를 발생시키거나 다른 구성 요소에 의해 호출될 수 있다.
public interface Subject<T> {
  void registerObserver(Observer<T> observer);
  void unregisterObserver(Observer<T> observer);
  void notifyObservers(T event);
}
public interface Observer<T> {
  void observe(T event);
}
  • generic 인터페이스는 이벤트 타입 T를 사용해 타입 안정성을 향상시킨다.
  • Subject 인터페이스에는 이벤트를 broadcasting하는 구독 관리 메서드들을 포함한다.
  • Observer 인터페이스는 이벤트를 처리하는 하나의 observe 메서드를 포함한다.
  • Observer 구현체가 구독 절차를 담당할 수도 있고, 제 3의 컴포넌트가 담당할 수도 있다.
    • Observer 인스턴스가 Subject의 존재를 전혀 인식하지 못하는 경우, 제 3의 컴포넌트가 Subject의 각 인스턴스에 Observer를 등록하는 역할을 담당한다.
    • 예를 들어 DI 컨테이너가 해당 역할을 수행할 수 있다.
      • DI 컨테이너는 클래스 패스에서 @EventListener 를 탐색해 Observer 인스턴스를 찾아내고, 발견된 인스턴스들을 Subject에 등록한다.
public class ConcreteObserverA implements Observer<String> {
  @Override
  public void observe(String event) {
    System.out.println("Observer A : " + event);
  }
}

public class ConcreteObserverB implements Observer<String> {
  @Override
  public void observe(String event) {
    System.out.println("Observer B : " + event);
  }
}
public class ConcreteSubject implements Subject<String> {
  private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
  
  public void registerObserver(Observer<String> observer) {
    observers.add(observer);
  }
  
  public void unregisterObserver(Observer<String> observer) {
    observers.remove(observer);
  }
  
  public void notifyObservers(String event) {
    observers.forEach(observer -> observer.observe(event));
  }
}
  • 멀티 스레드 상황에서 스레드 안정성을 유지하기 위해 업데이트 작업이 발생할 때마다 새 복사본을 생성하는 Set 구현체인 CopyOnWriteArraySet 을 사용했다.
    • CopyOnWriteArraySet 의 내용을 업데이트하는 것은 비용이 많이 들지만 관찰자 리스트는 자주 변경되지 않으므로 thread-safe한 구현을 위해 합리적인 선택이다.

관찰자 패턴 사용 예

@Test
public void observersHandleEventsFromSubject() {
  // given
  Subject<String> subject = new ConcreteSubject();
  Observer<String> observerA = Mockito.spy(new ConcreteObserverA());
  Observer<String> observerB = Mockito.spy(new ConcreteObserverB());
  
  // when
  subject.notifyObservers("No listeners");
  
  subject.registerObserver(observerA);
  subject.notifyObservers("Message for A");
  
  subject.registerObserver(observerB);
  subject.notifyObservers("Message for A & B");
  
  subject.unregisterObserver(observerA);
  subject.notifyObservers("Message for B");
  
  subject.unregisterObserver(observerB);
  subject.notifyObservers("No listeners");
  
  // then
  Mockito.verify(observerA, times(1)).observe("Message for A");
  Mockito.verify(observerA, times(1)).observe("Message for A & B");
  Mockito.verifyNoMoreInteractions(observerA);
  
  Mockito.verify(observerB, times(1)).observe("Message for A & B");
  Mockito.verify(observerB, times(1)).observe("Message for B");
  Mockito.verifyNoMoreInteractions(observerB);
}

멀티 스레드 사용

  • 대기 시간이 상당히 긴 이벤트를 구독하는 관찰자가 많은 경우, 추가적인 스레드 할당 또는 스레드 풀을 사용해서 메시지를 병렬로 전달할 수 있다.
private final ExecutorService executorService = Executors.newCachedThreadPool();

public void notifyObservers(String event) {
  observers.forEach(observer -> executorService.submit(() -> observer.observe(event)));
}
  • 이러한 멀티 스레드 환경 구현시에는 스레드 풀 크기를 제한하지 않으면 비효율성 및 버그를 발생시킬 수 있다.
    • 각 스레드는 자바에서 약 1MB를 소비하므로 단 몇천 개의 스레드만으로도 사용 가능한 어플리케이션 메모리를 모두 소모할 수 있다.
    • 과도한 리소스 사용을 방지하기 위해 스레드 풀 크기를 제한할 수 있다.

java.util.Observer

  • java.util 패키지 내에 ObserverObservable 클래스가 포함되어 있다.
  • 자바 제네릭 이전에 도입되었기 때문에 Object 타입을 사용하여 타입 안정성이 보장되지 않는다.
    • 이런 구현 방식은 멀티 스레드 환경에서 효율적이지 않다.
  • 이러한 문제점들로 인해 이 Observer 클래스는 개발 시 사용하지 않는 것이 좋다.

어플리케이션 개발 시 관찰자 패턴을 직접 구현할 수도 있다.

그러나 멀티 스레드 응용 프로그램에서 여러 측면의 문제들을 고려하여 직접 구현하는 것은 매우 번거롭고 어렵다.

즉, 믿을 수 있는 조직에서 제공하는 오류 처리, 비동기 실행, 스레드 안정성, 성능 요구 사항들을 모두 만족하는 구현체를 사용하는 것이 직접 구현하는 것보다 좋다.

@EventListener를 사용한 발행 - 구독 패턴

  • 스프링 프레임워크는 이벤트 처리를 위한 @EventListener 어노테이션과 이벤트 발행을 위한 ApplicationEventPublisher 클래스를 제공한다.
    • 이것들은 발행 - 구독 패턴을 구현한 것이다.

image

  • 발행 - 구독 패턴은 게시자와 구독자 간에 간접적인 계층을 제공하기 때문에 게시자와 구독자는 서로를 알 필요가 없다.

  • 각 이벤트 채널에는 동시에 여러개의 게시자가 있을 수 있다.
  • 이벤트 채널(message broker 또는 event bus)은 수신 메시지를 구독자에게 배포하기 전에 필터링 작업을 할 수도 있다.
  • 필터링 및 라우팅은 메시지 내용이나 메시지 주제에 의해 발생할 수 있다.
    • 즉, 토픽 기반 시스템에서 구독자는 관심 토픽에 게시된 모든 메시지를 수신하게 된다.

@EventListener 활용한 응용 프로그램 개발

  • 리액티브 디자인에 따라 온도 센서를 통해 측정한 온도를 보여주는 웹 서비스를 구현한다고 가정해보자.
    • 온도 센서는 난수 생성기를 사용해 시뮬레이션 한다.
  • 서버에서 클라이언트로 비동기 메시지 전달을 할 수 있는 WebSocket, SSE(Server-Sent Event) 같은 프로토콜들을 사용할 수 있다.
    • SSE를 사용하면 클라이언트가 서버에서 보낸 메시지를 자동으로 수신할 수 있다.
      • 따라서 SSE는 브라우저에 메시지를 업데이트하거나 연속적인 데이터 스트림을 보내는 데 사용한다.
      • 최신 브라우저에는 EventSource 라는 자바스크립트 API가 적용되는데, 이 API는 이벤트 스트림을 수신하기 위해 특정 URL을 호출하는데 사용한다.
    • SSE는 리액티브 시스템의 구성 요소 간에 통신 요구사항을 충족시키는 최적의 후보이다.

비즈니스 로직 구현하기

image

final class Temperature {
  private final double value;
}
@Component
public class TemperatureSensor {
  private final ApplicationEventPublisher publisher;
  private final Random rnd = new Random();
  private final ScheduledExecutorService executor = 
    Executors.newSingleThreadScheduledExecutor();
  
  public TemperatureSensor(ApplicationEventPublisher publisher) {
    this.publisher = publisher;
  }
  
  @PostConstruct
  public void startProcessing() {
    this.executor.schedule(this::probe, 1, SECONDS);
  }
  
  private void probe() {
    double temperature = 16 + rnd.nextGaussian() * 10;
    publisher.publishEvent(new Temperature(temperature));
    executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS);
  }
}
  • 위 클래스는 온도 센서를 시뮬레이션 하기 위한 클래스이다.
  • ApplicationEventPublisher 클래스를 사용하여 이벤트를 시스템에 발행한다.
  • 각 이벤트의 생성은 임의의 지연시간(0~5초) 후에 다음 이벤트 생성을 예약한다.

스프링 웹 MVC를 이용한 비동기 HTTP 통신

  • 서블릿 3.0에서 HTTP 요청을 처리하는 기능을 추가하여 비동기 지원 기능을 확장했다.
    • 스프링 웹 MVC에서 @Controller 는 단일 타입 T 외에도 Callable<T> 또는 DeferredResult<T> 도 반환할 수 있게 되었다.
    • DeferredResult<T>setResult() 메서드를 호출해 컨테이너 스레드 외부에서도 비동기 응답을 생성하므로 이벤트 루프 안에서도 사용할 수 있다.
  • 스프링 웹 MVC 버전 4.2부터는 DeferredResult 와 비슷하게 동작하는 ResponseBodyEmitter 를 반환할 수 있다.
    • ResponseBodyEmitter 는 메시지 컨버터( HttpMessageConverter 인터페이스의 구현 클래스)에 의해 개별적으로 만들어진 여러개의 메시지를 전달하는 용도로 사용할 수 있다.
  • 스프링 웹 MVC는 ResponseBodyEmitter 와 함께 StreamingResponseBody 인터페이스도 지원한다.
    • 이를 사용하면 @Controller 에서 데이터 반환 시에 비동기적으로 전달할 수 있다.

SSE 엔드포인트

  • SseEmitterResponseBodyEmitter 를 상속하여 SSE의 프로토콜 요구 사항에 따라 하나의 수신 요청에 대해 다수의 발신 메시지를 보낼 수 있는 클래스이다.
@RestController
public class TemperatureController {
  private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();
  
  @GetMapping("/temperature-stream")
  public SseEmitter events(HttpServletRequest request) {
    SseEmitter emitter = new SseEmitter();
    clients.add(emitter);
    
    emitter.onTimeout(() -> clients.remove(emitter));
    emitter.onCompletion(() -> clients.remove(emitter));
    return emitter;
  }
  
  @Async
  @EventListener
  public void handleMessage(Temperature temperature) {
    List<SseEmitter> deadEmitters = new ArrayList<>();
    clients.forEach(emitter -> {
      try {
        emitter.send(temperature, MediaType.APPLICATION_JSON);
      } catch(Exception ignore) {
        deadEmitters.add(emitter);
      }
    });
    clients.removeAll(deadEmitters);
  }
}
  • SSE 이벤트를 보내기 위해 SseEmitter 클래스를 사용했다.
  • 요청 처리 메서드가 SseEmitter 인스턴스를 반환하더라도 SseEmitter.complete() 메서드가 호출되거나 오류나 시간 초과가 발생할 때까지 실제 요청 처리는 계속된다.
  • 클라이언트가 /temperature-stream URI를 요청하면 새로운 SseEmitter 인스턴스를 만들어 활성 클라이언트 목록에 등록함과 동시에 반환한다.
    • 웹 클라이언트가 새로운 SSE 세션을 요청하면 clients 컬렉션에 새로운 SseEmitter 를 추가하고, SseEmitter 의 처리가 끝나거나 타임아웃이 발생하면 clients 컬렉션에서 자신을 제거한다.
  • 웹 클라이언트와의 커뮤니케이션 채널이 있다는 것은 온도 변화에 대한 이벤트를 수신할 수 있는 의미이다.
    • 이벤트를 수신하기 위해 handleMessage() 메서드를 구현했다.
  • 스프링으로부터 이벤트를 수신하기 위해 @EventListener 어노테이션을 설정하여 온도 이벤트를 수신할 때만 handleMessage() 메서드를 호출하도록 한다.
    • 즉, TemperatureSensor.probe() 메서드가 호출되어 Temperature 타입의 이벤트가 발생했을 때 handleMessage() 메서드가 실행되고, clients 목록에 있는 클라이언트들에게 온도 정보를 메시지로 만들어 전달하는 것이다.
  • @Async 어노테이션을 설정하여 메서드를 별도로 구성된 스레드 풀에서 호출하여 비동기적으로 실행하도록 한다.

솔루션에 대한 평가

  • 현재 위에서 구현한 솔루션은 스프링에서 제공하는 발행 - 구독 구조를 사용하고 있다.
    • 이 구조는 응용 프로그램 수명 주기 이벤트를 처리하기 위해 처음 도입됐으며, 고부하 및 고성능 시나리오를 위한 것이 아니다.
    • 따라서, 하나의 온도 데이터가 아닌 수천, 수백만 개의 개별 스트림을 필요로 할 때 프로그램이 감당할 수 있을지는 알 수 없다.
  • 비즈니스 로직을 구현하기 위해 스프링 프레임워크의 내부 매커니즘을 사용하고 있다.
    • 즉, 프레임워크의 사소한 변경으로 인해 프로그램의 안정성이 깨질 수 있어 위험하다.
    • 또한, Spring의 ApplicationContext를 로딩하지 않고 비즈니스 로직을 단위 테스트하기 어렵다는 단점이 있다.
  • 온도 이벤트를 비동기적으로 브로드캐스팅하기 위해 스레드 풀을 사용하고 있다.
    • 이는 진정한 비동기적 리액티브 접근에서는 필요없는 일이다.

이러한 문제들을 해결하기 위해 이 목적만을 위해 설계된 리액티브 라이브러리가 필요하다.

RxJava는 이런 목적을 가진 최초의 리액티브 라이브러리이다.

리액티브 프레임워크 RxJava

  • RxJava 라이브러리는 Reactive Extensions(ReactiveX)의 자바 구현체이다.
  • ReactiveX는 동기식 또는 비동기식 스트림과 관계없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구이다.
    • Observer 패턴, Iterator 패턴 및 함수형 프로그래밍의 조합으로 정의된다.

관찰자 + 반복자 = 리액티브 스트림

  • 관찰자 패턴은 무한한 데이터 스트림에 대해서는 매력적이지만, 데이터 스트림의 끝을 알리는 기능이 없다.
    • 또한, consumer가 준비되기 전에 producer가 이벤트를 생성하는 것은 적절하지 않을 수 있다.
  • 동기식 구현에서, 이런 때를 대비한 반복자 패턴이 존재한다.
public interface Iterator<T> {
  T next();
  boolean hasNext();
}
  • Iterator 인터페이스는 항목을 하나씩 검색하기 위한 next() 메서드와 시퀀스의 끝을 확인하기 위한 hasNext() 메서드를 제공한다.

RxObserver

  • Iterator 패턴과 관찰자 패턴에 의한 비동기 실행을 합치면 다음과 같다.
public interface RxObserver<T> {
  void onNext(T next);
  void onComplete();
  void onError(Exception e);
}
  • 이 인터페이스는 리액티브 스트림의 모든 컴포넌트 사이에 데이터가 흐르는 방법을 정의한다.
  • onNext() : 콜백으로 RxObserver 에 새로운 값이 전달된다.
  • onComplete() : 스트림의 끝을 알린다.
  • onError() : next() 메서드를 처리하는 동안 exception이 발생했을 경우 콜백으로 예외를 전달한다.

Observable

  • Observable 클래스는 관찰자 패턴의 Subject와 일치한다.
    • 즉, 이벤트 소스 역할을 수행한다.
  • 리액티브 스트림을 초기화하는 팩토리 메서드와 스트림 변환 메서드를 포함한다.
  • Subscriber 추상 클래스가 Observer 인터페이스를 구현하여 Observable 이 생성한 이벤트를 소비한다.

image

  • Observable은 일정 개수의 이벤트를 보낼 수 있고, 그 다음 성공을 알리거나 오류를 발생시켜 실행 종료를 알린다.
    • 즉, 각 구독자에 연결된 ObservableonNext() 를 여러번 호출한 다음, onComplete() 또는 onError() 를 호출한다.

스트림의 생산과 소비

Observable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
  @Override
  public void call(Subscriber<? super String> sub) {
    sub.onNext("Hello, reactive world!");
    sub.onCompleted();
  }
});
  • 구독자에게 적용될 콜백을 가진 Observable 클래스를 정의했다.
    • 콜백이 호출되면 Observer 는 하나의 문자열을 생성한 후, 스트림의 끝을 구독자에게 알린다.
  • 구독자를 정의해서 생성한 Observable 인스턴스와 연결할 수 있다.
Subscriber<String> subscriber = new Subscriber<String>() {
  @Override
  public void onNext(String s) {
    System.out.println(s);
  }
  
  @Override
  public void onCompleted() {
    System.out.println("Done!");
  }
  
  @Override
  public void onError(Throwable e) {
    System.out.println(e);
  }
};
observable.subscribe(subscriber);
  • 람다식을 이용해서 다음과 같이 구현할 수도 있다.
Observable<String> observable = Observable.create(
sub -> {
  sub.onNext("Hello, reactive world!");
  sub.onCompleted();
}).subscribe(
  System.out::println,
  System.out::println,
  () -> System.out.println("Done!")
);

이처럼 Observable을 생성하는 방식은 backpressure를 지원하지 않기 때문에 권장되지 않는다.

다양한 Observable 인스턴스 생성 방식

  • RxJava 라이브러리는 Observable 인스턴스를 생성하기 위해 많은 유연성을 제공한다.
  • 요소를 직접 등록할 수도 있고, 배열을 사용할 수도 있으며, Iterable 컬렉션을 이용해 만들 수도 있다.
Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());
  • Callable 또는 Future 를 사용할 수도 있다.
Observable<String> hello = Observable.fromCallable(() -> "hello");

Future<String> future = Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future);
  • Observable 스트림은 다른 Observable 인스턴스와 결합해 생성할 수도 있어 복잡한 워크플로우를 쉽게 구현할 수 있다.
    • concat() 메서드를 사용해 입력 스트림을 다운 스트림 Observable로 다시 보낼 수 있다.
Observable.concat(hello, world, Observable.just("!"))
  .forEach(System.out::print);
  • 처리 순서는 concat() 메서드의 파라미터 순서와 동일하다.
  • Observable.forEach() 메서드를 이용해 결과를 반복 처리한다.

비동기 시퀀스 생성하기

  • RxJava는 하나의 이벤트뿐만 아니라 주기적으로 비동기 이벤트 시퀀스를 생성할 수 있다.
Observable.interval(1, TimeUnit.SECONDS)
  .subscribe(e -> System.out.println("Received : " + e));
Thread.sleep(5000);
실행 결과
Received : 0
Received : 1
Received : 2
Received : 3
Received : 4
  • 위 코드에서 Thread.sleep() 을 제거하면 아무것도 출력되지 않고 종료된다.
    • 메인 스레드와 이벤트가 생성되는 스레드는 별개이기 때문이다.
    • 즉, 메인 스레드의 종료를 sleep() 으로 지연시키면 다른 스레드에서 1초에 한번씩 이벤트를 발행하는 것이다.

스트림 변환과 마블 다이어그램

  • RxJava의 모든 기능은 연산자에 의해 구현된다고 할 수 있다.
    • 연산자는 스트림의 원소를 조정하거나 스트림 구조 자체를 변경할 수 있다.

Map 연산자

<R> Observable<R> map(Func1<T, R> func)
  • func 함수가 타입 <T> 를 타입 <R> 로 변환하고, map 을 통해 Observable<T>Observable<R> 로 변환할 수 있음을 의미한다.

image-20201120165242455

Filter 연산자

  • filter 는 조건부 테스트를 성공적으로 통과한 원소만 재발행 함으로써 입력 스트림보다 적은 수의 원소를 생성할 수도 있다.

image

Count 연산자

  • count 는 입력 스트림의 개수를 발행한다.
  • 입력 스트림이 완료될 때 카운트가 발행되므로 스트림이 무한대일 때는 count 연산자가 완료되지 않거나 아무것도 반환하지 않는다.

image

Zip 연산자

  • zip 은 지정된 함수를 통해 여러 Observable의 이벤트를 결합하고, 결합한 결과를 발행한다.

image

Observable.zip(
  Observable.just("A", "B", "C"),
  Observable.just("1", "2", "3"),
  (x, y) -> x + y
).forEach(System.out::println);
실행결과
A1
B2
C3

https://rxmarbles.com 에서 다른 연산자들의 마블 다이어그램을 확인할 수 있다.

RxJava를 이용한 어플리케이션 구현

  • 앞에서 구현했던 온도 감지 어플리케이션을 RxJava로 다시 구현해보자.

  • RxJava 라이브러리를 사용하기 위해 build.gradle 에 의존성을 추가한다.

    compile('io.reactivex:rxjava:1.3.8')
    

비즈니스 로직 구현

  • TemperatureSensor 클래스에서 온도 측정 후 Temperature 이벤트가 있는 리액티브 스트림을 반환하도록 한다.
@Component
public class TemperatureSensor {
  private final Random rnd = new Random();
  
  private final Observable<Temperature> dataStream = 
    Observable
    .range(0, Integer.MAX_VALUE)
    .concatMap(tick -> Observable.just(tick)
              .delay(rnd.nextInt(5000), MILLISECONDS)
              .map(tickValue -> this.prove()))
    .publish()
    .refCount();
  
  private Temperature probe() {
    return new Temperature(16 + rnd.nextGaussian() * 10);
  }
  
  public Observable<Temperature> temperatureStream() {
    return dataStream;
  }
}
  • dataStream 은 팩토리 메서드 range() 를 사용해 사실상 무한대의 숫자 스트림을 생성한다.
    • 즉, range(0, Integer.MAX_VALUE) 는 0부터 Integer.MAX_VALUE 까지의 정수 시퀀스를 생성한다.
  • 이 숫자 스트림들의 각 값들을 concatMap() 을 이용해 변환한다.
    • concatMap() 메서드는 메서드 f를 수신한다.
      • 메서드 f는 tick 객체를 Observable 스트림으로 변환하고 f 함수를 적용한 다음, 결과 스트림에 하나씩 결합하는 역할을 한다.
      • 즉, 위 코드에서 f 함수는 임의의 지연시간 후에 probe() 메서드를 통해 센서 측정을 수행하고 값을 변환한다.
    • 결과적으로 각 측정 사이의 최대 간격이 5초인 센서 값을 반환하는 스트림을 받을 수 있다.
flatMap
  • 변환 연산자
  • 하나의 Observable이 발행하는 각 item을 여러개의 item들을 가진 Observable로 변환하고, item들의 배출을 차례로 줄 세워 하나의 Observable로 반환한다.

image

String[] balls={"1","3","5"};
Observable<String> source=Observable.interval(100L,TimeUnit.MILLISECONDS)
  .map(Long::intValue)
  .map(idx->balls[idx])
  .take(balls.length)
  .flatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS)
           .map(notUsed->ball+"<>")
           .take(2));
source.subscribe(Log::it);
실행결과
RxComputationThreadPool-2 | 386 | value = 1<>
RxComputationThreadPool-3 | 487 | value = 3<>
RxComputationThreadPool-4 | 585 | value = 5<>
RxComputationThreadPool-4 | 585 | value = 1<>
RxComputationThreadPool-3 | 687 | value = 3<>
RxComputationThreadPool-4 | 786 | value = 5<>
concatMap
  • flatMap 과 비슷하게 동작하며, 먼저 들어온 데이터 순서대로 처리하여 결과를 내도록 보장한다.
    • flatMap 은 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 끼어들기(interleaving)가 가능한 것과는 다르다.

image

  • 위 코드에서 flatMapconcatMap 으로 바꿨을 때 결과는 다음과 같다.
실행결과
RxComputationThreadPool-2 | 604 | value = 1<>
RxComputationThreadPool-2 | 804 | value = 1<>
RxComputationThreadPool-3 | 1005 | value = 3<>
RxComputationThreadPool-3 | 1206 | value = 3<>
RxComputationThreadPool-4 | 1407 | value = 5<>
RxComputationThreadPool-4 | 1608 | value = 5<>
  • 먼저 들어온 데이터를 먼저 처리하기 때문에 다른 데이터들은 대기 후 다음에 처리된다.
    • 끼어들기가 가능한 flatMap 에 비해 시간이 더 오래 걸린다.
publish
  • ObservableConnectableObservable 로 변환해준다.
    • Observable : 차가운 Observable 성격
      • subscribe() 했을 때 항상 같은 데이터가 차례로 발행된다.
    • ConnectableObservable : 뜨거운 Observable 성격
      • subscribe() -> connect() : 구독자를 대기시켜 동시에 같은 데이터를 받을 수 있도록 한다.
      • connect() -> subscribe() : 구독 전에 발행된 데이터는 받지 못한다.
  • ConnectableObservable
    • connect() 를 실행하면 데이터가 발행되기 시작한다.
    • refCount() : 구독자가 얼마나 있는지 추적하면서 스트림 데이터를 발행한다.
      • 처음 구독이 되면 그 때부터 connect() 되어 emit이 진행된다.
      • ConnectableObservableObservable 처럼 동작하게 만들기 때문에, 추가로 구독이 이루어졌을 때 해당 구독자에게 별도로 emit이 진행되어 다른 구독자들과 같은 데이터를 처음부터 받을 수 있게 한다.
      • 구독자들이 모두 없어지면 emit이 진행되지 않는다.

Custom SseEmitter

public class RxSseEmitter extends SseEmitter {
  static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
  private static final Subscriber<Temperature> subscriber;
  
  public RxSseEmitter() {
    super(SSE_SESSION_TIMEOUT);
    this.subscriber = new Subscriber<Temperature>() {
      @Override
      public void onNext(Temperature temperature) {
        try {
          RxSseEmitter.this.send(temperature);
        } catch (IOException e) {
          unsubscribe();
        }
      }
      
      @Override 
      public void onError(Throwable e) { }
      
      @Override
      public void onCompleted() { }
    };
    
    onCompletion(subscriber::unsubscribe);
    onTimeout(subscriber::unsubscribe);
  }
  
  public Subscriber<Temperature> getSubscriber() {
    return subscriber;
  }
}
  • Temperature 이벤트에 대한 구독자가 onNext 신호를 수신하면 응답으로 SSE 클라이언트에게 다시 신호를 보낸다.
    • 데이터 전송에 실패하면 구독자는 수신한 스트림으로부터 자기 자신을 unsubscribe 한다.

SSE 엔드포인트

@RequiredArgsConstructor
@RestController
public class TemperatureController {
  private final TemperatureSensor temperatureSensor;
  
  @GetMapping("/temperature-stream")
  public SseEmitter events(HttpServletRequest request) {
    RxSseEmitter emitter = new RxSseEmitter();
    temperatureSensor.temperatureStream().subscribe(emitter.getSubscribe);
    return emitter;
  }
}
  • 새로운 SSE 세션이 생성될 때 RxSseEmitter 인스턴스를 생성하고, TemperatureSensor 가 생성하는 온도 스트림을 구독한다.

이 구현은 스프링의 EventBus를 사용하지 않으므로 이식성이 더 높고, 스프링 컨텍스트 없이도 단위 테스트를 할 수 있다.

리액티브의 전망

  • RxJava의 성공에 따라 다양한 리액티브 라이브러리가 구현되고, 리액티브 환경에 다양성이 증가하고 있다.
  • 그러나, 하나의 자바 어플리케이션에 다른 종류의 리액티브 라이브러리 또는 프레임워크를 사용하는 것은 API 불일치로 인해 위험하다.
  • 전체 리액티브 환경을 아우르며 호환성을 보장하는 표준 리액티브 스트림의 필요성이 대두되었다.