-
Reactor - Reactive Streams 생명 주기spring 2019. 10. 21. 00:07
개요
Spring Webflux를 사용하여 리액티브 웹 서버를 만들기 위해서는 Webflux의 구조를 이해하는 것 만큼이나 Reactor의 구조를 이해하는 것이 중요하다. Reactor에서 reactive stream은 다음과 같은 3단계의 생명 주기를 가지고있다.
1. 조립 단계 (Assembly-time)
2. 구독 단계 (Subscription-time)
3. 런타임 단계 (Runtime)
각각의 단계에 대해서 자세히 알아보자.
조립 단계 (Assembly-time)
Reactive stream의 생명 주기의 첫 번째 단계는 조립 단계이다. Reactor가 제공하는 체인형 api를 사용하여 Flux나 Mono를 만들수 있는데, 이는 언뜻 보면 빌더 패턴처럼 보인다. 그러나, 해당 api들은 각각 immutable한 Flux/Mono 객체를 생성한다. 이 객체들을 서로 연결해주는 작업이 조립 단계이다. 다음과 같은 코드를 보자.
Flux.just(1, 20, 300) .map(String::valueOf) .filter(s -> s.length() > 1)
위 코드를 연쇄형 api를 쓰지 않고 구성하면 다음과 같이 나눠 쓸 수 있다.
Flux<Integer> sourceFlux = new FluxArray(1, 20, 300); Flux<String> mapFlux = new FluxMap(sourceFlux, String::valueOf); Flux<String> filterFlux = new FluxFilter(mapFlux, s -> s.length() > 1);
이렇게 각 타입의 Flux 객체가 생성되고, 다음 체인의 Flux객체의 생성자에 이전 Flux객체를 참조할 수 있도록 파라미터로 전달해준다. 그 결과 다음과 같은 체인 형태를 이루게 된다.
FluxFilter( FluxMap( FluxArray(1, 20, 300) ) ) )
조립 단계가 중요한 이유는, 단순히 객체들을 체이닝시켜주는 것 뿐만 아니라 필요한 경우에 조립 연산자를 바꿀 수 있기 때문이다. 이러한 형태의 최적화를 매크로 퓨전이라고 부른다. 예시로 Flux의 concatWith() 연산자 코드를 보자.
public final Flux<T> concatWith(Publisher<? extends T> other) { if (this instanceof FluxConcatArray) { @SuppressWarnings({ "unchecked" }) FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this; return fluxConcatArray.concatAdditionalSourceLast(other); } return concat(this, other); }
만약에 Flux.concatWith()가 연속해서 실행될 경우, 각각 FluxConcatArray를 만들어서 체이닝하지 않고, 하나의 FluxConcatArray를 만들어서 전체적인 성능을 향상시킨다.
구독 단계 (Subscription-time)
리액티브 스트림의 생명 주기의 두 번째 단계는 구독 단계이다. publisher에 subscribe()를 호출하였을 때 발생하는 단계이다. 앞서 언급했듯이, publisher들이 일련의 체인으로 묶여있기 때문에, subscriber도 각 publisher마다 하나씩 생기고 일련의 체인으로 묶어주어야 구독의 전파가 가능해진다. 다음 예시는 위의 조립 단계에서의 Flux체인에 대해 구독 단계에서 발생하는 subscriber 체인을 보여주는 예시이다.
filterFlux.subscribe(Subscriber) { mapFlux.subscribe(new FilterSubscriber(Subscriber)) { arrayFlux.subscribe(new MapSubscriber(FilterSubsriber(Subscriber))) { // 데이터 송신 } } }
publisher 체인을 따라 subscribe()가 전파가 되기 때문에, 다음 체인의 Subscriber 생성자에 자기 자신을 파라미터로 전달하고, 그에 따라 publisher 체인과는 다시 반대 순서의 체인이 형성된다.
ArraySubscriber( MapSubscriber( FilterSubscriber( Subscriber ) ) )
런타임 단계 (Runtime)
스트림 생명 주기의 마지막 단계는 런타임 단계이다. 이 단계에서 publisher와 subscriber가 onSubscribe() 시그널과 request() 시그널을 교환하면서 스트림 실행이 시작된다. 위 예제에서 최상위 subscriber는 ArraySubscriber이므로 거기서부터 onSubscribe()가 전파된다.
MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe( new ArraySubscription ) { FilterSubscriber(Subscriber).onSubscribe( new MapSubscription(ArraySubscription(...)) ) { Subscriber.onSubscribe( FilterSubscription(MapSubscription(ArraySubscription(...))) ) { // 데이터 요청 } } }
이 과정을 통해 다시 한 번 Subscription의 체인이 다음과 같이 형성된다.
FilterSubscription( MapSubscription( ArraySubscription() ) )
위 체인을 따라 request(n) 요청이 전파되고, 가장 안 쪽에 있는 ArraySubscription의 데이터 송신이 시작된다. 런타임 단계에서도 request() 호출 횟수를 줄이는 등의 최적화가 이루어진다. (마이크로 퓨전)
참고
Subscription::request() 메소드는 호출 시 요청 수요 정보를 volatile 필드에 쓴다. 이는 비용이 많이 드는 연산이므로 request() 호출이 적을수록 좋다.
참고
올레 도쿠카, 이호르 로진스키 <실전! 스프링5를 활용한 리액티브 프로그래밍>, 2019, pp.183-189
'spring' 카테고리의 다른 글
Spring Webflux Cache (1) 2019.10.28 Spring WebFlux - Config (0) 2019.08.29 Spring Webflux - CORS (0) 2019.07.31 Spring Webflux - Functional Endpoints (0) 2019.07.25 Spring Webflux - DispatcherHandler (0) 2019.07.18