ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Reactor - Reactive Streams 생명 주기
    spring 2019. 10. 21. 00:07

    개요


    Spring Webflux를 사용하여 리액티브 웹 서버를 만들기 위해서는 Webflux의 구조를 이해하는 것 만큼이나 Reactor의 구조를 이해하는 것이 중요하다. Reactor에서 reactive stream은 다음과 같은 3단계의 생명 주기를 가지고있다.

     

    1. 조립 단계 (Assembly-time)

    2. 구독 단계 (Subscription-time)

    3. 런타임 단계 (Runtime)

     

    각각의 단계에 대해서 자세히 알아보자.

     

    조립 단계 (Assembly-time)


    Reactive stream의 생명 주기의 첫 번째 단계는 조립 단계이다. Reactor가 제공하는 체인형 api를 사용하여 Flux나 Mono를 만들수 있는데, 이는 언뜻 보면 빌더 패턴처럼 보인다. 그러나, 해당 api들은 각각 immutable한 Flux/Mono 객체를 생성한다. 이 객체들을 서로 연결해주는 작업이 조립 단계이다. 다음과 같은 코드를 보자.

    Flux.just(1, 20, 300)
        .map(String::valueOf)
        .filter(s -> s.length() > 1)

    위 코드를 연쇄형 api를 쓰지 않고 구성하면 다음과 같이 나눠 쓸 수 있다.

    Flux<Integer> sourceFlux = new FluxArray(1, 20, 300);
    Flux<String> mapFlux = new FluxMap(sourceFlux, String::valueOf);
    Flux<String> filterFlux = new FluxFilter(mapFlux, s -> s.length() > 1);

    이렇게 각 타입의 Flux 객체가 생성되고, 다음 체인의 Flux객체의 생성자에 이전 Flux객체를 참조할 수 있도록 파라미터로 전달해준다. 그 결과 다음과 같은 체인 형태를 이루게 된다.

    FluxFilter(
        FluxMap(
            FluxArray(1, 20, 300)
            )
        )
    )

    조립 단계가 중요한 이유는, 단순히 객체들을 체이닝시켜주는 것 뿐만 아니라 필요한 경우에 조립 연산자를 바꿀 수 있기 때문이다. 이러한 형태의 최적화를 매크로 퓨전이라고 부른다. 예시로 Flux의 concatWith() 연산자 코드를 보자.

        public final Flux<T> concatWith(Publisher<? extends T> other) {
    		if (this instanceof FluxConcatArray) {
    			@SuppressWarnings({ "unchecked" })
    			FluxConcatArray<T> fluxConcatArray = (FluxConcatArray<T>) this;
    
    			return fluxConcatArray.concatAdditionalSourceLast(other);
    		}
    		return concat(this, other);
    	}

    만약에 Flux.concatWith()가 연속해서 실행될 경우, 각각 FluxConcatArray를 만들어서 체이닝하지 않고, 하나의 FluxConcatArray를 만들어서 전체적인 성능을 향상시킨다.

     

    구독 단계 (Subscription-time)


    리액티브 스트림의 생명 주기의 두 번째 단계는 구독 단계이다. publisher에 subscribe()를 호출하였을 때 발생하는 단계이다. 앞서 언급했듯이, publisher들이 일련의 체인으로 묶여있기 때문에, subscriber도 각 publisher마다 하나씩 생기고 일련의 체인으로 묶어주어야 구독의 전파가 가능해진다. 다음 예시는 위의 조립 단계에서의 Flux체인에 대해 구독 단계에서 발생하는 subscriber 체인을 보여주는 예시이다.

    filterFlux.subscribe(Subscriber) {
        mapFlux.subscribe(new FilterSubscriber(Subscriber)) {
            arrayFlux.subscribe(new MapSubscriber(FilterSubsriber(Subscriber))) {
                // 데이터 송신
            }
        }
    }

    publisher 체인을 따라 subscribe()가 전파가 되기 때문에, 다음 체인의 Subscriber 생성자에 자기 자신을 파라미터로 전달하고, 그에 따라 publisher 체인과는 다시 반대 순서의 체인이 형성된다.

    ArraySubscriber(
        MapSubscriber(
            FilterSubscriber(
                Subscriber
            )
        )
    )

     

    런타임 단계 (Runtime)


    스트림 생명 주기의 마지막 단계는 런타임 단계이다. 이 단계에서 publisher와 subscriber가 onSubscribe() 시그널과 request() 시그널을 교환하면서 스트림 실행이 시작된다. 위 예제에서 최상위 subscriber는 ArraySubscriber이므로 거기서부터 onSubscribe()가 전파된다.

    MapSubscriber(FilterSubscriber(Subscriber)).onSubscribe(
        new ArraySubscription
    ) {
        FilterSubscriber(Subscriber).onSubscribe(
            new MapSubscription(ArraySubscription(...))
        ) {
            Subscriber.onSubscribe(
                FilterSubscription(MapSubscription(ArraySubscription(...)))
            ) {
                // 데이터 요청
            }
        }
    }

    이 과정을 통해 다시 한 번 Subscription의 체인이 다음과 같이 형성된다.

    FilterSubscription(
        MapSubscription(
            ArraySubscription()
        )
    )

    위 체인을 따라 request(n) 요청이 전파되고, 가장 안 쪽에 있는 ArraySubscription의 데이터 송신이 시작된다. 런타임 단계에서도 request() 호출 횟수를 줄이는 등의 최적화가 이루어진다. (마이크로 퓨전)

     

    참고

    Subscription::request() 메소드는 호출 시 요청 수요 정보를 volatile 필드에 쓴다. 이는 비용이 많이 드는 연산이므로 request() 호출이 적을수록 좋다.

     

    참고


    올레 도쿠카, 이호르 로진스키 <실전! 스프링5를 활용한 리액티브 프로그래밍>, 2019, pp.183-189

     

    'spring' 카테고리의 다른 글

    Spring Webflux Cache  (1) 2019.10.28
    Spring WebFlux - Config  (0) 2019.08.29
    Spring Webflux - CORS  (0) 2019.07.31
    Spring Webflux - Functional Endpoints  (0) 2019.07.25
    Spring Webflux - DispatcherHandler  (0) 2019.07.18

    댓글

Designed by Tistory.