비동기 Job API 설계 시 놓치기 쉬운 것들 — BullMQ, Celery는 왜 그렇게 만들었나
배경 — 비동기 Job 폴링 패턴이란#
LLM 호출, 대량 데이터 변환, 외부 API 배치 호출. 이런 작업들은 수십 초에서 몇 분까지 걸립니다. 동기 API로 처리하면 그 시간 동안 커넥션을 물고 있어야 하고, 타임아웃에 걸리거나 중간에 연결이 끊기면 결과를 받을 수 없습니다.
그래서 비동기 Job 폴링 패턴을 씁니다. 흐름 자체는 단순합니다.
- 클라이언트가 작업을 요청하면, 서버는
jobId만 즉시 돌려준다 - 서버는 백그라운드에서 작업을 처리하면서 진행 상태를 갱신한다
- 클라이언트는 주기적으로 "다 됐어?" 하고 폴링한다
- 완료되면 결과를 가져와서 적용한다
여기까지는 어렵지 않습니다. 그런데 이걸 그대로 프로덕션에 올리면 금방 구멍이 보입니다.
직접 만들어보기 — Redis Hash 기반 구현#
상태 저장소는 Redis를 썼습니다.
- TTL: Job이 끝나면 알아서 사라집니다. DB에 넣으면 별도 배치로 청소해야 하는데, 그게 또 하나의 관리 포인트가 됩니다
- HINCRBY: 진행률을 원자적으로 올릴 수 있습니다. 여러 스레드가 동시에 processed를 증가시켜도 안전합니다
- 멀티 서버: 서버가 여러 대여도 같은 Redis를 바라보면 어디서든 상태를 조회할 수 있습니다
상태 모델은 세 가지면 충분했습니다.
RUNNING → SUCCESS
→ FAILED
Redis Hash 필드 구조는 이렇게 잡았습니다.
| 필드 | 타입 | 설명 |
|---|---|---|
| status | String | RUNNING / SUCCESS / FAILED |
| total | int | 전체 처리 대상 수 |
| processed | int | 현재까지 처리된 수 |
| errorMessage | String | 실패 시 에러 메시지 |
| expireAt | long | 타임아웃 판단용 만료 시각 (epoch ms) |
| results | String | 성공 시 결과 JSON |
핵심 구현 코드입니다. 도메인 로직은 빼고 Job 상태 관리만 남겼습니다.
@Component
public class JobRedisStore {
private static final long JOB_TTL_HOURS = 2;
private static final long JOB_TIMEOUT_MINUTES = 10;
private final StringRedisTemplate redisTemplate;
public String createJob(int total) {
String jobId = UUID.randomUUID().toString();
String key = "bulk:job:" + jobId;
long expireAt = System.currentTimeMillis() + JOB_TIMEOUT_MINUTES * 60 * 1000;
redisTemplate.opsForHash().putAll(key, Map.of(
"status", "RUNNING",
"total", String.valueOf(total),
"processed", "0",
"expireAt", String.valueOf(expireAt)
));
redisTemplate.expire(key, JOB_TTL_HOURS, TimeUnit.HOURS);
return jobId;
}
public void incrementProcessed(String jobId) {
redisTemplate.opsForHash().increment("bulk:job:" + jobId, "processed", 1);
}
public void markSuccess(String jobId, String resultsJson) {
String key = "bulk:job:" + jobId;
redisTemplate.opsForHash().put(key, "results", resultsJson);
redisTemplate.opsForHash().put(key, "status", "SUCCESS");
}
public void markFailed(String jobId, String errorMessage) {
String key = "bulk:job:" + jobId;
redisTemplate.opsForHash().put(key, "status", "FAILED");
redisTemplate.opsForHash().put(key, "errorMessage", errorMessage);
}
}비동기 처리는 @Async로 띄웁니다.
@Async
public void processAsync(String jobId, List<Item> items) {
List<Result> results = new ArrayList<>();
for (Item item : items) {
Result result = externalApi.call(item); // LLM 호출 등
jobRedisStore.incrementProcessed(jobId);
results.add(result);
}
jobRedisStore.markSuccess(jobId, serialize(results));
}여기까지가 최소 구현입니다. 동작은 하지만, 프로덕션에서는 몇 가지를 더 챙겨야 합니다.
놓치기 쉬운 것 ① — 중복 실행#
사용자가 "일괄 처리" 버튼을 눌렀습니다. 로딩 스피너가 돕니다. 1초... 2초... 반응이 없습니다. 다시 누릅니다.
14:00:00 클릭 → Job A 생성, 100건에 대해 LLM 호출 시작
14:00:02 또 클릭 → Job B 생성, 같은 100건에 대해 LLM 호출 또 시작
같은 대상에 대해 LLM API가 200번 호출됩니다. 비용 2배. 스레드풀 점유도 2배. 프론트에서 버튼을 disable하면 1차 방어는 되지만, 새로고침 후 재시도하거나 다른 관리자가 같은 대상을 실행하면 서버에서는 막을 방법이 없습니다.
이게 이른바 "따닥" 문제입니다.
해결은 의외로 간단합니다. Redis에 "이 대상에 대해 진행 중인 Job이 있는지"를 기록해두면 됩니다.
public String startJob(String targetId, int total) {
// 이미 돌고 있는 Job 있으면 그 jobId 그대로 반환
String existingJobId = redisTemplate.opsForValue().get("bulk:running:" + targetId);
if (existingJobId != null && isRunning(existingJobId)) {
return existingJobId;
}
String jobId = createJob(total);
redisTemplate.opsForValue().set("bulk:running:" + targetId, jobId, 2, TimeUnit.HOURS);
return jobId;
}실서비스에서는 Job이 SUCCESS나 FAILED로 끝날 때 bulk:running:{targetId}도 같이 지워주는 편이 깔끔합니다. 더 엄밀하게 하려면 SETNX(SET if Not eXists)로 원자적으로 lock을 잡아야 합니다. 위 코드는 "진행 중 Job 재사용"의 아이디어를 설명하기 위한 최소 예시로 보면 됩니다.
놓치기 쉬운 것 ② — 완료 전 결과 접근#
3단계 플로우의 마지막은 "결과 적용"입니다. 클라이언트가 POST /jobs/{jobId}/apply를 호출하면 Redis에서 결과를 꺼내 DB에 저장합니다.
근데 이 API에 상태 체크가 없으면 어떻게 될까요.
// 위험한 코드
public void applyResults(String jobId) {
List<Result> results = deserialize(jobRedisStore.findResultsJson(jobId));
saveAll(results);
}Job이 아직 RUNNING이면? findResultsJson은 null을 반환하고, deserialize는 빈 리스트를 돌려주고, saveAll은 아무것도 안 하고 리턴합니다. 예외도 없습니다. HTTP 200 OK.
클라이언트는 "저장 완료"로 인식합니다. 실제로는 아무것도 저장되지 않았는데. FAILED 상태에서도 마찬가지입니다.
"프론트에서 SUCCESS일 때만 apply 버튼을 활성화하면 되지 않나?" 정상 흐름에서는 맞습니다. 그래도 서버에서 한 번 더 막아야 합니다. 네트워크 타이밍 이슈, 프론트 버그, 브라우저 콘솔에서 직접 API를 호출하는 경우까지 생각하면 서버 검증을 빼둘 이유가 없습니다.
public void applyResults(String jobId) {
JobStatus status = jobRedisStore.findStatus(jobId);
if (!status.isSuccess()) {
throw new IllegalStateException("완료되지 않은 Job입니다. status=" + status.getStatus());
}
List<Result> results = deserialize(jobRedisStore.findResultsJson(jobId));
saveAll(results);
}HTTP API라면 예외를 그대로 노출하기보다 409 Conflict나 422 Unprocessable Entity로 내려주는 편이 더 명확합니다. 핵심은 "결과 접근 전에 상태를 검증한다"는 규칙 자체입니다.
놓치기 쉬운 것 ③ — 좀비 Job#
@Async로 비동기 스레드에서 LLM을 호출하고 있는데, 그 순간 서버가 재시작됩니다. 배포, OOM kill, 인스턴스 교체. 이유는 다양합니다.
Redis에는 status: RUNNING이 남아있습니다. 근데 그걸 처리하던 스레드는 이미 사라졌습니다. 클라이언트는 계속 폴링하지만 영영 SUCCESS가 오지 않습니다. 프론트에서 "처리 중..." 스피너가 끝없이 돌아갑니다.
이게 좀비 Job입니다. 해결 방법은 크게 두 가지입니다.
expireAt 방식 — Job 생성 시 만료 시각을 기록해두고, 상태 조회 시 현재 시각이 만료 시각을 넘었으면 FAILED로 전환합니다. 구현이 단순하고 추가 인프라가 필요 없습니다.
public JobStatus findStatus(String jobId) {
Map<Object, Object> fields = redisTemplate.opsForHash().entries("bulk:job:" + jobId);
String status = (String) fields.get("status");
if ("RUNNING".equals(status)) {
long expireAt = Long.parseLong((String) fields.get("expireAt"));
if (System.currentTimeMillis() > expireAt) {
markFailed(jobId, "처리 시간이 초과되었습니다.");
return JobStatus.failed(jobId, "처리 시간이 초과되었습니다.");
}
}
return JobStatus.of(jobId, status, ...);
}heartbeat 방식 — Worker가 주기적으로 "나 아직 살아있다"는 신호를 Redis에 보냅니다. heartbeat가 일정 시간 동안 안 오면 다른 Worker가 해당 Job을 가져가거나 FAILED 처리합니다. 더 정밀하지만 구현이 복잡해집니다.
단순한 비동기 Job이라면 expireAt으로 충분합니다. Worker 간 Job 인수인계까지 필요한 수준이라면 그때 heartbeat를 붙이면 됩니다.
놓치기 쉬운 것 ④ — 부분 실패 정책#
100건을 일괄 처리합니다. 99건이 성공했는데, 100번째에서 LLM API 에러가 터졌습니다.
All-or-Nothing — 1건이라도 실패하면 전체를 FAILED 처리합니다.
for (Item item : items) {
Result result = externalApi.call(item);
if (result.hasError()) {
jobRedisStore.markFailed(jobId, "처리 중 오류가 발생했습니다.");
return; // 성공한 99건도 버림
}
results.add(result);
}
jobRedisStore.markSuccess(jobId, serialize(results));구현이 단순합니다. "전부 적용됐거나, 아무것도 적용 안 됐거나" 둘 중 하나. 정합성이 명확합니다. 근데 사용자 입장에서는 답답합니다. 99건이나 성공했는데 처음부터 다시.
Partial Success — 실패한 건은 건너뛰고, 성공한 건만 결과에 포함합니다.
UX는 좋지만 "어디까지 적용됐는지" 추적이 복잡해집니다. 상태도 SUCCESS와 FAILED만으로는 부족하고 PARTIAL_SUCCESS 같은 중간 상태가 필요해지고, 결과에 개별 건의 성공/실패 여부도 담아야 합니다.
어느 쪽이든 설계 초기에 정해둬야 합니다. 나중에 바꾸려면 상태 모델부터 뜯어야 하는데, 이미 프론트에서 status 분기를 태우고 있으면 같이 바꿔야 해서 꽤 귀찮아집니다.
오픈소스는 어떻게 풀었나#
위 네 가지 문제는 비동기 Job을 다루는 시스템이라면 거의 반드시 마주칩니다. 직접 구현에서 한 번씩 빠뜨리기 쉬운 지점을, 유명한 오픈소스들은 어떻게 기본값으로 묶어두는지 보는 게 더 도움이 됐습니다.
BullMQ (Node.js, Redis 기반)#
BullMQ는 Redis의 List, Sorted Set, Stream을 활용하는 Job Queue입니다. 지금까지 설명한 Redis Hash 기반 구현과 구조가 가장 비슷합니다.
- 중복 실행 방지:
queue.add('task', data, { jobId: 'unique-key' })— 동일한 jobId를 지정하면 이미 존재하는 경우 무시합니다. 내부적으로 RedisSETNX를 씁니다 - 상태 검증:
job.getState()로 현재 상태를 확인한 뒤 결과에 접근합니다 - 좀비 Job: Worker가 주기적으로 heartbeat(lock 연장)를 보내고, 일정 시간 heartbeat가 안 오면 stalled job으로 판정해서 다른 Worker에 재할당합니다
- 부분 실패: Flow 기능으로 부모-자식 Job 관계를 만들 수 있습니다. 자식 Job이 개별적으로 성공/실패하고, 부모에서 집계합니다
Celery (Python, Redis/RabbitMQ)#
Celery는 Python 생태계의 대표적인 분산 태스크 큐입니다.
- 중복 실행 방지:
task.apply_async(task_id='custom-id')로 커스텀 ID를 지정하고, 공식 문서에서 Redis lock 패턴을 권장합니다 - 상태 검증:
AsyncResult(task_id).state가SUCCESS일 때만.result에 접근 가능합니다. 다른 상태에서 접근하면 예외가 발생합니다 - 좀비 Job:
@app.task(time_limit=300, soft_time_limit=240)으로 hard/soft 타임아웃을 설정합니다. soft 타임아웃 시SoftTimeLimitExceeded예외가 발생해서 정리 로직을 실행할 수 있습니다 - 부분 실패: Canvas의
group으로 병렬 실행하고,chord로 결과를 집계합니다. 개별 태스크가 실패해도 나머지는 계속 실행됩니다
JobRunr (Java/Spring)#
JobRunr는 Java용 백그라운드 Job 라이브러리입니다. Lambda 기반 API가 직관적입니다.
- 중복 실행 방지: Pro 버전에서
@Job(mutex="unique-key")어노테이션을 지원합니다 - 상태 검증:
jobScheduler.getJobById(jobId)로 상태를 조회합니다 - 좀비 Job: 서버가 죽으면 다른 BackgroundJobServer가 해당 Job을 감지해서 다시 큐에 넣습니다 (orphaned job detection)
- 부분 실패: 기본 제공은 없고 직접 구현해야 합니다
비교 테이블#
| 문제 | BullMQ | Celery | JobRunr |
|---|---|---|---|
| 중복 실행 | jobId 지정 (SETNX) | 커스텀 ID + Redis lock | Pro: mutex |
| 상태 검증 | job.getState() | AsyncResult.state | getJobById() |
| 좀비 Job | heartbeat + stalled 재할당 | hard/soft time_limit | orphaned job detection |
| 부분 실패 | Flow (부모-자식) | Canvas (group/chord) | 직접 구현 |
공통 패턴이 보인다#
세 개를 나란히 놓고 보니, 이름만 다를 뿐 막상 챙기는 건 꽤 비슷했습니다.
중복 실행 방지는 셋 다 "고유한 키로 이미 있는 Job인지 확인"합니다. BullMQ는 jobId, Celery는 task_id. 이름만 다르지 내부는 Redis SETNX이든 DB 유니크 제약이든 같은 겁니다.
상태 검증은 아예 프레임워크가 강제합니다. Celery는 SUCCESS가 아닌 상태에서 .result에 접근하면 예외를 던져버립니다. BullMQ도 job.returnvalue는 completed에서만 유효합니다. 직접 만들면 이 검증을 빼먹기 쉬운데, 셋 다 기본으로 넣어뒀다는 건 그만큼 자주 빠뜨린다는 뜻이겠죠.
타임아웃은 접근이 갈립니다. BullMQ는 Worker가 heartbeat를 계속 보내고, 안 오면 stalled로 판정합니다. Celery는 time_limit으로 잘라버립니다. 직접 만든다면 expireAt 방식이 가장 가볍고, Worker 간 Job 인수인계까지 필요해지면 그때 heartbeat를 붙이면 됩니다.
결국 큐 라이브러리들이 주는 가치는 "백그라운드에서 뭔가 돌릴 수 있다"가 아니라, 이런 운영 가드레일을 빠뜨리지 않게 기본값으로 제공한다는 데 있습니다.
언제는 직접 만들고, 언제는 Queue를 붙일까#
@Async + Redis Hash 정도로 직접 구현해도 충분한 구간이 있습니다. Job 수가 많지 않고, 실행 시간이 길어도 몇 분 이내이며, 실패 시 사람이 다시 눌러도 되고, 필요한 운영 규칙이 중복 방지·상태 검증·타임아웃 정도에서 끝나는 경우입니다. 백오피스 일괄 처리나 내부 운영 도구는 여기서 멈춰도 실용적입니다.
반대로 재시도와 backoff, 동시성 제한, delayed job, cron, 서버 재시작 후 자동 복구, Worker 분리, 운영 화면 같은 요구가 붙기 시작하면 직접 구현 비용이 급격히 올라갑니다. 이 시점부터는 BullMQ, Celery, JobRunr 같은 큐를 도입하는 편이 대체로 더 싸고 덜 위험합니다.
처음 시작은 Redis Hash에 상태 넣고 @Async로 돌리는 정도의 작업처럼 보입니다. 그런데 따닥 방지, 상태 검증, 좀비 처리, 부분 실패 정책을 안 챙겨놓으면 프로덕션에서 "저장 눌렀는데 아무것도 안 됐어요" 같은 리포트를 받게 됩니다.
이번에 직접 붙여보면서 최소한 같이 들어가야 한다고 느낀 건 네 가지였습니다:
- 같은 대상에 대한 중복 실행 방지 — Redis key로 진행 중인 Job 추적
- 결과 적용 전 반드시 status 검증 — 프론트에 위임하지 말 것
- 좀비 Job 감지 — expireAt 기반 타임아웃으로 시작, 필요하면 heartbeat로
- 부분 실패 정책은 설계 초기에 결정 — 나중에 바꾸려면 상태 모델부터 뜯어야 합니다
BullMQ, Celery, JobRunr를 까보면 이 네 가지를 전부 다른 형태로 기본 제공하고 있습니다. 직접 만들든 라이브러리를 쓰든, 결국 체크리스트는 비슷합니다. 이 목록을 빼먹지 않는 쪽이 덜 고생합니다.