Commit 4b7fa72d authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

refactor(lead-stage): clean 2-agent pipeline - no hardcoded heuristics

- Classifier (1 LLM call): classify stage via STAGE_JSON + call tools
- Stylist (1 LLM call): format response with optional stage context
- Removed _TOOL_STAGE_MAP hardcoded heuristic (stage is optional)
- Added re import for STAGE_JSON regex parsing
- Cleaned up debug logging
parent 68319fd8
"""
Lead Stage Agent Controller — Entry point with Langfuse tracing.
Wraps LeadStageGraph (Classifier ⇄ Tools → Stylist) with:
- Langfuse trace + span context
- Conversation history loading
- User insight injection
- Background task for history persistence
Every call creates a named Langfuse trace "lead-stage-chat" so the full
Classifier → Tool → Stylist pipeline is visible in the Langfuse dashboard.
"""
import json
import logging
import time
import uuid
from fastapi import BackgroundTasks
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langfuse import Langfuse, get_client as get_langfuse
from common.conversation_manager import get_conversation_manager
from common.langfuse_client import async_flush_langfuse, get_callback_handler
from agent.controller_helpers import load_user_insight_from_redis
from agent.helper import handle_post_chat_async
from .graph import get_lead_stage_agent
logger = logging.getLogger(__name__)
async def lead_stage_chat_controller(
*,
query: str,
identity_key: str,
background_tasks: BackgroundTasks,
model_name: str | None = None,
conversation_id: str | None = None,
is_authenticated: bool = False,
device_id: str | None = None,
) -> dict:
"""
Controller cho Lead Stage Agent — full Langfuse tracing.
Flow:
1. Load user_insight từ Redis
2. Load chat history từ ConversationManager
3. Build Langfuse trace context
4. Run LeadStageGraph (Classifier ⇄ Tools → Stylist)
5. Save conversation background
6. Return response + pipeline + lead_stage metadata
Returns:
dict with: status, ai_response, products, lead_stage, pipeline, timing, trace_id
"""
start_time = time.time()
run_id = str(uuid.uuid4())
session_id = conversation_id or f"{identity_key}-lead-{run_id[:8]}"
logger.info(f"📥 [Lead Controller] identity={identity_key} | query={query[:80]}")
# ═══ 1. LOAD USER INSIGHT ═══
user_insight = None
if identity_key:
user_insight = await load_user_insight_from_redis(str(identity_key))
# ═══ 2. LOAD CHAT HISTORY ═══
memory = await get_conversation_manager()
history_dicts = await memory.get_chat_history(
str(identity_key),
limit=10,
include_product_ids=False,
conversation_id=conversation_id,
)
history_msgs = [
HumanMessage(content=m["message"]) if m["is_human"] else AIMessage(content=m["message"])
for m in history_dicts
][::-1]
# ═══ 3. BUILD LANGFUSE TRACE ═══
trace_id = Langfuse.create_trace_id()
tags = ["lead_stage", "experiment"]
if is_authenticated:
tags.append("user:authenticated")
else:
tags.append("user:anonymous")
langfuse = get_langfuse()
observation_ctx = None
if langfuse:
try:
observation_ctx = langfuse.start_as_current_observation(
as_type="span",
name="lead-stage-chat",
trace_context={"trace_id": trace_id},
)
except Exception as e:
logger.warning(f"⚠️ Langfuse span init failed: {e}")
# Enter observation context
span = None
if observation_ctx:
try:
span = observation_ctx.__enter__()
span.update_trace(
name="lead-stage-chat",
user_id=str(identity_key),
session_id=session_id,
tags=tags,
input={"query": query, "identity_key": identity_key},
metadata={
"device_id": device_id,
"customer_id": identity_key if is_authenticated else None,
"model": model_name,
"conversation_id": conversation_id,
"history_turns": len(history_msgs),
"has_user_insight": user_insight is not None,
},
)
except Exception as e:
logger.warning(f"⚠️ Langfuse trace update failed: {e}")
# Create CallbackHandler INSIDE observation context for proper nesting
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
callbacks=[langfuse_handler] if langfuse_handler else [],
run_name="lead-stage-graph",
run_id=run_id,
metadata={
"langfuse_session_id": session_id,
"langfuse_user_id": str(identity_key),
"langfuse_tags": tags,
"trace_id": trace_id,
"conversation_id": session_id,
"customer_id": identity_key if is_authenticated else None,
"device_id": device_id,
},
)
# ═══ 4. RUN LEAD STAGE GRAPH ═══
try:
agent = get_lead_stage_agent(model_name)
chat_result = await agent.chat(
user_message=query,
user_insight=user_insight,
history=history_msgs,
config=exec_config,
)
ai_response = chat_result.get("response", "")
lead_stage = chat_result.get("lead_stage") or {}
pipeline = chat_result.get("pipeline", [])
products = chat_result.get("products", [])
total_elapsed = chat_result.get("elapsed_ms", 0)
# Timing breakdown from pipeline
classifier_ms = 0
stylist_ms = 0
tool_ms = 0
for p in pipeline:
step = p.get("step", "")
ms = p.get("elapsed_ms", 0)
if "classifier" in step:
classifier_ms += ms
elif step in ("stylist", "responder"):
stylist_ms += ms
elif step == "tool_result":
tool_ms += ms
# Update Langfuse trace output
if span:
try:
span.update_trace(
output={
"ai_response": (ai_response or "")[:500],
"stage": lead_stage.get("stage_name", "UNKNOWN"),
"stage_confidence": lead_stage.get("confidence"),
"product_count": len(products),
"elapsed_ms": round(total_elapsed),
"classifier_ms": classifier_ms,
"stylist_ms": stylist_ms,
},
)
except Exception:
pass
logger.info(
f"✅ [Lead Controller] Stage: {lead_stage.get('stage_name', 'N/A')} | "
f"Products: {len(products)} | "
f"Time: {total_elapsed:.0f}ms (C:{classifier_ms}ms S:{stylist_ms}ms)"
)
# ═══ 5. SAVE CONVERSATION (background) ═══
response_payload = {
"ai_response": ai_response,
"product_ids": products,
"lead_stage": lead_stage,
}
background_tasks.add_task(
handle_post_chat_async,
memory=memory,
identity_key=str(identity_key),
human_query=query,
ai_response=response_payload,
conversation_id=conversation_id,
)
# ═══ 6. RETURN ═══
return {
"status": "success",
"ai_response": ai_response,
"products": products,
"trace_id": trace_id,
"lead_stage": lead_stage,
"pipeline": pipeline,
"timing": {
"classifier_ms": classifier_ms,
"stylist_ms": stylist_ms,
"tool_ms": tool_ms,
"total_ms": round(total_elapsed),
},
}
except Exception as e:
elapsed = round((time.time() - start_time) * 1000)
logger.error(f"❌ [Lead Controller] Error after {elapsed}ms: {e}", exc_info=True)
# Log error to Langfuse
if span:
try:
span.update_trace(
output={"error": str(e)[:500], "elapsed_ms": elapsed},
level="ERROR",
)
except Exception:
pass
return {
"status": "error",
"error_code": "LEAD_STAGE_ERROR",
"message": f"Lead Stage Error: {str(e)[:200]}",
"trace_id": trace_id,
}
finally:
# Close Langfuse observation context
if observation_ctx:
try:
observation_ctx.__exit__(None, None, None)
except Exception:
pass
# Async flush Langfuse in background
background_tasks.add_task(async_flush_langfuse)
This diff is collapsed.
""" """
Prompts cho Lead Stage Agent. Prompts cho Lead Stage Agent — 2-Agent Architecture.
AI #1 — Lightweight classifier: CHỈ 2 PROMPTS:
Input: user_query + user_insight + history summary AI #1 — Classifier: Xác định lead stage + gọi tools (1 LLM call duy nhất)
Output: JSON { stage, stage_name, confidence, reasoning, tone_directive, behavioral_hints } AI #2 — Stylist: Nhận kết quả → format câu trả lời đẹp (1 LLM call)
""" """
# ═══════════════════════════════════════════════
# AI #1: Classifier — Classify stage + Call tools (1 call)
# ═══════════════════════════════════════════════
LEAD_STAGE_CLASSIFIER_PROMPT = """\ LEAD_STAGE_CLASSIFIER_PROMPT = """\
Bạn là Lead Stage Classifier cho chatbot bán hàng CANIFA. Bạn là AI Classifier & Planner cho chatbot bán hàng CANIFA — thương hiệu thời trang hàng đầu Việt Nam.
Nhiệm vụ DUY NHẤT: Phân tích tin nhắn khách hàng và xác định họ đang ở GIAI ĐOẠN MUA HÀNG nào.
## 5 GIAI ĐOẠN MUA HÀNG: ## 2 NHIỆM VỤ TRONG 1 LẦN TRẢ LỜI:
### NHIỆM VỤ 1: XÁC ĐỊNH LEAD STAGE
Phân tích tin nhắn và xác định khách đang ở giai đoạn nào:
| Stage | Tên | Trigger Signals | | Stage | Tên | Trigger Signals |
|-------|----------------|--------------------------------------------------------------------------| |-------|----------------|--------------------------------------------------------------------------|
...@@ -20,32 +25,56 @@ Nhiệm vụ DUY NHẤT: Phân tích tin nhắn khách hàng và xác định h ...@@ -20,32 +25,56 @@ Nhiệm vụ DUY NHẤT: Phân tích tin nhắn khách hàng và xác định h
| 4 | DECISION | "Lấy cái này", "chốt đơn", "mua luôn", "order", hỏi cách thanh toán | | 4 | DECISION | "Lấy cái này", "chốt đơn", "mua luôn", "order", hỏi cách thanh toán |
| 5 | RETENTION | Hỏi đổi trả, tracking, mua thêm, quay lại sau thời gian | | 5 | RETENTION | Hỏi đổi trả, tracking, mua thêm, quay lại sau thời gian |
## CÁCH PHÂN TÍCH: Quy tắc: Stage có thể nhảy/giảm. Không chắc → chọn thấp hơn.
1. Đọc tin nhắn HIỆN TẠI (quan trọng nhất)
2. Đọc USER_INSIGHT (nếu có) để hiểu context tích lũy ### NHIỆM VỤ 2: GỌI TOOLS HOẶC TRẢ LỜI
3. Đọc CHAT_HISTORY_SUMMARY (nếu có) để thấy hành trình - Cần tra cứu → gọi tool phù hợp
4. Quyết định stage dựa trên tổng hợp 3 nguồn trên - Câu hỏi đơn giản (chào hỏi, cảm ơn) → trả lời ngắn gọn luôn
## QUY TẮC: Tools:
- Stage CÓ THỂ GIẢM (VD: khách đã xem SP nhưng quay lại hỏi "còn gì khác?" → stage 2) - 🛍️ `tag_search_tool` — Tìm quần áo, váy, đồ đi làm, mặc nhà...
- Stage CÓ THỂ NHẢY (VD: khách vào thẳng "mua cái áo polo size L" → stage 4) - 📦 `check_is_stock` — Check size/màu còn hàng (cần SKU, color, size)
- Khi KHÔNG CHẮC → chọn stage THẤP hơn (ít aggressive hơn) - 🏬 `canifa_store_search` — Tìm cửa hàng (tỉnh/thành phố)
- confidence < 0.5 → đặt stage = stage hiện tại hoặc thấp hơn - ❓ `canifa_knowledge_search` — Chính sách (ship, đổi trả, VIP...)
- 🎁 `canifa_get_promotions` — Khuyến mãi, sale, event
## OUTPUT — JSON DUY NHẤT, KHÔNG có text thêm: - 📝 `collect_customer_info` — Đăng ký tư vấn, gửi SĐT/Email
{ - 🛒 `add_to_cart` — Chốt mua SP cụ thể (đã biết mã, màu, size)
"stage": <1-5>,
"stage_name": "<AWARENESS|INTEREST|CONSIDERATION|DECISION|RETENTION>", ## CÁCH OUTPUT:
"confidence": <0.0-1.0>, - Nếu CẦN gọi tool → gọi tool (hệ thống sẽ tự xử lý)
"reasoning": "<Giải thích NGẮN GỌN tại sao chọn stage này, 1-2 câu>", - Nếu KHÔNG cần tool → trả lời text ngắn gọn cho khách
"tone_directive": "<Welcomer|Consultant|Expert Stylist|Closer|Personal Shopper>", - Dù gọi tool hay trả lời text, HÃY bao gồm stage JSON ở CUỐI response:
"behavioral_hints": [
"<Gợi ý hành vi cụ thể cho AI Stylist, tối đa 3 hints>" STAGE_JSON: {"stage": <1-5>, "stage_name": "<tên>", "confidence": <0-1>, "reasoning": "<lý do>", "tone_directive": "<Friendly|Consultant|Expert|Closer|CareAgent>", "behavioral_hints": ["hint1"]}
] """
}
# ═══════════════════════════════════════════════
# AI #2: Stylist — Format response (NO tools)
# ═══════════════════════════════════════════════
STYLIST_SYSTEM_PROMPT = """\
Bạn là Trợ lý phong cách (AI Stylist) độc quyền của CANIFA — thương hiệu thời trang hàng đầu Việt Nam.
## VAI TRÒ DUY NHẤT CỦA BẠN:
Nhận kết quả từ AI Classifier (lead stage + tool results) và FORMAT thành câu trả lời tư vấn
đẹp, khéo léo, phù hợp với giai đoạn mua hàng của khách.
## BẠN KHÔNG ĐƯỢC:
- Gọi thêm tools
- Tự bịa sản phẩm
- Thay đổi lead stage
## QUY TẮC TƯ VẤN:
- Luôn khen ngợi hoặc đồng tình với gu thẩm mỹ của khách.
- Khi giới thiệu SP: Gợi ý cách phối đồ (mix & match), mô tả cảm giác khi mặc.
- Không trả lời cộc lốc danh sách SP. Nói như người tư vấn tại shop đang trò chuyện vui vẻ.
- Sản phẩm nào có link → bắt buộc đính kèm link để khách click xem.
{stage_injection}
""" """
# Template inject vào system prompt của AI #2 (Stylist) # ═══════════════════════════════════════════════
# Template inject stage context vào Stylist prompt
# ═══════════════════════════════════════════════
LEAD_STAGE_INJECTION_TEMPLATE = """\ LEAD_STAGE_INJECTION_TEMPLATE = """\
══════ LEAD STAGE CONTEXT ══════ ══════ LEAD STAGE CONTEXT ══════
🎯 Giai đoạn khách hàng: Stage {stage} — {stage_name} 🎯 Giai đoạn khách hàng: Stage {stage} — {stage_name}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment