ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ELK 를 통한 사용자 요청에 대한 로그 수집
    🔍 elastic search/💡 elk 2022. 11. 23. 14:41

    💬 Access Log 이용


    이전에는 Elasticsearch를 이용해 access log를 수집

    @Controller
    public class UserController {
        @GetMapping("/api/v1/users/")
        public User getUserInfo(@PathVariable("id") String id) {
            // 블라블라
        }
    }
    • 여기서 정의된 GET /v1/orders/ 라는 API가 사용되었지만
    • access log에는 다음과 같이 기록된다
    127.0.0.1 -  "GET /api/v1/users/A0000001 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"
    127.0.0.1 - "GET /api/v1/users/A0000002 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"
    127.0.0.1 - "GET /api/v1/users/A0000003 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"
    • 3건의 로그는 서로 다른 url이지만, 모두 같은 API를 호출하고 있다.

    access log를 통해서 사용자가 실제 요청한 url은 알 수 있었지만, 컨트롤러에 정의된 특정 API (url 패턴)이 호출되었는지는 알기 어렵다.

     

    📌 HandlerMapping


    스프링 웹 애플리케이션은 사용자의 요청을 처리하기 위한 Handler를 결정하는 과정에서 HandlerMapping 이라는 녀석을 사용한다.

    HandlerMapping ?

    • 요청과 처리 객체간의 매핑을 정의하는 객체가 구현할 인터페이스

    RequestMappingHandlerMapping

    • 사용자 요청을 특정한 컨트롤러 메소드와 연결해주는 역할을 한다.

     

    RequestMappingHandlerMapping는 가장 적합하다고 판단되는 패턴을 HttpServletRequest 의 .bestMatchingPattern 이라는 이름의 속성으로 추가한다.

    BEST_MATCHING_PATTERN_ATTRIBUTE

    • 핸들러 매핑 내에서 가장 잘 일치하는 패턴을 포함하는 HttpServletRequest 특성의 이름

    이 속성을 이용해 특정 API에 대한 호출을 확인하기 위한 로그를 남기자.

     

    ControllerLog.java

    @ToString
    @Getter
    public class ControllerLog {
        private String httpMethod;
        private String urlPattern;
        private ZonedDateTime requestedAt;
    
        public ControllerLog(String method, String urlPattern, ZonedDateTime requestedAt) {
            this.httpMethod = httpMethod;
            this.urlPattern = urlPattern;
            this.requestedAt = requestedAt;
        }
    }
    

     

    Interceptor

    @Slf4j
    @Component
    public class ControllerLogInterceptor extends HandlerInterceptorAdapter {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            ControllerLog controllerLog = new ControllerLog(
                request.getMethod(),
                (String) request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE), // .bestMatchingPattern
                ZonedDateTime.now()
            );
    
            log.info(contollerLog.toString());
    
            return super.preHandle(request, response, handler);
        }
    
    }

    결과

    [INFO ] [14:08:10.585]  - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul])
    [INFO ] [14:08:10.585]  - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul])
    [INFO ] [14:08:10.585]  - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul])

    ⇒ 모두 같은 API 요청이라는 것이 한눈에 들어오게 된다.

     

    🔍 Elasticsearch로 로그를 모으기


    Elasticsearch Service 사용

    • 쉽게 로그를 저장하고 탐색할 수 있는 환경이 갖춰진다.

    스프링 애플리케이션에서 Http Client를 사용해 Elasticsearch Service로 로그를 전송하는 방법

    애플리케이션에서 발생한 로그 데이터들은 메모리상에 모아놓고 있다가 주기적으로 한 번씩 Elasticsearch Service로 전송한다.

    ⇒ 요청 스레드가 아닌 별도의 스레드로 격리하여 처리하도록 해 서비스에 영향이 없어야 한다.

    • 만약 Elasticsearch Service에 지연이 발생하더라도 메모리를 무한정으로 잡아먹지 않도록 해야 한다.
      • 로그 데이터를 버리더라도

    ❓ How

    ❗ Reactor 또는 RxJava 를 사용해서 처리하자

    @Configuration
    public class ControllerLogConfig {
        @Bean
        public EmitterProcessor<ControllerLog> controllerLogEmitterProcessor() {
            return EmitterProcessor.create();
        }
    
        @Bean
        public FluxSink<ControllerLog> controllerLogSink(EmitterProcessor<ControllerLog> controllerLogEmitterProcessor) {
            return emitterProcessor.sink(FluxSink.OverflowStrategy.DROP);
        }
    
        @Bean
        public Flux<ControllerLog> controllerLogFlux(EmitterProcessor<ControllerLog> controllerLogEmitterProcessor) {
            return emitterProcessor.publishOn(Schedulers.elastic());
        }
    }
    

    로그는 FluxSink<ControllerLog> 를 사용하여 발행하면 되고

    Flux<ControllerLog>를 사용하여 적당히 가공하여 사용한다.

     

    사용하는 쪽에서는 Reactor에서 제공하는 엘라스틱 스레드를 사용해 요청 스레드를 블로킹하지 않는다.

     

    Interceptor 수정

    @Component
    public class ControllerLogInterceptor extends HandlerInterceptorAdapter {
        private final FluxSink<ControllerLog> controllerLogSink;
    
        public ControllerLogInterceptor(FluxSink<ControllerLog> controllerLogSink) {
            this.controllerLogSink = controllerLogSink;
        }
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            ControllerLog controllerLog = new ControllerLog(
                request.getMethod(),
                (String) request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE),
                ZonedDateTime.now()
            );
    
            controllerLogSink.next(controllerLog);
    
            return super.preHandle(request, response, handler);
        }
    }
    • ControllerLogInterceptor 에서 Slf4j 를 이용해 로그를 남기는 것 대신
    • FluxSink<ControllerLog> 를 사용해 로그를 발행한다.

    Slf4jControllerLogProcessor 컴포넌트

    @Slf4j
    @Component
    public class Slf4jControllerLogProcessor {
    
        private final Flux<ControllerLog> controllerLogFlux;
    
        private Disposable disposable;
    
        public Slf4jControllerLogProcessor(Flux<ControllerLog> controllerLogFlux) {
            this.controllerLogFlux = controllerLogFlux;
        }
    
        @PostConstruct
        public void init() {
            disposable = controllerLogFlux.subscribe(it -> log.info(it.toString()));
        }
    
        @PreDestroy
        public void destroy() {
            if (disposable != null) {
                disposable.dispose();
            }
        }
    }
    • 기존 처럼 slf4j 로그를 남기기 위한 컴포넌트 추가

    Elasticsearch Service에 로그 데이터 전송하기 위한 컴포넌트

    @Slf4j
    @Component
    public class EsControllerLogProcessor {    
        private final Flux<ControllerLog> controllerLogFlux;
    
        private final EsLogSender esLogSender;
    
        private Disposable disposable;
    
        public EsControllerLogProcessor(Flux<ControllerLog> EsLogSender esLogSender) {
            this.controllerLogFlux = controllerLogFlux;
            this.esLogSender = esLogSender;
        }
    
        @PostConstruct
        public void init() {
            disposable = logFlux.bufferTimeout(1000, Duration.ofSeconds(30), Schedulers.elastic()) 
                    .filter(it -> !CollectionUtils.isEmpty(it))
                    .subscribe(esLogSender::send);
        }
    
        @PreDestroy
        public void destroy() {
            if (disposable != null) {
                disposable.dispose();
            }
        }
    }
    • 발행된 로그를 Elasticsearch Service로 전송하는 것은 비용이 크다.
    • 따라서, 여러건의 로그를 묶어 전송하는 것이 효율적이다.

    bufferTimeout()

    • 정해진 시간 (위 코드에서는 30초) 또는 1,000 개의 로그가 모일 때마다 로그를 하나의 리스트로 묶어 발행하는 Flux 를 반환
    • 빈 리스트가 반환되는 경우(로그가 없는 경우) filter() 를 통해 걸러준다.

    payload를 생성하고 Elasticsearch Service에 전송

    @Slf4j
    @Component
    public class EsLogSender {
        public static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
    
        private final RestHighLevelClient client;
    
        private final ObjectMapper mapper;
    
        public EsLogSender(RestHighLevelClient client, ObjectMapper mapper) {
            this.client = client;
            this.mapper = mapper;
        }
    
        public void send(List<ControllerLog> list) {
            try {
                String indexName = "controller-log-" + LocalDate.now().format(FORMATTER);
                BulkRequest request = new BulkRequest();
                for (ControllerLog each : list) {
                    request.add(new IndexRequest(indexName, "doc").source(mapper.writeValueAsBytes(each), XContentType.JSON));
                }
    
                client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
                    @Override
                    public void onResponse(BulkResponse response) {
                        log.debug("Send to logs to ES.");
                    }
    
                    @Override
                    public void onFailure(Exception e) {
                        log.warn("Exception in send to logs to ES.", e);
                    }
                });
            } catch (Exception e) {
                log.warn("Exception in send to logs to ES.", e);
            }
        }
    }
    

    logback.xml 설정

    <?xml version="1.0" encoding="UTF-8" ?>
    <configuration> 
      <appender name="STASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:4560</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
      </appender>
    
      <root level="info">
          <appender-ref ref="STASH"/>
       </root>
    </configuration>
    

     

    결과

     

    참고 자료

    https://techblog.woowahan.com/2610/

    https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/HandlerMapping.html

    https://opendistro.github.io/for-elasticsearch-docs/docs/install/plugins/

    https://techblog.woowahan.com/2659/

    https://mangkyu.tistory.com/197

    '🔍 elastic search > 💡 elk' 카테고리의 다른 글

    ELK 구축 및 Filebeat로 로그 전송  (0) 2022.11.22

    댓글

Designed by Tistory.