-
ELK 를 통한 사용자 요청에 대한 로그 수집🔍 elastic search/💡 elk 2022. 11. 23. 14:41
💬 Access Log 이용
이전에는 Elasticsearch를 이용해 access log를 수집
@Controller public class UserController { @GetMapping("/api/v1/users/") public User getUserInfo(@PathVariable("id") String id) { // 블라블라 } }
- 여기서 정의된 GET /v1/orders/ 라는 API가 사용되었지만
- access log에는 다음과 같이 기록된다
127.0.0.1 - "GET /api/v1/users/A0000001 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"
127.0.0.1 - "GET /api/v1/users/A0000002 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"
127.0.0.1 - "GET /api/v1/users/A0000003 HTTP/1.1" 200 50 "-" "-" "127.0.0.1"- 3건의 로그는 서로 다른 url이지만, 모두 같은 API를 호출하고 있다.
access log를 통해서 사용자가 실제 요청한 url은 알 수 있었지만, 컨트롤러에 정의된 특정 API (url 패턴)이 호출되었는지는 알기 어렵다.
📌 HandlerMapping
스프링 웹 애플리케이션은 사용자의 요청을 처리하기 위한 Handler를 결정하는 과정에서 HandlerMapping 이라는 녀석을 사용한다.
HandlerMapping ?
- 요청과 처리 객체간의 매핑을 정의하는 객체가 구현할 인터페이스
RequestMappingHandlerMapping
- 사용자 요청을 특정한 컨트롤러 메소드와 연결해주는 역할을 한다.
RequestMappingHandlerMapping는 가장 적합하다고 판단되는 패턴을 HttpServletRequest 의 .bestMatchingPattern 이라는 이름의 속성으로 추가한다.
BEST_MATCHING_PATTERN_ATTRIBUTE
- 핸들러 매핑 내에서 가장 잘 일치하는 패턴을 포함하는 HttpServletRequest 특성의 이름
이 속성을 이용해 특정 API에 대한 호출을 확인하기 위한 로그를 남기자.
ControllerLog.java
@ToString @Getter public class ControllerLog { private String httpMethod; private String urlPattern; private ZonedDateTime requestedAt; public ControllerLog(String method, String urlPattern, ZonedDateTime requestedAt) { this.httpMethod = httpMethod; this.urlPattern = urlPattern; this.requestedAt = requestedAt; } }
Interceptor
@Slf4j @Component public class ControllerLogInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { ControllerLog controllerLog = new ControllerLog( request.getMethod(), (String) request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE), // .bestMatchingPattern ZonedDateTime.now() ); log.info(contollerLog.toString()); return super.preHandle(request, response, handler); } }
결과
[INFO ] [14:08:10.585] - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul]) [INFO ] [14:08:10.585] - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul]) [INFO ] [14:08:10.585] - ControllerLog(httpMethod=POST, urlPattern=/api/v1/users, requestedAt=2022-11-23T14:08:10.584095+09:00[Asia/Seoul])
⇒ 모두 같은 API 요청이라는 것이 한눈에 들어오게 된다.
🔍 Elasticsearch로 로그를 모으기
Elasticsearch Service 사용
- 쉽게 로그를 저장하고 탐색할 수 있는 환경이 갖춰진다.
스프링 애플리케이션에서 Http Client를 사용해 Elasticsearch Service로 로그를 전송하는 방법
애플리케이션에서 발생한 로그 데이터들은 메모리상에 모아놓고 있다가 주기적으로 한 번씩 Elasticsearch Service로 전송한다.
⇒ 요청 스레드가 아닌 별도의 스레드로 격리하여 처리하도록 해 서비스에 영향이 없어야 한다.
- 만약 Elasticsearch Service에 지연이 발생하더라도 메모리를 무한정으로 잡아먹지 않도록 해야 한다.
- 로그 데이터를 버리더라도
❓ How
❗ Reactor 또는 RxJava 를 사용해서 처리하자
@Configuration public class ControllerLogConfig { @Bean public EmitterProcessor<ControllerLog> controllerLogEmitterProcessor() { return EmitterProcessor.create(); } @Bean public FluxSink<ControllerLog> controllerLogSink(EmitterProcessor<ControllerLog> controllerLogEmitterProcessor) { return emitterProcessor.sink(FluxSink.OverflowStrategy.DROP); } @Bean public Flux<ControllerLog> controllerLogFlux(EmitterProcessor<ControllerLog> controllerLogEmitterProcessor) { return emitterProcessor.publishOn(Schedulers.elastic()); } }
로그는 FluxSink<ControllerLog> 를 사용하여 발행하면 되고
Flux<ControllerLog>를 사용하여 적당히 가공하여 사용한다.
사용하는 쪽에서는 Reactor에서 제공하는 엘라스틱 스레드를 사용해 요청 스레드를 블로킹하지 않는다.
Interceptor 수정
@Component public class ControllerLogInterceptor extends HandlerInterceptorAdapter { private final FluxSink<ControllerLog> controllerLogSink; public ControllerLogInterceptor(FluxSink<ControllerLog> controllerLogSink) { this.controllerLogSink = controllerLogSink; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { ControllerLog controllerLog = new ControllerLog( request.getMethod(), (String) request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE), ZonedDateTime.now() ); controllerLogSink.next(controllerLog); return super.preHandle(request, response, handler); } }
- ControllerLogInterceptor 에서 Slf4j 를 이용해 로그를 남기는 것 대신
- FluxSink<ControllerLog> 를 사용해 로그를 발행한다.
Slf4jControllerLogProcessor 컴포넌트
@Slf4j @Component public class Slf4jControllerLogProcessor { private final Flux<ControllerLog> controllerLogFlux; private Disposable disposable; public Slf4jControllerLogProcessor(Flux<ControllerLog> controllerLogFlux) { this.controllerLogFlux = controllerLogFlux; } @PostConstruct public void init() { disposable = controllerLogFlux.subscribe(it -> log.info(it.toString())); } @PreDestroy public void destroy() { if (disposable != null) { disposable.dispose(); } } }
- 기존 처럼 slf4j 로그를 남기기 위한 컴포넌트 추가
Elasticsearch Service에 로그 데이터 전송하기 위한 컴포넌트
@Slf4j @Component public class EsControllerLogProcessor { private final Flux<ControllerLog> controllerLogFlux; private final EsLogSender esLogSender; private Disposable disposable; public EsControllerLogProcessor(Flux<ControllerLog> EsLogSender esLogSender) { this.controllerLogFlux = controllerLogFlux; this.esLogSender = esLogSender; } @PostConstruct public void init() { disposable = logFlux.bufferTimeout(1000, Duration.ofSeconds(30), Schedulers.elastic()) .filter(it -> !CollectionUtils.isEmpty(it)) .subscribe(esLogSender::send); } @PreDestroy public void destroy() { if (disposable != null) { disposable.dispose(); } } }
- 발행된 로그를 Elasticsearch Service로 전송하는 것은 비용이 크다.
- 따라서, 여러건의 로그를 묶어 전송하는 것이 효율적이다.
bufferTimeout()
- 정해진 시간 (위 코드에서는 30초) 또는 1,000 개의 로그가 모일 때마다 로그를 하나의 리스트로 묶어 발행하는 Flux 를 반환
- 빈 리스트가 반환되는 경우(로그가 없는 경우) filter() 를 통해 걸러준다.
payload를 생성하고 Elasticsearch Service에 전송
@Slf4j @Component public class EsLogSender { public static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); private final RestHighLevelClient client; private final ObjectMapper mapper; public EsLogSender(RestHighLevelClient client, ObjectMapper mapper) { this.client = client; this.mapper = mapper; } public void send(List<ControllerLog> list) { try { String indexName = "controller-log-" + LocalDate.now().format(FORMATTER); BulkRequest request = new BulkRequest(); for (ControllerLog each : list) { request.add(new IndexRequest(indexName, "doc").source(mapper.writeValueAsBytes(each), XContentType.JSON)); } client.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse response) { log.debug("Send to logs to ES."); } @Override public void onFailure(Exception e) { log.warn("Exception in send to logs to ES.", e); } }); } catch (Exception e) { log.warn("Exception in send to logs to ES.", e); } } }
logback.xml 설정
<?xml version="1.0" encoding="UTF-8" ?> <configuration> <appender name="STASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <destination>127.0.0.1:4560</destination> <encoder class="net.logstash.logback.encoder.LogstashEncoder" /> </appender> <root level="info"> <appender-ref ref="STASH"/> </root> </configuration>
결과
참고 자료
https://techblog.woowahan.com/2610/
https://opendistro.github.io/for-elasticsearch-docs/docs/install/plugins/
'🔍 elastic search > 💡 elk' 카테고리의 다른 글
ELK 구축 및 Filebeat로 로그 전송 (0) 2022.11.22