DevBoi

[Kafka] dead letter queue 본문

Develop/[Kafka]

[Kafka] dead letter queue

HiSmith 2023. 8. 23. 21:17
반응형

dead letter queue는 이름에서 유추 할 수 있듯이 

실패한 레코드를 보관하는 별도의 큐이다.

 

이름에서 알 수 있듯이. 실패한 레코드를 보관하는 별도의 큐이다.

카프카는 2.0 부터 자체 plug in으로 실패한 레코드의 메타정보도 포함 시켜 저장하는 기능을 제공한다.

설정 방법은 간단하다.

Connect  설정에 아래와 같이 추가해주면 된다.

connect_dlq에서 아래와 같이 설정한다.

errors.tolerance = all
errors.deadletterqueue.topic.name = {토픽}

// 유효하지 않다고 판단된 레코드의 메타데이터 header에 저장
errors.deadletterqueue.context.headers.enable = true

// 로그 파일에 유효하지 않는 레코드 기록
errors.log.enable = true
errors.log.include.messages = true

 

메타 정보에는 토픽,오프셋,Exception이 있다.

convert,transform메서드를 통해서 해당 에러 내용을 변환 후 put메서드를 통해 dlq로직에 쌓는다.

위의 내용은 번거롭기 때문에 자체적으로

ErrantRecordReporter 인터페이스를 통해 발생한 레코드를 내보낸다.

따라서 해당 ErrantRecordReporter는 비동기적으로 dead letter queue로 레코드를 저장하게 된다.

private ErrantRecordReporter reporter;
 
@Override
public void start(Map<String, String> props) {
  ...
  try {
    reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
  } catch (NoSuchMethodException | NoClassDefFoundError e) {
    // Will occur in Connect runtimes earlier than 2.6
    reporter = null;
  }
}
 
@Override
public void put(Collection<SinkRecord> records) {
  for (SinkRecord record: records) {
    try {
      // attempt to send record to data sink
      process(record);
    } catch(Exception e) {
      if (reporter != null) {
        // Send errant record to error reporter
        Future<Void> future = reporter.report(record, e);
        // Optionally wait till the failure's been recorded in Kafka
        future.get();
      } else {
        // There's no error reporter, so fail
        throw new ConnectException("Failed on record", e);
      }
    }
  }
}

 

위와 같이, ErrantRecordReporter객체를 가져올 수 있다.

해당 객체를 가지고 있다가 레코드 처리를 하는 로직이 포함된 put() 메서드로 전달하면 된다.

해당 비동기로 처리해야되기 떄문에 Future로 받게 된다.

 

따라서 ErrantRecordReporter를 사용하면 에러처리를 패턴화로 균일하게 적용할 수 있고

DLQ의 설정도 그대로 사용할 수 있다는 장점이 많다.

 

 

반응형

'Develop > [Kafka]' 카테고리의 다른 글

[Kafka] 파티셔너  (0) 2023.08.31
[Kafka] 프로듀서  (0) 2023.08.31
[Kafka] 토픽을 생성하는 방법들  (0) 2023.08.13
[Kafka] 카프카 기타 쉘들  (0) 2023.08.13
[Kafka] consumer-groups  (0) 2023.08.13