ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ⚠️ WebFlux Error 처리
    🌱 spring/🍀 webflux 2023. 5. 1. 13:39

    📌 Functional Level


    💡 Error Handling

    1. onErrorReturn

    Reactive Stream에서 Error가 발생했을 경우 정해진 Fallback value를 리턴

    public final Flux<T> onErrorReturn(T fallbackValue);
    
    public final <E extends Throwable> Flux<T> onErrorReturn(Class<E> type, T fallbackValue);
    
    public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue);
    

    3가지 조건의 다른 파라미터를 갖는 메소드가 정의되어 있다.

    1. 모든 Error에 대해 fallback value를 리턴
    2. 특정 Error Type에 대해서만 처리
    3. 특정 Predicate에 부합될 경우에만 처리

    📍 예시

    @Test
    public void test_onErrorReturn() {
    
        Mockito.when(testService.getPost(1))
               .thenReturn(Mono.just(new Post(1, "title", "content", 1)));
    
        Mockito.when(testService.getPost(2))
               .thenThrow(new RuntimeException("Mock Exception"));
    
        Flux<Post> flux = Flux.just(1, 2, 3)
                                  .flatMap(i -> testService.getPost(i))
                                  .onErrorReturn(new Post(2, "fallback", "content2", 2));
    
        flux.subscribe(post -> log.info(post.toString()));
    }

    결과

    Post(id=1, title=title, body=content, userId=1)
    Post(id=2, title=fallback, body=content2, userId=2)  // Fallback value
    

     

    onErrorReturn은 Reactive Stream 처리 도중 Error 발생 시 정해진 Fallback value를 리턴 후, 나머지 element를 처리하지 않고 stream을 종료

     

    2. onErrorResume

    Reactive Stream에서 Error가 발생했을 경우 Fallback publisher를 구독하도록 전환

    public final Flux<T> onErrorResume(Function<? super Throwable,? extends Publisher<? extends T>> fallback);
    
    public final <E extends Throwable> Flux<T> onErrorResume(Class<E> type,
                                                             Function<? super E,? extends Publisher<? extends T>> fallback);
    
    public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate,
                                       Function<? super Throwable,? extends Publisher<? extends T>> fallback);

    역시 3가지 조건의 파라미터를 갖는 메서드가 정의되어 있다.

     

    📍 예시

    @Test
    public void test_OnErrorResumeType() {
    
        Mockito.when(testService.getPost(1))
               .thenReturn(Mono.just(new Post(1, "title", "content", 1)));
    
        Mockito.when(testService.getPost(2))
               .thenThrow(new RuntimeException("Mock Exception"))
               .thenReturn(Mono.just(new Post(2, "title2", "content2", 2)));
    
        Flux<Post> flux = Flux.just(1, 2, 3)
                                  .flatMap(i -> testService.getPost(i))
                                  .onErrorResume(
                                      RuntimeException.class
                                      , throwable -> Mono.just(new Post(3, "fallback", "content3", 3))
                                  );
    
        flux.subscribe(post -> log.info(post.toString()));
    }

     

    결과

    Post(id=1, title=title1, body=body1, userId=1)
    Post(id=3, title=fallback, body=content3, userId=3)  // Fallback publisher
    

    ⇒ testService.getPost(2) 호출 시 RuntimeException 발생

    ⇒ Fallback Publisher 를 subscribe 한다.

     

    ❗ onErrorResume 은 Reactive Stream 처리 도중 Error가 발생한 경우 정해진 Fallback Function을 통해 새로운 Publisher로 대체, 나머지 element는 처리하지 않고 stream 종료

     

    3. onErrorContinue

    Reactive Stream에서 Error가 발생했을 경우 Error를 처리해주고 남은 Stream element를 계속 처리

    public final Flux<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer);
    
    public final <E extends Throwable> Flux<T> onErrorContinue(Class<E> type,
                                                               BiConsumer<Throwable,Object> errorConsumer);
    
    public final <E extends Throwable> Flux<T> onErrorContinue(Predicate<E> errorPredicate,
                                                               BiConsumer<Throwable,Object> errorConsumer);

     

    📍 예시

    @Test
    public void test_OnErrorContinuePredicate() {
    
        Mockito.when(testService.getPost(1))
               .thenReturn(Mono.just(new Post(1, "title1", "content1", 1)));
    
        Mockito.when(testService.getPost(2))
               .thenThrow(new RuntimeException("Mock Exception"));
    
        Mockito.when(testService.getPost(3))
               .thenReturn(Mono.just(new Post(3, "title3", "content3", 3)));
    
        Flux<Post> flux = Flux.just(1, 2, 3)
                                  .flatMap(i -> testService.getPost(i))
                                  .onErrorContinue(
                                      throwable -> throwable.getLocalizedMessage().contains("Mock")
                                      , (throwable, o) -> log.error("Error occurred at : {} with {} ", o, throwable.getLocalizedMessage())
                                  );
    
        flux.subscribe(post -> log.info(post.toString()));
    }
    • testService.getPost(2) 호출 시 RuntimeException 발생
      • 위 예제에서 Predicate 조건 : “Mock” 이란 단어 포함
    • → onErrorContinue의 Predicate 조건에 따라 Error logging 후 다음 Stream element 처리

     

    결과

    Post(id=1, title=title1, body=content1, userId=1)
    Error occurred at : 2 with Mock Exception      // Error 처리
    Post(id=3, title=title3, body=content3, userId=3) 
    

    ❗onErrorContinue는 Reactive Stream 처리 도중 Error 발생 시 Consumer를 통해 Error를 처리하고 나머지 element를 계속 처리한다.

     

    💡Retry

    1. retry

    retry는 Error 발생시 Reactive stream을 처음부터 다시 subscribe

    • retry를 아무 조건없이 사용하게 될 경우 계속해서 retry를 시도해 과부하가 걸릴 수 있다.

     

    ❗따라서, 최대 재시도 횟수를 제한하는 파라미터가 추가된 method를 주로 사용한다.

    • retry(long numRetries)
    • Reactive Stream에서 에러 발생 시 최대 numRetries 만큼 재시도 후, 횟수를 넘기면 에러를 리턴

     

    📍 예시

    @Test
    public void test_OnErrorRetryNtimes() {
    
        Mockito.when(tesetService.getPost(1))
               .thenReturn(Mono.just(new Post(1, "title", "content1", 1)));
    
        Mockito.when(tesetService.getPost(2))
               .thenThrow(new RuntimeException("Mock Exception"))
               .thenThrow(new RuntimeException("Mock Exception"))
               .thenReturn(Mono.just(new Post(2, "title2", "content2", 2)));
    
        Mockito.when(tesetService.getPost(3))
               .thenReturn(Mono.just(new Post(3, "title3", "content3", 3)));
    
        Flux<Post> postFlux = Flux.just(1, 2, 3)
                                  .flatMap(i -> tesetService.getPost(i))
                                  .retry(2);
    
        postFlux.subscribe(post -> log.info(post.toString()));
    }

     

    결과

    Post(id=1, title=title1, body=content1, userId=1)
    Post(id=1, title=title1, body=content1, userId=1) // retry 1
    Post(id=1, title=title1, body=content1, userId=1) // retry 2
    Post(id=2, title=title2, body=content2, userId=3)
    Post(id=3, title=title3, body=content3, userId=3)
    

     

    ❗ retry를 이용시 Reactive Stream에서 에러 발생 시 해당 Stream을 다시 Subscribe 해 재처리를 시도하고 최대 재시도 횟수를 설정 가능

     

    2. retryWhen

    retry 조건을 주고 싶을 때 RetrySpec을 통해 원하는 조건에 따른 재시도가 가능하게 해 준다.

     

    📍Retry.max

    • Retry.max(long max)retry(long numRetries) 와 같은 기능
    Flux<Post> postFlux = Flux.just(1, 2, 3)
                                  .flatMap(i -> searchService.getPost(i))
                                  .retryWhen(Retry.max(2));

    ⇒ 두 번의 재시도 후에 정상 처리

     

    📍Retry.fixedDelay

    • Retry.fixedDelay(long maxAttemps, Duration fixedDelay)
    • 정해진 Duration 동안 delay를 가진 후 maxAttemps 만큼 retry를 시도
    Flux<Post> postFlux = Flux.just(1, 2, 3)
                                  .flatMap(i -> searchService.getPost(i))
                                  .retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(2)));

    ⇒ 2초 간격으로 새로운 Scheduler에서 재시도를 처리

     

    📍Retry.backoff

    • Retry.backoff(long maxAttempts, Duration minBackoff)
    • 에러가 발생할 경우 최소 minBackoff 만큼 delay 후 maxAttemps만큼 retry를 시도
    Flux<Post> postFlux = Flux.just(1, 2, 3)
                                  .flatMap(i -> searchService.getPost(i))
                                  .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)));
    

     

    📖 Retry.fixedDelay 와 차이점

    • Retry.backoff 는 에러가 계속 발생할 경우 Delay 시간이 지수 형태로 점점 늘어난다.

     

    🌐 Global Exception 처리


    WebFlux Error를 Global 수준에서 처리하도록 하기 위해서 2가지 단계를 수행하면 된다.

    1. Customize the Global Error Response Attributes
    2. Impement the Global Error Handler

     

    BusinessException

    ResponseStatusException을 상속해 공통 처리를 받을 수 있는 에러처리 클래스를 만든다.

    public class BusinessException extends ResponseStatusException {
    
        public BusinessException(HttpStatus status) {
            super(status);
        }
    
        public BusinessException(HttpStatus status, String reason) {
            super(status, reason);
        }
    
        public BusinessException(HttpStatus status, String reason, Throwable cause) {
            super(status, reason, cause);
        }
    }
    

     

    Global Error Attributes

    DefaultErrorAttributes 는 스프링이 자동으로 만들어내는 에러를 담고 있다.

    • 해당 클래스를 통해 상속받아 확장
    @Component
    public class GlobalErrorAttributes extends DefaultErrorAttributes {
    
      @Override
        public Map<String, Object> getErrorAttributes(ServerRequest request, 
          ErrorAttributeOptions options) {
            Map<String, Object> map = super.getErrorAttributes(
              request, options);
            map.put("status", HttpStatus.BAD_REQUEST);
            map.put("message", "username is required");
            return map;
        }
    }
    

    ⚠️ @RequestParam 등으로 처리되는 필수체크 등은 스프링 에러에 담기지 않는다.

    → 해당 클래스를 통해 throwable 의 내용을 남아 사용자에게 메시지를 제대로 전달한다.

    → 사용자 정의 에러인 경우 BusinessException 을 통해 따로 처리

     

    GlobalExceptionHandler

    스프링에서 처리되는 에러처리를 중간에 가로채 따로 처리한 내용으로 리턴

    @Component
    @Order(-2)
    public class GlobalErrorWebExceptionHandler extends 
        AbstractErrorWebExceptionHandler {
    
        // constructors
    
        @Override
        protected RouterFunction<ServerResponse> getRoutingFunction(
          ErrorAttributes errorAttributes) {
    
            return RouterFunctions.route(
              RequestPredicates.all(), this::renderErrorResponse);
        }
    
        private Mono<ServerResponse> renderErrorResponse(
           ServerRequest request) {
    
           Map<String, Object> errorPropertiesMap = getErrorAttributes(request, 
             ErrorAttributeOptions.defaults());
    
           return ServerResponse.status(HttpStatus.BAD_REQUEST)
             .contentType(MediaType.APPLICATION_JSON)
             .body(BodyInserters.fromValue(errorPropertiesMap));
        }
    }
    

     

    📖 @Order(-2) 를 하는 이유

    • DefaultErrorWebExceptionHandler, 스프링의 기본에러 처리의 Order는 -1 이다.
    • 따라서, DefaultErrorWebExceptionHandler 보다 높은 우선순위를 부여하기 위함이다.

     

    참고 자료

    https://yang1s.tistory.com/m/10

    https://medium.com/@odysseymoon/spring-webflux에서-error-처리와-retry-전략-a6bd2c024f6f

    https://www.baeldung.com/spring-webflux-errors

    댓글

Designed by Tistory.