7+1개 에이전트가 하나의 SQL을 만드는 과정 [2]
왜 에이전트를 나눴나#
1편에서 14개 기업 사례를 보면서도 처음엔 솔직히 좀 과하다고 생각했다. SQL 하나 만드는 데 에이전트를 그렇게 많이 붙여야 하나 싶었다. 프롬프트 하나에 "이 스키마 보고 SQL 만들어"라고 던지면 되는 거 아닌가 하는 쪽이었다.
근데 직접 붙여보니 왜 다들 그 구조로 가는지 바로 보였다.
테이블 10개 정도까지는 한 번에 넣어도 그럭저럭 된다. 그런데 50개가 넘어가면 프롬프트가 수만 토큰이 되면서 LLM이 엉뚱한 테이블을 참조하기 시작한다. 게다가 의도 파악, 테이블 선택, SQL 생성, 검증을 한 번에 시키면 틀렸을 때 어디서 깨졌는지도 안 보인다.
에이전트를 나누고 나니 두 가지가 풀렸다:
- 각 단계의 입력이 작아져서 정확도가 올라갔다
- 어디서 틀렸는지 바로 보여서 디버깅이 쉬워졌다
전체 파이프라인#
구조는 LangGraph 기반으로 잡았습니다. 각 에이전트가 노드이고, 공유 상태(GraphState)를 통해 데이터를 주고받습니다.
사용자 질문
↓
[Tenant Resolve] → API Key로 테넌트 식별
↓
[Intent Agent] → 의도 분류 + 복잡도 판단
↓ (SQL_QUERY가 아니면 여기서 빠짐)
[Cache Check] → 동일 질문 캐시 히트 시 바로 반환
↓ (Miss)
[Table Agent] → Vector Search → Rerank → 관련 테이블 선별
↓
[Column Prune Agent] → 불필요한 컬럼 제거 (COMPLEX만)
↓
[Context Assembly] → 샘플 쿼리 + 대화 히스토리 + 용어 사전 조합
↓
[Schema Linking] → 테이블 간 관계(FK) 파악
↓
[SQL Agent] → SQL 생성 (Chain-of-Thought)
↓
[Guardrail] → 위험 구문 필터링
↓
[Validation + Self-Correction] → 검증 + 자동 수정 (최대 3회)
↓
[Executor] → READ-ONLY 실행 (30초 타임아웃)
↓
[Result Agent] → 결과 요약 + 인사이트 + 후속 질문 제안
7개 코어 에이전트 + 1개 선택적 에이전트(Decomposer) 구조입니다. 처음에 블로그 초안을 쓸 때는 12개라고 적어놨는데, 다시 보니 cache_check나 tenant_resolve 같은 non-LLM 노드까지 같이 세고 있었습니다. 실제로 LLM을 호출하는 쪽은 7+1개입니다.
Understand: 의도를 먼저 파악#
Intent Agent#
모든 질문이 SQL로 바꿀 수 있는 건 아닙니다. Intent Agent가 세 가지로 분류합니다:
- SQL_QUERY: 데이터 조회 질문 → 파이프라인 진행
- CLARIFICATION_NEEDED: 모호한 질문 → 사용자에게 되물음
- GENERAL: 일반 대화 → SQL 파이프라인 태우지 않고 직접 응답
여기서 복잡도(SIMPLE / COMPLEX)도 같이 판단합니다. SIMPLE이면 Column Pruning을 건너뛰어서 토큰과 시간을 아낍니다.
Intent Agent에는 비용이 작은 모델(Haiku 4.5)을 씁니다. 의도 분류는 복잡한 추론이 필요 없기 때문에 작은 모델로도 충분하고, 매 요청마다 호출되는 만큼 비용을 줄여야 합니다. 나머지 에이전트는 Sonnet 4.5를 씁니다.
Decomposer Agent (선택적)#
복합 질문이 들어오면 하위 질문으로 분해합니다.
"부서별 주문 건수와 각 부서의 평균 매출을 비교해줘"
↓
Sub Q1: "부서별 주문 건수"
Sub Q2: "부서별 평균 매출"
↓
각각 Query Subgraph로 병렬 실행 → 결과 병합
항상 활성화되는 건 아니고, Intent Agent가 복합 질문으로 판단했을 때만 동작합니다. 단일 질문이면 건너뜁니다.
Retrieve: 수백 개 테이블에서 찾기#
Table Agent — Multi-stage Ranking#
테이블 선별이 전체 정확도의 핵심입니다. 잘못된 테이블을 선택하면 이후 단계에서 아무리 잘 해도 틀린 SQL이 나옵니다.
LinkedIn의 Multi-stage Ranking 패턴을 참고해서 2단계 파이프라인을 구성했습니다.
1단계: Vector Search (Qdrant)
질문을 OpenAI text-embedding-3-small(1536차원)로 임베딩하고, Qdrant에서 상위 20개 테이블 후보를 가져옵니다. 이 단계는 재현율(Recall)이 높고, 정밀도(Precision)는 낮습니다. 관련 없는 테이블도 섞여 들어옵니다.
테넌트별로 Qdrant 컬렉션을 분리해둬서 다른 테넌트의 스키마가 섞이지 않습니다. 테넌트당 3개 컬렉션: tables_{id}, columns_{id}, sample_queries_{id}.
2단계: Rerank (Cohere)
Cohere rerank-v3.5로 20개 후보를 재순위화해서 상위 5개를 선택합니다. Cross-encoder 기반이라 질문과 테이블 설명 사이의 관련성을 1단계보다 정밀하게 평가합니다.
blog-series-plan 초안에 "3단계(Vector → Rerank → LLM Ranking)"로 적혀있었는데, 코드를 확인해보니 LLM Ranking 단계는 구현하지 않았습니다. 2단계만으로도 충분한 정밀도가 나왔기 때문입니다.
Column Prune Agent#
선택된 테이블에서 질문과 무관한 컬럼을 제거합니다. 테이블 하나에 컬럼이 50개씩 있으면 SQL Agent에 전달되는 컨텍스트가 너무 길어집니다. 실제로 필요한 건 5-10개뿐인데 50개를 다 넣으면 토큰 낭비이고 LLM이 혼란스러워합니다.
SIMPLE 쿼리는 이 단계를 건너뜁니다. 단일 테이블이면 컬럼이 많아도 LLM이 잘 처리하기 때문입니다.
Context Assembly#
SQL 생성에 필요한 컨텍스트를 모아서 조립합니다:
- 선택된 테이블의 DDL: 컬럼 타입, 제약조건 포함
- 샘플 쿼리: 비슷한 질문에 대한 과거 성공 SQL (RAG에서 검색)
- 대화 히스토리: 이전 질문에서 참조한 테이블 정보
- 용어 사전: 비즈니스 용어 매핑 (예: "활성 사용자" = user_activity 테이블, 최근 30일)
용어 사전은 3계층 구조입니다: User 레벨 > Tenant 레벨 > Base 레벨. 사용자가 직접 정의한 용어가 최우선, 그 다음 테넌트 공통 용어, 마지막으로 시스템 기본 용어입니다.
Schema Linking#
선택된 테이블 간의 관계(FK)를 파악합니다. JOIN 조건을 정확하게 생성하려면 어떤 테이블이 어떤 키로 연결되는지 알아야 합니다. FK 1-hop 이웃까지 확장해서 직접 선택되지 않았지만 JOIN에 필요한 브릿지 테이블도 포함시킵니다.
Generate: SQL 생성 + 검증#
SQL Agent#
Chain-of-Thought 방식으로 SQL을 생성합니다. 단순히 SQL만 출력하는 게 아니라, 추론 과정(thinking), 사용한 테이블, 신뢰도(confidence), 필수 조건 누락 시 명확화 질문까지 함께 반환합니다.
class SQLGenerationResult(BaseModel):
thinking: str # 추론 과정
sql: str # 생성된 SQL
tables_used: list[str] # 사용한 테이블
confidence: float # 0.0 ~ 1.0
clarification_question: str | None # 필수 조건 누락 시confidence가 낮으면 SQL을 실행하지 않고 사용자에게 되묻습니다. 틀린 결과를 주는 것보다 미리 확인하는 게 낫습니다.
DB 방언(dialect)도 고려합니다. MySQL과 PostgreSQL은 함수명이나 문법이 다른 부분이 있어서, 테넌트의 DB 종류에 맞는 SQL을 생성하도록 방언별 프롬프트를 분리해뒀습니다.
Guardrail#
LLM이 생성한 SQL을 바로 실행하면 위험합니다. 4개 레이어로 안전장치를 뒀습니다:
- SQL Filter: DROP, DELETE, UPDATE 같은 위험 구문 차단. 기본적으로 SELECT만 허용합니다
- PII Masker: 결과에 개인정보(이름, 이메일 등)가 포함되면 마스킹
- Audit Log: 모든 질문, 생성된 SQL, 실행 결과를 기록
- Credential Manager: DB 접속 정보를 안전하게 관리
Validation + Self-Correction#
생성된 SQL을 3단계로 검증합니다:
- Syntax Check: sqlparse로 구문 파싱 가능한지 확인
- Schema Check: 참조하는 테이블과 컬럼이 실제로 존재하는지 확인 (Hallucination 방지)
- EXPLAIN: DB에서 실행 계획을 뽑아서 런타임 오류가 없는지 확인
검증에 실패하면 Self-Correction 서브그래프가 동작합니다. 오류 메시지와 원래 질문을 SQL Agent에 다시 넘겨서 수정된 SQL을 생성하고, 다시 검증합니다. 최대 3회까지 시도하고, 그래도 안 되면 UNFIXABLE로 판정합니다.
왜 3회인지 — 경험적으로 3회 안에 수정 안 되면 4, 5번째도 안 될 확률이 높습니다. 무한 루프를 방지하면서도 충분한 재시도 기회를 주는 타협점입니다.
Self-Correction을 LangGraph의 Subgraph로 분리한 이유는, 각 수정 시도를 Langfuse에서 별도 span으로 추적하기 위해서입니다. "3번째 시도에서 어떤 오류를 어떻게 수정했는지"를 나중에 확인할 수 있습니다.
Deliver: 실행 + 결과 전달#
Executor#
검증 통과한 SQL을 실행합니다. READ-ONLY 트랜잭션에서 30초 타임아웃으로 실행해서, 잘못된 쿼리가 DB에 부하를 주는 것을 방지합니다. 최대 결과 행 수도 1000개로 제한합니다.
Result Agent#
실행 결과를 사용자에게 보기 좋게 정리합니다. 단순히 테이블 데이터를 보여주는 게 아니라, 결과에 대한 인사이트와 후속 질문을 함께 제안합니다.
질문: "부서별 주문 건수?"
결과: 개발팀 45명, 영업팀 32명, 경영지원팀 18명
인사이트: "개발팀이 전체 대상자의 38%를 차지합니다"
후속 질문 제안: "개발팀의 월별 매출 추이를 볼까요?"
복잡도별 분기#
모든 질문에 풀 파이프라인을 태우면 낭비입니다. Intent Agent가 판단한 복잡도에 따라 경로가 달라집니다:
| 복잡도 | 경로 | 건너뛰는 단계 |
|---|---|---|
| SIMPLE | Intent → Cache → Table → Context → SQL → Validation → Execute | Column Prune, Decompose |
| COMPLEX | 풀 파이프라인 | 없음 |
SIMPLE 쿼리는 단일 테이블, 단순 필터/집계입니다. Column Pruning을 건너뛰면 LLM 호출이 1회 줄고, 응답 시간도 줄어듭니다.
기술 스택 정리#
| 카테고리 | 기술 | 역할 |
|---|---|---|
| LLM | Claude Sonnet 4.5 | SQL 생성, 결과 요약 |
| LLM (경량) | Haiku 4.5 | Intent 분류 |
| Embedding | OpenAI text-embedding-3-small | 스키마 벡터화 (1536차원) |
| Reranking | Cohere rerank-v3.5 | 테이블 선택 정밀도 향상 |
| 오케스트레이션 | LangGraph + LangChain | Multi-Agent 상태머신 |
| API | FastAPI + SSE | REST + 실시간 스트리밍 |
| Vector DB | Qdrant | 스키마 RAG 검색 |
| Cache | Redis | 쿼리 결과 캐싱 |
| Storage | PostgreSQL | 대화 히스토리, 테넌트 관리 |
| Observability | Langfuse | 트레이싱, 프롬프트 버전관리, 비용 추적 |
| 보안 | SQL Filter, PII Masker, Read-Only TX | 안전 실행 |
숫자로 보는 시스템#
| 항목 | 값 |
|---|---|
| 코어 에이전트 | 7개 + 1개 선택적 (Decomposer) |
| GraphState 필드 | 44개 |
| 그래프 노드 | 14개 (11 기본 + 3 선택적) |
| 라우팅 함수 | 7개 (복잡도/의도/캐시/검증 분기) |
| 테스트 케이스 | 315개 |
| 외부 서비스 | 6개 (Claude, OpenAI, Cohere, Qdrant, Redis, PostgreSQL) |
| 최대 수정 시도 | 3회 |
| 쿼리 타임아웃 | 30초 |
| 캐시 TTL | 3600초 |
파이프라인을 어떻게 테스트했나#
에이전트가 7+1개이고, GraphState 필드가 44개이고, 외부 서비스가 6개입니다. 이걸 어떻게 테스트하냐는 질문을 자주 받습니다.
통합 테스트 — 10개 의존성을 한꺼번에 mock#
파이프라인 전체가 올바르게 동작하는지 확인하는 통합 테스트가 있습니다. LLM, RAG, Executor, Cache, Feedback, Tracer 등 10개 이상의 외부 의존성을 mock으로 교체하고, 파이프라인에 질문을 넣어서 PipelineResult가 정상적으로 나오는지 검증합니다.
async def test_run_returns_pipeline_result_on_success(self):
graph_state = make_graph_state() # 44개 필드 기본값
pipeline = MultiAgentPipeline()
mock_graph = MagicMock()
mock_graph.run = AsyncMock(return_value=graph_state)
result = await pipeline.run(
question="주문 수를 알려줘",
tenant_id="test_tenant",
)
assert isinstance(result, PipelineResult)
assert result.success is True
assert result.sql == "SELECT COUNT(*) FROM orders\nLIMIT 1000"make_graph_state() 헬퍼가 44개 필드에 기본값을 채워주고, 테스트마다 필요한 필드만 override합니다. 이 패턴 덕분에 테스트 코드가 짧으면서도 모든 필드를 커버합니다.
의도별 경로 분기 검증#
Intent가 SQL_QUERY / GENERAL / CLARIFICATION_NEEDED 세 가지라서, 각 의도별로 PipelineResult의 success 조건이 다릅니다:
# SQL_QUERY: validated_sql이 있으면 성공
state = make_state(intent="SQL_QUERY", validated_sql="SELECT 1 LIMIT 1000")
assert result.success is True
# CLARIFICATION_NEEDED: SQL 없어도 성공 (명확화 질문이 있으니까)
state = make_state(intent="CLARIFICATION_NEEDED", validated_sql=None,
clarification_question="어떤 데이터를 원하시나요?")
assert result.success is True
# GENERAL: SQL 없어도 성공 (일반 대화)
state = make_state(intent="GENERAL", validated_sql=None)
assert result.success is True의도별로 "성공"의 정의가 다르다는 걸 테스트로 명시합니다. 이게 없으면 CLARIFICATION_NEEDED에서 SQL이 없다고 실패로 처리하는 버그가 생기기 쉽습니다.
직렬화 — DB가 반환하는 타입들#
SQL 실행 결과를 JSON으로 직렬화할 때, DB가 반환하는 타입이 문제가 됩니다. datetime, Decimal, bytes 같은 타입은 json.dumps()가 처리 못 합니다.
def test_datetime_serialized_to_isoformat(self):
dt = datetime.datetime(2024, 1, 15, 10, 30, 0)
rows = [{"created_at": dt}]
result = _make_serializable(rows)
assert result[0]["created_at"] == "2024-01-15T10:30:00"
def test_decimal_serialized_to_float(self):
rows = [{"amount": Decimal("99.99")}]
result = _make_serializable(rows)
assert result[0]["amount"] == pytest.approx(99.99)
def test_bytes_serialized_to_string(self):
rows = [{"data": b"hello"}]
result = _make_serializable(rows)
assert result[0]["data"] == "hello"프로덕션에서 MySQL의 DECIMAL 컬럼이 Python Decimal로 오는데, 프론트엔드에서 처리 못 해서 빈 화면이 뜨는 버그를 실제로 겪었습니다. 이후에 이 테스트를 추가했습니다.
SSE 스트리밍 이벤트 순서#
실시간 스트리밍은 이벤트 순서가 중요합니다. start가 먼저, 중간에 단계별 이벤트, 마지막에 complete.
async def test_stream_yields_sse_events(self):
events = [e async for e in pipeline.run_stream(question="주문 수는?", ...)]
assert events[0]["event"] == "start"
assert events[-1]["event"] == "complete"
event_names = [e["event"] for e in events]
assert "intent" in event_names or "sql_generation" in event_namescomplete 이벤트에는 전체 timing breakdown도 포함됩니다. 각 에이전트별 소요 시간을 Tracer에서 수집해서 클라이언트에 전달합니다.
async def test_stream_complete_event_includes_timing(self):
complete_event = [e for e in events if e["event"] == "complete"][0]
assert "timing" in complete_event["data"]
assert complete_event["data"]["timing"]["total_ms"] == 120초기화 실패 시 안전한 에러 반환#
DB 연결이 끊기거나 API 키가 없는 상황에서 파이프라인이 크래시하면 안 됩니다. 예외가 발생해도 PipelineResult(success=False)를 반환하고, 스트리밍 모드에서는 error 이벤트를 yield해야 합니다.
async def test_run_returns_failure_when_initialization_raises(self):
with patch.object(pipeline, "_ensure_initialized",
side_effect=RuntimeError("TARGET_DB_URL이 설정되지 않았습니다")):
result = await pipeline.run(question="주문 수는?", tenant_id="t1")
assert result.success is False
assert "초기화" in result.error
async def test_stream_yields_error_event_on_init_failure(self):
with patch.object(pipeline, "_ensure_initialized",
side_effect=RuntimeError("DB 연결 실패")):
events = [e async for e in pipeline.run_stream(question="테스트", tenant_id="t1")]
assert len(events) == 1
assert events[0]["event"] == "error"이 테스트들이 있어야 "에러가 나도 시스템이 그냥 죽지 않고, 사용자한테는 의미 있는 응답을 준다"는 걸 보장할 수 있습니다.
돌아보면 이 글에서 말한 구조의 핵심은 멋있는 멀티 에이전트 구성이 아닙니다. 질문을 작게 나누고, 각 단계를 따로 검증 가능하게 만들고, 실패했을 때 어디서 잘못됐는지 바로 보이게 만드는 쪽이 더 중요했습니다.
다음 글에서#
이 파이프라인에서 가장 까다로웠던 부분 — 수백 개 테이블에서 정확한 테이블을 찾아내는 RAG + Reranking 검색 아키텍처를 3편에서 자세히 다루겠습니다.