입사 1년차 백엔드 개발자의 Apache Kafka 컨트리뷰션 분석기
카프카를 "쓰기만 하던" 개발자#
입사한 곳에서 카프카를 쓰고 있었습니다. Producer로 이벤트 보내고, @KafkaListener로 받아서 처리하는 정도.
@KafkaListener(topics = "some-topic", groupId = "some-group")
public void consume(String message) {
// 처리 로직
}이 정도면 업무는 됩니다. 토픽에 메시지 보내고, 리스너 달아서 받고, 잘 돌아가면 끝. 카프카 내부가 어떻게 돌아가는지는 몰라도 당장 문제될 건 없었습니다.
근데 코드를 보다 보면 이상한 게 눈에 들어옵니다. Consumer 설정에 MAX_POLL_RECORDS가 전체 Consumer에 1로 고정돼 있었습니다.
MAX_POLL_RECORDS는 Consumer가 poll() 한 번에 브로커에서 가져오는 최대 레코드 수입니다. 카프카 기본값은 500이고, 네트워크 왕복 한 번에 수백 건을 배치로 가져오는 게 카프카가 높은 처리량을 내는 핵심 원리입니다. 근데 이게 1로 돼 있으니 poll() 할 때마다 1건만 가져오고 있었습니다. 카프카의 배치 처리를 완전히 버리고 SQS처럼 한 건씩 꺼내 쓰고 있었던 겁니다. 왜 이렇게 설정했는지 아는 사람도 없었습니다.
카프카를 쓰고 있긴 한데, 카프카를 모르고 있었습니다.
소스코드를 읽게 된 계기#
신규 프로젝트에서 카프카를 직접 다뤄야 하는 상황이 생겼습니다. 기존에는 누군가 세팅해둔 카프카에 리스너만 붙이면 됐는데, 이번엔 토픽 설계부터 파이프라인 구성까지 직접 해야 했습니다.
직접 만지다 보니 궁금한 게 생겼습니다. 파티션은 어떤 기준으로 메시지를 분배하는지, Consumer Group 리밸런싱은 어떻게 되는지, 브로커가 죽으면 데이터는 어떻게 되는지. 공식 문서를 읽어봤는데 "왜 이렇게 동작하는가"까지는 안 나와 있었습니다.
그래서 소스코드를 열었습니다.
apache/kafka 소스코드 읽기#
apache/kafka 레포를 클론했습니다. 처음엔 막막했습니다. 파일이 수천 개인데 어디서부터 봐야 할지 감이 안 잡혔습니다.
진입점은 회사에서 쓰는 기능이었습니다. KafkaConsumer의 poll()이 내부적으로 뭘 하는지, ConsumerGroup 리밸런싱은 어떤 코드가 담당하는지 — 업무에서 이미 접한 개념이라 추적하기가 수월했습니다.
읽다 보니 공식 문서에서 한 줄로 넘어간 부분이 실제로는 수백 줄짜리 로직인 경우가 많았습니다. 그러면서 "이건 왜 이렇게 짰지?", "여기 동시성 문제 있는 거 아닌가?" 같은 의문이 생기기 시작했습니다.
그 의문 중 하나가 진짜 버그였습니다.
KAFKA-15154 — Sensor.checkQuotas() 동시성 버그#
카프카 메트릭 시스템의 Sensor 클래스를 읽고 있었습니다. 처리량이나 지연 시간 같은 메트릭을 기록하고, quota 초과를 체크하는 컴포넌트입니다.
코드를 보다가 이상한 걸 발견했습니다.
public class Sensor {
// metrics 필드는 LinkedHashMap
private final Map<CompoundStat, List<Stat>> metrics = new LinkedHashMap<>();
// record()는 synchronized로 보호
public synchronized void record(double value, long timeMs) {
for (Stat stat : this.metrics.values()) {
stat.record(value, timeMs);
}
}
// checkQuotas()는... synchronized가 없다?
public void checkQuotas(long timeMs) {
for (KafkaMetric metric : this.metrics.values()) {
// LinkedHashMap을 lock 없이 순회
MetricConfig config = metric.config();
...
}
}
}record()는 synchronized로 보호하는데, checkQuotas()는 같은 metrics 맵을 동기화 없이 순회하고 있었습니다. LinkedHashMap은 thread-safe하지 않아서, record()가 맵을 수정하는 동안 checkQuotas()가 순회하면 ConcurrentModificationException이 터질 수 있습니다.
찾아보니 카프카 커미터 Divij Vaidya가 직접 리포트한 이슈(KAFKA-15154)였습니다. 소스코드는 Sensor.java에 있습니다. 이전에 PR이 두 개 올라왔는데 둘 다 머지 안 됐습니다.
재현#
재현 테스트를 짰습니다.
// 스레드 1: record() 반복 호출
// 스레드 2: checkQuotas() 반복 호출
// → 100회 중 100회 ConcurrentModificationException100/100 재현.
해결 방법 분석#
수정 방법은 세 가지였습니다.
| 방법 | 장단점 |
|---|---|
checkQuotas()에 synchronized 추가 | 간단하지만 record()와 lock 경합. record()는 메시지마다 호출(초당 수만 번)되니까 성능 저하 |
StampedLock 도입 | 유연하지만 변경 범위가 큼. 이전 PR에서 시도했는데 리뷰 0개로 방치 |
| Defensive copy | 복사 순간만 lock, 순회는 lock-free. 같은 클래스에서 이미 쓰고 있는 패턴 |
Defensive copy가 맞다고 봤습니다. metrics의 스냅샷을 synchronized 안에서 복사하고, 복사본으로 순회하는 방식입니다. 복사하는 짧은 순간만 lock을 잡으니 record()와의 경합이 최소화됩니다. 같은 클래스의 다른 메서드에서 이미 쓰고 있는 패턴이라 일관성도 맞았습니다.
public void checkQuotas(long timeMs) {
List<KafkaMetric> snapshot;
synchronized (this) {
snapshot = new ArrayList<>(this.metrics.values());
}
// snapshot으로 순회 — lock 없이 안전
for (KafkaMetric metric : snapshot) {
...
}
}이전 PR이 실패한 이유#
이전 PR 두 개를 뜯어봤습니다.
- PR #13969:
synchronized방식. 리뷰어가 "재현 테스트 추가해달라" 했는데 반영 안 해서 stale - PR #14837:
StampedLock방식. 변경 범위가 커서인지 리뷰 코멘트 0개. 그대로 묻힘
재현 테스트가 있어야 하고, 변경 범위는 작을수록 좋습니다.
이 이슈는 분석을 마친 시점에 이미 다른 개발자가 assign돼 있어서 PR은 안 올렸습니다.
KAFKA-20398 — Kafka Streams 메모리 누수#
두 번째로 파고든 건 Kafka Streams 쪽이었습니다.
Kafka Streams란#
카프카 토픽의 데이터를 실시간으로 변환/처리하는 Java 라이브러리입니다. 일반 Consumer로도 같은 일을 할 수 있지만, 상태 관리, 장애 복구, 파티션 리밸런싱, exactly-once 처리 같은 걸 프레임워크가 대신 해줍니다.
// 입력 토픽에서 읽어서 → 필터하고 → 변환해서 → 출력 토픽에 쓴다
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();별도 클러스터 없이 그냥 Java 라이브러리라서, KafkaStreams 인스턴스 만들고 start() 하면 끝입니다. 내부적으로는 이런 구조로 돌아갑니다:
StreamThread가 실제 처리를 담당하는 스레드이고, 파티션 하나당 Task 하나가 매핑됩니다. 이 StreamThread의 라이프사이클에서 버그가 터졌습니다.
배경: REPLACE_THREAD#
Kafka Streams에는 스트림 처리 중 예외가 터졌을 때 대응 전략을 설정할 수 있습니다. REPLACE_THREAD는 죽은 스레드를 새 스레드로 교체해서 처리를 이어가는 전략입니다.
streams.setUncaughtExceptionHandler(exception -> {
log.warn("예외 발생, 스레드 교체: {}", exception.getMessage());
return StreamThreadExceptionResponse.REPLACE_THREAD;
});KAFKA-20398은 이 REPLACE_THREAD가 반복되면 메모리가 계속 올라가서 OutOfMemoryError가 난다는 리포트였습니다.
재현 환경 구축#
일단 직접 봐야 했습니다. 재현용 프로젝트를 만들었습니다.
// 메시지 처리할 때마다 강제로 예외를 던져서 REPLACE_THREAD 반복 유도
static class NoopProcessor implements Processor<String, String, Void, Void> {
@Override
public void process(Record<String, String> record) {
log.info("key={} value={}", record.key(), record.value());
throw new RuntimeException("forced failure");
}
}KRaft 모드로 로컬 브로커를 띄우고, 128MB 힙으로 앱을 실행한 뒤 토픽에 메시지를 하나 넣었습니다. 스레드가 죽고 → 교체되고 → 같은 메시지를 다시 처리하고 → 또 죽고 → 또 교체되는 루프가 시작됩니다.
버전별 검증#
여러 카프카 버전에서 같은 조건으로 테스트했습니다.
| 클라이언트 버전 | 브로커 버전 | 결과 |
|---|---|---|
| 3.9.1 | 3.9.1 | 24시간 후에도 정상 |
| 4.0.1 | 4.0.1 | 약 9분 만에 OOM |
| 4.0.2 | 4.0.1 | 약 9분 만에 OOM |
| 4.1.2 | 4.0.1 | 약 3.5시간 후 OOM |
| 4.2.0 | 4.0.1 | 약 3.5시간 후 OOM |
3.9.x까지는 문제없고 4.0부터 터집니다. 4.0에서 뭔가 바뀐 겁니다.
근본 원인 추적#
1단계: heap histogram으로 뭐가 새는지 확인
OOM이 나니까 일단 뭐가 메모리를 먹고 있는지 봐야 했습니다. 앱을 띄워두고 스레드 교체가 18번 정도 일어난 시점에 jmap으로 heap histogram을 떴습니다.
jmap -histo:live $(jps | grep App | awk '{print $1}') \
| grep -E "StreamThread|ClassicKafkaConsumer|DelegatingReporter|Sensor"결과를 보고 바로 감이 왔습니다.
| 클래스 | 인스턴스 수 |
|---|---|
| StreamThread | 19 |
| ClassicKafkaConsumer | 38 |
| TaskManager | 19 |
| DelegatingReporter | 19 |
| Sensor | 2,493 |
| KafkaMetric | 5,818 |
스레드 교체가 18번 일어났는데 StreamThread가 19개. 현재 살아있는 1개 + 죽은 18개가 GC에 안 잡히고 전부 살아있었습니다. Consumer도 38개(스레드당 main + restore 2개), Sensor가 2,493개. 뭔가가 죽은 스레드의 참조를 붙잡고 있어서 GC가 회수를 못 하는 겁니다.
2단계: 뭐가 참조를 붙잡고 있는지 추적
DelegatingReporter가 19개인 게 눈에 띄었습니다. StreamsThreadMetricsDelegatingReporter라는 클래스인데, 스레드마다 하나씩 만들어지는 metrics reporter입니다. 이게 왜 GC에 안 잡히는지 StreamThread.java의 생성 코드를 따라갔습니다.
// StreamThread.create() 내부
StreamsThreadMetricsDelegatingReporter reporter =
new StreamsThreadMetricsDelegatingReporter(...);
metrics.addReporter(reporter); // Metrics 레지스트리에 등록Metrics 레지스트리는 앱 전체에서 공유되는 싱글톤이고, 여기에 addReporter로 등록하면 레지스트리가 reporter의 참조를 들고 있습니다. reporter는 자기 스레드의 Consumer 참조를 갖고 있고, Consumer는 Sensor, KafkaMetric 등을 갖고 있습니다. 체인으로 전부 연결돼 있어서 reporter 하나만 안 지워지면 스레드에 딸린 모든 객체가 GC 대상에서 빠집니다.
레지스트리가 GC root이기 때문에 여기서 참조가 끊기지 않으면 하위 객체가 전부 살아남습니다. 그래서 completeShutdown() — 스레드 종료 시 호출되는 메서드 — 을 확인했습니다. removeReporter 호출이 없었습니다.
3단계: 언제 도입됐는지 확인
이 reporter가 3.9.x에는 없고 4.0에서 생긴 건지 확인했습니다. KAFKA-17248에 의해 PR #17021에서 telemetry pipeline용으로 추가된 코드였고, 4.0.0부터 포함됐습니다. 버전별 테스트 결과와 정확히 일치합니다.
정리하면:
StreamThread.java의 create()에서 등록하고, completeShutdown()에서 제거해야 하는데 제거 코드가 없었습니다.
수정과 검증#
수정은 StreamThread.completeShutdown()에서 removeReporter를 호출하는 겁니다.
// StreamThread.java — completeShutdown() 내부
if (streamsThreadMetricsDelegatingReporter != null) {
metrics.removeReporter(streamsThreadMetricsDelegatingReporter);
}수정한 코드를 publishToMavenLocal로 로컬 빌드하고, 같은 재현 프로젝트에서 같은 조건으로 돌렸습니다. 18번 교체 후 다시 jmap histogram을 떴습니다.
| 클래스 | 수정 전 (18회 교체) | 수정 후 (18회 교체) |
|---|---|---|
| StreamThread | 19 | 2 |
| ClassicKafkaConsumer | 38 | 4 |
| TaskManager | 19 | 2 |
| DelegatingReporter | 19 | 0 |
| Sensor | 2,493 | 283 |
| KafkaMetric | 5,818 | 684 |
수정 후에는 교체 횟수와 관계없이 인스턴스가 2개(현재 active thread + global thread)로 유지됐습니다. DelegatingReporter가 0개인 건, shutdown 시 제거되니까 살아있는 스레드의 reporter만 남고 나머지는 전부 GC된 겁니다.
하루 차이#
PR을 정리하려던 참에 확인해보니, 하루 전에 다른 개발자가 같은 수정으로 PR을 올린 상태였습니다.
PR #21973 — sstremler라는 개발자가 올렸는데, 원인 분석, 수정 위치, 수정 방법, 검증 결과까지 거의 동일했습니다. StreamThread에 reporter를 필드로 저장하고, completeShutdown()에서 removeReporter 호출.
이 PR은 Kafka Streams 커미터 Matthias J. Sax, Bill Bejeck 리뷰 후 다음 날 trunk에 머지됐습니다. 4.3에 cherry-pick되고, 4.2 백포트용 PR #21989도 바로 올라왔습니다.
아쉽긴 합니다. 근데 독립적으로 같은 결론이 나왔으니 분석 자체가 틀린 건 아니었습니다.
배운 것들#
소스코드를 읽자#
공식 문서를 열 번 읽는 것보다 소스코드 한 번 따라가는 게 빠릅니다. "Consumer Group 리밸런싱"을 문서로 읽으면 추상적인데, 코드에서 ConsumerCoordinator가 JoinGroupRequest를 보내고 SyncGroupResponse를 받는 흐름을 직접 보면 바로 와닿습니다.
버그는 대단한 게 아니다#
KAFKA-15154는 synchronized 하나 빠진 거고, KAFKA-20398은 removeReporter 호출 하나 빠진 겁니다. 수정 자체는 단순합니다. 어려운 건 "여기가 문제다"를 찾아가는 과정입니다.
선점당해도 괜찮다#
두 이슈 다 결과적으로 PR을 올리지 못했습니다. 근데 분석하면서 카프카 메트릭 시스템의 동시성 모델을 이해하게 됐고, Kafka Streams 스레드 라이프사이클도 깊이 파게 됐습니다.
apache/kafka도 결국 코드다#
apache/kafka라고 하면 엄청난 실력이 필요할 것 같은데, 해보니까 진입 장벽은 코드를 열어보느냐 마느냐였습니다. 열어보면 생각보다 읽을 만하고, 읽다 보면 이상한 게 보입니다.