Develop/[Kafka]
[Kafka] dead letter queue
HiSmith
2023. 8. 23. 21:17
반응형
dead letter queue는 이름에서 유추 할 수 있듯이
실패한 레코드를 보관하는 별도의 큐이다.
이름에서 알 수 있듯이. 실패한 레코드를 보관하는 별도의 큐이다.
카프카는 2.0 부터 자체 plug in으로 실패한 레코드의 메타정보도 포함 시켜 저장하는 기능을 제공한다.
설정 방법은 간단하다.
Connect 설정에 아래와 같이 추가해주면 된다.
connect_dlq에서 아래와 같이 설정한다.
errors.tolerance = all
errors.deadletterqueue.topic.name = {토픽}
// 유효하지 않다고 판단된 레코드의 메타데이터 header에 저장
errors.deadletterqueue.context.headers.enable = true
// 로그 파일에 유효하지 않는 레코드 기록
errors.log.enable = true
errors.log.include.messages = true
메타 정보에는 토픽,오프셋,Exception이 있다.
convert,transform메서드를 통해서 해당 에러 내용을 변환 후 put메서드를 통해 dlq로직에 쌓는다.
위의 내용은 번거롭기 때문에 자체적으로
ErrantRecordReporter 인터페이스를 통해 발생한 레코드를 내보낸다.
따라서 해당 ErrantRecordReporter는 비동기적으로 dead letter queue로 레코드를 저장하게 된다.
private ErrantRecordReporter reporter;
@Override
public void start(Map<String, String> props) {
...
try {
reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
} catch (NoSuchMethodException | NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
reporter = null;
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
try {
// attempt to send record to data sink
process(record);
} catch(Exception e) {
if (reporter != null) {
// Send errant record to error reporter
Future<Void> future = reporter.report(record, e);
// Optionally wait till the failure's been recorded in Kafka
future.get();
} else {
// There's no error reporter, so fail
throw new ConnectException("Failed on record", e);
}
}
}
}
위와 같이, ErrantRecordReporter객체를 가져올 수 있다.
해당 객체를 가지고 있다가 레코드 처리를 하는 로직이 포함된 put() 메서드로 전달하면 된다.
해당 비동기로 처리해야되기 떄문에 Future로 받게 된다.
따라서 ErrantRecordReporter를 사용하면 에러처리를 패턴화로 균일하게 적용할 수 있고
DLQ의 설정도 그대로 사용할 수 있다는 장점이 많다.
반응형