LLM 토큰 제한 해결하기 — MapReduce 체인으로 LLM 호출 분할 처리하기
토큰 제한에 걸리면#
처음에는 긴 텍스트도 그냥 한 번에 보내면 될 줄 알았습니다. GPT-4o는 128K, Claude는 200K까지 된다니까 얼핏 가능해 보였기 때문입니다. 근데 실무에서는 그 한도까지 가기 전에 먼저 비용이 튀고, 응답 품질도 흔들리기 시작했습니다. 그래서 실제로는 8K~16K 정도를 적정선으로 잡게 됐습니다.
그래서 긴 텍스트를 처리해야 할 때는 분할이 필요합니다. 예를 들어 수백 건의 사용자 피드백을 한번에 분석하거나, 긴 보고서를 요약하는 경우. 단순히 텍스트를 잘라서 각각 보내면 되지 않을까 싶지만, 잘린 결과를 어떻게 합칠 것인지가 문제입니다. 청크 A의 요약과 청크 B의 요약을 그냥 이어붙이면 중복도 생기고, 전체 맥락이 빠진 부분적인 결과가 됩니다.
이걸 해결하는 패턴이 MapReduce입니다. Hadoop의 그 MapReduce에서 이름을 따왔는데, 아이디어는 같습니다:
- 긴 텍스트를 적절한 크기의 청크로 분할
- 각 청크를 병렬로 LLM에 보냄 (Map)
- 개별 결과를 하나로 통합 (Reduce)
그래서 Spring 프로젝트 안에서 이 패턴을 직접 붙였고, 이 글은 그 과정을 정리해둔 기록에 가깝습니다. 프로덕션에서 운영 중인 코드 기준입니다.
토큰 추정 — 정확할 필요 없다#
청크를 나누려면 텍스트가 대략 몇 토큰인지 알아야 합니다. 정확한 토큰 수를 계산하려면 tiktoken 같은 토크나이저를 써야 하는데, Java에서는 의존성이 무겁고, 어차피 우리가 필요한 건 "이 텍스트가 8,000 토큰을 넘는지 아닌지"를 판단하는 정도입니다. ±500 정도 오차가 나도 문제 없습니다.
그래서 휴리스틱으로 추정합니다.
public class TokenEstimator {
private static final double KOREAN_CHAR_FACTOR = 1.5;
private static final double ENGLISH_WORD_FACTOR = 1.3;
private static final double OTHER_FACTOR = 0.3;
public static int estimate(String text) {
if (text == null || text.isEmpty()) return 0;
int koreanChars = countKoreanChars(text);
int englishWords = countEnglishWords(text);
int otherChars = text.length() - koreanChars;
double tokens = (koreanChars * KOREAN_CHAR_FACTOR)
+ (englishWords * ENGLISH_WORD_FACTOR)
+ (otherChars * OTHER_FACTOR);
return (int) Math.ceil(tokens);
}
private static int countKoreanChars(String text) {
int count = 0;
for (char c : text.toCharArray()) {
if (c >= 0xAC00 && c <= 0xD7A3) count++;
}
return count;
}
private static int countEnglishWords(String text) {
String[] words = text.split("\\s+");
int count = 0;
for (String word : words) {
if (word.chars().anyMatch(Character::isLetter)) count++;
}
return count;
}
}한글은 보통 1-2 토큰으로 인코딩되기 때문에 글자 수에 1.5를 곱합니다. 영문은 단어당 평균 1-1.5 토큰이라 1.3을 곱하고, 공백이나 특수문자는 0.3 정도. 실제로 이 계수로 추정한 값과 tiktoken 결과를 비교하면 대부분 10% 이내의 오차입니다.
이 추정기는 두 군데서 쓰입니다: 청크 분할 시 각 청크가 넘치지 않게 크기를 조절할 때, 그리고 뒤에서 다룰 Adaptive Chain에서 SimpleChain과 MapReduceChain 중 어떤 걸 쓸지 결정할 때.
청크 분할#
텍스트를 적절한 크기로 나누는 알고리즘입니다. 핵심 결정이 하나 있는데, 토큰 단위로 자르지 않고 라인 단위로 자릅니다.
토큰 단위로 자르면 문장이나 단어 중간에서 잘릴 수 있습니다. "사용자 피드백을 분석한 결과"라는 문장이 "사용자 피드백을 분석한" / "결과"로 잘리면 의미가 훼손됩니다. 라인 단위로 자르면 최소한 문장 단위로는 온전하게 유지됩니다.
private static final int CHUNK_SIZE_TOKENS = 8_000;
private static List<String> splitIntoChunks(String content) {
int totalTokens = TokenEstimator.estimate(content);
int chunkCount = (int) Math.ceil((double) totalTokens / CHUNK_SIZE_TOKENS);
if (chunkCount <= 1) {
return List.of(content);
}
String[] lines = content.split("\n");
List<String> chunks = new ArrayList<>();
StringBuilder currentChunk = new StringBuilder();
int currentTokens = 0;
for (String line : lines) {
int lineTokens = TokenEstimator.estimate(line);
if (currentTokens + lineTokens > CHUNK_SIZE_TOKENS
&& !currentChunk.isEmpty()) {
chunks.add(currentChunk.toString());
currentChunk = new StringBuilder();
currentTokens = 0;
}
currentChunk.append(line).append("\n");
currentTokens += lineTokens;
}
if (!currentChunk.isEmpty()) {
chunks.add(currentChunk.toString());
}
return chunks;
}CHUNK_SIZE_TOKENS를 8,000으로 잡은 건, 프롬프트 자체가 차지하는 토큰과 응답 토큰을 고려해서입니다. 모델의 컨텍스트 윈도우가 16K라면, 프롬프트 2K + 응답 4K를 빼면 입력에 쓸 수 있는 건 약 10K입니다. 여유를 두고 8K로 잡았습니다.
한 가지 엣지 케이스가 있는데, 단일 라인이 8,000 토큰을 초과하는 경우입니다. 코드에서 !currentChunk.isEmpty() 조건이 이걸 처리합니다. 현재 청크가 비어있으면 라인 크기가 초과하더라도 일단 넣습니다. 완벽하진 않지만, 실무에서 한 줄이 8,000 토큰을 넘는 경우는 거의 없기 때문에 충분합니다.
예를 들어 10만 토큰짜리 텍스트를 넣으면 약 13개 청크로 분할됩니다.
Map 단계 — 병렬 LLM 호출#
청크가 나뉘면 각각을 LLM에 보냅니다. 13개 청크를 순차로 보내면 너무 느리니까, CompletableFuture로 병렬 처리합니다.
@RequiredArgsConstructor
public class MapReduceChainImpl {
private final LLMService llmService;
private final Executor llmTaskExecutor;
public CompletableFuture<List<ChainResult>> mapAsync(
List<MapReduceChunkInput> inputs) {
List<CompletableFuture<ChainResult>> futures = inputs.stream()
.map(input -> CompletableFuture.supplyAsync(
() -> processChunk(input), llmTaskExecutor))
.toList();
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.toList());
}
private ChainResult processChunk(MapReduceChunkInput input) {
long startTime = System.currentTimeMillis();
String response = llmService.chat(
input.promptRequest(),
List.of(new LLMMessage.User(input.content())),
String.class
);
long elapsed = System.currentTimeMillis() - startTime;
int estimatedTokens = TokenEstimator.estimate(response);
return ChainResult.of(response, Map.of(
"executionTimeMs", elapsed,
"estimatedTokens", estimatedTokens,
"chunkIndex", input.chunkIndex(),
"chainType", "mapreduce"
));
}
}CompletableFuture.allOf()가 모든 청크의 처리를 기다리고, join()으로 결과를 수집합니다. 13개 청크를 병렬로 처리하면, 순차 대비 처리 시간이 크게 줄어듭니다. 물론 스레드풀 크기와 LLM API의 rate limit에 따라 실제 병렬도는 달라집니다.
llmTaskExecutor는 별도로 설정한 스레드풀입니다. Spring의 기본 ForkJoinPool을 쓰면 다른 비동기 작업에 영향을 줄 수 있어서, LLM 호출 전용으로 분리합니다.
ChunkVariableProvider#
각 청크를 LLM에 보낼 때, 프롬프트의 변수로 넣어야 합니다. 예를 들어 프롬프트가 "다음 텍스트를 요약하세요: {{data}}"라면, 청크 내용을 data 변수에 넣어야 합니다.
@FunctionalInterface
public interface ChunkVariableProvider {
Map<String, Object> toVariables(String chunkContent, int chunkIndex);
static ChunkVariableProvider of(String variableName) {
return (content, index) -> Map.of(variableName, content);
}
}ChunkVariableProvider.of("data")로 만들면, 청크 내용이 Map.of("data", chunkContent)로 변환됩니다. Langfuse 1편에서 다뤘던 변수 치환(PromptVariableReplacer)이 이 map을 받아서 {{data}}를 실제 값으로 바꿉니다.
chunkIndex도 받을 수 있게 해둔 이유는, 청크 순서가 중요한 경우에 "이것은 N번째 청크입니다"같은 컨텍스트를 프롬프트에 넣을 수 있게 하려는 의도입니다.
Reduce 단계 — 결과 통합#
Map에서 청크별 결과가 나오면, 이걸 하나로 합쳐야 합니다. 두 가지 전략이 있습니다.
단순 결합#
Map 결과를 그냥 이어붙이는 겁니다. 각 청크의 결과가 독립적이고, 추가 통합이 필요 없을 때 씁니다.
public ChainResult reduce(List<ChainResult> mapResults) {
String combined = mapResults.stream()
.map(ChainResult::content)
.collect(Collectors.joining("\n\n"));
int totalTokens = mapResults.stream()
.mapToInt(r -> (int) r.metadata()
.getOrDefault("estimatedTokens", 0))
.sum();
return ChainResult.of(combined, Map.of(
"estimatedTokens", totalTokens,
"chunkCount", mapResults.size(),
"chainType", "mapreduce"
));
}예를 들어 각 청크에서 키워드를 추출하는 경우라면, 결과를 이어붙이기만 해도 됩니다. 청크 1에서 나온 키워드와 청크 2에서 나온 키워드를 합치면 전체 키워드가 되니까요.
LLM 통합 (reduceWithLLM)#
Map 결과를 단순히 이어붙이면 부족한 경우가 있습니다. 요약이 대표적인데, 각 청크의 요약을 이어붙이면 중복이 생기고 전체 흐름이 없어집니다. 이때는 Map 결과를 모아서 LLM을 한 번 더 호출합니다.
public ChainResult reduceWithLLM(List<ChainResult> mapResults,
MapReduceContext context) {
// Map 결과를 하나의 문자열로 결합
String combinedMapResults = mapResults.stream()
.map(ChainResult::content)
.collect(Collectors.joining("\n\n"));
// Reduce 프롬프트의 변수에 Map 결과 주입
Map<String, Object> variables = new HashMap<>(
context.reducePromptRequest().promptVariables());
variables.put(context.reduceVariableName(), combinedMapResults);
LLMPromptRequest reduceRequest = LLMPromptRequest.of(
context.reducePromptRequest().promptName(),
variables,
context.reducePromptRequest().fallbackPrompt()
);
// LLM 한 번 더 호출
String finalResult = llmService.chat(
reduceRequest,
Collections.emptyList(),
String.class
);
return ChainResult.of(finalResult, Map.of(
"chunkCount", mapResults.size(),
"chainType", "mapreduce-with-llm-reduce"
));
}Reduce까지 묶으면 흐름은 이렇습니다:
예를 들어 Reduce 프롬프트가 "다음 요약들을 하나의 종합 요약으로 통합하세요: {{summaries}}"이고, reduceVariableName이 "summaries"라면 — Map에서 나온 3개의 요약이 {{summaries}} 자리에 들어가서 LLM이 하나의 종합 요약을 만듭니다.
언제 어떤 전략을 쓰는가#
| 상황 | Reduce 전략 | 이유 |
|---|---|---|
| 키워드 추출, 항목 나열 | 단순 결합 | 각 청크 결과가 독립적 |
| 요약, 브리핑 생성 | LLM 통합 | 전체 맥락을 아우르는 결과 필요 |
| 감성 분석 | 단순 결합 후 집계 | 각 청크의 감성 점수를 평균/비율로 |
판단 기준은 간단합니다: 각 청크의 결과를 이어붙였을 때 최종 결과로 쓸 수 있으면 단순 결합, 추가 가공이 필요하면 LLM 통합.
Reduce 전략 커스텀#
위에서 본 reduce()와 reduceWithLLM()은 미리 만들어둔 두 가지 전략이지만, Reduce 자체를 추상화하면 호출하는 쪽에서 자유롭게 전략을 바꿀 수 있습니다.
예를 들어 Map 결과를 JSON 배열로 합치고 싶다면:
// 단순 결합 대신 JSON 배열로 통합
List<ChainResult> mapResults = mapReduceChain.map(chunkInputs);
String jsonArray = mapResults.stream()
.map(ChainResult::content)
.collect(Collectors.joining(",", "[", "]"));아니면 Map 결과에서 점수만 뽑아서 평균을 내는 경우:
// Map 결과에서 점수 추출 후 집계
double avgScore = mapResults.stream()
.map(r -> parseScore(r.content()))
.mapToDouble(Double::doubleValue)
.average()
.orElse(0.0);mapAsync()가 CompletableFuture<List<ChainResult>>를 반환하기 때문에, .thenApply()에 원하는 Reduce 로직을 체이닝하면 됩니다. reduceWithLLM은 그 중 하나의 구현일 뿐이고, Map 결과를 어떻게 합칠지는 완전히 호출하는 쪽의 몫입니다.
// 커스텀 Reduce 예시 — Map 결과를 Markdown 테이블로 변환
mapReduceChain.mapAsync(chunkInputs)
.thenApply(results -> {
String table = "| 청크 | 결과 |\n|---|---|\n";
for (int i = 0; i < results.size(); i++) {
table += "| " + (i + 1) + " | " + results.get(i).content() + " |\n";
}
return ChainResult.of(table, Map.of("chainType", "custom-reduce"));
});Adaptive Chain — 자동 전략 선택#
MapReduce 체인을 만들었는데, 모든 텍스트에 MapReduce를 쓸 필요는 없습니다. 1,000 토큰짜리 텍스트를 굳이 청크로 나누고 병렬 처리하면 오버헤드만 생깁니다. 반대로 10만 토큰짜리 텍스트를 단일 호출로 보내면 토큰 제한에 걸리거나 응답 품질이 떨어집니다.
그래서 토큰 수에 따라 자동으로 전략을 고르는 Adaptive Chain을 만들었습니다.
public class AdaptiveChainImpl {
private static final int TOKEN_THRESHOLD = 10_000;
private static final int CHUNK_SIZE_TOKENS = 8_000;
private final SimpleChainImpl simpleChain;
private final MapReduceChainImpl mapReduceChain;
public ChainResult execute(AdaptiveChainInput input) {
int estimatedTokens = TokenEstimator.estimate(input.additionalContent());
if (estimatedTokens < TOKEN_THRESHOLD) {
return simpleChain.execute(toSimpleInput(input));
}
List<String> chunks = splitIntoChunks(input.additionalContent());
List<MapReduceChunkInput> chunkInputs = buildChunkInputs(
chunks, input.mapPromptId(), input.chunkVariableProvider());
List<ChainResult> mapResults = mapReduceChain.map(chunkInputs);
return mapReduceChain.reduceWithLLM(
mapResults,
new MapReduceContext(
input.promptRequest(),
input.reduceVariableName()
)
);
}
}TOKEN_THRESHOLD를 10,000으로 잡은 이유는, 8,000 토큰 청크 하나로 처리할 수 있는 범위를 약간 넘기는 지점이기 때문입니다. 10,000 미만이면 단일 호출로 충분하고, 이상이면 최소 2개 이상의 청크로 나눠야 합니다.
SimpleChain#
토큰이 적으면 LLM을 한 번만 호출합니다. 단순해 보이지만, 실행 시간과 토큰 수 메타데이터를 수집하는 역할이 있습니다.
public class SimpleChainImpl {
public ChainResult execute(SimpleChainInput input) {
long startTime = System.currentTimeMillis();
String response = llmService.chat(
input.promptRequest(),
input.messages(),
String.class
);
long elapsed = System.currentTimeMillis() - startTime;
int estimatedTokens = TokenEstimator.estimate(response);
return ChainResult.of(response, Map.of(
"executionTimeMs", elapsed,
"estimatedTokens", estimatedTokens,
"chainType", "simple"
));
}
public CompletableFuture<ChainResult> executeAsync(SimpleChainInput input) {
return CompletableFuture.supplyAsync(
() -> execute(input), llmTaskExecutor);
}
}AdaptiveChainInput#
호출하는 쪽에서는 AdaptiveChainInput을 만들어서 넘깁니다. 팩토리 메서드로 용도에 맞게 생성합니다.
public record AdaptiveChainInput(
LLMPromptRequest promptRequest,
PromptId mapPromptId,
String additionalContent,
ChunkVariableProvider chunkVariableProvider,
String reduceVariableName
) {
// 짧은 텍스트 — Simple만 사용
public static AdaptiveChainInput forSimple(
LLMPromptRequest request, String content) {
return new AdaptiveChainInput(request, null, content, null, null);
}
// 긴 텍스트 가능성 — MapReduce도 대비
public static AdaptiveChainInput forMapReduce(
LLMPromptRequest reduceRequest,
PromptId mapPromptId,
String content,
String mapVariableName,
String reduceVariableName) {
return new AdaptiveChainInput(
reduceRequest, mapPromptId, content,
ChunkVariableProvider.of(mapVariableName),
reduceVariableName
);
}
}forSimple()은 MapReduce 관련 필드가 전부 null입니다. 토큰이 적으면 SimpleChain만 쓰니까요. forMapReduce()는 Map 프롬프트, Reduce 프롬프트, 변수명까지 전부 지정합니다.
비동기 실행#
Adaptive Chain도 비동기를 지원합니다. MapReduce 경로에서는 mapAsync().thenApply(reduce) 체인으로 연결됩니다.
public CompletableFuture<ChainResult> executeAsync(AdaptiveChainInput input) {
int estimatedTokens = TokenEstimator.estimate(input.additionalContent());
if (estimatedTokens < TOKEN_THRESHOLD) {
return simpleChain.executeAsync(toSimpleInput(input));
}
List<String> chunks = splitIntoChunks(input.additionalContent());
List<MapReduceChunkInput> chunkInputs = buildChunkInputs(
chunks, input.mapPromptId(), input.chunkVariableProvider());
return mapReduceChain.mapAsync(chunkInputs)
.thenApply(mapResults -> mapReduceChain.reduceWithLLM(
mapResults,
new MapReduceContext(
input.promptRequest(),
input.reduceVariableName()
)
));
}붙여놓고 나면#
여기까지 구현한 걸 한 번에 보면 전체 호출 흐름은 이렇습니다.
실사용 예시#
실제 서비스 코드에서는 이렇게 씁니다.
@Service
@RequiredArgsConstructor
public class DocumentAnalysisService {
private final AdaptiveChainImpl adaptiveChain;
public AnalysisResult analyze(String documentText) {
AdaptiveChainInput input = AdaptiveChainInput.forMapReduce(
PromptRequestBuilder
.from(AnalysisPrompt.REDUCE_SUMMARY)
.build(),
AnalysisPrompt.CHUNK_ANALYSIS, // Map 프롬프트
documentText, // 분석할 텍스트
"document", // Map 변수명
"analysisResults" // Reduce 변수명
);
ChainResult result = adaptiveChain.execute(input);
log.info("분석 완료 - 체인: {}, 청크: {}, 소요: {}ms",
result.metadata().get("chainType"),
result.metadata().get("chunkCount"),
result.metadata().get("executionTimeMs"));
return parseResult(result.content());
}
}서비스 코드에서는 AdaptiveChainInput만 만들면 됩니다. 토큰이 적으면 SimpleChain으로, 많으면 MapReduceChain으로 넘어가고, 메타데이터에 어떤 체인을 썼는지, 청크가 몇 개였는지, 얼마나 걸렸는지가 남습니다. 실제로 붙여보면 이 메타데이터가 운영할 때 꽤 도움이 됩니다.
다음 글#
이번 글은 토큰 제한을 넘는 텍스트를 MapReduce 체인으로 쪼개 처리하고, Adaptive Chain으로 전략 선택을 자동화하는 데까지 왔습니다.
그런데 아직 하나 빠진 게 있습니다. 지금 구조는 OpenAI든 Anthropic이든 한 번에 하나의 Provider만 씁니다. 그 Provider가 흔들리면 체인 구조를 잘 짜놔도 결국 같이 멈춥니다. 다음 글은 멀티 Provider Fallback으로 OpenAI가 죽으면 Anthropic으로 넘기고, 어떤 Provider가 어떤 에러로 실패했는지 추적하는 쪽으로 이어집니다.
참고 자료