반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Axon framework
- JPA
- Flutter
- DDD
- 스프링부트공부
- 알고리즘공부
- JPA스터디
- nestjs스터디
- nestjs
- 기술면접공부
- K8S
- 자료구조공부
- Kafka
- 기술공부
- 스프링부트
- 카프카
- nestjs공부
- 스프링
- 자바공부
- 스프링공부
- JPA예제
- 코테공부
- querydsl
- 프로그래머스
- JPA공부
- 스프링 공부
- JPA 공부
- 코테준비
- 플러터 공부
- 플러터 개발
Archives
- Today
- Total
DevBoi
[Kafka] dead letter queue 본문
반응형
dead letter queue는 이름에서 유추 할 수 있듯이
실패한 레코드를 보관하는 별도의 큐이다.
이름에서 알 수 있듯이. 실패한 레코드를 보관하는 별도의 큐이다.
카프카는 2.0 부터 자체 plug in으로 실패한 레코드의 메타정보도 포함 시켜 저장하는 기능을 제공한다.
설정 방법은 간단하다.
Connect 설정에 아래와 같이 추가해주면 된다.
connect_dlq에서 아래와 같이 설정한다.
errors.tolerance = all
errors.deadletterqueue.topic.name = {토픽}
// 유효하지 않다고 판단된 레코드의 메타데이터 header에 저장
errors.deadletterqueue.context.headers.enable = true
// 로그 파일에 유효하지 않는 레코드 기록
errors.log.enable = true
errors.log.include.messages = true
메타 정보에는 토픽,오프셋,Exception이 있다.
convert,transform메서드를 통해서 해당 에러 내용을 변환 후 put메서드를 통해 dlq로직에 쌓는다.
위의 내용은 번거롭기 때문에 자체적으로
ErrantRecordReporter 인터페이스를 통해 발생한 레코드를 내보낸다.
따라서 해당 ErrantRecordReporter는 비동기적으로 dead letter queue로 레코드를 저장하게 된다.
private ErrantRecordReporter reporter;
@Override
public void start(Map<String, String> props) {
...
try {
reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
} catch (NoSuchMethodException | NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
reporter = null;
}
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record: records) {
try {
// attempt to send record to data sink
process(record);
} catch(Exception e) {
if (reporter != null) {
// Send errant record to error reporter
Future<Void> future = reporter.report(record, e);
// Optionally wait till the failure's been recorded in Kafka
future.get();
} else {
// There's no error reporter, so fail
throw new ConnectException("Failed on record", e);
}
}
}
}
위와 같이, ErrantRecordReporter객체를 가져올 수 있다.
해당 객체를 가지고 있다가 레코드 처리를 하는 로직이 포함된 put() 메서드로 전달하면 된다.
해당 비동기로 처리해야되기 떄문에 Future로 받게 된다.
따라서 ErrantRecordReporter를 사용하면 에러처리를 패턴화로 균일하게 적용할 수 있고
DLQ의 설정도 그대로 사용할 수 있다는 장점이 많다.
반응형
'Develop > [Kafka]' 카테고리의 다른 글
[Kafka] 파티셔너 (0) | 2023.08.31 |
---|---|
[Kafka] 프로듀서 (0) | 2023.08.31 |
[Kafka] 토픽을 생성하는 방법들 (0) | 2023.08.13 |
[Kafka] 카프카 기타 쉘들 (0) | 2023.08.13 |
[Kafka] consumer-groups (0) | 2023.08.13 |