dev notes

7+1개 에이전트가 하나의 SQL을 만드는 과정 [2]

2026-01-1521 min read
공유

왜 에이전트를 나눴나#

1편에서 14개 기업 사례를 보면서도 처음엔 솔직히 좀 과하다고 생각했다. SQL 하나 만드는 데 에이전트를 그렇게 많이 붙여야 하나 싶었다. 프롬프트 하나에 "이 스키마 보고 SQL 만들어"라고 던지면 되는 거 아닌가 하는 쪽이었다.

근데 직접 붙여보니 왜 다들 그 구조로 가는지 바로 보였다.

테이블 10개 정도까지는 한 번에 넣어도 그럭저럭 된다. 그런데 50개가 넘어가면 프롬프트가 수만 토큰이 되면서 LLM이 엉뚱한 테이블을 참조하기 시작한다. 게다가 의도 파악, 테이블 선택, SQL 생성, 검증을 한 번에 시키면 틀렸을 때 어디서 깨졌는지도 안 보인다.

에이전트를 나누고 나니 두 가지가 풀렸다:

  1. 각 단계의 입력이 작아져서 정확도가 올라갔다
  2. 어디서 틀렸는지 바로 보여서 디버깅이 쉬워졌다

전체 파이프라인#

구조는 LangGraph 기반으로 잡았습니다. 각 에이전트가 노드이고, 공유 상태(GraphState)를 통해 데이터를 주고받습니다.

사용자 질문
    ↓
[Tenant Resolve] → API Key로 테넌트 식별
    ↓
[Intent Agent] → 의도 분류 + 복잡도 판단
    ↓ (SQL_QUERY가 아니면 여기서 빠짐)
[Cache Check] → 동일 질문 캐시 히트 시 바로 반환
    ↓ (Miss)
[Table Agent] → Vector Search → Rerank → 관련 테이블 선별
    ↓
[Column Prune Agent] → 불필요한 컬럼 제거 (COMPLEX만)
    ↓
[Context Assembly] → 샘플 쿼리 + 대화 히스토리 + 용어 사전 조합
    ↓
[Schema Linking] → 테이블 간 관계(FK) 파악
    ↓
[SQL Agent] → SQL 생성 (Chain-of-Thought)
    ↓
[Guardrail] → 위험 구문 필터링
    ↓
[Validation + Self-Correction] → 검증 + 자동 수정 (최대 3회)
    ↓
[Executor] → READ-ONLY 실행 (30초 타임아웃)
    ↓
[Result Agent] → 결과 요약 + 인사이트 + 후속 질문 제안

7개 코어 에이전트 + 1개 선택적 에이전트(Decomposer) 구조입니다. 처음에 블로그 초안을 쓸 때는 12개라고 적어놨는데, 다시 보니 cache_check나 tenant_resolve 같은 non-LLM 노드까지 같이 세고 있었습니다. 실제로 LLM을 호출하는 쪽은 7+1개입니다.

Understand: 의도를 먼저 파악#

Intent Agent#

모든 질문이 SQL로 바꿀 수 있는 건 아닙니다. Intent Agent가 세 가지로 분류합니다:

  • SQL_QUERY: 데이터 조회 질문 → 파이프라인 진행
  • CLARIFICATION_NEEDED: 모호한 질문 → 사용자에게 되물음
  • GENERAL: 일반 대화 → SQL 파이프라인 태우지 않고 직접 응답

여기서 복잡도(SIMPLE / COMPLEX)도 같이 판단합니다. SIMPLE이면 Column Pruning을 건너뛰어서 토큰과 시간을 아낍니다.

Intent Agent에는 비용이 작은 모델(Haiku 4.5)을 씁니다. 의도 분류는 복잡한 추론이 필요 없기 때문에 작은 모델로도 충분하고, 매 요청마다 호출되는 만큼 비용을 줄여야 합니다. 나머지 에이전트는 Sonnet 4.5를 씁니다.

Decomposer Agent (선택적)#

복합 질문이 들어오면 하위 질문으로 분해합니다.

"부서별 주문 건수와 각 부서의 평균 매출을 비교해줘"
    ↓
Sub Q1: "부서별 주문 건수"
Sub Q2: "부서별 평균 매출"
    ↓
각각 Query Subgraph로 병렬 실행 → 결과 병합

항상 활성화되는 건 아니고, Intent Agent가 복합 질문으로 판단했을 때만 동작합니다. 단일 질문이면 건너뜁니다.

Retrieve: 수백 개 테이블에서 찾기#

Table Agent — Multi-stage Ranking#

테이블 선별이 전체 정확도의 핵심입니다. 잘못된 테이블을 선택하면 이후 단계에서 아무리 잘 해도 틀린 SQL이 나옵니다.

LinkedIn의 Multi-stage Ranking 패턴을 참고해서 2단계 파이프라인을 구성했습니다.

1단계: Vector Search (Qdrant)

질문을 OpenAI text-embedding-3-small(1536차원)로 임베딩하고, Qdrant에서 상위 20개 테이블 후보를 가져옵니다. 이 단계는 재현율(Recall)이 높고, 정밀도(Precision)는 낮습니다. 관련 없는 테이블도 섞여 들어옵니다.

테넌트별로 Qdrant 컬렉션을 분리해둬서 다른 테넌트의 스키마가 섞이지 않습니다. 테넌트당 3개 컬렉션: tables_{id}, columns_{id}, sample_queries_{id}.

2단계: Rerank (Cohere)

Cohere rerank-v3.5로 20개 후보를 재순위화해서 상위 5개를 선택합니다. Cross-encoder 기반이라 질문과 테이블 설명 사이의 관련성을 1단계보다 정밀하게 평가합니다.

blog-series-plan 초안에 "3단계(Vector → Rerank → LLM Ranking)"로 적혀있었는데, 코드를 확인해보니 LLM Ranking 단계는 구현하지 않았습니다. 2단계만으로도 충분한 정밀도가 나왔기 때문입니다.

Column Prune Agent#

선택된 테이블에서 질문과 무관한 컬럼을 제거합니다. 테이블 하나에 컬럼이 50개씩 있으면 SQL Agent에 전달되는 컨텍스트가 너무 길어집니다. 실제로 필요한 건 5-10개뿐인데 50개를 다 넣으면 토큰 낭비이고 LLM이 혼란스러워합니다.

SIMPLE 쿼리는 이 단계를 건너뜁니다. 단일 테이블이면 컬럼이 많아도 LLM이 잘 처리하기 때문입니다.

Context Assembly#

SQL 생성에 필요한 컨텍스트를 모아서 조립합니다:

  • 선택된 테이블의 DDL: 컬럼 타입, 제약조건 포함
  • 샘플 쿼리: 비슷한 질문에 대한 과거 성공 SQL (RAG에서 검색)
  • 대화 히스토리: 이전 질문에서 참조한 테이블 정보
  • 용어 사전: 비즈니스 용어 매핑 (예: "활성 사용자" = user_activity 테이블, 최근 30일)

용어 사전은 3계층 구조입니다: User 레벨 > Tenant 레벨 > Base 레벨. 사용자가 직접 정의한 용어가 최우선, 그 다음 테넌트 공통 용어, 마지막으로 시스템 기본 용어입니다.

Schema Linking#

선택된 테이블 간의 관계(FK)를 파악합니다. JOIN 조건을 정확하게 생성하려면 어떤 테이블이 어떤 키로 연결되는지 알아야 합니다. FK 1-hop 이웃까지 확장해서 직접 선택되지 않았지만 JOIN에 필요한 브릿지 테이블도 포함시킵니다.

Generate: SQL 생성 + 검증#

SQL Agent#

Chain-of-Thought 방식으로 SQL을 생성합니다. 단순히 SQL만 출력하는 게 아니라, 추론 과정(thinking), 사용한 테이블, 신뢰도(confidence), 필수 조건 누락 시 명확화 질문까지 함께 반환합니다.

python
class SQLGenerationResult(BaseModel):
    thinking: str          # 추론 과정
    sql: str               # 생성된 SQL
    tables_used: list[str] # 사용한 테이블
    confidence: float      # 0.0 ~ 1.0
    clarification_question: str | None  # 필수 조건 누락 시

confidence가 낮으면 SQL을 실행하지 않고 사용자에게 되묻습니다. 틀린 결과를 주는 것보다 미리 확인하는 게 낫습니다.

DB 방언(dialect)도 고려합니다. MySQL과 PostgreSQL은 함수명이나 문법이 다른 부분이 있어서, 테넌트의 DB 종류에 맞는 SQL을 생성하도록 방언별 프롬프트를 분리해뒀습니다.

Guardrail#

LLM이 생성한 SQL을 바로 실행하면 위험합니다. 4개 레이어로 안전장치를 뒀습니다:

  • SQL Filter: DROP, DELETE, UPDATE 같은 위험 구문 차단. 기본적으로 SELECT만 허용합니다
  • PII Masker: 결과에 개인정보(이름, 이메일 등)가 포함되면 마스킹
  • Audit Log: 모든 질문, 생성된 SQL, 실행 결과를 기록
  • Credential Manager: DB 접속 정보를 안전하게 관리

Validation + Self-Correction#

생성된 SQL을 3단계로 검증합니다:

  1. Syntax Check: sqlparse로 구문 파싱 가능한지 확인
  2. Schema Check: 참조하는 테이블과 컬럼이 실제로 존재하는지 확인 (Hallucination 방지)
  3. EXPLAIN: DB에서 실행 계획을 뽑아서 런타임 오류가 없는지 확인

검증에 실패하면 Self-Correction 서브그래프가 동작합니다. 오류 메시지와 원래 질문을 SQL Agent에 다시 넘겨서 수정된 SQL을 생성하고, 다시 검증합니다. 최대 3회까지 시도하고, 그래도 안 되면 UNFIXABLE로 판정합니다.

왜 3회인지 — 경험적으로 3회 안에 수정 안 되면 4, 5번째도 안 될 확률이 높습니다. 무한 루프를 방지하면서도 충분한 재시도 기회를 주는 타협점입니다.

Self-Correction을 LangGraph의 Subgraph로 분리한 이유는, 각 수정 시도를 Langfuse에서 별도 span으로 추적하기 위해서입니다. "3번째 시도에서 어떤 오류를 어떻게 수정했는지"를 나중에 확인할 수 있습니다.

Deliver: 실행 + 결과 전달#

Executor#

검증 통과한 SQL을 실행합니다. READ-ONLY 트랜잭션에서 30초 타임아웃으로 실행해서, 잘못된 쿼리가 DB에 부하를 주는 것을 방지합니다. 최대 결과 행 수도 1000개로 제한합니다.

Result Agent#

실행 결과를 사용자에게 보기 좋게 정리합니다. 단순히 테이블 데이터를 보여주는 게 아니라, 결과에 대한 인사이트와 후속 질문을 함께 제안합니다.

질문: "부서별 주문 건수?"
결과: 개발팀 45명, 영업팀 32명, 경영지원팀 18명
인사이트: "개발팀이 전체 대상자의 38%를 차지합니다"
후속 질문 제안: "개발팀의 월별 매출 추이를 볼까요?"

복잡도별 분기#

모든 질문에 풀 파이프라인을 태우면 낭비입니다. Intent Agent가 판단한 복잡도에 따라 경로가 달라집니다:

복잡도경로건너뛰는 단계
SIMPLEIntent → Cache → Table → Context → SQL → Validation → ExecuteColumn Prune, Decompose
COMPLEX풀 파이프라인없음

SIMPLE 쿼리는 단일 테이블, 단순 필터/집계입니다. Column Pruning을 건너뛰면 LLM 호출이 1회 줄고, 응답 시간도 줄어듭니다.

기술 스택 정리#

카테고리기술역할
LLMClaude Sonnet 4.5SQL 생성, 결과 요약
LLM (경량)Haiku 4.5Intent 분류
EmbeddingOpenAI text-embedding-3-small스키마 벡터화 (1536차원)
RerankingCohere rerank-v3.5테이블 선택 정밀도 향상
오케스트레이션LangGraph + LangChainMulti-Agent 상태머신
APIFastAPI + SSEREST + 실시간 스트리밍
Vector DBQdrant스키마 RAG 검색
CacheRedis쿼리 결과 캐싱
StoragePostgreSQL대화 히스토리, 테넌트 관리
ObservabilityLangfuse트레이싱, 프롬프트 버전관리, 비용 추적
보안SQL Filter, PII Masker, Read-Only TX안전 실행

숫자로 보는 시스템#

항목
코어 에이전트7개 + 1개 선택적 (Decomposer)
GraphState 필드44개
그래프 노드14개 (11 기본 + 3 선택적)
라우팅 함수7개 (복잡도/의도/캐시/검증 분기)
테스트 케이스315개
외부 서비스6개 (Claude, OpenAI, Cohere, Qdrant, Redis, PostgreSQL)
최대 수정 시도3회
쿼리 타임아웃30초
캐시 TTL3600초

파이프라인을 어떻게 테스트했나#

에이전트가 7+1개이고, GraphState 필드가 44개이고, 외부 서비스가 6개입니다. 이걸 어떻게 테스트하냐는 질문을 자주 받습니다.

통합 테스트 — 10개 의존성을 한꺼번에 mock#

파이프라인 전체가 올바르게 동작하는지 확인하는 통합 테스트가 있습니다. LLM, RAG, Executor, Cache, Feedback, Tracer 등 10개 이상의 외부 의존성을 mock으로 교체하고, 파이프라인에 질문을 넣어서 PipelineResult가 정상적으로 나오는지 검증합니다.

python
async def test_run_returns_pipeline_result_on_success(self):
    graph_state = make_graph_state()  # 44개 필드 기본값
 
    pipeline = MultiAgentPipeline()
    mock_graph = MagicMock()
    mock_graph.run = AsyncMock(return_value=graph_state)
 
    result = await pipeline.run(
        question="주문 수를 알려줘",
        tenant_id="test_tenant",
    )
 
    assert isinstance(result, PipelineResult)
    assert result.success is True
    assert result.sql == "SELECT COUNT(*) FROM orders\nLIMIT 1000"

make_graph_state() 헬퍼가 44개 필드에 기본값을 채워주고, 테스트마다 필요한 필드만 override합니다. 이 패턴 덕분에 테스트 코드가 짧으면서도 모든 필드를 커버합니다.

의도별 경로 분기 검증#

Intent가 SQL_QUERY / GENERAL / CLARIFICATION_NEEDED 세 가지라서, 각 의도별로 PipelineResult의 success 조건이 다릅니다:

python
# SQL_QUERY: validated_sql이 있으면 성공
state = make_state(intent="SQL_QUERY", validated_sql="SELECT 1 LIMIT 1000")
assert result.success is True
 
# CLARIFICATION_NEEDED: SQL 없어도 성공 (명확화 질문이 있으니까)
state = make_state(intent="CLARIFICATION_NEEDED", validated_sql=None,
                   clarification_question="어떤 데이터를 원하시나요?")
assert result.success is True
 
# GENERAL: SQL 없어도 성공 (일반 대화)
state = make_state(intent="GENERAL", validated_sql=None)
assert result.success is True

의도별로 "성공"의 정의가 다르다는 걸 테스트로 명시합니다. 이게 없으면 CLARIFICATION_NEEDED에서 SQL이 없다고 실패로 처리하는 버그가 생기기 쉽습니다.

직렬화 — DB가 반환하는 타입들#

SQL 실행 결과를 JSON으로 직렬화할 때, DB가 반환하는 타입이 문제가 됩니다. datetime, Decimal, bytes 같은 타입은 json.dumps()가 처리 못 합니다.

python
def test_datetime_serialized_to_isoformat(self):
    dt = datetime.datetime(2024, 1, 15, 10, 30, 0)
    rows = [{"created_at": dt}]
    result = _make_serializable(rows)
    assert result[0]["created_at"] == "2024-01-15T10:30:00"
 
def test_decimal_serialized_to_float(self):
    rows = [{"amount": Decimal("99.99")}]
    result = _make_serializable(rows)
    assert result[0]["amount"] == pytest.approx(99.99)
 
def test_bytes_serialized_to_string(self):
    rows = [{"data": b"hello"}]
    result = _make_serializable(rows)
    assert result[0]["data"] == "hello"

프로덕션에서 MySQL의 DECIMAL 컬럼이 Python Decimal로 오는데, 프론트엔드에서 처리 못 해서 빈 화면이 뜨는 버그를 실제로 겪었습니다. 이후에 이 테스트를 추가했습니다.

SSE 스트리밍 이벤트 순서#

실시간 스트리밍은 이벤트 순서가 중요합니다. start가 먼저, 중간에 단계별 이벤트, 마지막에 complete.

python
async def test_stream_yields_sse_events(self):
    events = [e async for e in pipeline.run_stream(question="주문 수는?", ...)]
 
    assert events[0]["event"] == "start"
    assert events[-1]["event"] == "complete"
    event_names = [e["event"] for e in events]
    assert "intent" in event_names or "sql_generation" in event_names

complete 이벤트에는 전체 timing breakdown도 포함됩니다. 각 에이전트별 소요 시간을 Tracer에서 수집해서 클라이언트에 전달합니다.

python
async def test_stream_complete_event_includes_timing(self):
    complete_event = [e for e in events if e["event"] == "complete"][0]
    assert "timing" in complete_event["data"]
    assert complete_event["data"]["timing"]["total_ms"] == 120

초기화 실패 시 안전한 에러 반환#

DB 연결이 끊기거나 API 키가 없는 상황에서 파이프라인이 크래시하면 안 됩니다. 예외가 발생해도 PipelineResult(success=False)를 반환하고, 스트리밍 모드에서는 error 이벤트를 yield해야 합니다.

python
async def test_run_returns_failure_when_initialization_raises(self):
    with patch.object(pipeline, "_ensure_initialized",
                      side_effect=RuntimeError("TARGET_DB_URL이 설정되지 않았습니다")):
        result = await pipeline.run(question="주문 수는?", tenant_id="t1")
 
    assert result.success is False
    assert "초기화" in result.error
 
async def test_stream_yields_error_event_on_init_failure(self):
    with patch.object(pipeline, "_ensure_initialized",
                      side_effect=RuntimeError("DB 연결 실패")):
        events = [e async for e in pipeline.run_stream(question="테스트", tenant_id="t1")]
 
    assert len(events) == 1
    assert events[0]["event"] == "error"

이 테스트들이 있어야 "에러가 나도 시스템이 그냥 죽지 않고, 사용자한테는 의미 있는 응답을 준다"는 걸 보장할 수 있습니다.

돌아보면 이 글에서 말한 구조의 핵심은 멋있는 멀티 에이전트 구성이 아닙니다. 질문을 작게 나누고, 각 단계를 따로 검증 가능하게 만들고, 실패했을 때 어디서 잘못됐는지 바로 보이게 만드는 쪽이 더 중요했습니다.

다음 글에서#

이 파이프라인에서 가장 까다로웠던 부분 — 수백 개 테이블에서 정확한 테이블을 찾아내는 RAG + Reranking 검색 아키텍처를 3편에서 자세히 다루겠습니다.

Connected Notes