OpenSearch 검색 파이프라인 소스코드 분석
코드를 읽다가 눈에 걸린 것#
OpenSearch 인덱싱 파이프라인을 파고 나서 검색 쪽을 보기 시작했습니다. TransportSearchAction부터 순서대로 따라가는데, SearchQueryThenFetchAsyncAction에서 이상한 코드가 나왔습니다.
// SearchQueryThenFetchAsyncAction.java:172
private ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
if (bottomSortCollector == null) {
return request;
}
// disable tracking total hits if we already reached the required estimation.
if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE && bottomSortCollector.getTotalHits() > trackTotalHitsUpTo) {
request.source(request.source().shallowCopy().trackTotalHits(false));
}
// set the current best bottom field doc
if (bottomSortCollector.getBottomSortValues() != null) {
request.setBottomSortValues(bottomSortCollector.getBottomSortValues());
}
return request;
}샤드에 쿼리를 보내기 직전에 요청을 다시 쓰고 있었습니다. bottomSortCollector에서 뭔가 가져와서 요청에 심는데, 이게 뭔지 궁금해졌습니다.
BottomSortValuesCollector#
이 이름이 말하는 건 "가장 바닥에 있는 정렬값"입니다. 무슨 바닥인지 봐야 했습니다.
// BottomSortValuesCollector.java:79
synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
totalHits += topDocs.totalHits.value();
if (validateShardSortFields(topDocs.fields) == false) {
return;
}
FieldDoc shardBottomDoc = extractBottom(topDocs);
if (shardBottomDoc == null) {
return;
}
if (bottomSortValues == null || compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
}
}extractBottom()은 샤드 응답의 TopDocs에서 가장 점수 낮은 문서를 꺼냅니다. 정렬 기준으로 보면 맨 끝에 있는 문서입니다.
// BottomSortValuesCollector.java:112
private FieldDoc extractBottom(TopFieldDocs topDocs) {
return topNSize > 0 && topDocs.scoreDocs.length == topNSize ? (FieldDoc) topDocs.scoreDocs[topNSize - 1] : null;
}샤드가 Top N을 보내왔으면 N번째(마지막) 문서의 정렬값을 뽑습니다. 그리고 여러 샤드에서 온 이 값들 중 가장 좋은 것을 bottomSortValues로 유지합니다. 더 나은 값이 오면 교체.
여기까지만 봐서는 "왜 이걸 추적하지?"가 풀리지 않았습니다. 위에서 본 rewriteShardSearchRequest로 돌아가야 했습니다.
샤드가 샤드한테 힌트를 준다#
코드를 다시 보면 순서가 이렇습니다.
- 코디네이팅 노드가 모든 샤드에 Query Phase 요청을 뿌린다
- 먼저 도착한 샤드 응답에서
bottomSortValues를 추출 - 아직 요청이 나가지 않은 샤드의 요청에 이 값을 심는다 (
setBottomSortValues) - 그 샤드는 "이 값보다 나쁜 점수는 볼 필요 없다"는 힌트를 가지고 Lucene 쿼리를 실행
즉, 샤드들이 순차적으로 서로 협력합니다. 첫 번째 샤드가 "내가 본 가장 나쁜 점수는 7.5야"라고 하면, 두 번째 샤드는 "그럼 난 7.5보다 낮은 건 스킵"하는 식입니다.
샤드가 많을수록, 뒤쪽 샤드는 더 많은 가지치기를 합니다. synchronized로 보호된 이유가 여기 있습니다. 여러 샤드 응답이 동시에 도착할 수 있고, 그 와중에도 다음 요청은 계속 나가니까요.
trackTotalHits도 같은 원리입니다. 추정 한도에 도달했으면 나머지 샤드한테는 "전체 카운트는 안 해도 된다"고 알려줍니다. Lucene이 일찍 종료할 수 있습니다.
이게 분산 검색 전체에 어떻게 맞물리는가#
이 최적화가 왜 나왔는지는 Query-Then-Fetch 전체를 봐야 이해됩니다.
인덱싱과 다르게 검색은 어느 샤드에 무슨 문서가 있는지 모릅니다. 문서 ID 해시로 타겟이 정해지는 게 아니니까요. 모든 샤드에 쿼리를 던져야 합니다.
근데 나이브하게 하면 망합니다.
샤드 5개 × 각 샤드 상위 100개 = 500개를 코디네이팅 노드로 끌고 와서 정렬?
10억 건에서 상위 10개 뽑겠다고 이렇게 하면 네트워크가 터집니다. 그래서 Query-Then-Fetch입니다.
Query Phase는 문서 ID와 점수만 받습니다. 가볍습니다. Fetch Phase에서 글로벌 정렬 후 진짜 상위 K개가 어느 샤드에 있는지 정해지면, 그 샤드에만 _source를 요청합니다.
BottomSortValuesCollector는 Query Phase 안에서 동작하는 최적화입니다. Phase 자체가 이미 가벼운데, 그 안에서도 더 가볍게 만드는 겁니다.
REST에서 Lucene까지#
검색 요청이 들어온 순서대로 봤습니다.
RestSearchAction#
// RestSearchAction.java:134-171
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
);
return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
};
}parseSearchRequest()에서 Query DSL JSON이 MatchQueryBuilder 같은 객체 트리로 바뀝니다. RestCancellableNodeClient로 감싸는 이유는 HTTP 연결이 끊기면 검색 작업도 같이 취소하기 위해서입니다.
TransportSearchAction#
// TransportSearchAction.java:321
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
if (task instanceof CancellableTask) {
listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(
client, (CancellableTask) task,
clusterService.getClusterSettings().get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING),
listener, e -> {}
);
}
executeRequest(task, searchRequest, this::searchAsyncAction, searchStatusStatsUpdateListener);
}executeRequest() 안에서 GroupShardsIterator<SearchShardIterator>를 만듭니다. 각 SearchShardIterator는 "이 샤드의 레플리카들 중 하나에 요청한다"는 의미입니다. 읽기니까 Primary든 Replica든 상관없습니다. preference 파라미터로 특정 레플리카를 고정하면 OS 페이지 캐시 히트율이 올라갑니다.
SearchType에 따라 분기됩니다.
switch (searchRequest.searchType()) {
case QUERY_THEN_FETCH:
return new SearchQueryThenFetchAsyncAction(...);
case DFS_QUERY_THEN_FETCH:
return new SearchDfsQueryThenFetchAsyncAction(...);
}DFS는 앞에 단계가 하나 더 있습니다. 각 샤드의 TF-IDF 통계가 달라서 스코어가 편향되는 문제를 막으려고, 먼저 분산 용어 빈도를 수집한 뒤 검색합니다. 라운드트립이 한 번 더 붙습니다.
샤드 병렬 요청#
// SearchQueryThenFetchAsyncAction.java:128
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener
) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
if (request != null) {
request.setInboundNetworkTime(System.currentTimeMillis());
}
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}처음에 본 그 rewriteShardSearchRequest가 여기서 호출됩니다. 샤드에 요청을 보내기 직전마다. 앞의 샤드에서 쌓인 bottomSortValues가 뒤의 샤드 요청에 주입되는 지점입니다.
데이터 노드 SearchService#
// SearchService.java:867
private SearchPhaseResult executeQueryPhase(
ShardSearchRequest request, SearchShardTask task,
boolean keepStatesInContext, boolean isStreamSearch,
ActionListener<SearchPhaseResult> listener
) throws Exception {
final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
try (
Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
SearchContext context = createContext(readerContext, request, task, true, isStreamSearch)
) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
}
afterQueryTime = executor.success();
}
if (request.numberOfShards() == 1) {
return executeFetchPhase(readerContext, context, afterQueryTime);
} else {
return context.queryResult();
}
}
}ReaderContext는 Lucene IndexReader를 감싸는 객체입니다. 일반 검색은 요청마다 새로 만들고 끝나면 해제합니다. 스크롤이나 PIT는 이걸 유지해서 같은 스냅샷을 여러 요청이 재사용합니다.
샤드가 1개면 Query Phase와 Fetch Phase를 합쳐서 한 번에 실행합니다. 네트워크 라운드트립이 필요 없으니까요.
QueryBuilder에서 Lucene Query로#
loadOrExecuteQueryPhase() 안에서 QueryBuilder가 Lucene Query로 변환됩니다. MatchQueryBuilder.doToQuery()를 보면:
MatchQuery matchQuery = new MatchQuery(context);
matchQuery.setOccur(operator);
matchQuery.setAnalyzer(analyzer);
// MatchQuery.parse():
// "hello" → Analyzer → Term("title:hello")
// 필드 타입별:
// Text → BooleanQuery { SHOULD: TermQuery(...) }
// Keyword → TermQuery
// PHRASE → PhraseQuery{"match": {"title": "hello world"}}는 이렇게 바뀝니다.
BooleanQuery {
SHOULD: TermQuery("title:hello")
SHOULD: TermQuery("title:world")
}
title이 Text 타입이면 Analyzer가 토큰화하고 각 토큰이 TermQuery가 됩니다. Keyword 타입이면 토큰화 없이 전체 문자열 그대로. 그래서 Text 필드와 Keyword 필드 검색 결과가 다릅니다.
QueryPhase.execute#
// QueryPhase.java:136
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
if (searchContext.hasOnlySuggest()) {
// suggest only 분기
return;
}
final AggregationProcessor aggregationProcessor = queryPhaseSearcher.aggregationProcessor(searchContext);
aggregationProcessor.preProcess(searchContext);
boolean rescore = executeInternal(searchContext, queryPhaseSearcher);
if (rescore) {
rescoreProcessor.process(searchContext);
}
}executeInternal() 안에서 ContextIndexSearcher.search(query, collector)가 호출됩니다. 모든 세그먼트를 순회하며 TermQuery 매칭, BM25 스코어링, Collector로 TopDocs 수집. Aggregation이 있으면 같은 순회에서 계산합니다.
결과는 QuerySearchResult. 문서 ID와 점수만 담겨있습니다.
onShardResult와 bottomSortCollector의 갱신#
샤드에서 결과가 돌아올 때마다 onShardResult가 호출되고, 여기서 bottomSortCollector가 갱신됩니다.
// SearchQueryThenFetchAsyncAction.java:147
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
QuerySearchResult queryResult = result.queryResult();
if (queryResult.isNull() == false
&& getRequest().scroll() == null
&& queryResult.topDocs() != null
&& queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
if (bottomSortCollector == null) {
synchronized (this) {
if (bottomSortCollector == null) {
bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
}
}
}
bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
}
super.onShardResult(result, shardIt);
}조건이 눈에 들어옵니다.
getRequest().scroll() == null— 스크롤이면 이 최적화 안 씁니다. 샤드별 상태를 따로 추적하니까요.topDocs.getClass() == TopFieldDocs.class— Sort 기반 쿼리에서만 작동합니다.TopFieldDocs는 정렬 필드가 있을 때만 나옵니다.
즉, _score로만 정렬하는 일반 full-text 검색은 이 혜택을 못 받습니다. 명시적 sort가 있어야 합니다.
Collector 초기화가 double-checked locking 패턴입니다. 여러 샤드 응답이 동시에 처음 도착할 수 있으니까요.
Fetch Phase#
모든 Query 응답이 모이면 FetchSearchPhase로 넘어갑니다.
// SearchQueryThenFetchAsyncAction.java:168
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, null, this);
}글로벌 Top K를 뽑고 샤드별로 그룹화해서 Fetch 요청을 보냅니다.
샤드 A: Top 100 (id=1, id=5, id=9, ...)
샤드 B: Top 100 (id=3, id=7, ...)
샤드 C: Top 100 (id=2, id=4, ...)
↓ 글로벌 정렬 → Top 10
상위 10개: id=5(A), id=3(B), id=2(C), id=7(B), id=1(A), ...
↓ 샤드별 그룹화
샤드 A: [id=5, id=1, ...] → Fetch
샤드 B: [id=3, id=7, ...] → Fetch
샤드 C: [id=2, ...] → Fetch
SearchService.executeFetchPhase() → FetchPhase.execute()가 Lucene StoredFields를 읽어서 _source를 복원합니다. 하이라이팅, 스크립트 필드, docvalue_fields도 여기서 처리됩니다.
응답#
모든 Fetch 결과가 코디네이팅 노드에 모이면 SearchPhaseController가 최종 응답을 만듭니다. Aggregation 결과도 여기서 리듀스됩니다. 각 샤드의 중간 집계값을 합쳐서 최종 버킷을 계산합니다.
{
"took": 5,
"hits": {
"total": { "value": 1, "relation": "eq" },
"max_score": 0.2876821,
"hits": [
{ "_index": "my-index", "_id": "1", "_score": 0.2876821,
"_source": { "title": "hello world" } }
]
}
}운영 관점#
명시적 sort가 있는 쿼리는 BottomSortValuesCollector의 혜택을 봅니다. _score 정렬만으로는 안 됩니다. 정렬 필드를 쓸 수 있다면 쓰는 게 샤드 수가 많을 때 성능에 유리합니다.
track_total_hits를 끄면 빨라집니다. 같은 코드 경로에서 같이 최적화됩니다. 10000까지만 카운트하고 멈추는 기본 설정은 이 최적화를 쓰기 위한 장치입니다. 정확한 전체 개수가 필요 없으면 false로.
from + size가 커지면 힙이 터집니다. 깊은 페이지네이션은 Scroll이나 PIT + search_after. 기본 상한 index.max_result_window가 10000.
preference로 샤드 라우팅을 고정하세요. 같은 사용자 세션의 연속 쿼리가 같은 레플리카로 가면 OS 페이지 캐시 히트율이 올라갑니다. 기본은 라운드로빈.
Aggregation은 Query Phase에서 계산됩니다. Fetch가 아닙니다. _source를 안 쓰더라도 무거운 집계는 각 샤드에서 전체 문서를 스캔할 수 있습니다. 필요하면 pipeline aggregation이나 transforms로 인덱싱 단계에서 precompute.
DFS_QUERY_THEN_FETCH는 기본이 아닙니다. 샤드별 TF-IDF 편향이 문제될 때만 씁니다. 라운드트립이 한 번 더 붙어서 느립니다.
뭘 배웠나#
처음에는 Query-Then-Fetch가 왜 2단계인지가 궁금했습니다. 검색 결과를 네트워크 효율적으로 가져오는 프로토콜 설명은 문서에도 있습니다.
소스를 보면서 알게 된 건 2단계 안에서도 더 줄일 게 있다는 점이었습니다. 앞 샤드의 응답으로 뒷 샤드의 요청을 동적으로 고쳐 쓰는 방식. 문서로는 잘 안 나오는 디테일이었습니다. 이런 건 소스를 직접 안 보면 모릅니다.
분석은 OpenSearch 3.7.0-SNAPSHOT (commit e1717ed, Lucene 10.4.0) 기준입니다.