OpenSearch 인덱싱 파이프라인 소스코드 분석
소스코드를 열게 된 이유#
OpenSearch를 컨트리뷰트 대상으로 잡고 소스코드를 읽기 시작했습니다. Elasticsearch 7.10.2에서 포크된 프로젝트고, Apache 2.0 라이선스라 누구나 기여할 수 있습니다.
근데 이런 프로젝트는 코드를 그냥 열면 막막합니다. server 모듈만 패키지가 30개 넘고, Node.java 한 파일이 2000줄입니다. 진입점이 필요했습니다.
그래서 가장 기본적인 질문부터 시작했습니다.
curl -X PUT localhost:9200/my-index/_doc/1 \
-H 'Content-Type: application/json' \
-d '{"title": "hello world"}'이 요청이 응답을 받기까지 내부에서 뭐가 벌어지는가?
직접 빌드하고 실행해서 확인했습니다. 분석 대상은 OpenSearch 3.7.0-SNAPSHOT(main 브랜치, Lucene 10.4.0)입니다.
전체 흐름#
12개 클래스를 거칩니다.
하나씩 봤습니다.
1단계: HTTP 수신과 URL 라우팅#
HTTP 서버는 Netty 4 기반입니다. Netty4HttpRequestHandler가 바이트를 읽어서 HTTP로 디코딩하고, RestController에 넘깁니다.
RestController는 URL 패턴을 PathTrie(접두사 트리)에 저장하고 있습니다.
// RestController.java:117
private final PathTrie<RestMethodHandlers> handlers = new PathTrie<>(RestUtils.REST_DECODER);PUT /my-index/_doc/1이 들어오면 /{index}/_doc/{id} 패턴과 매칭돼서 RestIndexAction을 찾습니다.
2단계: REST에서 IndexRequest로#
RestIndexAction이 HTTP 파라미터를 IndexRequest 객체로 바꿉니다.
// RestIndexAction.java:139-165
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
IndexRequest indexRequest = new IndexRequest(request.param("index"));
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.setPipeline(request.param("pipeline"));
indexRequest.source(request.requiredContent(), request.getMediaType());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.setRefreshPolicy(request.param("refresh"));
indexRequest.version(RestActions.parseVersion(request));
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
return channel -> client.index(
indexRequest,
new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))
);
}URL 파라미터, 바디, 버전 정보, 파이프라인 설정까지 전부 IndexRequest 하나에 담깁니다. client.index()가 호출되면 NodeClient가 레지스트리(DynamicActionRegistry)에서 IndexAction에 매핑된 TransportAction을 찾아서 실행합니다.
3단계: 모든 인덱싱은 Bulk이다#
단건 PUT /_doc/1도 내부적으로 Bulk 경로를 탑니다. TransportIndexAction은 TransportBulkAction에 위임하는 래퍼일 뿐입니다. PUT /_doc/1이든 POST /_bulk이든, 같은 코드에 도달합니다.
// TransportBulkAction.java:250-260
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService
.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
} catch (Exception e) {
releasingListener.onFailure(e);
}
}첫 줄에서 IndexingPressureService가 배압을 겁니다. 요청의 RAM 사용량을 추적해서 임계값 넘으면 거부합니다. 운영 중 인덱싱 reject가 터지면 여기입니다.
시스템 인덱스면 SYSTEM_WRITE 풀, 일반 인덱스면 WRITE 풀에서 실행됩니다. SEARCH 풀과 분리돼 있어서 인덱싱이 몰려도 검색은 안 죽습니다.
왜 단건도 Bulk로 통합했을까. 분산 시스템에서 요청을 샤드별로 그룹화하는 코드는 단건이든 다건이든 똑같습니다. 한 곳에서 관리하는 게 맞고, 실제로 여러 요청이 같은 샤드로 갈 때 네트워크 라운드트립을 줄이는 배치 효과도 얻습니다.
// TransportBulkAction.java (doInternalExecute 내부)
boolean hasIndexRequestsWithPipelines = resolvePipelinesForActionRequests(bulkRequest.requests, metadata, minNodeVersion);
if (hasIndexRequestsWithPipelines) {
// 인제스트 파이프라인이 있으면 먼저 실행
// 파이프라인 처리 후 다시 doExecute()로 돌아옴
}인제스트 파이프라인이 설정돼 있으면 인덱싱 전에 먼저 실행합니다. 파이프라인 처리가 끝나면 다시 같은 doExecute()로 돌아옵니다.
4단계: 라우팅#
BulkOperation 내부에서 각 요청의 타겟 샤드를 정합니다.
// TransportBulkAction.java (BulkOperation 내부)
shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex.getName(), request.id(), request.routing())
.shardId();문서 ID를 해시해서 샤드 수로 나눈 나머지. _routing 파라미터로 이 로직을 오버라이드할 수 있습니다. 관련 문서를 같은 샤드에 모아야 할 때 씁니다.
같은 샤드로 가는 요청들은 Map<ShardId, List<BulkItemRequest>>로 묶어서 한 번에 보냅니다.
네트워크 라운드트립이 샤드 수만큼만 발생합니다. Bulk의 진짜 이점이 여기 있습니다.
5단계: Primary 샤드 실행#
그룹화된 요청은 TransportShardBulkAction을 통해 Primary 샤드가 있는 노드로 갑니다. 같은 노드면 TCP 안 타고 직접 호출합니다.
// TransportShardBulkAction.java:595-670
static boolean executeBulkItemRequest(
BulkPrimaryExecutionContext context, UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate, ActionListener<Void> itemDoneListener
) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();
// UPDATE면 먼저 INDEX나 DELETE로 변환
if (opType == DocWriteRequest.OpType.UPDATE) {
final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();
updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);
switch (updateResult.getResponseResult()) {
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
context.setRequestToExecute(indexRequest);
break;
case DELETED:
context.setRequestToExecute(updateResult.action());
break;
case NOOP:
context.markOperationAsNoOp(updateResult.action());
return true;
}
} else {
context.setRequestToExecute(context.getCurrent());
}
// INDEX 실행
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(
version, request.versionType(),
new SourceToParse(request.index(), request.id(), request.source(),
request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(), request.isRetry()
);
}UPDATE는 독립적인 연산이 아닙니다. 기존 문서를 읽어서 변경사항을 적용한 뒤, INDEX나 DELETE로 변환합니다. Lucene에서 문서 수정은 "삭제 후 재삽입"이니까요. NOOP(변경 없음)이면 바로 리턴합니다.
매핑에 없는 새 필드가 발견되면 MappingUpdatePerformer가 클러스터 상태 업데이트를 트리거하고, 완료될 때까지 기다린 후 재시도합니다.
6단계: IndexShard — 문서 파싱#
applyIndexOperationOnPrimary()에서 JSON이 Lucene Document로 변환됩니다.
// IndexShard.java:1168-1192
public Engine.IndexResult applyIndexOperationOnPrimary(
long version, VersionType versionType, SourceToParse sourceToParse,
long ifSeqNo, long ifPrimaryTerm, long autoGeneratedTimestamp, boolean isRetry
) throws IOException {
assert versionType.validateVersionForWrites(version);
return applyIndexOperation(
getIndexer(),
UNASSIGNED_SEQ_NO, // 시퀀스 번호 아직 없음
getOperationPrimaryTerm(), // 현재 Primary Term
version, versionType,
ifSeqNo, ifPrimaryTerm,
autoGeneratedTimestamp, isRetry,
Engine.Operation.Origin.PRIMARY,
sourceToParse, null
);
}UNASSIGNED_SEQ_NO입니다. Primary는 이 시점에 시퀀스 번호가 없습니다. Engine에서 나중에 생성합니다.
내부의 applyIndexOperation()에서 DocumentMapper.parse()가 JSON을 파싱합니다. 필드 매핑에 따라 분석기(Analyzer)가 텍스트를 토큰화하고, Lucene Document 객체를 만듭니다.
Replica는 다릅니다. Primary가 할당한 시퀀스 번호를 그대로 받아 씁니다.
// IndexShard.java:1194-1217
public Engine.IndexResult applyIndexOperationOnReplica(
String id, long seqNo, long opPrimaryTerm, long version, ...
) throws IOException {
return applyIndexOperation(
getIndexer(), seqNo, opPrimaryTerm, version, null,
UNASSIGNED_SEQ_NO, 0, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse, id
);
}Segment Replication이 켜진 레플리카는 아예 파싱을 건너뜁니다. Primary에서 Lucene 세그먼트를 통째로 복사하니까요.
// IndexShard.java:1236
if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary() == false) {
// 파싱 건너뛰고 최소한의 Engine.Index 생성
}7단계: InternalEngine#
이 파이프라인에서 가장 복잡한 메서드입니다. 버전 충돌 검사, 시퀀스 번호 생성, Lucene 기록, Translog 기록이 전부 여기서 일어납니다.
// InternalEngine.java:891-999
public IndexResult index(Index index) throws IOException {
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
try (
Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
) {
lastWriteNanos = index.startTime();락이 두 개입니다.
readLock은 Engine 레벨 공유 락입니다. flush나 close가 진행 중이 아니면 여러 인덱싱이 동시에 잡을 수 있습니다. "read" lock이란 이름이지만 실제로는 "인덱싱은 동시에 허용하되, flush 같은 배타적 작업은 차단한다"는 의미입니다.
versionMap.acquireLock(uid)는 문서 ID 레벨 락입니다. 같은 문서에 대한 동시 수정만 차단합니다. 서로 다른 문서는 경합이 없습니다. 10000개의 서로 다른 문서를 동시에 인덱싱해도 락 경합이 발생하지 않습니다.
다음은 인덱싱 전략 결정입니다.
final IndexingStrategy plan = indexingStrategyForOperation(index);이 메서드가 VersionMap이나 Lucene을 조회해서 기존 문서 존재 여부를 확인합니다. 결과에 따라 세 가지 중 하나를 고릅니다:
addDocument— 새 문서softUpdateDocument— 기존 문서 교체addStaleOp— 오래된 작업(soft delete로 마킹)
시퀀스 번호 생성은 Primary에서만 일어납니다.
if (index.origin() == Operation.Origin.PRIMARY) {
index = new Index(
index.uid(), index.parsedDoc(),
generateSeqNoForOperationOnPrimary(index),
index.primaryTerm(), index.version(), index.versionType(),
index.origin(), index.startTime(),
index.getAutoGeneratedIdTimestamp(), index.isRetry(),
index.getIfSeqNo(), index.getIfPrimaryTerm()
);
final boolean toAppend = plan.executeOpOnEngine && plan.useUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
}시퀀스 번호는 샤드 내 단조 증가 숫자입니다. 모든 쓰기 작업에 순서를 부여합니다. 레플리카 동기화와 장애 복구에서 "어디까지 처리했는가"를 추적하는 기준이 됩니다.
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary()는 업데이트/삭제 작업의 최대 시퀀스 번호를 기록합니다. Replica에서 이 값을 써서 오래된 작업을 감지합니다.
8단계: Lucene에 기록#
indexIntoLucene()이 Lucene IndexWriter를 직접 호출하는 마지막 지점입니다.
// InternalEngine.java:1046-1092
private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.version >= 0 : "version must be set. got " + plan.version;
index.parsedDoc().updateSeqID(index.seqNo(), index.primaryTerm());
index.parsedDoc().version().setLongValue(plan.version);
try {
if (plan.addStaleOpToEngine) {
addStaleDocs(index.docs(), documentIndexWriter, index.uid());
} else if (plan.useUpdateDocument) {
updateDocs(index.uid(), index.docs(), documentIndexWriter,
plan.version, index.seqNo(), index.primaryTerm());
} else {
assert assertDocDoesNotExist(index, ...);
addDocs(index.docs(), documentIndexWriter, index.uid());
}
return new IndexResult(plan.version, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (documentIndexWriter.getTragicException() == null && treatDocumentFailureAsTragicError(index) == false) {
return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), index.seqNo());
} else {
throw ex;
}
}
}문서에 시퀀스 번호와 버전을 기록한 뒤 Lucene에 넘깁니다. 에러 처리가 두 갈래로 나뉩니다. getTragicException()이 null이면 문서 레벨 에러(매핑 오류 등)라서 해당 문서만 실패 처리합니다. null이 아니면 IndexWriter 자체가 고장난 거라서 Engine을 fail 시킵니다.
세 가지 전략의 실제 Lucene API 호출:
// 새 문서 — IndexWriter.addDocument()
private void addDocs(List<Document> docs, DocumentIndexWriter indexWriter, Term uid) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs, uid);
} else {
indexWriter.addDocument(docs.get(0), uid);
}
numDocAppends.inc(docs.size());
}
// 기존 문서 교체 — soft delete
private void updateDocs(
final Term uid, final List<Document> docs, final DocumentIndexWriter indexWriter,
long version, long seqNo, long primaryTerm
) throws IOException {
if (engineConfig.getIndexSettings().getIndexMetadata().isAppendOnlyIndex()) {
failEngine("Failing shard as update operation is not allowed for append only index", ...);
}
if (docs.size() > 1) {
indexWriter.softUpdateDocuments(uid, docs, version, seqNo, primaryTerm, softDeletesField);
} else {
indexWriter.softUpdateDocument(uid, docs.get(0), version, seqNo, primaryTerm, softDeletesField);
}
numDocUpdates.inc(docs.size());
}
// 오래된 작업 — soft delete로 마킹하여 추가
private void addStaleDocs(List<Document> docs, DocumentIndexWriter indexWriter, Term uid) throws IOException {
for (Document doc : docs) {
doc.add(softDeletesField); // soft-deleted every document before adding to Lucene
}
// ...
}softUpdateDocument()는 기존 문서를 물리적으로 삭제하지 않고 soft-delete 플래그만 설정합니다. 레플리카 동기화에서 "이 문서가 수정되었다"는 히스토리가 필요하기 때문입니다. soft-delete된 문서는 나중에 Lucene 머지 과정에서 정리됩니다.
append-only 인덱스에 update가 들어오면 failEngine()을 호출해서 샤드를 고의로 죽입니다. 데이터 정합성을 깨는 것보다 낫다는 판단입니다.
이 시점에서 문서는 Lucene의 인메모리 버퍼에 있습니다. 디스크에 flush되지 않았습니다.
9단계: Translog#
Lucene 기록 직후에 Translog에도 씁니다.
// InternalEngine.java:972-998 (index() 메서드 계속)
if (index.origin().isFromTranslog() == false) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translogManager.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& indexResult.getFailure() != null) {
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(),
index.origin(), index.startTime(), indexResult.getFailure().toString());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}성공하면 Translog에 인덱스 작업을 기록합니다. 실패했는데 시퀀스 번호가 이미 할당된 경우에는 NoOp(빈 작업)을 기록합니다. 시퀀스 번호에 구멍이 생기면 레플리카 동기화가 꼬지기 때문입니다.
그 다음 VersionMap을 갱신합니다.
if (plan.executeOpOnEngine && indexResult.getResultType() == Result.Type.SUCCESS) {
versionMap.maybePutIndexUnderLock(
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.version, index.seqNo(), index.primaryTerm())
);
}Lucene flush(디스크 commit)는 비용이 커서 주기적으로만 합니다. 기준은 translog 크기입니다. index.translog.flush_threshold_size가 기본 512MB이고, 이 값을 넘으면 IndexShard.shouldPeriodicallyFlush()가 true를 반환하면서 flush가 트리거됩니다. 시간 기반 주기는 없습니다. 고트래픽이면 분 단위로 flush가 돌고, 저트래픽이면 며칠 동안 안 돌 수도 있습니다. 그 사이에 노드가 죽으면 인메모리 버퍼가 날아갑니다. Translog가 이걸 커버합니다. 노드 재시작 시 Translog를 replay해서 Lucene 상태를 복구합니다. DB의 WAL이랑 같은 패턴입니다.
VersionMap은 문서 ID → (버전, 시퀀스 번호, Translog 위치) 매핑을 인메모리에 캐싱합니다. 다음에 같은 문서가 인덱싱되면 Lucene을 조회하지 않고도 버전 충돌을 감지할 수 있습니다. 대신 힙 메모리를 먹습니다. 문서 수가 많고 업데이트가 잦으면 GC 압력이 올라갑니다.
마지막으로 로컬 체크포인트를 갱신합니다.
localCheckpointTracker.markSeqNoAsProcessed(index.seqNo());이 체크포인트가 "이 샤드에서 시퀀스 번호 N까지 처리 완료"를 의미합니다. 레플리카 동기화의 기준점입니다.
10단계: 레플리카 복제#
Primary 성공 후 ReplicationOperation이 in-sync 레플리카에 전파합니다.
Replica의 InternalEngine.index()는 같은 코드를 타지만, 시퀀스 번호를 생성하지 않고 Primary가 준 걸 씁니다. 모든 레플리카에서 작업 순서가 일치합니다.
Primary에서 실패한 작업은 Replica에서 markSeqNoAsNoop()으로 처리됩니다. 시퀀스 번호 연속성을 유지하면서 실제 작업은 건너뜁니다.
wait_for_active_shards 설정에 따라 응답 시점이 달라집니다. 기본값은 1(Primary만 성공하면 응답). all로 설정하면 모든 레플리카 ACK를 기다립니다.
11단계: 응답#
{
"_index": "my-index",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": { "total": 2, "successful": 1, "failed": 0 },
"_seq_no": 0,
"_primary_term": 1
}| 필드 | 생성 위치 |
|---|---|
_version | InternalEngine의 IndexingStrategy에서 할당 |
_seq_no | InternalEngine의 generateSeqNoForOperationOnPrimary() |
_primary_term | 클러스터 매니저 선출 시 결정된 term |
_shards.total: 2, successful: 1 | 레플리카 설정은 1이지만 단일 노드라 할당 불가 |
_shards.successful이 1인데 _shards.total이 2인 건 레플리카가 할당되지 않아서입니다. 단일 노드 환경에서는 정상이고, 인덱스 상태가 yellow로 표시됩니다.
운영에서 알아두면 좋은 것들#
단건보다 Bulk을 쓰세요. 내부적으로 Bulk 경로를 타긴 하지만, 외부에서 Bulk로 보내면 HTTP 오버헤드와 샤드 그룹화 비용이 줄어듭니다. 같은 샤드로 가는 요청을 묶어서 보내면 네트워크 라운드트립 횟수가 줍니다.
Translog durability 설정이 성능과 내구성 트레이드오프입니다. index.translog.durability의 기본값 request는 매 요청마다 fsync합니다. async로 바꾸면 빨라지지만 최대 sync_interval(기본 5초)만큼 유실될 수 있습니다.
refresh와 flush는 다릅니다. refresh(기본 1초)는 Lucene 인메모리 버퍼를 검색 가능한 세그먼트로 만드는 것이고, flush는 Translog를 비우고 Lucene commit을 수행하는 것입니다. flush는 시간 기반이 아니라 translog 크기 기반 (기본 index.translog.flush_threshold_size 512MB). "인덱싱 후 바로 검색이 안 된다"는 건 refresh 간격 때문입니다.
인덱싱 reject가 나면 IndexingPressureService의 메모리 임계값부터 보세요. 힙의 10%가 기본입니다. WRITE 스레드 풀 큐 크기(thread_pool.write.queue_size, 기본 10000)도 확인.
버전 충돌은 if_seq_no와 if_primary_term으로 낙관적 동시성 제어를 걸 수 있습니다. InternalEngine에서 VersionMap이나 Lucene을 조회해서 충돌을 감지합니다.
느낀 점#
curl 한 줄이 12개 클래스를 거쳐 Lucene에 닿습니다. 이 중 대부분은 Lucene이 아니라 Lucene을 여러 노드에서 안전하게 돌리기 위한 코드입니다. 라우팅, 복제, Translog, 배압 관리, 시퀀스 번호. 분산 시스템의 복잡도가 어디에 있는지 소스코드가 보여줍니다.
분석은 OpenSearch 3.7.0-SNAPSHOT (commit e1717ed, Lucene 10.4.0) 기준입니다.