ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring WebClient
    spring 2019. 7. 10. 17:11

    개요


    Spring Webflux에는 reactive, non-blocking하게 HTTP 요청을 처리할 수 있도록 WebClient라는 모듈을 제공한다. 기존의 RestTemplate과 같은 역할 하지만, non-blocking하다라는 점에서 차이가 있다.

     

    내부적으로 WebClient는 HTTP 클라이언트 라이브러리에 위임하는데, 디폴트로 Reactor Netty의 HttpClient를 사용한다. Reactor Netty 외에도, Jetty의 HttpClient를 지원하며, 다른 라이브러리도 ClientHttpConnector에 넣어주면 사용할 수 있다.

     

    설정


    WebClient를 생성하는 가장 간단한 방식은 아래와 같은 static 팩토리 메소드들을 사용하는 것이다.

    • WebClient.create()
    • WebClient.create(String baseUrl)

    WebClient.builder()를 사용해 다양한 옵션을 셋팅할 수도 있다.

    • uriBuilderFactory: base URL로 사용될 UriBuilderFactory
    • defaultHeader: 모든 요청에 대한 기본 헤더
    • defaultCookie: 모든 요청에 대한 기본 쿠키
    • defaultRequest: 모든 요청에 대한 Consumer
    • filter: 모든 요청에 대한 클라이언트 필터
    • exchangeStrategies: HTTP 메세지 reader/writer 커스터마이징 (ex. codec)
    • clientConnector: HTTP 클라이언트 라이브러리 셋팅

    다음 예제는 HTTP codecs를 설정하는 내용이다.

        ExchangeStrategies strategies = ExchangeStrategies.builder()
                .codecs(configurer -> {
                    // ...
                })
                .build();
    
        WebClient client = WebClient.builder()
                .exchangeStrategies(strategies)
                .build();

    일단 build()가 호출되면, WebClient는 불변 객체이다. 그러나, 다음과 같은 방식으로 클론하여 기존 인스턴스에 영향을 주지않고도 수정하여 사용할 수 있다.

        WebClient client1 = WebClient.builder()
                .filter(filterA).filter(filterB).build();
    
        WebClient client2 = client1.mutate()
                .filter(filterC).filter(filterD).build();
    
        // client1 has filterA, filterB
    
        // client2 has filterA, filterB, filterC, filterD

     

    Reactor Netty 커스터마이징

    Reactor Netty 셋팅을 커스터마이징하려면 다음과 같이 미리 HttpClient를 설정해두고 clientConnector의 파라미터로 넘겨주면 된다.

        HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
    
        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .build();

     

    디폴트로 HttpClient는 이벤트 루프 스레드들과 커넥션 풀을 포함하여 reactor.netty.http.HttpResources에 있는 글로벌 Reactor Netty 리소스들을 사용한다. 고정된 몇 개의 공유 리소스들을 사용하는 것이 동시성에 유리하기 때문에, 이 방식이 추천된다. 이 방식을 사용하면 프로세스가 닫힐 때까지 글로벌 리소스들은 계속 살아있게 된다.

     

     프로세스가 닫히면 리소스들도 사라지지만, WAR형태로 하나의 프로세스 안에서 서버가 뜨고 내려갈 수 있는 경우, 스프링 ApplicationContext가 닫히면 글로벌 Reactor Netty 리소스들도 닫히도록 ReactorResourceFactory를 빈으로 등록할 수도 있다.

        @Bean
        public ReactorResourceFactory reactorResourceFactory() {
            return new ReactorResourceFactory();
        }

    아래와 같이 글로벌 Reactor Netty 리소스를 사용하지 않도록 설정할 수도 있다.

        @Bean
        public ReactorResourceFactory resourceFactory() {
            ReactorResourceFactory factory = new ReactorResourceFactory();
            // 글로벌 리소스 사용하지 않음
            factory.setGlobalResources(false); 
            return factory;
        }
    
        @Bean
        public WebClient webClient() {
    
            Function<HttpClient, HttpClient> mapper = client -> {
                // Further customizations...
            };
    
            // resourceFactory 빈을 생성자에 넣어준다.
            ClientHttpConnector connector =
                    new ReactorClientHttpConnector(resourceFactory(), mapper); 
    
            // clientConnector에 파라미터로 전달해 WebClient를 빌드한다.
            return WebClient.builder().clientConnector(connector).build(); 
        }

     

    아래와 같이 HttpClient에 타임아웃 설정을 넣을 수 있다.

    import io.netty.channel.ChannelOption;
    
    HttpClient httpClient = HttpClient.create()
            .tcpConfiguration(client ->
                    client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000));

    read/write 타임아웃 설정을 넣을 수도 있다.

    import io.netty.handler.timeout.ReadTimeoutHandler;
    import io.netty.handler.timeout.WriteTimeoutHandler;
    
    HttpClient httpClient = HttpClient.create()
            .tcpConfiguration(client ->
                    client.doOnConnected(conn -> conn
                            .addHandlerLast(new ReadTimeoutHandler(10))
                            .addHandlerLast(new WriteTimeoutHandler(10))));

     

    retrieve()


    retrieve() 메소드는 response body를 가져오는 가장 단순한 방법이다.

        WebClient client = WebClient.create("https://example.org");
    
        Mono<Person> result = client.get()
                .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(Person.class);

    오브젝트 stream을 다음과 같이 Flux로 받을 수도 있다.

        Flux<Quote> result = client.get()
                .uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(Quote.class);

    디폴트로, HTTP 응답 코드가 4xx, 5xx로 오면, WebClientResponseException이 발생한다. 그러나 아래처럼 onStatus() 메소드를 통해 exception을 커스터마이징할 수 있다.

        Mono<Person> result = client.get()
                .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .onStatus(HttpStatus::is4xxServerError, response -> ...)
                .onStatus(HttpStatus::is5xxServerError, response -> ...)
                .bodyToMono(Person.class);

     

    exchange()


    exchange() 메소드는 retrieve() 메소드보다 더 많은 컨트롤을 제공합니다. retrieve()와 동일한 역할을 하면서 ClientRespone 객체의 접근을 가능하게 해 response 처리를 커스터마이징하기 보다 용이합니다.

        Mono<Person> result = client.get()
                .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMap(response -> response.bodyToMono(Person.class));

    toEntity() 메소드로 ResponseEntity를 만들 수도 있다.

        Mono<ResponseEntity<Person>> result = client.get()
                .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMap(response -> response.toEntity(Person.class));

    참고로, exchange()는 retrieve()와 다르게 4xx, 5xx 응답 코드에 대한 자동적인 예외 처리가 없기 때문에 직접 응답 코드를 체크해서 처리해야 한다.

     

     Request Body


    Mono형태의 객체를 아래와 같이 request body에 넣을 수 있다.

        Mono<Person> personMono = ... ;
    
        Mono<Void> result = client.post()
                .uri("/persons/{id}", id)
                .contentType(MediaType.APPLICATION_JSON)
                .body(personMono, Person.class)
                .retrieve()
                .bodyToMono(Void.class);

    Flux형태도 마찬가지이다.

        Flux<Person> personFlux = ... ;
    
        Mono<Void> result = client.post()
                .uri("/persons/{id}", id)
                .contentType(MediaType.APPLICATION_STREAM_JSON)
                .body(personFlux, Person.class)
                .retrieve()
                .bodyToMono(Void.class);

    Mono나 Flux로 감싼 게 아닌 실제 오브젝트의 경우, syncBody() 메소드로 넣을 수 있다.

        Person person = ... ;
    
        Mono<Void> result = client.post()
                .uri("/persons/{id}", id)
                .contentType(MediaType.APPLICATION_JSON)
                .syncBody(person)
                .retrieve()
                .bodyToMono(Void.class);

    form data를 넣고 싶으면 MultiValueMap형태로 body에 넣을 수 있다. 이 경우 자동으로 콘텐트 타입이 'application/x-www-form-urlencoded'로 셋팅된다.

        MultiValueMap<String, String> formData = ... ;
    
        Mono<Void> result = client.post()
                .uri("/path", id)
                .syncBody(formData)
                .retrieve()
                .bodyToMono(Void.class);

    BodyInserters의 static 메소드를  사용하여 form data를 인라인으로 넣을 수도 있다.

        Mono<Void> result = client.post()
                .uri("/path", id)
                .body(BodyInserters.fromFormData("k1", "v1").with("k2", "v2"))
                .retrieve()
                .bodyToMono(Void.class);

    Multipart data의 경우도 MultipartBodyBuilder를 사용하여 body에 넣을 수 있다.

        MultipartBodyBuilder builder = new MultipartBodyBuilder();
        builder.part("fieldPart", "fieldValue");
        builder.part("filePart", new FileSystemResource("...logo.png"));
        builder.part("jsonPart", new Person("Jason"));
    
        Mono<Void> result = client.post()
                .uri("/path", id)
                .syncBody(builder.build())
                .retrieve()
                .bodyToMono(Void.class);

    마찬가지로 BodyInserters의 static 메소드를 사용하여 multipart date도 인라인으로 넣을 수 있다.

        Mono<Void> result = client.post()
                .uri("/path", id)
                .body(BodyInserters.fromMultipartData("fieldPart", "value").with("filePart", resource))
                .retrieve()
                .bodyToMono(Void.class);

     

    Client Filters


    request를 수정하기 위해 client filter (ExchangeFilterFunction)를 등록할 수 있다.

    WebClient client = WebClient.builder()
            .filter((request, next) -> {
    
                ClientRequest filtered = ClientRequest.from(request)
                        .header("foo", "bar")
                        .build();
    
                return next.exchange(filtered);
            })
            .build();

     

    Synchronous Use


    Webclient를 사용할 때, 때로는 동기로 데이터를 꺼내와야할 필요가 있다. 그럴 때에는 Flux와 Mono의 block()을 사용하면 된다.

    Person person = client.get().uri("/person/{id}", i).retrieve()
        .bodyToMono(Person.class)
        .block();
    
    List<Person> persons = client.get().uri("/persons").retrieve()
        .bodyToFlux(Person.class)
        .collectList()
        .block();

    그러나, 여러 개의 api 호출을 할 때에는 위의 예제처럼 각각 block()을 걸면 각각 블록킹이 되므로 WebClient의 장점이 사라진다. 여러 개의 api를 동시에 호출할 때에는 block()을 각각 하지 않고 Mono.zip()과 같은 메소드를 사용해 동시에 여러 response를 받아온 후, 그 결과를 합치는 publisher를 만들고, block()을 거기에 거는 것이 좋다.

    Mono<Person> personMono = client.get().uri("/person/{id}", personId)
            .retrieve().bodyToMono(Person.class);
    
    Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId)
            .retrieve().bodyToFlux(Hobby.class).collectList();
    
    // 아래 한 번만 block
    Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> {
                Map<String, String> map = new LinkedHashMap<>();
                map.put("person", personName);
                map.put("hobbies", hobbies);
                return map;
            })
            .block();

     

    RestTemplate과의 비교


    RestTemplate 예제

    Controller

    @RestController
    public class DemoController {
    
        @GetMapping("/hello")
        public String hello() throws InterruptedException {
            Thread.sleep(5000);
    
            return "hello";
        }
    
        @GetMapping("/world")
        public String world() throws InterruptedException {
            Thread.sleep(3000);
    
            return "world";
        }
    }

    /hello는 대략 5초가 걸리는 작업이고, /world는 3초가 걸리는 작업이다.

     

    Client

    @Slf4j
    @Component
    public class Client implements ApplicationRunner {
    
        @Override
        public void run(ApplicationArguments args) {
            RestTemplate restTemplate = new RestTemplate();
    
            long startTime = System.currentTimeMillis();
    
            String hello = restTemplate.getForObject("http://localhost:8080/hello", String.class);
            log.info(hello + ", time: " + (System.currentTimeMillis() - startTime) + "sec");
    
            long startTime2 = System.currentTimeMillis();
    
            String world = restTemplate.getForObject("http://localhost:8080/world", String.class);
            log.info(world + ", time: " + (System.currentTimeMillis() - startTime2) + "sec");
    
            log.info("end: " + (System.currentTimeMillis() - startTime) + "sec");
        }
      }

     

    출력 결과는 다음과 같다.

    /hello와 /world는 각각 5초, 3초 정도의 시간이 걸리고, blocking 방식이기 때문에 두 api의 결과를 모두 돌려받을 때 까지 걸리는 시간은 8초이다.

     

     

    Webclient 예제

    Controller

    controller는 위 예제와 동일하다.

     

    Client

    @Slf4j
    @Component
    public class Client implements ApplicationRunner {
        @Autowired
        private WebClient.Builder webClientBuilder;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            WebClient webClient = webClientBuilder.baseUrl("http://localhost:8080").build();
    
            long startTime = System.currentTimeMillis();
    
            webClient.get().uri("/hello")
                    .retrieve()
                    .bodyToMono(String.class)
                    .subscribe(ret -> {
                        log.info(ret + ", time: " + (System.currentTimeMillis() - startTime) + "sec");
                    });
    
            webClient.get().uri("/world")
                    .retrieve()
                    .bodyToMono(String.class)
                    .subscribe(ret -> {
                        log.info(ret + ", time: " + (System.currentTimeMillis() - startTime) + "sec");
                    });
    
            log.info("end: " + (System.currentTimeMillis() - startTime) + "sec");
        }
    }
    

    출력 결과는 다음과 같다.

    위 결과를 보면,각각 5초와 3초가 걸리는 api를 호출하는데, run()은 50초만에 끝나고, ("end: 50sec") 두 api의 결과를 모두 돌려받을 때 까지 걸리는 시간은 5초 정도이다. (좌측 시간 정보로 파악)

     

    그리고 스레드명을 보면, 실제 hello와 world api를 호출하여 publish한 스레드는 main이 아닌 'ctor-http-nio-<n>'라는 이름의 스레드이다. 이는 WebClient에서 non-blocking하게 요청을 처리할 수 있도록 해주는 워커 스레드들이다. hello와 world가 서로 다른 워커 스레드가 각각 실행했음을 확인할 수 있다. 이렇게 각각 요청을 별도 스레드로 동시에 처리하면 hello와 world는 각각 3초, 5초가 걸리는 작업이지만 8초가 아닌 5초의 시간으로 두 요청의 결과를 받을 수 있다. 

    'spring' 카테고리의 다른 글

    Spring Webflux - Functional Endpoints  (0) 2019.07.25
    Spring Webflux - DispatcherHandler  (0) 2019.07.18
    Spring Webflux + Reactor  (0) 2019.05.13
    Spring AMQP  (0) 2019.05.13
    JDK Dynamic Proxy vs CGLib Proxy  (0) 2019.05.13

    댓글

Designed by Tistory.