Commit 25332352 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

refactor(server): remove deadlock note api and setup feedback learning agent architecture

parent 5ca83a91
"""
Fashion Q&A Agent Package
"""
from .graph import build_graph
from .models import AgentConfig, AgentState, get_config
__all__ = [
"AgentConfig",
"AgentState",
"build_graph",
"get_config",
]
"""
Fashion Q&A Agent Controller
Langfuse will auto-trace via LangChain integration (no code changes needed).
"""
import json
import logging
import time
import uuid
from fastapi import BackgroundTasks
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from common.cache import redis_cache
from common.conversation_manager import get_conversation_manager
from common.langfuse_client import get_callback_handler
from config import DEFAULT_MODEL, REDIS_CACHE_TURN_ON
from langfuse import propagate_attributes
from .graph import build_graph
from .helper import extract_product_ids, handle_post_chat_async, parse_ai_response
from .models import AgentState
logger = logging.getLogger(__name__)
async def chat_controller(
query: str,
user_id: str,
background_tasks: BackgroundTasks,
model_name: str = DEFAULT_MODEL,
images: list[str] | None = None,
identity_key: str | None = None,
) -> dict:
"""
Controller main logic for non-streaming chat requests.
Flow:
1. Check cache (if enabled) → HIT: return cached response
2. MISS: Call LLM → Save to cache → Return response
Args:
identity_key: Key for saving/loading history (identity.history_key)
Guest: device_id, User: user_id
"""
effective_identity_key = identity_key or user_id
logger.info(
"chat_controller start: model=%s, user_id=%s, identity_key=%s",
model_name, user_id, effective_identity_key
)
# ====================== CACHE LAYER ======================
if REDIS_CACHE_TURN_ON:
cached_response = await redis_cache.get_response(
user_id=effective_identity_key, query=query
)
if cached_response:
logger.info(f"⚡ CACHE HIT for identity_key={effective_identity_key}")
memory = await get_conversation_manager()
background_tasks.add_task(
handle_post_chat_async,
memory=memory,
identity_key=effective_identity_key,
human_query=query,
ai_response=cached_response,
)
return {**cached_response, "cached": True}
# ====================== NORMAL LLM FLOW ======================
logger.info("chat_controller: proceed with live LLM call")
# Graph is a singleton — built once and reused across requests.
# LLM instances are cached by LLMFactory, tools are static.
graph = build_graph()
# Init ConversationManager (Singleton)
memory = await get_conversation_manager()
# Load History
history_dicts = await memory.get_chat_history(effective_identity_key, limit=5)
history_messages: list[BaseMessage] = []
for m in history_dicts:
if m["is_human"]:
history_messages.append(HumanMessage(content=m["message"]))
else:
# AI responses may be saved as JSON — extract readable text
ai_content = m["message"]
try:
parsed = json.loads(ai_content)
if isinstance(parsed, dict) and "ai_response" in parsed:
ai_content = parsed["ai_response"]
except (json.JSONDecodeError, TypeError):
pass
history_messages.append(AIMessage(content=ai_content))
# Prepare State
# - history: previous conversation messages (for context)
# - messages: starts with current query only (tool calls will be appended by LangGraph)
# - user_query: current user message
user_query_message: BaseMessage = HumanMessage(content=query)
initial_state: AgentState = {
"user_query": user_query_message,
"messages": [user_query_message],
"history": history_messages,
"user_id": user_id,
"images_embedding": [],
"ai_response": None,
}
run_uuid = uuid.uuid4()
run_id_str = str(run_uuid)
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
configurable={
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id_str,
},
run_id=run_uuid,
metadata={"run_id": run_id_str, "tags": "chatbot,production"},
callbacks=[langfuse_handler] if langfuse_handler else [],
)
# Execute Graph
start_time = time.time()
session_id = f"{user_id}-{run_id_str[:8]}"
with propagate_attributes(user_id=user_id, session_id=session_id):
result = await graph.ainvoke(initial_state, config=exec_config)
duration = time.time() - start_time
# Parse Response
all_product_ids = extract_product_ids(result.get("messages", []))
ai_raw_content = result.get("ai_response").content if result.get("ai_response") else ""
logger.info("RAW LLM output (%d chars): %s", len(ai_raw_content), ai_raw_content[:500])
ai_text_response, final_product_ids = parse_ai_response(ai_raw_content, all_product_ids)
logger.info("PARSED ai_response (%d chars): %s", len(ai_text_response), ai_text_response[:300])
response_payload = {
"ai_response": ai_text_response,
"product_ids": final_product_ids,
}
# ====================== SAVE TO CACHE ======================
if REDIS_CACHE_TURN_ON:
await redis_cache.set_response(
user_id=effective_identity_key,
query=query,
response_data=response_payload,
ttl=300
)
logger.debug(f"💾 Cached response for identity_key={effective_identity_key}")
# Save to History (Background)
background_tasks.add_task(
handle_post_chat_async,
memory=memory,
identity_key=effective_identity_key,
human_query=query,
ai_response=response_payload,
)
logger.info("chat_controller finished in %.2fs", duration)
return {**response_payload, "cached": False}
async def chat_controller_stream(
query: str,
user_id: str,
model_name: str = DEFAULT_MODEL,
images: list[str] | None = None,
identity_key: str | None = None,
):
"""
Streaming controller — yields SSE events with token chunks.
Each yield is a JSON string:
{"token": "partial text"} — during streaming
{"done": true, "ai_response": "full text"} — final event
History is saved after stream completes.
"""
effective_identity_key = identity_key or user_id
logger.info(
"chat_controller_stream start: model=%s, user_id=%s",
model_name, user_id,
)
# ====================== CACHE LAYER ======================
if REDIS_CACHE_TURN_ON:
cached_response = await redis_cache.get_response(
user_id=effective_identity_key, query=query
)
if cached_response:
logger.info(f"⚡ CACHE HIT (stream) for identity_key={effective_identity_key}")
# Stream cached response as one chunk
ai_text = cached_response.get("ai_response", "")
yield json.dumps({"token": ai_text}, ensure_ascii=False)
yield json.dumps({"done": True, "ai_response": ai_text}, ensure_ascii=False)
# Save history
memory = await get_conversation_manager()
await handle_post_chat_async(
memory=memory,
identity_key=effective_identity_key,
human_query=query,
ai_response=cached_response,
)
return
# ====================== STREAM LLM FLOW ======================
graph = build_graph()
memory = await get_conversation_manager()
# Load History
history_dicts = await memory.get_chat_history(effective_identity_key, limit=5)
history_messages: list[BaseMessage] = []
for m in history_dicts:
if m["is_human"]:
history_messages.append(HumanMessage(content=m["message"]))
else:
ai_content = m["message"]
try:
parsed = json.loads(ai_content)
if isinstance(parsed, dict) and "ai_response" in parsed:
ai_content = parsed["ai_response"]
except (json.JSONDecodeError, TypeError):
pass
history_messages.append(AIMessage(content=ai_content))
user_query_message: BaseMessage = HumanMessage(content=query)
initial_state: AgentState = {
"user_query": user_query_message,
"messages": [user_query_message],
"history": history_messages,
"user_id": user_id,
"images_embedding": [],
"ai_response": None,
}
run_uuid = uuid.uuid4()
run_id_str = str(run_uuid)
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
configurable={
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id_str,
},
run_id=run_uuid,
metadata={"run_id": run_id_str, "tags": "chatbot,production,stream"},
callbacks=[langfuse_handler] if langfuse_handler else [],
)
# Stream using astream_events
start_time = time.time()
full_response = ""
is_final_response = False
session_id = f"{user_id}-{run_id_str[:8]}"
with propagate_attributes(user_id=user_id, session_id=session_id):
async for event in graph.astream_events(
initial_state, config=exec_config, version="v2"
):
kind = event.get("event", "")
# Only stream tokens from the chat model (not tool calls)
if kind == "on_chat_model_stream":
chunk = event.get("data", {}).get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
# Skip if this is a tool_call chunk (no text content)
if hasattr(chunk, "tool_calls") and chunk.tool_calls:
continue
# Only stream the FINAL response (after tool execution)
# We detect this by tracking: if tools were called,
# the final agent response comes after tool results
token = chunk.content
full_response += token
is_final_response = True
yield json.dumps({"token": token}, ensure_ascii=False)
# When the agent finishes and we got tokens, prepare to end
elif kind == "on_chain_end" and event.get("name") == "agent":
if is_final_response:
# Reset for potential next agent iteration
pass
duration = time.time() - start_time
logger.info("chat_controller_stream finished in %.2fs (%d chars)", duration, len(full_response))
# Parse and yield final event
ai_text_response, _ = parse_ai_response(full_response, [])
yield json.dumps({"done": True, "ai_response": ai_text_response}, ensure_ascii=False)
# Build response payload for caching and history
response_payload = {
"ai_response": ai_text_response,
"product_ids": [],
}
# Save to cache
if REDIS_CACHE_TURN_ON:
await redis_cache.set_response(
user_id=effective_identity_key,
query=query,
response_data=response_payload,
ttl=300,
)
# Save to history
await handle_post_chat_async(
memory=memory,
identity_key=effective_identity_key,
human_query=query,
ai_response=response_payload,
)
"""
Fashion Q&A Agent Graph
LangGraph workflow với clean architecture.
Tất cả resources (LLM, Tools) khởi tạo trong __init__.
Sử dụng ConversationManager (Postgres) để lưu history thay vì checkpoint.
"""
import logging
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from langgraph.cache.memory import InMemoryCache
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.types import CachePolicy
from common.llm_factory import create_llm
from .models import AgentConfig, AgentState, get_config
from .prompt import get_system_prompt
from .tools.get_tools import get_all_tools, get_collection_tools
logger = logging.getLogger(__name__)
class CANIFAGraph:
"""
Fashion Q&A Agent Graph Manager.
"""
def __init__(
self,
config: AgentConfig | None = None,
llm: BaseChatModel | None = None,
tools: list | None = None,
):
self.config = config or get_config()
self._compiled_graph: Any | None = None
self.llm: BaseChatModel = llm or create_llm(
model_name=self.config.model_name, api_key=self.config.openai_api_key, streaming=True
)
self.all_tools = tools or get_all_tools()
self.collection_tools = get_collection_tools() # Vẫn lấy list name để routing
self.retrieval_tools = self.all_tools
self.llm_with_tools = self.llm.bind_tools(self.all_tools, strict=True)
# NOTE: prompt is NOT cached here — fetched fresh each request
# so Langfuse updates take effect immediately.
self.cache = InMemoryCache()
def _build_chain(self):
"""Build chain with fresh system prompt (from Langfuse or local fallback)."""
system_prompt = get_system_prompt()
prompt_template = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="user_query"),
MessagesPlaceholder(variable_name="messages"),
]
)
return prompt_template | self.llm_with_tools
async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict:
"""Agent node — rebuilds chain each call for realtime prompt updates."""
messages = state.get("messages", [])
history = state.get("history", [])
user_query = state.get("user_query")
chain = self._build_chain()
response = await chain.ainvoke({
"user_query": [user_query] if user_query else [],
"history": history,
"messages": messages
})
return {"messages": [response], "ai_response": response}
def _should_continue(self, state: AgentState) -> str:
"""Routing: tool nodes hoặc end."""
last_message = state["messages"][-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
logger.info("🏁 Agent finished")
return "end"
tool_names = [tc["name"] for tc in last_message.tool_calls]
collection_names = [t.name for t in self.collection_tools]
if any(name in collection_names for name in tool_names):
logger.info(f"🔄 → collect_tools: {tool_names}")
return "collect_tools"
logger.info(f"🔄 → retrieve_tools: {tool_names}")
return "retrieve_tools"
def build(self) -> Any:
"""Build và compile LangGraph workflow."""
if self._compiled_graph is not None:
return self._compiled_graph
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("agent", self._agent_node)
workflow.add_node("retrieve_tools", ToolNode(self.retrieval_tools), cache_policy=CachePolicy(ttl=3600))
workflow.add_node("collect_tools", ToolNode(self.collection_tools))
# Edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
self._should_continue,
{"retrieve_tools": "retrieve_tools", "collect_tools": "collect_tools", "end": END},
)
workflow.add_edge("retrieve_tools", "agent")
workflow.add_edge("collect_tools", "agent")
self._compiled_graph = workflow.compile(cache=self.cache) # No Checkpointer
logger.info("✅ Graph compiled (Langfuse callback will be per-run)")
return self._compiled_graph
@property
def graph(self) -> Any:
return self.build()
# --- Singleton & Public API ---
_instance: list[CANIFAGraph | None] = [None]
def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None) -> Any:
"""Get compiled graph (singleton)."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0].build()
def get_graph_manager(
config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None
) -> CANIFAGraph:
"""Get CANIFAGraph instance."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0]
def reset_graph() -> None:
"""Reset singleton for testing."""
_instance[0] = None
"""
Agent Helper Functions
Các hàm tiện ích cho chat controller.
"""
import json
import logging
import uuid
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from common.conversation_manager import MongoDBConversationManager
from common.langfuse_client import get_callback_handler
from .models import AgentState
logger = logging.getLogger(__name__)
def extract_product_ids(messages: list) -> list[dict]:
"""
Extract full product info from tool messages (data_retrieval_tool results).
Returns list of product objects with: sku, name, price, sale_price, url, thumbnail_image_url.
"""
products = []
seen_skus = set()
for msg in messages:
if isinstance(msg, ToolMessage):
try:
# Tool result is JSON string
tool_result = json.loads(msg.content)
# Check if tool returned products
if tool_result.get("status") == "success" and "products" in tool_result:
for product in tool_result["products"]:
sku = product.get("internal_ref_code")
if sku and sku not in seen_skus:
seen_skus.add(sku)
# Extract full product info
product_obj = {
"sku": sku,
"name": product.get("magento_product_name", ""),
"price": product.get("price_vnd", 0),
"sale_price": product.get("sale_price_vnd"), # null nếu không sale
"url": product.get("magento_url_key", ""),
"thumbnail_image_url": product.get("thumbnail_image_url", ""),
}
products.append(product_obj)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug(f"Could not parse tool message for products: {e}")
continue
return products
def parse_ai_response(ai_raw_content: str, all_product_ids: list) -> tuple[str, list]:
"""
Parse AI response từ LLM output.
Args:
ai_raw_content: Raw content từ AI response
all_product_ids: Product IDs extracted từ tool messages
Returns:
tuple: (ai_text_response, final_product_ids)
"""
ai_text_response = ai_raw_content
final_product_ids = all_product_ids
try:
# Strip markdown code blocks if present (```json ... ```)
content_to_parse = ai_raw_content.strip()
# Remove markdown code block markers
if content_to_parse.startswith("```"):
# Find the first newline after ```
first_newline = content_to_parse.find("\n")
if first_newline != -1:
# Remove opening ```json or ``` and closing ```
content_to_parse = content_to_parse[first_newline + 1:]
if content_to_parse.endswith("```"):
content_to_parse = content_to_parse[:-3]
content_to_parse = content_to_parse.strip()
# Try to parse if it's a JSON string from LLM
ai_json = json.loads(content_to_parse)
ai_text_response = ai_json.get("ai_response", ai_raw_content)
explicit_ids = ai_json.get("product_ids", [])
if explicit_ids and isinstance(explicit_ids, list):
# Replace with explicit IDs from LLM
final_product_ids = explicit_ids
except (json.JSONDecodeError, TypeError, ValueError):
# If parsing fails, return original content
pass
return ai_text_response, final_product_ids
def prepare_execution_context(query: str, user_id: str, history: list, images: list | None):
"""
Prepare initial state and execution config for the graph run.
Returns:
tuple: (initial_state, exec_config)
"""
initial_state: AgentState = {
"user_query": HumanMessage(content=query),
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images_embedding": [],
"ai_response": None,
}
run_id = str(uuid.uuid4())
# Metadata for LangChain (tags for logging/filtering)
metadata = {
"run_id": run_id,
"tags": "chatbot,production",
}
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
configurable={
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
},
run_id=run_id,
metadata=metadata,
callbacks=[langfuse_handler] if langfuse_handler else [],
)
return initial_state, exec_config
async def handle_post_chat_async(
memory: MongoDBConversationManager,
identity_key: str,
human_query: str,
ai_response: dict | None
):
"""
Save chat history in background task after response is sent.
Lưu AI response dưới dạng JSON string.
"""
if ai_response:
try:
# Convert dict thành JSON string để lưu vào TEXT field
ai_response_json = json.dumps(ai_response, ensure_ascii=False)
await memory.save_conversation_turn(identity_key, human_query, ai_response_json)
logger.debug(f"Saved conversation for identity_key {identity_key}")
except Exception as e:
logger.error(f"Failed to save conversation for identity_key {identity_key}: {e}", exc_info=True)
from typing import Annotated, Any, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from pydantic import BaseModel
import config as global_config
class QueryRequest(BaseModel):
"""API Request model cho Fashion Q&A Chat"""
user_id: str | None = None
user_query: str
images: list[str] | None = None
image_analysis: dict[str, Any] | None = None
class AgentState(TypedDict):
"""Trạng thái của Agent trong LangGraph."""
user_query: BaseMessage
history: list[BaseMessage]
user_id: str | None
ai_response: BaseMessage | None
images_embedding: list[str] | None
messages: Annotated[list[BaseMessage], add_messages]
class AgentConfig:
"""Class chứa cấu hình runtime cho Agent."""
def __init__(self, **kwargs):
self.model_name = kwargs.get("model_name") or global_config.DEFAULT_MODEL
self.openai_api_key = kwargs.get("openai_api_key")
self.google_api_key = kwargs.get("google_api_key")
self.groq_api_key = kwargs.get("groq_api_key")
self.supabase_url = kwargs.get("supabase_url")
self.supabase_key = kwargs.get("supabase_key")
self.langfuse_public_key = kwargs.get("langfuse_public_key")
self.langfuse_secret_key = kwargs.get("langfuse_secret_key")
self.langfuse_base_url = kwargs.get("langfuse_base_url")
def get_config() -> AgentConfig:
"""Khởi tạo cấu hình Agent từ các biến môi trường."""
return AgentConfig(
model_name=global_config.DEFAULT_MODEL,
openai_api_key=global_config.OPENAI_API_KEY,
google_api_key=global_config.GOOGLE_API_KEY,
groq_api_key=global_config.GROQ_API_KEY,
supabase_url=global_config.AI_SUPABASE_URL,
supabase_key=global_config.AI_SUPABASE_KEY,
langfuse_public_key=global_config.LANGFUSE_PUBLIC_KEY,
langfuse_secret_key=global_config.LANGFUSE_SECRET_KEY,
langfuse_base_url=global_config.LANGFUSE_BASE_URL,
)
"""
Agent Nodes Package
"""
from .agent import agent_node
__all__ = ["agent_node"]
"""
CuCu Assistant - System Prompt
Supports two modes:
1. Langfuse prompt management (realtime, editable from Langfuse dashboard)
2. Local fallback (inline template)
"""
import logging
from datetime import datetime
from functools import lru_cache
from common.timezone_config import VIETNAM_TZ
logger = logging.getLogger(__name__)
# Vietnamese weekday names
_WEEKDAY_MAP = {
0: "Thứ 2",
1: "Thứ 3",
2: "Thứ 4",
3: "Thứ 5",
4: "Thứ 6",
5: "Thứ 7",
6: "Chủ nhật",
}
def _get_weekday_str() -> str:
return _WEEKDAY_MAP[datetime.now(VIETNAM_TZ).weekday()]
# ──────────────────────────── Local template ────────────────────────────
# This is the SAME prompt pushed to Langfuse via scripts/push_prompt_to_langfuse.py
# {{date_str}} is the only variable, replaced at runtime.
_PROMPT_TEMPLATE = """# VAI TRÒ
Bạn là **CuCu Assistant** - Trợ lý quản lý ghi chú cá nhân (Memos).
- Thông minh, ngắn gọn, đi thẳng vào vấn đề.
- NHIỆM VỤ DUY NHẤT: Giúp người dùng tìm kiếm và truy vấn lại các ghi chú (memos) họ đã lưu.
- Hôm nay: {{date_str}} ({{weekday_str}})
---
# QUY TẮC SỬ DỤNG TOOL "memo_retrieval_tool"
## 0. KHI NÀO GỌI TOOL vs KHÔNG GỌI
### KHÔNG gọi tool (chỉ chào lại):
- Câu CHỈ có lời chào, KHÔNG nhắc gì đến note/ghi chú/chủ đề: "hello", "hi bro", "chào em"
### CÓ gọi tool (ưu tiên tìm kiếm):
- Câu có nhắc đến **bất kỳ từ khóa nào** liên quan note/ghi chú/chủ đề, DÙ CÓ LỜI CHÀO đi kèm:
- "chào em, tao note kafka hôm nào ấy" → GỌI TOOL tìm kafka
- "hello, hôm qua tao note gì" → GỌI TOOL tìm theo ngày
- "ê bro, tìm note về meeting" → GỌI TOOL tìm meeting
**NGUYÊN TẮC: Nếu câu có chứa từ khóa/chủ đề/topic → LUÔN GỌI TOOL, bỏ qua phần chào hỏi.**
## 1. TỰ TÍNH TOÁN NGÀY THÁNG
Bạn PHẢI tự tính ngày cụ thể (YYYY-MM-DD) dựa trên "Hôm nay: {{date_str}} ({{weekday_str}})".
Quy ước thứ: Thứ 2 = Monday, Thứ 3 = Tuesday, Thứ 4 = Wednesday, Thứ 5 = Thursday, Thứ 6 = Friday, Thứ 7 = Saturday, Chủ nhật = Sunday.
### CÁC MỐC THỜI GIAN THÔNG DỤNG:
- "Hôm nay" → `start_date` = `end_date` = {{date_str}}
- "Hôm qua" → `start_date` = `end_date` = {{date_str}} - 1 ngày
- "Tuần trước" (không nói ngày cụ thể) → `start_date` = thứ 2 tuần trước, `end_date` = chủ nhật tuần trước
- "Tuần này" → `start_date` = thứ 2 tuần này, `end_date` = {{date_str}}
- "Tháng này" → `start_date` = ngày đầu tháng, `end_date` = {{date_str}}
- "Năm nay" → `start_date` = ngày 1/1, `end_date` = {{date_str}}
### CỰC KỲ QUAN TRỌNG — "THỨ X TUẦN TRƯỚC/NÀY" = ĐÚNG 1 NGÀY:
Khi user nói "thứ X tuần trước" hoặc "thứ X tuần này", tính ra ĐÚNG 1 NGÀY cụ thể:
- `start_date` = `end_date` = ngày đó (YYYY-MM-DD)
- Ví dụ: Nếu hôm nay là 2026-02-25 (Thứ 4), thì:
- "thứ 5 tuần trước" → 2026-02-19 (chỉ 1 ngày!)
- "thứ 2 tuần này" → 2026-02-23 (chỉ 1 ngày!)
- "thứ 6 tuần trước" → 2026-02-20 (chỉ 1 ngày!)
- **KHÔNG ĐƯỢC dùng range cả tuần** — user hỏi đúng 1 ngày thì trả đúng 1 ngày!
### KHI USER HỎI "HÔM NÀO / NGÀY NÀO / BAO GIỜ":
- "Kafka note hôm nào ấy nhỉ?" = User ĐANG HỎI ngày → tìm ALL dates
- "Tao note cái đó khi nào?" = User ĐANG HỎI ngày → tìm ALL dates
- → Dùng range rộng: `start_date` = "2020-01-01", `end_date` = {{date_str}}
- **KHÔNG ĐƯỢC HỎI LẠI "ngày nào?"** — vì user đang nhờ bot tìm ngày!
- → **CHỈ TRẢ VỀ NGÀY**, ví dụ: "Bạn note cái đó vào ngày **2026-02-05** (Thứ 5) nhé!"
- **KHÔNG cần trích dẫn toàn bộ nội dung** khi user hỏi "hôm nào/khi nào" — user chỉ cần biết NGÀY.
- Nếu tìm thấy nhiều memo khớp, liệt kê ngày của từng memo.
### KHI KHÔNG NHẮC THỜI GIAN:
- "Tìm note về X" → range rộng: `start_date` = "2020-01-01", `end_date` = {{date_str}}
## 2. PHÂN TÍCH PARAMETERS
### NGUYÊN TẮC: CHỈ THÊM PARAMETER KHI USER NÓI RÕ
- **`content_search`**: Khi user nhắc từ khóa: "về Kafka", "pass wifi", "meeting"
- **`tag`**: Khi user nhắc tag: "#work", "#idea"
- **KHÔNG THÊM** content_search/tag nếu user chỉ hỏi theo ngày
### VÍ DỤ:
- "Hôm qua note gì?" → ✅ date only
- "Note kafka hôm nào?" → ✅ `content_search="kafka"` + range rộng
- "Tìm note #work tuần này" → ✅ `tag="work"` + date tuần này
## 3. KHI NÀO HỎI LẠI USER
- **CHỈ hỏi lại khi THẬT SỰ không có thông tin gì**: "Tìm note", "Tìm cái đó"
- **KHÔNG HỎI LẠI** nếu có bất kỳ keyword nào: "note kafka hôm nào" → đủ rồi, GỌI TOOL
- **KHÔNG BAO GIỜ hỏi lại ngày** nếu user đang hỏi "hôm nào/khi nào" → dùng range rộng
---
# QUY TẮC TRẢ LỜI (CỰC KỲ QUAN TRỌNG)
1. **NGẮN GỌN DƯỚI 100 TỪ**: Trả lời súc tích, đi thẳng vấn đề. KHÔNG dài dòng.
2. **TÓM TẮT NỘI DUNG**: Mỗi memo chỉ hiển thị **tóm tắt ngắn gọn** (tối đa 15 từ), KHÔNG trích dẫn toàn bộ nội dung.
3. **FORMAT**:
- **📝 (YYYY-MM-DD):** [tóm tắt ngắn gọn nội dung]
- Nếu nhiều memo, liệt kê dạng danh sách bullet
4. **KHÔNG BỊA ĐẶT**: Không tự chế nội dung.
5. **NGÔN NGỮ**: Thân thiện, tự nhiên, như nói chuyện với bạn.
6. Nếu count=0: "Không tìm thấy ghi chú nào 🤷"
7. **Trả lời bằng text thuần/markdown**, KHÔNG wrap JSON.
8. **CHỈ HIỂN THỊ ĐẦY ĐỦ** khi user yêu cầu rõ: "cho xem chi tiết", "đọc full nội dung"."""
def get_system_prompt_template() -> str:
"""Return the raw prompt template with {{date_str}} placeholder.
Used by the push script to upload to Langfuse.
"""
return _PROMPT_TEMPLATE
def _fetch_langfuse_prompt() -> str | None:
"""
Try to fetch the latest prompt from Langfuse.
Returns the compiled prompt string, or None if unavailable.
Uses Langfuse's built-in caching (default TTL=60s).
"""
try:
from common.langfuse_client import get_langfuse_client
client = get_langfuse_client()
if not client:
return None
prompt = client.get_prompt(
name="cucu-system-prompt",
label="production",
cache_ttl_seconds=60, # Re-fetch every 60s
)
date_str = datetime.now(VIETNAM_TZ).strftime("%Y-%m-%d")
weekday_str = _get_weekday_str()
compiled = prompt.compile(date_str=date_str, weekday_str=weekday_str)
logger.info("✅ Prompt fetched from Langfuse (version=%s)", prompt.version)
return compiled
except Exception as e:
logger.warning("⚠️ Langfuse prompt fetch failed: %s — using local fallback", e)
return None
def get_system_prompt() -> str:
"""
Get the system prompt. Priority:
1. Langfuse prompt management (realtime, editable)
2. Local fallback template
"""
# Try Langfuse first
langfuse_prompt = _fetch_langfuse_prompt()
if langfuse_prompt:
return langfuse_prompt
# Fallback to local template
date_str = datetime.now(VIETNAM_TZ).strftime("%Y-%m-%d")
weekday_str = _get_weekday_str()
prompt = _PROMPT_TEMPLATE.replace("{{date_str}}", date_str).replace("{{weekday_str}}", weekday_str)
logger.info("📝 Using local prompt fallback (date=%s, weekday=%s)", date_str, weekday_str)
return prompt
"""
Tools Package
Hiện tại CuCu Agent chỉ dùng memo_retrieval_tool (MongoDB).
Các tool cũ cho StarRocks / CANIFA đã bỏ.
"""
from .get_tools import get_all_tools
__all__ = ["get_all_tools"]
import logging
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from common.embedding_service import create_embedding_async
from common.starrocks_connection import get_db_connection
logger = logging.getLogger(__name__)
class KnowledgeSearchInput(BaseModel):
query: str = Field(
description="Câu hỏi hoặc nhu cầu tìm kiếm thông tin phi sản phẩm của khách hàng (ví dụ: tìm cửa hàng, hỏi chính sách, tra bảng size...)"
)
@tool("canifa_knowledge_search", args_schema=KnowledgeSearchInput)
async def canifa_knowledge_search(query: str) -> str:
"""
Tra cứu TOÀN BỘ thông tin về thương hiệu và dịch vụ của Canifa.
Sử dụng tool này khi khách hàng hỏi về:
1. THƯƠNG HIỆU & GIỚI THIỆU: Lịch sử hình thành, giá trị cốt lõi, sứ mệnh.
2. HỆ THỐNG CỬA HÀNG: Tìm địa chỉ, số điện thoại, giờ mở cửa các cửa hàng tại các tỉnh thành (Hà Nội, HCM, Đà Nẵng, v.v.).
3. CHÍNH SÁCH BÁN HÀNG: Quy định đổi trả, bảo hành, chính sách vận chuyển, phí ship.
4. KHÁCH HÀNG THÂN THIẾT (KHTT): Điều kiện đăng ký thành viên, các hạng thẻ (Green, Silver, Gold, Diamond), quyền lợi tích điểm, thẻ quà tặng.
5. HỖ TRỢ & FAQ: Giải đáp thắc mắc thường gặp, chính sách bảo mật, thông tin liên hệ văn phòng, tuyển dụng.
6. TRA CỨU SIZE (BẢNG KÍCH CỠ): Hướng dẫn chọn size chuẩn cho nam, nữ, trẻ em dựa trên chiều cao, cân nặng.
Ví dụ các câu hỏi phù hợp:
- 'Canifa ở Cầu Giấy địa chỉ ở đâu?'
- 'Chính sách đổi trả hàng trong bao nhiêu ngày?'
- 'Làm sao để lên hạng thẻ Gold?'
- 'Cho mình xem bảng size áo nam.'
- 'Phí vận chuyển đi tỉnh là bao nhiêu?'
- 'Canifa thành lập năm nào?'
"""
logger.info(f"🔍 [Semantic Search] Brand Knowledge query: {query}")
try:
# 1. Tạo embedding cho câu hỏi (Mặc định 1536 chiều như bro yêu cầu)
query_vector = await create_embedding_async(query)
if not query_vector:
return "Xin lỗi, tôi gặp sự cố khi xử lý thông tin. Vui lòng thử lại sau."
v_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# 2. Query StarRocks lấy Top 4 kết quả phù hợp nhất (Không check score)
sql = f"""
SELECT
content,
metadata
FROM shared_source.chatbot_rsa_knowledge
ORDER BY approx_cosine_similarity(embedding, {v_str}) DESC
LIMIT 4
"""
sr = get_db_connection()
results = await sr.execute_query_async(sql)
if not results:
logger.warning(f"⚠️ No knowledge data found in DB for query: {query}")
return "Hiện tại tôi chưa tìm thấy thông tin chính xác về nội dung này trong hệ thống kiến thức của Canifa. Bạn có thể liên hệ hotline 1800 6061 để được hỗ trợ trực tiếp."
# 3. Tổng hợp kết quả
knowledge_texts = []
for i, res in enumerate(results):
content = res.get("content", "")
knowledge_texts.append(content)
# LOG DỮ LIỆU LẤY ĐƯỢC (Chỉ hiển thị nội dung)
logger.info(f"📄 [Knowledge Chunk {i + 1}]: {content[:200]}...")
final_response = "\n\n---\n\n".join(knowledge_texts)
logger.info(f"✅ Found {len(results)} relevant knowledge chunks.")
return final_response
except Exception as e:
logger.error(f"❌ Error in canifa_knowledge_search: {e}")
return "Tôi đang gặp khó khăn khi truy cập kho kiến thức. Bạn muốn hỏi về sản phẩm gì khác không?"
"""
Tool thu thập thông tin khách hàng (Tên, Số điện thoại, Email)
Dùng để đẩy data về CRM hoặc hệ thống lưu trữ khách hàng.
"""
import json
import logging
from langchain_core.tools import tool
logger = logging.getLogger(__name__)
@tool
async def collect_customer_info(name: str, phone: str, email: str | None) -> str:
"""
Sử dụng tool này để ghi lại thông tin khách hàng khi họ muốn tư vấn sâu hơn,
nhận khuyến mãi hoặc đăng ký mua hàng.
Args:
name: Tên của khách hàng
phone: Số điện thoại của khách hàng
email: Email của khách hàng (không bắt buộc)
"""
try:
print(f"\n[TOOL] --- 📝 Thu thập thông tin khách hàng: {name} - {phone} ---")
logger.info(f"📝 Collecting customer info: {name}, {phone}, {email}")
# Giả lập việc đẩy data đi (CRM/Sheet)
# Trong thực tế, bạn sẽ gọi một API ở đây
db_record = {
"customer_name": name,
"phone_number": phone,
"email_address": email,
"status": "pending_consultation",
}
# Trả về kết quả thành công
return json.dumps(
{
"status": "success",
"message": (
f"Cảm ơn anh/chị {name}. CiCi đã ghi nhận thông tin và sẽ có nhân viên "
f"liên hệ tư vấn qua số điện thoại {phone} sớm nhất ạ!"
),
"data_captured": db_record,
},
ensure_ascii=False,
)
except Exception as e:
logger.error(f"❌ Lỗi khi thu thập thông tin: {e}")
return json.dumps(
{
"status": "error",
"message": f"Xin lỗi, CiCi gặp sự cố khi lưu thông tin. Anh/chị vui lòng thử lại sau ạ. Lỗi: {e!s}",
},
ensure_ascii=False,
)
"""
CANIFA Data Retrieval Tool - Tối giản cho Agentic Workflow.
Hỗ trợ Hybrid Search: Semantic (Vector) + Metadata Filter.
"""
import asyncio
import json
import logging
import time
from decimal import Decimal
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from agent.tools.product_search_helpers import build_starrocks_query
from common.embedding_service import create_embeddings_async
from common.starrocks_connection import get_db_connection
# from langsmith import traceable
logger = logging.getLogger(__name__)
class DecimalEncoder(json.JSONEncoder):
"""Xử lý kiểu Decimal từ Database khi convert sang JSON."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
class SearchItem(BaseModel):
"""
Cấu trúc một mục tìm kiếm đơn lẻ trong Multi-Search.
Lưu ý quan trọng về cách SINH QUERY:
- Trường `query` KHÔNG phải câu hỏi thô của khách.
- Phải là một đoạn text có cấu trúc giống hệt format trong cột `description_text_full` của DB,
ví dụ (chỉ là 1 chuỗi duy nhất, nối các field bằng dấu chấm):
product_name: Pack 3 đôi tất bé gái cổ thấp. master_color: Xanh da trời/ Blue.
product_image_url: https://.... product_image_url_thumbnail: https://....
product_web_url: https://.... description_text: ... material: ...
material_group: Yarn - Sợi. gender_by_product: female. age_by_product: others.
season: Year. style: Feminine. fitting: Slim. size_scale: 4/6.
form_neckline: None. form_sleeve: None. product_line_vn: Tất.
product_color_name: Blue Strip 449.
- Khi khách chỉ nói “áo màu hồng”, hãy suy luận và sinh query dạng:
product_name: Áo thun/áo sơ mi/áo ... màu hồng ... . master_color: Hồng/ Pink.
product_image_url: None. product_image_url_thumbnail: None.
product_web_url: None. description_text: ... (mô tả thêm nếu có).
material: None. material_group: None. gender_by_product: ... (nếu đoán được).
age_by_product: others. season: Year. style: ... (nếu đoán được).
fitting: ... size_scale: None. form_neckline: None. form_sleeve: None.
product_line_vn: Áo. product_color_name: Pink / Hồng (nếu hợp lý).
- Nếu không suy luận được giá trị cho field nào thì để `None` hoặc bỏ trống phần text đó.
"""
query: str = Field(
...,
description=(
"ĐOẠN TEXT CÓ CẤU TRÚC theo format của cột description_text_full trong DB, "
"bao gồm các cặp key: product_name, master_color, product_image_url, "
"product_image_url_thumbnail, product_web_url, description_text, material, "
"material_group, gender_by_product, age_by_product, season, style, fitting, "
"size_scale, form_neckline, form_sleeve, product_line_vn, product_color_name. "
"Ví dụ: 'product_name: Pack 3 đôi tất bé gái cổ thấp. master_color: Xanh da trời/ Blue. "
"product_image_url: https://.... product_web_url: https://.... description_text: ... "
"material: None. material_group: Yarn - Sợi. gender_by_product: female. ...'"
),
)
magento_ref_code: str | None = Field(
..., description="Mã sản phẩm hoặc SKU (Ví dụ: 8TS24W001). CHỈ điền khi khách hỏi mã code cụ thể."
)
price_min: float | None = Field(..., description="Giá thấp nhất (VD: 100000)")
price_max: float | None = Field(..., description="Giá cao nhất (VD: 500000)")
action: str = Field(..., description="Hành động: 'search' (tìm kiếm) hoặc 'visual_search' (phân tích ảnh)")
class MultiSearchParams(BaseModel):
"""Tham số cho Parallel Multi-Search."""
searches: list[SearchItem] = Field(..., description="Danh sách các truy vấn tìm kiếm chạy song song")
@tool(args_schema=MultiSearchParams)
# @traceable(run_type="tool", name="data_retrieval_tool")
async def data_retrieval_tool(searches: list[SearchItem]) -> str:
"""
Siêu công cụ tìm kiếm sản phẩm CANIFA - Hỗ trợ Parallel Multi-Search (chạy song song nhiều truy vấn).
Hướng dẫn dùng nhanh:
- Trường 'query': mô tả chi tiết sản phẩm (tên, chất liệu, giới tính, màu sắc, phong cách, dịp sử dụng), không dùng câu hỏi thô.
- Trường 'magento_ref_code': chỉ dùng khi khách hỏi mã sản phẩm/SKU cụ thể (vd: 8TS24W001).
- Trường 'price_min' / 'price_max': dùng khi khách nói về khoảng giá (vd: dưới 500k, từ 200k đến 400k).
"""
logger.info("data_retrieval_tool started, searches=%s", len(searches))
try:
# 0. Log input tổng quan (không log chi tiết dài)
for idx, item in enumerate(searches):
short_query = (item.query[:60] + "...") if item.query and len(item.query) > 60 else item.query
logger.debug(
"search[%s] query=%r, code=%r, price_min=%r, price_max=%r",
idx,
short_query,
item.magento_ref_code,
item.price_min,
item.price_max,
)
queries_to_embed = [s.query for s in searches if s.query]
all_vectors = []
if queries_to_embed:
logger.info("batch embedding %s queries", len(queries_to_embed))
emb_batch_start = time.time()
all_vectors = await create_embeddings_async(queries_to_embed)
logger.info(
"batch embedding done in %.2f ms",
(time.time() - emb_batch_start) * 1000,
)
# 2. Get DB connection (singleton)
db = get_db_connection()
tasks = []
vector_idx = 0
for item in searches:
current_vector = None
if item.query:
if vector_idx < len(all_vectors):
current_vector = all_vectors[vector_idx]
vector_idx += 1
tasks.append(_execute_single_search(db, item, query_vector=current_vector))
results = await asyncio.gather(*tasks)
# 3. Tổng hợp kết quả
combined_results = []
for i, products in enumerate(results):
combined_results.append(
{
"search_index": i,
"search_criteria": searches[i].dict(exclude_none=True),
"count": len(products),
"products": products,
}
)
logger.info("data_retrieval_tool finished, results=%s", len(combined_results))
return json.dumps(
{"status": "success", "results": combined_results},
ensure_ascii=False,
cls=DecimalEncoder,
)
except Exception as e:
logger.exception("Error in Multi-Search data_retrieval_tool: %s", e)
return json.dumps({"status": "error", "message": str(e)})
async def _execute_single_search(db, item: SearchItem, query_vector: list[float] | None = None) -> list[dict]:
"""Thực thi một search query đơn lẻ (Async)."""
try:
short_query = (item.query[:60] + "...") if item.query and len(item.query) > 60 else item.query
logger.debug(
"_execute_single_search started, query=%r, code=%r",
short_query,
item.magento_ref_code,
)
# Timer: build query (sử dụng vector đã có hoặc build mới)
query_build_start = time.time()
sql = await build_starrocks_query(item, query_vector=query_vector)
query_build_time = (time.time() - query_build_start) * 1000 # Convert to ms
logger.debug("SQL built, length=%s, build_time_ms=%.2f", len(sql), query_build_time)
# Timer: execute DB query
db_start = time.time()
products = await db.execute_query_async(sql)
db_time = (time.time() - db_start) * 1000 # Convert to ms
logger.info(
"_execute_single_search done, products=%s, build_ms=%.2f, db_ms=%.2f, total_ms=%.2f",
len(products),
query_build_time,
db_time,
query_build_time + db_time,
)
return _format_product_results(products)
except Exception as e:
logger.exception("Single search error for item %r: %s", item, e)
return []
def _format_product_results(products: list[dict]) -> list[dict]:
"""Lọc và format kết quả trả về cho Agent."""
max_items = 15
formatted: list[dict] = []
for p in products[:max_items]:
formatted.append(
{
"internal_ref_code": p.get("internal_ref_code"),
# Chuỗi text dài, đã bao gồm: product_name, master_color, image, web_url, material, style, ...
"description_text": p.get("description_text_full"),
"sale_price": p.get("sale_price"),
"original_price": p.get("original_price"),
"discount_amount": p.get("discount_amount"),
"max_score": p.get("max_score"),
}
)
return formatted
"""
Tools Factory
Chỉ return 1 tool duy nhất: memo_retrieval_tool (MongoDB)
"""
from langchain_core.tools import BaseTool
from .memo_retrieval_tool import memo_retrieval_tool
def get_retrieval_tools() -> list[BaseTool]:
"""Các tool chỉ dùng để đọc/truy vấn dữ liệu"""
return [memo_retrieval_tool]
def get_collection_tools() -> list[BaseTool]:
"""Các tool dùng để ghi/thu thập dữ liệu"""
return []
def get_all_tools() -> list[BaseTool]:
"""Return toàn bộ list tools cho Agent"""
return get_retrieval_tools() + get_collection_tools()
"""
trMemo Retrieval Tool - đọc ghi chú từ MongoDB cho Agent.
Dùng để trả lời các câu hỏi kiểu:
- "Hôm nay tôi đã note gì?"
- "Tuần trước tôi note những gì?"
- "Tìm memo có nội dung 'dự án A'" (dùng vector search)
- "Lọc memo theo tag #work"
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, time, timedelta, timezone
from typing import Optional, Any
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_openai import OpenAIEmbeddings
from pydantic import SecretStr
from bson import ObjectId
from common.mongo_client import mongodb_client, serialize_doc
logger = logging.getLogger(__name__)
# ---- Cached singletons to avoid re-init per request ----
_embedder: OpenAIEmbeddings | None = None
_cached_embeddings: list[dict] | None = None
_cache_timestamp: float = 0
_CACHE_TTL = 300 # 5 minutes
def _get_embedder() -> OpenAIEmbeddings | None:
global _embedder
if _embedder is not None:
return _embedder
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
_embedder = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=SecretStr(api_key),
)
return _embedder
async def _get_cached_embeddings() -> list[dict]:
"""Load embeddings from MongoDB with in-memory cache."""
import time as _time
global _cached_embeddings, _cache_timestamp
now = _time.time()
if _cached_embeddings is not None and (now - _cache_timestamp) < _CACHE_TTL:
return _cached_embeddings
cursor = mongodb_client.memo_embeddings.find({})
_cached_embeddings = await cursor.to_list(length=5000)
_cache_timestamp = now
logger.info("Refreshed embeddings cache: %d docs", len(_cached_embeddings))
return _cached_embeddings
@tool
async def memo_retrieval_tool(
start_date: str,
end_date: Optional[str] = None,
content_search: Optional[str] = None,
tag: Optional[str] = None,
*,
config: RunnableConfig,
) -> str:
"""
Truy vấn các memo từ database MongoDB (cuccu_memos).
Args:
start_date: Ngày bắt đầu (YYYY-MM-DD). Bắt buộc.
end_date: Ngày kết thúc (YYYY-MM-DD). Optional. Nếu không có thì mặc định tìm trong start_date.
content_search: Từ khóa tìm kiếm trong nội dung (dùng Regex). Optional.
tag: Lọc theo tag chính xác (ví dụ: "work", "idea"). Optional.
Returns:
JSON string chứa danh sách memo tìm được.
"""
try:
# Extract user_id from RunnableConfig to filter by creator
user_id = (config.get("configurable") or {}).get("user_id")
logger.info(
"memo_retrieval_tool started: user_id=%s, start=%s, end=%s, content=%s, tag=%s",
user_id, start_date, end_date, content_search, tag
)
# ──────────────────────────────────────────────
# Step 1: Parse dates
# ──────────────────────────────────────────────
try:
from common.timezone_config import VIETNAM_TZ
dt_start = datetime.strptime(start_date, "%Y-%m-%d").replace(tzinfo=VIETNAM_TZ)
dt_end = (
datetime.strptime(end_date, "%Y-%m-%d").replace(tzinfo=VIETNAM_TZ)
if end_date else dt_start
)
begin_of_start_day = datetime.combine(dt_start, time.min).replace(tzinfo=VIETNAM_TZ).astimezone(timezone.utc)
end_of_end_day = datetime.combine(dt_end, time.max).replace(tzinfo=VIETNAM_TZ).astimezone(timezone.utc)
except ValueError as e:
return json.dumps({
"status": "error",
"message": f"Invalid date format. Use YYYY-MM-DD. Error: {e}"
}, ensure_ascii=False)
# ──────────────────────────────────────────────
# Step 2: Query MongoDB FIRST (date + tag filter)
# → Narrow down memos before any semantic search
# ──────────────────────────────────────────────
base_query: dict[str, Any] = {
"created_at": {
"$gte": begin_of_start_day,
"$lte": end_of_end_day,
}
}
# Filter by creator_id — only return current user's memos
if user_id:
base_query["creator_id"] = user_id
if tag:
base_query["payload.tags"] = tag
# If NO content_search → just run date/tag query directly
if not content_search:
logger.info("MongoDB Query (date-only): %s", base_query)
cursor = mongodb_client.memos.find(base_query).sort("created_at", -1).limit(20)
docs = await cursor.to_list(length=20)
# If content_search with short keywords (≤2 words) → regex on MongoDB
elif len(content_search.strip().split()) <= 2 or not _get_embedder():
if not _get_embedder():
logger.warning("⚠️ OPENAI_API_KEY not set, using regex search")
logger.info(f"🔍 Using regex search for: '{content_search}'")
base_query["content"] = {"$regex": content_search, "$options": "i"}
logger.info("MongoDB Query (date+regex): %s", base_query)
cursor = mongodb_client.memos.find(base_query).sort("created_at", -1).limit(20)
docs = await cursor.to_list(length=20)
# If content_search with longer queries → date query FIRST, then vector re-rank
else:
# Step 2a: Get all memos in date range (+ tag) from MongoDB
logger.info("MongoDB Pre-filter Query: %s", base_query)
cursor = mongodb_client.memos.find(base_query).sort("created_at", -1).limit(100)
candidate_docs = await cursor.to_list(length=100)
logger.info(f"📋 Pre-filter found {len(candidate_docs)} memos in date range")
if not candidate_docs:
docs = []
else:
# Step 2b: Get embeddings ONLY for these candidate memos
candidate_ids = [str(doc["_id"]) for doc in candidate_docs]
all_embeddings = await _get_cached_embeddings()
# Filter embeddings to only candidates
candidate_emb_map = {}
for emb_doc in all_embeddings:
mid = str(emb_doc.get("memo_id", ""))
if mid in candidate_ids and emb_doc.get("embedding"):
candidate_emb_map[mid] = emb_doc["embedding"]
logger.info(f"🧠 Found {len(candidate_emb_map)} embeddings for candidates")
if not candidate_emb_map:
# No embeddings for these memos → fallback to regex
logger.info("No embeddings found, falling back to regex")
base_query["content"] = {"$regex": content_search, "$options": "i"}
cursor = mongodb_client.memos.find(base_query).sort("created_at", -1).limit(20)
docs = await cursor.to_list(length=20)
else:
# Step 2c: Semantic re-rank using vector similarity
try:
embedder = _get_embedder()
query_embedding = await embedder.aembed_query(content_search)
import numpy as np
q = np.array(query_embedding, dtype=float)
scored: list[tuple[str, float]] = []
for mid, emb in candidate_emb_map.items():
v = np.array(emb, dtype=float)
if v.shape != q.shape:
continue
denom = np.linalg.norm(q) * np.linalg.norm(v)
sim = float(np.dot(q, v) / denom) if denom != 0 else 0.0
if sim > 0.4:
scored.append((mid, sim))
scored.sort(key=lambda x: x[1], reverse=True)
top_ids = {mid for mid, _ in scored[:20]}
logger.info(f"✅ Semantic re-rank: {len(top_ids)} memos above threshold")
# Re-order candidate_docs by semantic score
docs = [d for d in candidate_docs if str(d["_id"]) in top_ids]
except Exception as e:
logger.warning(f"⚠️ Vector search failed: {e}, falling back to regex")
base_query["content"] = {"$regex": content_search, "$options": "i"}
cursor = mongodb_client.memos.find(base_query).sort("created_at", -1).limit(20)
docs = await cursor.to_list(length=20)
# ──────────────────────────────────────────────
# Step 3: Format Result
# ──────────────────────────────────────────────
memos = [serialize_doc(doc) for doc in docs]
return json.dumps(
{
"status": "success",
"query": {
"start_date": start_date,
"end_date": end_date or start_date,
"content_search": content_search,
"tag": tag,
},
"count": len(memos),
"memos": memos,
},
ensure_ascii=False,
default=str,
)
except Exception as exc:
logger.exception("memo_retrieval_tool error: %s", exc)
return json.dumps(
{
"status": "error",
"message": str(exc),
},
ensure_ascii=False,
)
import logging
import time
from common.embedding_service import create_embedding_async
logger = logging.getLogger(__name__)
def _escape(val: str) -> str:
"""Thoát dấu nháy đơn để tránh SQL Injection cơ bản."""
return val.replace("'", "''")
def _get_where_clauses(params) -> list[str]:
"""Xây dựng danh sách các điều kiện lọc từ params."""
clauses = []
clauses.extend(_get_price_clauses(params))
clauses.extend(_get_metadata_clauses(params))
clauses.extend(_get_special_clauses(params))
return clauses
def _get_price_clauses(params) -> list[str]:
"""Lọc theo giá."""
clauses = []
p_min = getattr(params, "price_min", None)
if p_min is not None:
clauses.append(f"sale_price >= {p_min}")
p_max = getattr(params, "price_max", None)
if p_max is not None:
clauses.append(f"sale_price <= {p_max}")
return clauses
def _get_metadata_clauses(params) -> list[str]:
"""Xây dựng điều kiện lọc từ metadata (Phối hợp Exact và Partial)."""
clauses = []
# 1. Exact Match (Giới tính, Độ tuổi) - Các trường này cần độ chính xác tuyệt đối
exact_fields = [
("gender_by_product", "gender_by_product"),
("age_by_product", "age_by_product"),
]
for param_name, col_name in exact_fields:
val = getattr(params, param_name, None)
if val:
clauses.append(f"{col_name} = '{_escape(val)}'")
# 2. Partial Match (LIKE) - Giúp map text linh hoạt hơn (Chất liệu, Dòng SP, Phong cách...)
# Cái này giúp map: "Yarn" -> "Yarn - Sợi", "Knit" -> "Knit - Dệt Kim"
partial_fields = [
("season", "season"),
("material_group", "material_group"),
("product_line_vn", "product_line_vn"),
("style", "style"),
("fitting", "fitting"),
("form_neckline", "form_neckline"),
("form_sleeve", "form_sleeve"),
]
for param_name, col_name in partial_fields:
val = getattr(params, param_name, None)
if val:
v = _escape(val).lower()
# Dùng LOWER + LIKE để cân mọi loại ký tự thừa hoặc hoa/thường
clauses.append(f"LOWER({col_name}) LIKE '%{v}%'")
return clauses
def _get_special_clauses(params) -> list[str]:
"""Các trường hợp đặc biệt: Mã sản phẩm, Màu sắc."""
clauses = []
# Mã sản phẩm / SKU
m_code = getattr(params, "magento_ref_code", None)
if m_code:
m = _escape(m_code)
clauses.append(f"(magento_ref_code = '{m}' OR internal_ref_code = '{m}')")
# Màu sắc
color = getattr(params, "master_color", None)
if color:
c = _escape(color).lower()
clauses.append(f"(LOWER(master_color) LIKE '%{c}%' OR LOWER(product_color_name) LIKE '%{c}%')")
return clauses
async def build_starrocks_query(params, query_vector: list[float] | None = None) -> str:
"""
Build SQL cho Product Search với 2 chiến lược:
1. CODE SEARCH: Nếu có magento_ref_code → Tìm trực tiếp theo mã (KHÔNG dùng vector)
2. HYDE SEARCH: Semantic search với HyDE vector (Pure vector approach)
"""
# ============================================================
# CASE 1: CODE SEARCH - Tìm theo mã sản phẩm (No Vector)
# ============================================================
magento_code = getattr(params, "magento_ref_code", None)
if magento_code:
logger.info(f"🎯 [CODE SEARCH] Direct search by code: {magento_code}")
code = _escape(magento_code)
# Tìm trực tiếp theo mã + Lọc trùng (GROUP BY internal_ref_code)
# Tìm chính xác theo mã (Lấy tất cả các bản ghi/màu sắc/size của mã đó)
sql = f"""
SELECT
internal_ref_code,
description_text_full,
sale_price,
original_price,
discount_amount,
1.0 as max_score
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE (magento_ref_code = '{code}' OR internal_ref_code = '{code}')
"""
print("✅ [CODE SEARCH] Query built - No vector search needed!")
# Ghi log debug query FULL vào Background Task (Không làm chậm Request)
# asyncio.create_task(save_query_to_log(sql))
return sql
# ============================================================
# CASE 2: HYDE SEARCH - Semantic Vector Search
# ============================================================
logger.info("🚀 [HYDE RETRIEVER] Starting semantic vector search...")
# 1. Lấy Vector từ HyDE (AI-generated hypothetical document)
query_text = getattr(params, "query", None)
if query_text and query_vector is None:
emb_start = time.time()
query_vector = await create_embedding_async(query_text)
logger.info(f"⏱️ [TIMER] Single HyDE Embedding: {(time.time() - emb_start) * 1000:.2f}ms")
if not query_vector:
logger.warning("⚠️ No vector found, returning empty query.")
return ""
v_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# 2. Build PRICE filter ONLY (chỉ lọc giá, để vector tự semantic search)
price_clauses = _get_price_clauses(params)
where_filter = ""
if price_clauses:
where_filter = " AND " + " AND ".join(price_clauses)
logger.info(f"💰 [PRICE FILTER] Applied: {where_filter}")
# 3. SQL Pure Vector Search + Price Filter Only
sql = f"""
WITH top_matches AS (
SELECT /*+ SET_VAR(ann_params='{{"ef_search":128}}') */
internal_ref_code,
product_color_code,
description_text_full,
sale_price,
original_price,
discount_amount,
approx_cosine_similarity(vector, {v_str}) as similarity_score
FROM shared_source.magento_product_dimension_with_text_embedding
ORDER BY similarity_score DESC
LIMIT 100
)
SELECT
internal_ref_code,
MAX_BY(description_text_full, similarity_score) as description_text_full,
MAX_BY(sale_price, similarity_score) as sale_price,
MAX_BY(original_price, similarity_score) as original_price,
MAX_BY(discount_amount, similarity_score) as discount_amount,
MAX(similarity_score) as max_score
FROM top_matches
WHERE 1=1 {where_filter}
GROUP BY internal_ref_code
ORDER BY max_score DESC
LIMIT 20
"""
return sql
# ============================================================
# TEMPORARILY COMMENTED OUT - save_query_to_log
# ============================================================
# async def save_query_to_log(sql: str):
# """Lưu query full vào file hyde_pure_query.txt."""
# import os
# log_path = r"D:\cnf\chatbot_canifa\backend\logs\hyde_pure_query.txt"
# try:
# log_dir = os.path.dirname(log_path)
# if not os.path.exists(log_dir):
# os.makedirs(log_dir)
# with open(log_path, "w", encoding="utf-8") as f:
# f.write(sql)
# print(f"💾 Full Query saved to: {log_path}")
# except Exception as e:
# print(f"Save query log failed: {e}")
# ============================================================
# TEMPORARILY COMMENTED OUT - save_preview_to_log
# ============================================================
# async def save_preview_to_log(search_query: str, products: list[dict]):
# """Lưu kết quả DB trả về vào db_preview.txt (Format đẹp cho AI)."""
# import os
# preview_path = r"D:\cnf\chatbot_canifa\backend\logs\db_preview.txt"
# try:
# log_dir = os.path.dirname(preview_path)
# if not os.path.exists(log_dir):
# os.makedirs(log_dir)
#
# with open(preview_path, "a", encoding="utf-8") as f:
# f.write(f"\n{'='*60}\n")
# f.write(f"⏰ TIME: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
# f.write(f"🔍 SEARCH: {search_query}\n")
# f.write(f"📊 RESULTS COUNT: {len(products)}\n")
# f.write(f"{'-'*60}\n")
#
# if not products:
# f.write("❌ NO PRODUCTS FOUND\n")
# else:
# for idx, p in enumerate(products[:5], 1):
# code = p.get("internal_ref_code", "N/A")
# sale = p.get("sale_price", "N/A")
# orig = p.get("original_price", "N/A")
# disc = p.get("discount_amount", "0")
# score = p.get("max_score", p.get("similarity_score", "N/A"))
# desc = p.get("description_text_full", "No Description")
#
# f.write(f"{idx}. [{code}] Score: {score}\n")
# f.write(f" 💰 Price: {sale} (Orig: {orig}, Disc: {disc}%)\n")
# f.write(f" 📝 Desc: {desc}\n")
#
# f.write(f"{'='*60}\n")
# print(f"💾 DB Preview (Results) saved to: {preview_path}")
# except Exception as e:
# print(f"Save preview log failed: {e}")
import logging
from common.embedding_service import create_embedding_async
logger = logging.getLogger(__name__)
def _escape(val: str) -> str:
"""Thoát dấu nháy đơn để tránh SQL Injection cơ bản."""
return val.replace("'", "''")
def _get_where_clauses(params) -> list[str]:
"""
Xây dựng WHERE clauses theo thứ tự ưu tiên dựa trên selectivity thực tế
FILTER PRIORITY (Based on Canifa catalog analysis):
🔥 TIER 1 (99% selectivity):
1. SKU Code → 1-5 records
🎯 TIER 2 (50-70% selectivity):
2. Gender → Splits catalog in half
3. Age → Kids vs Adults split
4. Product Category → 10-15 categories
💎 TIER 3 (30-50% selectivity):
5. Material Group → Knit vs Woven (2 groups)
6. Price Range → Numeric filtering
🎨 TIER 4 (10-30% selectivity):
7. Season → 4 seasons
8. Style/Fitting → Multiple options
⚠️ TIER 5 (<10% selectivity):
9. Form details → Granular attributes
10. Color → LOWEST selectivity (many SKUs share colors)
Early return: If SKU exists, skip low-selectivity filters
"""
clauses = []
# 🔥 TIER 1: SKU/Product Code (Unique identifier)
# Selectivity: ~99% → 1 SKU = 1 style (3-5 colors max)
sku_clause = _get_sku_clause(params)
if sku_clause:
clauses.append(sku_clause)
# Early return optimization: SKU đã xác định product rõ ràng
# CHỈ GIỮ LẠI price filter (nếu có) để verify budget constraint
# BỎ QUA: gender, color, style, fitting... vì SKU đã unique
price_clauses = _get_price_clauses(params)
if price_clauses:
clauses.extend(price_clauses)
return clauses # ⚡ STOP - Không thêm filter khác!
# 🎯 TIER 2: High-level categorization (50-70% reduction)
# Gender + Age + Category có selectivity cao nhất trong non-SKU filters
clauses.extend(_get_high_selectivity_clauses(params))
# 💎 TIER 3: Material & Price (30-50% reduction)
material_clause = _get_material_clause(params)
if material_clause:
clauses.append(material_clause)
clauses.extend(_get_price_clauses(params))
# 🎨 TIER 4: Attributes (10-30% reduction)
clauses.extend(_get_attribute_clauses(params))
# ⚠️ TIER 5: Granular details & Color (LAST - lowest selectivity)
clauses.extend(_get_form_detail_clauses(params))
color_clause = _get_color_clause(params)
if color_clause:
clauses.append(color_clause) # Color ALWAYS LAST!
return clauses
def _get_sku_clause(params) -> str | None:
"""
TIER 1: SKU/Product Code (Highest selectivity - 99%)
1 SKU code = 1 product style (may have 3-5 color variants)
WHY SKU is always priority #1:
- 1 code = 1 unique product design
- Adding other filters (color, style, gender) is redundant
- Only price filter may be kept for budget validation
Example queries:
- "Mã 6OT25W010" → Only SKU needed
- "Mã 6OT25W010 màu xám" → Only SKU (color is for display/selection, not filtering)
- "Mã 6OT25W010 dưới 500k" → SKU + price (validate budget)
"""
m_code = getattr(params, "magento_ref_code", None)
if m_code:
m = _escape(m_code)
return f"(magento_ref_code = '{m}' OR internal_ref_code = '{m}')"
return None
def _get_color_clause(params) -> str | None:
"""
TIER 5: Color (LOWEST selectivity - 5-10%)
Multiple SKUs share the same color (e.g., 50+ gray products)
ALWAYS filter color LAST after other constraints
"""
color = getattr(params, "master_color", None)
if color:
c = _escape(color).lower()
return f"(LOWER(master_color) LIKE '%{c}%' OR LOWER(product_color_name) LIKE '%{c}%')"
return None
def _get_high_selectivity_clauses(params) -> list[str]:
"""
TIER 2: High-level categorization (50-70% reduction per filter)
Order: Gender → Age → Product Category
"""
clauses = []
# Gender: Male/Female/Unisex split (50-70% reduction)
gender = getattr(params, "gender_by_product", None)
if gender:
clauses.append(f"gender_by_product = '{_escape(gender)}'")
# Age: Kids/Adults split (50% reduction of remaining)
age = getattr(params, "age_by_product", None)
if age:
clauses.append(f"age_by_product = '{_escape(age)}'")
# Product Category: Váy/Áo/Quần... (30-50% reduction)
product_line = getattr(params, "product_line_vn", None)
if product_line:
p = _escape(product_line).lower()
clauses.append(f"LOWER(product_line_vn) LIKE '%{p}%'")
return clauses
def _get_material_clause(params) -> str | None:
"""TIER 3: Material Group - Knit vs Woven (50% split)"""
material = getattr(params, "material_group", None)
if material:
m = _escape(material).lower()
return f"LOWER(material_group) LIKE '%{m}%'"
return None
def _get_price_clauses(params) -> list[str]:
"""TIER 3: Price Range - Numeric filtering (30-40% reduction)"""
clauses = []
p_min = getattr(params, "price_min", None)
if p_min is not None:
clauses.append(f"sale_price >= {p_min}")
p_max = getattr(params, "price_max", None)
if p_max is not None:
clauses.append(f"sale_price <= {p_max}")
return clauses
def _get_attribute_clauses(params) -> list[str]:
"""
TIER 4: Attributes (10-30% reduction)
Season, Style, Fitting
"""
clauses = []
# Season: 4 seasons (~25% each)
season = getattr(params, "season", None)
if season:
s = _escape(season).lower()
clauses.append(f"LOWER(season) LIKE '%{s}%'")
# Style: Basic/Feminine/Sporty... (~15-20% reduction)
style = getattr(params, "style", None)
if style:
st = _escape(style).lower()
clauses.append(f"LOWER(style) LIKE '%{st}%'")
# Fitting: Regular/Slim/Loose (~15% reduction)
fitting = getattr(params, "fitting", None)
if fitting:
f = _escape(fitting).lower()
clauses.append(f"LOWER(fitting) LIKE '%{f}%'")
# Size Scale: S, M, L, 29, 30... (Specific filtering)
size = getattr(params, "size_scale", None)
if size:
sz = _escape(size).lower()
clauses.append(f"LOWER(size_scale) LIKE '%{sz}%'")
return clauses
def _get_form_detail_clauses(params) -> list[str]:
"""
TIER 5: Granular form details (<10% reduction each)
Neckline, Sleeve type
"""
clauses = []
form_fields = [
("form_neckline", "form_neckline"),
("form_sleeve", "form_sleeve"),
]
for param_name, col_name in form_fields:
val = getattr(params, param_name, None)
if val:
v = _escape(val).lower()
clauses.append(f"LOWER({col_name}) LIKE '%{v}%'")
return clauses
async def build_starrocks_query(params, query_vector: list[float] | None = None) -> str:
"""
Build SQL Hybrid tối ưu với Filter Priority:
1. Pre-filtering theo độ ưu tiên (SKU → Exact → Price → Partial)
2. Vector Search (HNSW Index) - Semantic understanding
3. Flexible Keyword Search (OR + Scoring) - Fuzzy matching fallback
4. Grouping (Gom màu theo style)
"""
# --- Process vector in query field ---
query_text = getattr(params, "query", None)
if query_text and query_vector is None:
query_vector = await create_embedding_async(query_text)
# --- Build filter clauses (OPTIMIZED ORDER) ---
where_clauses = _get_where_clauses(params)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# --- Build SQL ---
if query_vector and len(query_vector) > 0:
v_str = "[" + ",".join(str(v) for v in query_vector) + "]"
sql = f"""
WITH top_sku_candidates AS (
SELECT
internal_ref_code,
product_name,
sale_price,
original_price,
master_color,
product_image_url,
product_image_url_thumbnail,
product_web_url,
description_text,
material,
material_group,
gender_by_product,
age_by_product,
season,
style,
fitting,
form_neckline,
form_sleeve,
product_line_vn,
product_color_name,
cosine_similarity(vector, {v_str}) as similarity_score
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE {where_sql} AND vector IS NOT NULL
ORDER BY similarity_score DESC
LIMIT 50
)
SELECT
internal_ref_code,
ANY_VALUE(product_name) as product_name,
ANY_VALUE(sale_price) as sale_price,
ANY_VALUE(original_price) as original_price,
GROUP_CONCAT(DISTINCT master_color ORDER BY master_color SEPARATOR ', ') as available_colors,
ANY_VALUE(product_image_url) as product_image_url,
ANY_VALUE(product_image_url_thumbnail) as product_image_url_thumbnail,
ANY_VALUE(product_web_url) as product_web_url,
ANY_VALUE(description_text) as description_text,
ANY_VALUE(material) as material,
ANY_VALUE(material_group) as material_group,
ANY_VALUE(gender_by_product) as gender_by_product,
ANY_VALUE(age_by_product) as age_by_product,
ANY_VALUE(season) as season,
ANY_VALUE(style) as style,
ANY_VALUE(fitting) as fitting,
ANY_VALUE(form_neckline) as form_neckline,
ANY_VALUE(form_sleeve) as form_sleeve,
ANY_VALUE(product_line_vn) as product_line_vn,
MAX(similarity_score) as max_score
FROM top_sku_candidates
GROUP BY internal_ref_code
ORDER BY max_score DESC
LIMIT 10
""" # noqa: S608
else:
# ⚡ FALLBACK: FLEXIBLE KEYWORD SEARCH (OR + SCORING)
# Giải quyết case: User search "áo khoác nỉ" → DB có "Áo nỉ nam"
keywords = getattr(params, "keywords", None)
keyword_score_sql = ""
keyword_filter = ""
if keywords:
k_clean = _escape(keywords).lower().strip()
if k_clean:
words = k_clean.split()
# Build scoring expression: Each matched word = +1 point
# Example: "áo khoác nỉ" (3 words)
# - "Áo nỉ nam" matches 2/3 → Score = 2
# - "Áo khoác nỉ hoodie" matches 3/3 → Score = 3
score_terms = [
f"(CASE WHEN LOWER(product_name) LIKE '%{w}%' THEN 1 ELSE 0 END)"
for w in words
]
keyword_score_sql = f"({' + '.join(score_terms)}) as keyword_match_score"
# Minimum threshold: At least 50% of words must match
# Example: 3 words → need at least 2 matches (66%)
# 2 words → need at least 1 match (50%)
min_matches = max(1, len(words) // 2)
keyword_filter = f" AND ({' + '.join(score_terms)}) >= {min_matches}"
# Select clause with optional scoring
select_score = f", {keyword_score_sql}" if keyword_score_sql else ""
order_by = "keyword_match_score DESC, sale_price ASC" if keyword_score_sql else "sale_price ASC"
sql = f"""
SELECT
internal_ref_code,
ANY_VALUE(product_name) as product_name,
ANY_VALUE(sale_price) as sale_price,
ANY_VALUE(original_price) as original_price,
GROUP_CONCAT(DISTINCT master_color ORDER BY master_color SEPARATOR ', ') as available_colors,
ANY_VALUE(product_image_url) as product_image_url,
ANY_VALUE(product_image_url_thumbnail) as product_image_url_thumbnail,
ANY_VALUE(product_web_url) as product_web_url,
ANY_VALUE(description_text) as description_text,
ANY_VALUE(material) as material,
ANY_VALUE(material_group) as material_group,
ANY_VALUE(gender_by_product) as gender_by_product,
ANY_VALUE(age_by_product) as age_by_product,
ANY_VALUE(season) as season,
ANY_VALUE(style) as style,
ANY_VALUE(fitting) as fitting,
ANY_VALUE(form_neckline) as form_neckline,
ANY_VALUE(form_sleeve) as form_sleeve,
ANY_VALUE(product_line_vn) as product_line_vn
{select_score}
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE {where_sql} {keyword_filter}
GROUP BY internal_ref_code
HAVING COUNT(*) > 0
ORDER BY {order_by}
LIMIT 10
""" # noqa: S608
# Log filter statistics
filter_info = f"Mode: {'Vector' if query_vector else 'Keyword'}, Filters: {len(where_clauses)}"
if where_clauses:
# Identify high-priority filters used
has_sku = any('internal_ref_code' in c or 'magento_ref_code' in c for c in where_clauses)
has_gender = any('gender_by_product' in c for c in where_clauses)
has_category = any('product_line_vn' in c for c in where_clauses)
priority_info = []
if has_sku:
priority_info.append("SKU")
if has_gender:
priority_info.append("Gender")
if has_category:
priority_info.append("Category")
if priority_info:
filter_info += f", Priority: {'+'.join(priority_info)}"
logger.info(f"📊 {filter_info}")
# Write SQL to file for debugging
try:
with open(r"d:\cnf\chatbot_canifa\backend\embedding.txt", "w", encoding="utf-8") as f:
f.write(sql)
except Exception as e:
logger.error(f"Failed to write SQL to embedding.txt: {e}")
return sql
\ No newline at end of file
import logging
from typing import List
logger = logging.getLogger(__name__)
class LangfusePuller:
"""
Module phụ trách cào dữ liệu bad-feedback từ hệ thống Langfuse (hoặc Database Log nội bộ).
"""
def __init__(self, public_key=None, secret_key=None):
self.public_key = public_key
self.secret_key = secret_key
# TODO: Khởi tạo db connect / langfuse client thật ở đây khi có Auth Key
def fetch_bad_feedbacks(self, limit: int = 10) -> List[dict]:
"""
Kéo log các ca tư vấn bị người dùng chê bai (1 Sao hoặc có Cảm xúc tiêu cực).
Dữ liệu trả về sẽ dùng để nhồi vào cho Agent học lại.
Giả lập dữ liệu mồi do chưa nối Auth thực tế.
"""
logger.info(f"Đang kéo top {limit} bad feedbacks từ Langfuse...")
# MOCK DATA ĐỂ TEST LUỒNG
return [
{
"trace_id": "lf_tr_83x91A",
"user_query": "Tìm đồ đi dạo phố mùa hè",
"bot_suggestion": "Áo len cardigan mỏng + Quần nỉ",
"feedback_note": "Mùa hè mà khuyên mặc áo nỉ với áo len?? Đồ dở hơi!",
"score": 0.0
},
{
"trace_id": "lf_tr_42mL0P",
"user_query": "Đi sự kiện công ty mặc gì trang trọng?",
"bot_suggestion": "Áo thun T-shirt dáng hộp rộng + Quần ngố denim",
"feedback_note": "Đi event công sở ai cho mặc quần ngố với áo thun rộng thế này?",
"score": 0.0
}
]
langfuse_puller = LangfusePuller()
import json
import logging
from typing import List
logger = logging.getLogger(__name__)
def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str:
"""
Hàm này tạo Prompt Module chuẩn để nhồi cho con Gemini.
Định nghĩa vai trò là Chuyên gia Fashion QC Agent.
"""
feedback_text = ""
for idx, fb in enumerate(feedbacks):
feedback_text += f"{idx+1}. User hỏi: '{fb['user_query']}' -> Bot trả lời: '{fb['bot_suggestion']}' -> User chửi: '{fb['feedback_note']}'\n"
prompt = f"""
You are an expert AI Fashion Quality Assurance (QA) Agent.
Your task is to fix logical flaws in the current 'fashion_rules.json' based on user bad-feedback.
=========== EXPLICIT INSTRUCTIONS ===========
1. Check what the users are angry about in the feedback.
2. Review the CURRENT RULES JSON.
3. Modify specific weight arrays or negative pairs to satisfy the user feedback. (For example, explicitly banning 'sweatshirt' for 'summer', or 'shorts' for 'formal_event').
4. Provide the FINAL COMPLETE JSON EXACTLY in the output format, properly structured.
=========== BAD FEEDBACK LOGS ===========
{feedback_text}
=========== CURRENT RULES (JSON) ===========
{current_rules}
"""
return prompt
def run_optimization_logic(feedbacks: list, current_rules_str: str) -> str:
"""
Lõi chạy suy luận trực tiếp. Thực tế sẽ invoke call sang LLM.
Trong bản API PoC này, trả về mock JSON mô phỏng Gemini đã edit lại file.
"""
logger.info("Chạy Agent Learning Loop phân tích Rules vs Feedbacks...")
# TODO: Tích hợp langchain_google_genai hoặc API Gemini thật tại đây...
# prompt = generate_optimizer_prompt(feedbacks, current_rules_str)
# output = gemini_llm.invoke(prompt)
# ----------------------------------------
# MOCK DATA: Giả sử LLM xuất ra JSON đã sửa lỗi "sweatshirt" mua he
# Bản gốc fashion_rules.json sẽ đc lấy nội dung + đắp thêm rule cấm kỵ mới
try:
old_data = json.loads(current_rules_str)
except:
old_data = {}
# Agent tự động bổ sung Rule âm điểm (Trừ 99 điểm) cho ca lỗi
if "negative_matches" not in old_data:
old_data["negative_matches"] = []
old_data["negative_matches"].extend([
{"season_vi": "mùa hè", "banned_category": "áo nỉ", "reason": "Feedback tồi: nóng"},
{"season_vi": "mùa hè", "banned_category": "áo len cardigan", "reason": "Feedback tồi: nóng"},
{"occasion": "công sở", "banned_category": "quần ngố", "reason": "Feedback tồi: không lịch sự"}
])
# Trả ra text JSON chuẩn đã đc Agent "chỉnh não"
return json.dumps(old_data, ensure_ascii=False, indent=4)
import json
import os
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Import 2 module gốc của Phòng ban Learning Agent
from agent.feedback_agent.langfuse_puller import langfuse_puller
from agent.feedback_agent.learning_loop import run_optimization_logic
router = APIRouter(prefix="/api/feedback-agent", tags=["Learning Agent"])
RULES_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../worker/fashion_rules.json")
class ApplyRuleRequest(BaseModel):
new_rules_json: str
@router.get("/sync-langfuse")
async def sync_langfuse_feedbacks():
"""
Kéo bad feedbacks thủ công từ Langfuse về xem chơi.
Dành cho View ở trang Admin (HTML).
"""
feedbacks = langfuse_puller.fetch_bad_feedbacks(limit=5)
return JSONResponse(status_code=200, content={"status": "success", "data": feedbacks})
@router.post("/analyze")
async def analyze_feedbacks():
"""
Bắt con LLM học và chỉnh sửa file Rules hiện tại.
Trả về File Cu - Mới để UI Admin Review (không ghi đè File ngay).
"""
feedbacks = langfuse_puller.fetch_bad_feedbacks()
# 1. Đọc não bộ hiện tại (JSON)
current_rule_str = "{}"
if os.path.exists(RULES_PATH):
try:
with open(RULES_PATH, "r", encoding="utf-8") as f:
current_rule_str = f.read()
except Exception as e:
pass
# 2. Bắt con LLM suy luận học hỏi
draft_rule_str = run_optimization_logic(feedbacks, current_rule_str)
# 3. Trả về nháp cho Admin Compare
return JSONResponse(status_code=200, content={
"status": "success",
"draft_json": draft_rule_str,
"old_json": current_rule_str,
"feedbacks": feedbacks
})
@router.post("/apply")
async def apply_new_rules(req: ApplyRuleRequest):
"""
Cho phép Admin Confirm bản Cập nhật mới mà LLM đề xuất.
Ghi đè trực tiếp lên fashion_rules.json -> Hoàn tất vòng chạy Tự học!
"""
try:
# Check an toàn: Parse lại nếu LLM nhả rác (không phải JSON)
parsed = json.loads(req.new_rules_json)
# Ghi đè vào Rule hệ thống
with open(RULES_PATH, "w", encoding="utf-8") as f:
json.dump(parsed, f, ensure_ascii=False, indent=4)
return {"status": "success", "message": "Updated fashion_rules.json successfully."}
except json.JSONDecodeError:
return JSONResponse(status_code=400, content={"status": "error", "message": "Feedback Agent nhả ra chuỗi không phải File JSON chuẩn!"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
......@@ -19,7 +19,6 @@ from api.n8n_api_route import router as n8n_router
from api.feedback_route import router as feedback_router
from api.text_to_sql_route import router as text_to_sql_router
from api.dashboard_route import router as dashboard_router
from api.notes_route import router as notes_router
from api.experiment_links_route import router as experiment_links_router
from api.product_route import router as product_router
from api.sql_chat_route import router as sql_chat_router
......@@ -83,13 +82,7 @@ async def startup_event():
# Start FastStream EventBus
await event_bus.start()
# Initialize Dashboard Note tables (Postgres)
try:
from common.note_service import init_note_tables
init_note_tables()
except Exception as e:
logger.warning("⚠️ Dashboard Note tables init skipped: %s", e)
# Cleaned up Note APIs
# Start report worker if Redis is available
if REDIS_CACHE_TURN_ON and redis_cache.get_client():
from agent.report_agent.report_queue import report_worker_loop
......@@ -180,7 +173,6 @@ app.include_router(n8n_router)
app.include_router(feedback_router)
app.include_router(text_to_sql_router) # Bản 2: Text-to-SQL
app.include_router(dashboard_router) # Dashboard overview
app.include_router(notes_router) # Dashboard team notes
app.include_router(experiment_links_router) # Experiment links sidebar
app.include_router(product_router) # Product performance dashboard
app.include_router(sql_chat_router) # AI Data Analyst (Text-to-SQL)
......@@ -214,13 +206,14 @@ from api.canifa_product_api import router as canifa_product_router
app.include_router(canifa_product_router) # Canifa Product Proxy (GraphQL)
from api.ai_diagram_route import router as diagram_router
app.include_router(diagram_router) # AI Diagram Agent
from api.dashboard_note_route import router as dashboard_note_router
app.include_router(dashboard_note_router) # Dashboard Note (Postgres)
from api.merge_history.merge_history_route import router as merge_history_router
app.include_router(merge_history_router) # Mock merge history endpoints
from api.mock_auth_route import router as mock_auth_router
app.include_router(mock_auth_router) # Mock Auth (identity linking test)
from api.feedback_agent_route import router as feedback_agent_router
app.include_router(feedback_agent_router) # Lõi Agent Rút Kinh Nghiệm (Langfuse -> Rules)
if __name__ == "__main__":
print("=" * 60)
......
<!DOCTYPE html>
<html lang="vi">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Feedback Learning Agent | Canifa AI</title>
<!-- Vue 3 -->
<script src="https://unpkg.com/vue@3/dist/vue.global.js"></script>
<!-- Tailwind CSS -->
<script src="https://cdn.tailwindcss.com"></script>
<!-- Lucide Icons -->
<script src="https://unpkg.com/lucide@latest"></script>
<style>
.json-diff pre { font-family: monospace; font-size: 13px; line-height: 1.5; white-space: pre-wrap; }
.bg-red-canifa { background-color: #E60012; }
</style>
</head>
<body class="bg-gray-50 min-h-screen text-gray-800">
<div id="app" class="max-w-7xl mx-auto p-6 font-sans">
<!-- Header -->
<header class="flex justify-between items-center mb-8 bg-white p-4 shadow-sm rounded-xl">
<div>
<h1 class="text-2xl font-bold flex items-center gap-2">
<i data-lucide="brain-circuit" class="text-red-600"></i>
Agent Rút Kinh Nghiệm (Continuous Learning)
</h1>
<p class="text-sm text-gray-500 mt-1">Đồng bộ Langfuse → Phân Tích Lỗi Tiềm Ẩn → Tự Động Vá Lỗi (JSON)</p>
</div>
<button @click="analyzing ? null : analyzeFeedbacks()"
:class="[analyzing ? 'bg-gray-400 cursor-not-allowed' : 'bg-red-canifa hover:bg-red-700']"
class="text-white px-6 py-2.5 rounded-lg font-semibold shadow-md flex items-center gap-2 transition-all">
<i v-if="!analyzing" data-lucide="zap"></i>
<i v-else data-lucide="loader" class="animate-spin"></i>
{{ analyzing ? 'Nạp năng lượng & Đang rút kinh nghiệm...' : 'Chạy Học Tập Ngay' }}
</button>
</header>
<div class="grid grid-cols-12 gap-6">
<!-- Cột 1: Danh sách Feedback Tệ -->
<div class="col-span-4 bg-white p-5 rounded-xl border border-gray-100 shadow-sm overflow-hidden flex flex-col h-[750px]">
<h2 class="text-lg font-bold mb-4 flex items-center gap-2 text-gray-700 border-b pb-3">
<i data-lucide="message-square-warning" class="text-amber-500"></i>
Nhật ký chửi rủa (Langfuse)
</h2>
<div class="flex-1 overflow-y-auto pr-2 space-y-4">
<div v-if="feedbacks.length === 0" class="text-center text-gray-400 mt-10">
Bấm chạy [Học tập] để lôi feedback về...
</div>
<div v-for="(fb, i) in feedbacks" :key="i" class="bg-red-50/50 p-4 rounded-lg border border-red-100">
<div class="flex justify-between items-start mb-2">
<span class="text-xs font-mono text-gray-400">#{{ fb.trace_id }}</span>
<span class="bg-red-100 text-red-700 text-xs px-2 py-0.5 rounded font-bold">1 Sao</span>
</div>
<p class="text-sm"><span class="font-semibold">User:</span> {{ fb.user_query }}</p>
<p class="text-sm mt-1"><span class="font-semibold text-gray-600">Bot xúi:</span> {{ fb.bot_suggestion }}</p>
<hr class="my-2 border-red-200">
<p class="text-sm text-red-700"><span class="font-bold">Khách chửi:</span> "{{ fb.feedback_note }}"</p>
</div>
</div>
</div>
<!-- Cột 2 & 3: So sánh Cũ và Mới -->
<div class="col-span-8 flex flex-col h-[750px] relative">
<div class="flex items-center justify-between mb-2">
<h2 class="text-lg font-bold flex items-center gap-2 text-gray-700">
<i data-lucide="git-compare" class="text-purple-600"></i>
Đề xuất vá lỗi file Rules
</h2>
<button v-if="draftJson" @click="applyRules"
class="bg-green-600 hover:bg-green-700 text-white px-4 py-1.5 rounded-md font-semibold text-sm shadow flex items-center gap-1">
<i data-lucide="check-circle" class="w-4 h-4"></i>
Approve & Ghi đè vào Hệ Thống
</button>
</div>
<div class="flex-1 grid grid-cols-2 gap-4">
<!-- Bản Gốc Thực Tại -->
<div class="bg-gray-800 rounded-xl overflow-hidden flex flex-col shadow-inner">
<div class="bg-gray-900 px-4 py-2 border-b border-gray-700 flex items-center gap-2">
<i data-lucide="file-json" class="text-gray-400 w-4 h-4"></i>
<span class="text-gray-300 text-sm font-semibold">fashion_rules.json (Đang chạy)</span>
</div>
<pre class="flex-1 p-4 overflow-auto text-green-300 text-xs font-mono">{{ oldJson || 'Load current...' }}</pre>
</div>
<!-- Bản Nháp Agent Vừa Sửa -->
<div class="bg-gray-800 rounded-xl overflow-hidden flex flex-col shadow-inner relative">
<div class="bg-gray-900 px-4 py-2 border-b border-gray-700 flex items-center gap-2">
<i data-lucide="sparkles" class="text-yellow-400 w-4 h-4"></i>
<span class="text-white text-sm font-semibold">Bản Vá Đề Xuất (Tự viết bởi Agent)</span>
</div>
<pre class="flex-1 p-4 overflow-auto text-yellow-300 text-xs font-mono json-diff">{{ draftJson || 'Đang chờ năng lượng chửi của khách...' }}</pre>
<!-- Lớp phủ lúc đang loading chay Agent -->
<div v-if="analyzing" class="absolute inset-0 bg-gray-900/80 backdrop-blur-sm flex flex-col items-center justify-center p-6 text-center">
<i data-lucide="cpu" class="w-12 h-12 text-blue-400 animate-pulse mb-3"></i>
<h3 class="text-white font-bold mb-1">Agent Đang Cày Hăng Say</h3>
<p class="text-gray-400 text-sm">LLM đang mở não đọc lại feedback của khách và tìm ra cách trừ điểm trong Model JSON...</p>
</div>
</div>
</div>
<!-- Toast Alert -->
<div v-if="successMsg" class="absolute bottom-4 left-1/2 transform -translate-x-1/2 bg-green-500 text-white px-6 py-3 rounded-full shadow-xl flex items-center gap-2 font-semibold">
<i data-lucide="check" class="w-5 h-5"></i>
{{ successMsg }}
</div>
</div>
</div>
</div>
<script>
const { createApp, ref, onMounted, nextTick } = Vue;
createApp({
setup() {
const analyzing = ref(false);
const feedbacks = ref([]);
const oldJson = ref('');
const draftJson = ref('');
const successMsg = ref('');
// Móc icon
const updateIcons = () => nextTick(() => lucide.createIcons());
const analyzeFeedbacks = async () => {
analyzing.value = true;
// Reset
feedbacks.value = [];
oldJson.value = '';
draftJson.value = '';
try {
const res = await fetch('/api/feedback-agent/analyze', { method: 'POST' });
const data = await res.json();
// Mô phỏng Loading trễ 1 chút cho cảm giác "thực tế Agent đang chạy"
setTimeout(() => {
feedbacks.value = data.feedbacks;
oldJson.value = data.old_json;
draftJson.value = data.draft_json;
analyzing.value = false;
updateIcons();
}, 1200);
} catch (e) {
alert("Lỗi khi kết nối Agent API!");
analyzing.value = false;
}
};
const applyRules = async () => {
if(!confirm("Bro có chắc muốn Ghi đè file rules của Hệ thống chứ? Tác dụng Tức thì lên con Chatbot mua sắm!")) return;
try {
const res = await fetch('/api/feedback-agent/apply', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ new_rules_json: draftJson.value })
});
const data = await res.json();
if(data.status === 'success') {
successMsg.value = "Ngon! Canifa Chatbot đã Khôn ra ngay lập tức!";
setTimeout(() => { successMsg.value = ''; }, 4000);
} else {
alert("Xịt! " + data.message);
}
} catch(e) {
alert("Xịt mạng!");
}
}
onMounted(() => {
updateIcons();
});
return { analyzing, feedbacks, oldJson, draftJson, analyzeFeedbacks, applyRules, successMsg }
}
}).mount('#app');
</script>
</body>
</html>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment