BOOK 4 - 스프링 5를 활용한 리액티브 프로그래밍(2)
스프링을 이용한 리액티브 프로그래밍 - 기본 개념
- 리액티브 시스템의 기반이 될 수 있는 많은 패턴과 프로그래밍 기술이 있다.
- 예를 들어, Callback 및
CompletableFuture는 메시지 기반 아키텍쳐를 구현하는 데 널리 사용된다. - 리액티브 프로그래밍 또한 같은 역할을 수행하는 주요 기술이다.
- 예를 들어, Callback 및
관찰자(Observer) 패턴
- Observer 패턴은 GoF 디자인 패턴 중 하나이다.
- 관찰자 패턴은 이벤트를 발생시키는 역할인 subject(주체)와 이벤트를 수신하는 역할인 observer(관찰자) 두가지 요소가 존재한다.
- 주체는 일반적으로 자신의 메서드 중 하나를 호출하여 관찰자에게 상태 변경을 알린다.
- 관찰자 패턴을 사용하면 런타임에 객체 사이에 일대다 의존성을 등록할 수 있다.
- 각 컴포넌트가 활발히 상호작용하게 하면서도 응용 프로그램 사이의 결합도를 낮출 수 있다.

- 관찰자 패턴은
Subject와Observer2개의 인터페이스로 구성된다.- Observer는 Subject에 등록되고 Subject로부터 알림을 수신한다.
- Subject는 스스로 이벤트를 발생시키거나 다른 구성 요소에 의해 호출될 수 있다.
public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}
public interface Observer<T> {
void observe(T event);
}
- generic 인터페이스는 이벤트 타입 T를 사용해 타입 안정성을 향상시킨다.
Subject인터페이스에는 이벤트를 broadcasting하는 구독 관리 메서드들을 포함한다.Observer인터페이스는 이벤트를 처리하는 하나의observe메서드를 포함한다.Observer구현체가 구독 절차를 담당할 수도 있고, 제 3의 컴포넌트가 담당할 수도 있다.- Observer 인스턴스가 Subject의 존재를 전혀 인식하지 못하는 경우, 제 3의 컴포넌트가 Subject의 각 인스턴스에 Observer를 등록하는 역할을 담당한다.
- 예를 들어 DI 컨테이너가 해당 역할을 수행할 수 있다.
- DI 컨테이너는 클래스 패스에서
@EventListener를 탐색해Observer인스턴스를 찾아내고, 발견된 인스턴스들을 Subject에 등록한다.
- DI 컨테이너는 클래스 패스에서
public class ConcreteObserverA implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer A : " + event);
}
}
public class ConcreteObserverB implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer B : " + event);
}
}
public class ConcreteSubject implements Subject<String> {
private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();
public void registerObserver(Observer<String> observer) {
observers.add(observer);
}
public void unregisterObserver(Observer<String> observer) {
observers.remove(observer);
}
public void notifyObservers(String event) {
observers.forEach(observer -> observer.observe(event));
}
}
- 멀티 스레드 상황에서 스레드 안정성을 유지하기 위해 업데이트 작업이 발생할 때마다 새 복사본을 생성하는 Set 구현체인
CopyOnWriteArraySet을 사용했다.CopyOnWriteArraySet의 내용을 업데이트하는 것은 비용이 많이 들지만 관찰자 리스트는 자주 변경되지 않으므로 thread-safe한 구현을 위해 합리적인 선택이다.
관찰자 패턴 사용 예
@Test
public void observersHandleEventsFromSubject() {
// given
Subject<String> subject = new ConcreteSubject();
Observer<String> observerA = Mockito.spy(new ConcreteObserverA());
Observer<String> observerB = Mockito.spy(new ConcreteObserverB());
// when
subject.notifyObservers("No listeners");
subject.registerObserver(observerA);
subject.notifyObservers("Message for A");
subject.registerObserver(observerB);
subject.notifyObservers("Message for A & B");
subject.unregisterObserver(observerA);
subject.notifyObservers("Message for B");
subject.unregisterObserver(observerB);
subject.notifyObservers("No listeners");
// then
Mockito.verify(observerA, times(1)).observe("Message for A");
Mockito.verify(observerA, times(1)).observe("Message for A & B");
Mockito.verifyNoMoreInteractions(observerA);
Mockito.verify(observerB, times(1)).observe("Message for A & B");
Mockito.verify(observerB, times(1)).observe("Message for B");
Mockito.verifyNoMoreInteractions(observerB);
}
멀티 스레드 사용
- 대기 시간이 상당히 긴 이벤트를 구독하는 관찰자가 많은 경우, 추가적인 스레드 할당 또는 스레드 풀을 사용해서 메시지를 병렬로 전달할 수 있다.
private final ExecutorService executorService = Executors.newCachedThreadPool();
public void notifyObservers(String event) {
observers.forEach(observer -> executorService.submit(() -> observer.observe(event)));
}
- 이러한 멀티 스레드 환경 구현시에는 스레드 풀 크기를 제한하지 않으면 비효율성 및 버그를 발생시킬 수 있다.
- 각 스레드는 자바에서 약 1MB를 소비하므로 단 몇천 개의 스레드만으로도 사용 가능한 어플리케이션 메모리를 모두 소모할 수 있다.
- 과도한 리소스 사용을 방지하기 위해 스레드 풀 크기를 제한할 수 있다.
java.util.Observer
java.util패키지 내에Observer와Observable클래스가 포함되어 있다.- 자바 제네릭 이전에 도입되었기 때문에
Object타입을 사용하여 타입 안정성이 보장되지 않는다.- 이런 구현 방식은 멀티 스레드 환경에서 효율적이지 않다.
- 이러한 문제점들로 인해 이
Observer클래스는 개발 시 사용하지 않는 것이 좋다.
어플리케이션 개발 시 관찰자 패턴을 직접 구현할 수도 있다.
그러나 멀티 스레드 응용 프로그램에서 여러 측면의 문제들을 고려하여 직접 구현하는 것은 매우 번거롭고 어렵다.
즉, 믿을 수 있는 조직에서 제공하는 오류 처리, 비동기 실행, 스레드 안정성, 성능 요구 사항들을 모두 만족하는 구현체를 사용하는 것이 직접 구현하는 것보다 좋다.
@EventListener를 사용한 발행 - 구독 패턴
- 스프링 프레임워크는 이벤트 처리를 위한
@EventListener어노테이션과 이벤트 발행을 위한ApplicationEventPublisher클래스를 제공한다.- 이것들은 발행 - 구독 패턴을 구현한 것이다.

-
발행 - 구독 패턴은 게시자와 구독자 간에 간접적인 계층을 제공하기 때문에 게시자와 구독자는 서로를 알 필요가 없다.
- 각 이벤트 채널에는 동시에 여러개의 게시자가 있을 수 있다.
- 이벤트 채널(message broker 또는 event bus)은 수신 메시지를 구독자에게 배포하기 전에 필터링 작업을 할 수도 있다.
- 필터링 및 라우팅은 메시지 내용이나 메시지 주제에 의해 발생할 수 있다.
- 즉, 토픽 기반 시스템에서 구독자는 관심 토픽에 게시된 모든 메시지를 수신하게 된다.
@EventListener 활용한 응용 프로그램 개발
- 리액티브 디자인에 따라 온도 센서를 통해 측정한 온도를 보여주는 웹 서비스를 구현한다고 가정해보자.
- 온도 센서는 난수 생성기를 사용해 시뮬레이션 한다.
- 서버에서 클라이언트로 비동기 메시지 전달을 할 수 있는 WebSocket, SSE(Server-Sent Event) 같은 프로토콜들을 사용할 수 있다.
- SSE를 사용하면 클라이언트가 서버에서 보낸 메시지를 자동으로 수신할 수 있다.
- 따라서 SSE는 브라우저에 메시지를 업데이트하거나 연속적인 데이터 스트림을 보내는 데 사용한다.
- 최신 브라우저에는
EventSource라는 자바스크립트 API가 적용되는데, 이 API는 이벤트 스트림을 수신하기 위해 특정 URL을 호출하는데 사용한다.
- SSE는 리액티브 시스템의 구성 요소 간에 통신 요구사항을 충족시키는 최적의 후보이다.
- SSE를 사용하면 클라이언트가 서버에서 보낸 메시지를 자동으로 수신할 수 있다.
비즈니스 로직 구현하기

final class Temperature {
private final double value;
}
@Component
public class TemperatureSensor {
private final ApplicationEventPublisher publisher;
private final Random rnd = new Random();
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
public TemperatureSensor(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@PostConstruct
public void startProcessing() {
this.executor.schedule(this::probe, 1, SECONDS);
}
private void probe() {
double temperature = 16 + rnd.nextGaussian() * 10;
publisher.publishEvent(new Temperature(temperature));
executor.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS);
}
}
- 위 클래스는 온도 센서를 시뮬레이션 하기 위한 클래스이다.
ApplicationEventPublisher클래스를 사용하여 이벤트를 시스템에 발행한다.- 각 이벤트의 생성은 임의의 지연시간(0~5초) 후에 다음 이벤트 생성을 예약한다.
스프링 웹 MVC를 이용한 비동기 HTTP 통신
- 서블릿 3.0에서 HTTP 요청을 처리하는 기능을 추가하여 비동기 지원 기능을 확장했다.
- 스프링 웹 MVC에서
@Controller는 단일 타입 T 외에도Callable<T>또는DeferredResult<T>도 반환할 수 있게 되었다. DeferredResult<T>는setResult()메서드를 호출해 컨테이너 스레드 외부에서도 비동기 응답을 생성하므로 이벤트 루프 안에서도 사용할 수 있다.
- 스프링 웹 MVC에서
- 스프링 웹 MVC 버전 4.2부터는
DeferredResult와 비슷하게 동작하는ResponseBodyEmitter를 반환할 수 있다.ResponseBodyEmitter는 메시지 컨버터(HttpMessageConverter인터페이스의 구현 클래스)에 의해 개별적으로 만들어진 여러개의 메시지를 전달하는 용도로 사용할 수 있다.
- 스프링 웹 MVC는
ResponseBodyEmitter와 함께StreamingResponseBody인터페이스도 지원한다.- 이를 사용하면
@Controller에서 데이터 반환 시에 비동기적으로 전달할 수 있다.
- 이를 사용하면
SSE 엔드포인트
SseEmitter는ResponseBodyEmitter를 상속하여 SSE의 프로토콜 요구 사항에 따라 하나의 수신 요청에 대해 다수의 발신 메시지를 보낼 수 있는 클래스이다.
@RestController
public class TemperatureController {
private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();
@GetMapping("/temperature-stream")
public SseEmitter events(HttpServletRequest request) {
SseEmitter emitter = new SseEmitter();
clients.add(emitter);
emitter.onTimeout(() -> clients.remove(emitter));
emitter.onCompletion(() -> clients.remove(emitter));
return emitter;
}
@Async
@EventListener
public void handleMessage(Temperature temperature) {
List<SseEmitter> deadEmitters = new ArrayList<>();
clients.forEach(emitter -> {
try {
emitter.send(temperature, MediaType.APPLICATION_JSON);
} catch(Exception ignore) {
deadEmitters.add(emitter);
}
});
clients.removeAll(deadEmitters);
}
}
- SSE 이벤트를 보내기 위해
SseEmitter클래스를 사용했다. - 요청 처리 메서드가
SseEmitter인스턴스를 반환하더라도SseEmitter.complete()메서드가 호출되거나 오류나 시간 초과가 발생할 때까지 실제 요청 처리는 계속된다. - 클라이언트가
/temperature-streamURI를 요청하면 새로운SseEmitter인스턴스를 만들어 활성 클라이언트 목록에 등록함과 동시에 반환한다.- 웹 클라이언트가 새로운 SSE 세션을 요청하면
clients컬렉션에 새로운SseEmitter를 추가하고,SseEmitter의 처리가 끝나거나 타임아웃이 발생하면clients컬렉션에서 자신을 제거한다.
- 웹 클라이언트가 새로운 SSE 세션을 요청하면
- 웹 클라이언트와의 커뮤니케이션 채널이 있다는 것은 온도 변화에 대한 이벤트를 수신할 수 있는 의미이다.
- 이벤트를 수신하기 위해
handleMessage()메서드를 구현했다.
- 이벤트를 수신하기 위해
- 스프링으로부터 이벤트를 수신하기 위해
@EventListener어노테이션을 설정하여 온도 이벤트를 수신할 때만handleMessage()메서드를 호출하도록 한다.- 즉,
TemperatureSensor.probe()메서드가 호출되어Temperature타입의 이벤트가 발생했을 때handleMessage()메서드가 실행되고,clients목록에 있는 클라이언트들에게 온도 정보를 메시지로 만들어 전달하는 것이다.
- 즉,
@Async어노테이션을 설정하여 메서드를 별도로 구성된 스레드 풀에서 호출하여 비동기적으로 실행하도록 한다.
솔루션에 대한 평가
- 현재 위에서 구현한 솔루션은 스프링에서 제공하는 발행 - 구독 구조를 사용하고 있다.
- 이 구조는 응용 프로그램 수명 주기 이벤트를 처리하기 위해 처음 도입됐으며, 고부하 및 고성능 시나리오를 위한 것이 아니다.
- 따라서, 하나의 온도 데이터가 아닌 수천, 수백만 개의 개별 스트림을 필요로 할 때 프로그램이 감당할 수 있을지는 알 수 없다.
- 비즈니스 로직을 구현하기 위해 스프링 프레임워크의 내부 매커니즘을 사용하고 있다.
- 즉, 프레임워크의 사소한 변경으로 인해 프로그램의 안정성이 깨질 수 있어 위험하다.
- 또한, Spring의 ApplicationContext를 로딩하지 않고 비즈니스 로직을 단위 테스트하기 어렵다는 단점이 있다.
- 온도 이벤트를 비동기적으로 브로드캐스팅하기 위해 스레드 풀을 사용하고 있다.
- 이는 진정한 비동기적 리액티브 접근에서는 필요없는 일이다.
이러한 문제들을 해결하기 위해 이 목적만을 위해 설계된 리액티브 라이브러리가 필요하다.
RxJava는 이런 목적을 가진 최초의 리액티브 라이브러리이다.
리액티브 프레임워크 RxJava
- RxJava 라이브러리는 Reactive Extensions(ReactiveX)의 자바 구현체이다.
- ReactiveX는 동기식 또는 비동기식 스트림과 관계없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구이다.
- Observer 패턴, Iterator 패턴 및 함수형 프로그래밍의 조합으로 정의된다.
관찰자 + 반복자 = 리액티브 스트림
- 관찰자 패턴은 무한한 데이터 스트림에 대해서는 매력적이지만, 데이터 스트림의 끝을 알리는 기능이 없다.
- 또한, consumer가 준비되기 전에 producer가 이벤트를 생성하는 것은 적절하지 않을 수 있다.
- 동기식 구현에서, 이런 때를 대비한 반복자 패턴이 존재한다.
public interface Iterator<T> {
T next();
boolean hasNext();
}
Iterator인터페이스는 항목을 하나씩 검색하기 위한next()메서드와 시퀀스의 끝을 확인하기 위한hasNext()메서드를 제공한다.
RxObserver
- Iterator 패턴과 관찰자 패턴에 의한 비동기 실행을 합치면 다음과 같다.
public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}
- 이 인터페이스는 리액티브 스트림의 모든 컴포넌트 사이에 데이터가 흐르는 방법을 정의한다.
onNext(): 콜백으로RxObserver에 새로운 값이 전달된다.onComplete(): 스트림의 끝을 알린다.onError():next()메서드를 처리하는 동안 exception이 발생했을 경우 콜백으로 예외를 전달한다.
Observable
Observable클래스는 관찰자 패턴의 Subject와 일치한다.- 즉, 이벤트 소스 역할을 수행한다.
- 리액티브 스트림을 초기화하는 팩토리 메서드와 스트림 변환 메서드를 포함한다.
Subscriber추상 클래스가Observer인터페이스를 구현하여Observable이 생성한 이벤트를 소비한다.

- Observable은 일정 개수의 이벤트를 보낼 수 있고, 그 다음 성공을 알리거나 오류를 발생시켜 실행 종료를 알린다.
- 즉, 각 구독자에 연결된
Observable은onNext()를 여러번 호출한 다음,onComplete()또는onError()를 호출한다.
- 즉, 각 구독자에 연결된
스트림의 생산과 소비
Observable<String> observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
});
- 구독자에게 적용될 콜백을 가진
Observable클래스를 정의했다.- 콜백이 호출되면
Observer는 하나의 문자열을 생성한 후, 스트림의 끝을 구독자에게 알린다.
- 콜백이 호출되면
- 구독자를 정의해서 생성한
Observable인스턴스와 연결할 수 있다.
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
};
observable.subscribe(subscriber);
- 람다식을 이용해서 다음과 같이 구현할 수도 있다.
Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}).subscribe(
System.out::println,
System.out::println,
() -> System.out.println("Done!")
);
이처럼 Observable을 생성하는 방식은 backpressure를 지원하지 않기 때문에 권장되지 않는다.
다양한 Observable 인스턴스 생성 방식
- RxJava 라이브러리는 Observable 인스턴스를 생성하기 위해 많은 유연성을 제공한다.
- 요소를 직접 등록할 수도 있고, 배열을 사용할 수도 있으며,
Iterable컬렉션을 이용해 만들 수도 있다.
Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());
Callable또는Future를 사용할 수도 있다.
Observable<String> hello = Observable.fromCallable(() -> "hello");
Future<String> future = Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future);
- Observable 스트림은 다른
Observable인스턴스와 결합해 생성할 수도 있어 복잡한 워크플로우를 쉽게 구현할 수 있다.concat()메서드를 사용해 입력 스트림을 다운 스트림 Observable로 다시 보낼 수 있다.
Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);
- 처리 순서는
concat()메서드의 파라미터 순서와 동일하다. Observable.forEach()메서드를 이용해 결과를 반복 처리한다.
비동기 시퀀스 생성하기
- RxJava는 하나의 이벤트뿐만 아니라 주기적으로 비동기 이벤트 시퀀스를 생성할 수 있다.
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Received : " + e));
Thread.sleep(5000);
실행 결과
Received : 0
Received : 1
Received : 2
Received : 3
Received : 4
- 위 코드에서
Thread.sleep()을 제거하면 아무것도 출력되지 않고 종료된다.- 메인 스레드와 이벤트가 생성되는 스레드는 별개이기 때문이다.
- 즉, 메인 스레드의 종료를
sleep()으로 지연시키면 다른 스레드에서 1초에 한번씩 이벤트를 발행하는 것이다.
스트림 변환과 마블 다이어그램
- RxJava의 모든 기능은 연산자에 의해 구현된다고 할 수 있다.
- 연산자는 스트림의 원소를 조정하거나 스트림 구조 자체를 변경할 수 있다.
Map 연산자
<R> Observable<R> map(Func1<T, R> func)
func함수가 타입<T>를 타입<R>로 변환하고,map을 통해Observable<T>를Observable<R>로 변환할 수 있음을 의미한다.

Filter 연산자
filter는 조건부 테스트를 성공적으로 통과한 원소만 재발행 함으로써 입력 스트림보다 적은 수의 원소를 생성할 수도 있다.

Count 연산자
count는 입력 스트림의 개수를 발행한다.- 입력 스트림이 완료될 때 카운트가 발행되므로 스트림이 무한대일 때는
count연산자가 완료되지 않거나 아무것도 반환하지 않는다.

Zip 연산자
zip은 지정된 함수를 통해 여러 Observable의 이벤트를 결합하고, 결합한 결과를 발행한다.

Observable.zip(
Observable.just("A", "B", "C"),
Observable.just("1", "2", "3"),
(x, y) -> x + y
).forEach(System.out::println);
실행결과
A1
B2
C3
https://rxmarbles.com 에서 다른 연산자들의 마블 다이어그램을 확인할 수 있다.
RxJava를 이용한 어플리케이션 구현
-
앞에서 구현했던 온도 감지 어플리케이션을 RxJava로 다시 구현해보자.
-
RxJava 라이브러리를 사용하기 위해
build.gradle에 의존성을 추가한다.compile('io.reactivex:rxjava:1.3.8')
비즈니스 로직 구현
TemperatureSensor클래스에서 온도 측정 후Temperature이벤트가 있는 리액티브 스트림을 반환하도록 한다.
@Component
public class TemperatureSensor {
private final Random rnd = new Random();
private final Observable<Temperature> dataStream =
Observable
.range(0, Integer.MAX_VALUE)
.concatMap(tick -> Observable.just(tick)
.delay(rnd.nextInt(5000), MILLISECONDS)
.map(tickValue -> this.prove()))
.publish()
.refCount();
private Temperature probe() {
return new Temperature(16 + rnd.nextGaussian() * 10);
}
public Observable<Temperature> temperatureStream() {
return dataStream;
}
}
dataStream은 팩토리 메서드range()를 사용해 사실상 무한대의 숫자 스트림을 생성한다.- 즉,
range(0, Integer.MAX_VALUE)는 0부터Integer.MAX_VALUE까지의 정수 시퀀스를 생성한다.
- 즉,
- 이 숫자 스트림들의 각 값들을
concatMap()을 이용해 변환한다.concatMap()메서드는 메서드 f를 수신한다.- 메서드 f는
tick객체를Observable스트림으로 변환하고 f 함수를 적용한 다음, 결과 스트림에 하나씩 결합하는 역할을 한다. - 즉, 위 코드에서 f 함수는 임의의 지연시간 후에
probe()메서드를 통해 센서 측정을 수행하고 값을 변환한다.
- 메서드 f는
- 결과적으로 각 측정 사이의 최대 간격이 5초인 센서 값을 반환하는 스트림을 받을 수 있다.
flatMap
- 변환 연산자
- 하나의 Observable이 발행하는 각 item을 여러개의 item들을 가진 Observable로 변환하고, item들의 배출을 차례로 줄 세워 하나의 Observable로 반환한다.

String[] balls={"1","3","5"};
Observable<String> source=Observable.interval(100L,TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx->balls[idx])
.take(balls.length)
.flatMap(ball->Observable.interval(200L,TimeUnit.MILLISECONDS)
.map(notUsed->ball+"<>")
.take(2));
source.subscribe(Log::it);
실행결과
RxComputationThreadPool-2 | 386 | value = 1<>
RxComputationThreadPool-3 | 487 | value = 3<>
RxComputationThreadPool-4 | 585 | value = 5<>
RxComputationThreadPool-4 | 585 | value = 1<>
RxComputationThreadPool-3 | 687 | value = 3<>
RxComputationThreadPool-4 | 786 | value = 5<>
concatMap
flatMap과 비슷하게 동작하며, 먼저 들어온 데이터 순서대로 처리하여 결과를 내도록 보장한다.flatMap은 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 끼어들기(interleaving)가 가능한 것과는 다르다.

- 위 코드에서
flatMap만concatMap으로 바꿨을 때 결과는 다음과 같다.
실행결과
RxComputationThreadPool-2 | 604 | value = 1<>
RxComputationThreadPool-2 | 804 | value = 1<>
RxComputationThreadPool-3 | 1005 | value = 3<>
RxComputationThreadPool-3 | 1206 | value = 3<>
RxComputationThreadPool-4 | 1407 | value = 5<>
RxComputationThreadPool-4 | 1608 | value = 5<>
- 먼저 들어온 데이터를 먼저 처리하기 때문에 다른 데이터들은 대기 후 다음에 처리된다.
- 끼어들기가 가능한
flatMap에 비해 시간이 더 오래 걸린다.
- 끼어들기가 가능한
publish
Observable을ConnectableObservable로 변환해준다.- Observable : 차가운 Observable 성격
subscribe()했을 때 항상 같은 데이터가 차례로 발행된다.
- ConnectableObservable : 뜨거운 Observable 성격
subscribe()->connect(): 구독자를 대기시켜 동시에 같은 데이터를 받을 수 있도록 한다.connect()->subscribe(): 구독 전에 발행된 데이터는 받지 못한다.
- Observable : 차가운 Observable 성격
ConnectableObservableconnect()를 실행하면 데이터가 발행되기 시작한다.refCount(): 구독자가 얼마나 있는지 추적하면서 스트림 데이터를 발행한다.- 처음 구독이 되면 그 때부터
connect()되어 emit이 진행된다. ConnectableObservable을Observable처럼 동작하게 만들기 때문에, 추가로 구독이 이루어졌을 때 해당 구독자에게 별도로 emit이 진행되어 다른 구독자들과 같은 데이터를 처음부터 받을 수 있게 한다.- 구독자들이 모두 없어지면 emit이 진행되지 않는다.
- 처음 구독이 되면 그 때부터
Custom SseEmitter
public class RxSseEmitter extends SseEmitter {
static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
private static final Subscriber<Temperature> subscriber;
public RxSseEmitter() {
super(SSE_SESSION_TIMEOUT);
this.subscriber = new Subscriber<Temperature>() {
@Override
public void onNext(Temperature temperature) {
try {
RxSseEmitter.this.send(temperature);
} catch (IOException e) {
unsubscribe();
}
}
@Override
public void onError(Throwable e) { }
@Override
public void onCompleted() { }
};
onCompletion(subscriber::unsubscribe);
onTimeout(subscriber::unsubscribe);
}
public Subscriber<Temperature> getSubscriber() {
return subscriber;
}
}
Temperature이벤트에 대한 구독자가onNext신호를 수신하면 응답으로 SSE 클라이언트에게 다시 신호를 보낸다.- 데이터 전송에 실패하면 구독자는 수신한 스트림으로부터 자기 자신을 unsubscribe 한다.
SSE 엔드포인트
@RequiredArgsConstructor
@RestController
public class TemperatureController {
private final TemperatureSensor temperatureSensor;
@GetMapping("/temperature-stream")
public SseEmitter events(HttpServletRequest request) {
RxSseEmitter emitter = new RxSseEmitter();
temperatureSensor.temperatureStream().subscribe(emitter.getSubscribe);
return emitter;
}
}
- 새로운 SSE 세션이 생성될 때
RxSseEmitter인스턴스를 생성하고,TemperatureSensor가 생성하는 온도 스트림을 구독한다.
이 구현은 스프링의 EventBus를 사용하지 않으므로 이식성이 더 높고, 스프링 컨텍스트 없이도 단위 테스트를 할 수 있다.
리액티브의 전망
- RxJava의 성공에 따라 다양한 리액티브 라이브러리가 구현되고, 리액티브 환경에 다양성이 증가하고 있다.
- 그러나, 하나의 자바 어플리케이션에 다른 종류의 리액티브 라이브러리 또는 프레임워크를 사용하는 것은 API 불일치로 인해 위험하다.
- 전체 리액티브 환경을 아우르며 호환성을 보장하는 표준 리액티브 스트림의 필요성이 대두되었다.