dev notes

OpenSearch 검색 파이프라인 소스코드 분석

2026-04-1016 min read
공유

코드를 읽다가 눈에 걸린 것#

OpenSearch 인덱싱 파이프라인을 파고 나서 검색 쪽을 보기 시작했습니다. TransportSearchAction부터 순서대로 따라가는데, SearchQueryThenFetchAsyncAction에서 이상한 코드가 나왔습니다.

java
// SearchQueryThenFetchAsyncAction.java:172
private ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
    if (bottomSortCollector == null) {
        return request;
    }
 
    // disable tracking total hits if we already reached the required estimation.
    if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE && bottomSortCollector.getTotalHits() > trackTotalHitsUpTo) {
        request.source(request.source().shallowCopy().trackTotalHits(false));
    }
 
    // set the current best bottom field doc
    if (bottomSortCollector.getBottomSortValues() != null) {
        request.setBottomSortValues(bottomSortCollector.getBottomSortValues());
    }
    return request;
}

샤드에 쿼리를 보내기 직전에 요청을 다시 쓰고 있었습니다. bottomSortCollector에서 뭔가 가져와서 요청에 심는데, 이게 뭔지 궁금해졌습니다.

BottomSortValuesCollector#

이 이름이 말하는 건 "가장 바닥에 있는 정렬값"입니다. 무슨 바닥인지 봐야 했습니다.

java
// BottomSortValuesCollector.java:79
synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
    totalHits += topDocs.totalHits.value();
    if (validateShardSortFields(topDocs.fields) == false) {
        return;
    }
 
    FieldDoc shardBottomDoc = extractBottom(topDocs);
    if (shardBottomDoc == null) {
        return;
    }
    if (bottomSortValues == null || compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
        bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
    }
}

extractBottom()은 샤드 응답의 TopDocs에서 가장 점수 낮은 문서를 꺼냅니다. 정렬 기준으로 보면 맨 끝에 있는 문서입니다.

java
// BottomSortValuesCollector.java:112
private FieldDoc extractBottom(TopFieldDocs topDocs) {
    return topNSize > 0 && topDocs.scoreDocs.length == topNSize ? (FieldDoc) topDocs.scoreDocs[topNSize - 1] : null;
}

샤드가 Top N을 보내왔으면 N번째(마지막) 문서의 정렬값을 뽑습니다. 그리고 여러 샤드에서 온 이 값들 중 가장 좋은 것bottomSortValues로 유지합니다. 더 나은 값이 오면 교체.

여기까지만 봐서는 "왜 이걸 추적하지?"가 풀리지 않았습니다. 위에서 본 rewriteShardSearchRequest로 돌아가야 했습니다.

샤드가 샤드한테 힌트를 준다#

코드를 다시 보면 순서가 이렇습니다.

  1. 코디네이팅 노드가 모든 샤드에 Query Phase 요청을 뿌린다
  2. 먼저 도착한 샤드 응답에서 bottomSortValues를 추출
  3. 아직 요청이 나가지 않은 샤드의 요청에 이 값을 심는다 (setBottomSortValues)
  4. 그 샤드는 "이 값보다 나쁜 점수는 볼 필요 없다"는 힌트를 가지고 Lucene 쿼리를 실행

즉, 샤드들이 순차적으로 서로 협력합니다. 첫 번째 샤드가 "내가 본 가장 나쁜 점수는 7.5야"라고 하면, 두 번째 샤드는 "그럼 난 7.5보다 낮은 건 스킵"하는 식입니다.

Loading diagram...

샤드가 많을수록, 뒤쪽 샤드는 더 많은 가지치기를 합니다. synchronized로 보호된 이유가 여기 있습니다. 여러 샤드 응답이 동시에 도착할 수 있고, 그 와중에도 다음 요청은 계속 나가니까요.

trackTotalHits도 같은 원리입니다. 추정 한도에 도달했으면 나머지 샤드한테는 "전체 카운트는 안 해도 된다"고 알려줍니다. Lucene이 일찍 종료할 수 있습니다.

이게 분산 검색 전체에 어떻게 맞물리는가#

이 최적화가 왜 나왔는지는 Query-Then-Fetch 전체를 봐야 이해됩니다.

인덱싱과 다르게 검색은 어느 샤드에 무슨 문서가 있는지 모릅니다. 문서 ID 해시로 타겟이 정해지는 게 아니니까요. 모든 샤드에 쿼리를 던져야 합니다.

근데 나이브하게 하면 망합니다.

샤드 5개 × 각 샤드 상위 100개 = 500개를 코디네이팅 노드로 끌고 와서 정렬?

10억 건에서 상위 10개 뽑겠다고 이렇게 하면 네트워크가 터집니다. 그래서 Query-Then-Fetch입니다.

Loading diagram...

Query Phase는 문서 ID와 점수만 받습니다. 가볍습니다. Fetch Phase에서 글로벌 정렬 후 진짜 상위 K개가 어느 샤드에 있는지 정해지면, 그 샤드에만 _source를 요청합니다.

BottomSortValuesCollector는 Query Phase 안에서 동작하는 최적화입니다. Phase 자체가 이미 가벼운데, 그 안에서도 더 가볍게 만드는 겁니다.

REST에서 Lucene까지#

검색 요청이 들어온 순서대로 봤습니다.

RestSearchAction#

java
// RestSearchAction.java:134-171
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    SearchRequest searchRequest = new SearchRequest();
    IntConsumer setSize = size -> searchRequest.source().size(size);
    request.withContentOrSourceParamParserOrNull(
        parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
    );
 
    return channel -> {
        RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
        cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
    };
}

parseSearchRequest()에서 Query DSL JSON이 MatchQueryBuilder 같은 객체 트리로 바뀝니다. RestCancellableNodeClient로 감싸는 이유는 HTTP 연결이 끊기면 검색 작업도 같이 취소하기 위해서입니다.

TransportSearchAction#

java
// TransportSearchAction.java:321
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
    if (task instanceof CancellableTask) {
        listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(
            client, (CancellableTask) task,
            clusterService.getClusterSettings().get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING),
            listener, e -> {}
        );
    }
    executeRequest(task, searchRequest, this::searchAsyncAction, searchStatusStatsUpdateListener);
}

executeRequest() 안에서 GroupShardsIterator<SearchShardIterator>를 만듭니다. 각 SearchShardIterator는 "이 샤드의 레플리카들 중 하나에 요청한다"는 의미입니다. 읽기니까 Primary든 Replica든 상관없습니다. preference 파라미터로 특정 레플리카를 고정하면 OS 페이지 캐시 히트율이 올라갑니다.

SearchType에 따라 분기됩니다.

java
switch (searchRequest.searchType()) {
    case QUERY_THEN_FETCH:
        return new SearchQueryThenFetchAsyncAction(...);
    case DFS_QUERY_THEN_FETCH:
        return new SearchDfsQueryThenFetchAsyncAction(...);
}

DFS는 앞에 단계가 하나 더 있습니다. 각 샤드의 TF-IDF 통계가 달라서 스코어가 편향되는 문제를 막으려고, 먼저 분산 용어 빈도를 수집한 뒤 검색합니다. 라운드트립이 한 번 더 붙습니다.

샤드 병렬 요청#

java
// SearchQueryThenFetchAsyncAction.java:128
protected void executePhaseOnShard(
    final SearchShardIterator shardIt,
    final SearchShardTarget shard,
    final SearchActionListener<SearchPhaseResult> listener
) {
    ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
    if (request != null) {
        request.setInboundNetworkTime(System.currentTimeMillis());
    }
    getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}

처음에 본 그 rewriteShardSearchRequest가 여기서 호출됩니다. 샤드에 요청을 보내기 직전마다. 앞의 샤드에서 쌓인 bottomSortValues가 뒤의 샤드 요청에 주입되는 지점입니다.

데이터 노드 SearchService#

java
// SearchService.java:867
private SearchPhaseResult executeQueryPhase(
    ShardSearchRequest request, SearchShardTask task,
    boolean keepStatesInContext, boolean isStreamSearch,
    ActionListener<SearchPhaseResult> listener
) throws Exception {
    final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
    try (
        Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
        SearchContext context = createContext(readerContext, request, task, true, isStreamSearch)
    ) {
        final long afterQueryTime;
        try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
            loadOrExecuteQueryPhase(request, context);
            if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
                freeReaderContext(readerContext.id());
            }
            afterQueryTime = executor.success();
        }
        if (request.numberOfShards() == 1) {
            return executeFetchPhase(readerContext, context, afterQueryTime);
        } else {
            return context.queryResult();
        }
    }
}

ReaderContext는 Lucene IndexReader를 감싸는 객체입니다. 일반 검색은 요청마다 새로 만들고 끝나면 해제합니다. 스크롤이나 PIT는 이걸 유지해서 같은 스냅샷을 여러 요청이 재사용합니다.

샤드가 1개면 Query Phase와 Fetch Phase를 합쳐서 한 번에 실행합니다. 네트워크 라운드트립이 필요 없으니까요.

QueryBuilder에서 Lucene Query로#

loadOrExecuteQueryPhase() 안에서 QueryBuilder가 Lucene Query로 변환됩니다. MatchQueryBuilder.doToQuery()를 보면:

java
MatchQuery matchQuery = new MatchQuery(context);
matchQuery.setOccur(operator);
matchQuery.setAnalyzer(analyzer);
 
// MatchQuery.parse():
// "hello" → Analyzer → Term("title:hello")
// 필드 타입별:
//   Text    → BooleanQuery { SHOULD: TermQuery(...) }
//   Keyword → TermQuery
//   PHRASE  → PhraseQuery

{"match": {"title": "hello world"}}는 이렇게 바뀝니다.

BooleanQuery {
  SHOULD: TermQuery("title:hello")
  SHOULD: TermQuery("title:world")
}

title이 Text 타입이면 Analyzer가 토큰화하고 각 토큰이 TermQuery가 됩니다. Keyword 타입이면 토큰화 없이 전체 문자열 그대로. 그래서 Text 필드와 Keyword 필드 검색 결과가 다릅니다.

QueryPhase.execute#

java
// QueryPhase.java:136
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
    if (searchContext.hasOnlySuggest()) {
        // suggest only 분기
        return;
    }
 
    final AggregationProcessor aggregationProcessor = queryPhaseSearcher.aggregationProcessor(searchContext);
    aggregationProcessor.preProcess(searchContext);
    boolean rescore = executeInternal(searchContext, queryPhaseSearcher);
 
    if (rescore) {
        rescoreProcessor.process(searchContext);
    }
}

executeInternal() 안에서 ContextIndexSearcher.search(query, collector)가 호출됩니다. 모든 세그먼트를 순회하며 TermQuery 매칭, BM25 스코어링, Collector로 TopDocs 수집. Aggregation이 있으면 같은 순회에서 계산합니다.

결과는 QuerySearchResult. 문서 ID와 점수만 담겨있습니다.

onShardResult와 bottomSortCollector의 갱신#

샤드에서 결과가 돌아올 때마다 onShardResult가 호출되고, 여기서 bottomSortCollector가 갱신됩니다.

java
// SearchQueryThenFetchAsyncAction.java:147
protected void onShardResult(SearchPhaseResult result, SearchShardIterator shardIt) {
    QuerySearchResult queryResult = result.queryResult();
    if (queryResult.isNull() == false
        && getRequest().scroll() == null
        && queryResult.topDocs() != null
        && queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
        TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;
        if (bottomSortCollector == null) {
            synchronized (this) {
                if (bottomSortCollector == null) {
                    bottomSortCollector = new BottomSortValuesCollector(topDocsSize, topDocs.fields);
                }
            }
        }
        bottomSortCollector.consumeTopDocs(topDocs, queryResult.sortValueFormats());
    }
    super.onShardResult(result, shardIt);
}

조건이 눈에 들어옵니다.

  • getRequest().scroll() == null — 스크롤이면 이 최적화 안 씁니다. 샤드별 상태를 따로 추적하니까요.
  • topDocs.getClass() == TopFieldDocs.classSort 기반 쿼리에서만 작동합니다. TopFieldDocs는 정렬 필드가 있을 때만 나옵니다.

즉, _score로만 정렬하는 일반 full-text 검색은 이 혜택을 못 받습니다. 명시적 sort가 있어야 합니다.

Collector 초기화가 double-checked locking 패턴입니다. 여러 샤드 응답이 동시에 처음 도착할 수 있으니까요.

Fetch Phase#

모든 Query 응답이 모이면 FetchSearchPhase로 넘어갑니다.

java
// SearchQueryThenFetchAsyncAction.java:168
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
    return new FetchSearchPhase(results, searchPhaseController, null, this);
}

글로벌 Top K를 뽑고 샤드별로 그룹화해서 Fetch 요청을 보냅니다.

샤드 A: Top 100 (id=1, id=5, id=9, ...)
샤드 B: Top 100 (id=3, id=7, ...)
샤드 C: Top 100 (id=2, id=4, ...)

  ↓ 글로벌 정렬 → Top 10

상위 10개: id=5(A), id=3(B), id=2(C), id=7(B), id=1(A), ...

  ↓ 샤드별 그룹화

샤드 A: [id=5, id=1, ...]  → Fetch
샤드 B: [id=3, id=7, ...]  → Fetch
샤드 C: [id=2, ...]        → Fetch

SearchService.executeFetchPhase()FetchPhase.execute()가 Lucene StoredFields를 읽어서 _source를 복원합니다. 하이라이팅, 스크립트 필드, docvalue_fields도 여기서 처리됩니다.

응답#

모든 Fetch 결과가 코디네이팅 노드에 모이면 SearchPhaseController가 최종 응답을 만듭니다. Aggregation 결과도 여기서 리듀스됩니다. 각 샤드의 중간 집계값을 합쳐서 최종 버킷을 계산합니다.

json
{
  "took": 5,
  "hits": {
    "total": { "value": 1, "relation": "eq" },
    "max_score": 0.2876821,
    "hits": [
      { "_index": "my-index", "_id": "1", "_score": 0.2876821,
        "_source": { "title": "hello world" } }
    ]
  }
}

운영 관점#

명시적 sort가 있는 쿼리는 BottomSortValuesCollector의 혜택을 봅니다. _score 정렬만으로는 안 됩니다. 정렬 필드를 쓸 수 있다면 쓰는 게 샤드 수가 많을 때 성능에 유리합니다.

track_total_hits를 끄면 빨라집니다. 같은 코드 경로에서 같이 최적화됩니다. 10000까지만 카운트하고 멈추는 기본 설정은 이 최적화를 쓰기 위한 장치입니다. 정확한 전체 개수가 필요 없으면 false로.

from + size가 커지면 힙이 터집니다. 깊은 페이지네이션은 Scroll이나 PIT + search_after. 기본 상한 index.max_result_window가 10000.

preference로 샤드 라우팅을 고정하세요. 같은 사용자 세션의 연속 쿼리가 같은 레플리카로 가면 OS 페이지 캐시 히트율이 올라갑니다. 기본은 라운드로빈.

Aggregation은 Query Phase에서 계산됩니다. Fetch가 아닙니다. _source를 안 쓰더라도 무거운 집계는 각 샤드에서 전체 문서를 스캔할 수 있습니다. 필요하면 pipeline aggregation이나 transforms로 인덱싱 단계에서 precompute.

DFS_QUERY_THEN_FETCH는 기본이 아닙니다. 샤드별 TF-IDF 편향이 문제될 때만 씁니다. 라운드트립이 한 번 더 붙어서 느립니다.

뭘 배웠나#

처음에는 Query-Then-Fetch가 왜 2단계인지가 궁금했습니다. 검색 결과를 네트워크 효율적으로 가져오는 프로토콜 설명은 문서에도 있습니다.

소스를 보면서 알게 된 건 2단계 안에서도 더 줄일 게 있다는 점이었습니다. 앞 샤드의 응답으로 뒷 샤드의 요청을 동적으로 고쳐 쓰는 방식. 문서로는 잘 안 나오는 디테일이었습니다. 이런 건 소스를 직접 안 보면 모릅니다.

분석은 OpenSearch 3.7.0-SNAPSHOT (commit e1717ed, Lucene 10.4.0) 기준입니다.

Connected Notes