AI

一步步构建一个将数据保存到数据库的 AI Agent

学习如何开发一个智能 AI Agent,它能将完整对话历史保存到数据库、跟踪实体,并集成实时网页数据。
27 分钟阅读
构建一个将数据保存到数据库的 AI Agent

在本文中,你将学到:

  • 如何构建一个可用于生产的、将对话持久化到数据库的 AI Agent
  • 如何实现智能数据抽取和实体追踪
  • 如何创建具有自动恢复能力的健壮错误处理机制
  • 如何用 Bright Data 的实时网页数据增强你的 Agent

让我们开始吧!

无状态 AI 对话的挑战

当前的 AI Agent 通常作为无状态系统工作,它们将每一次对话视为一次独立事件。这种缺乏历史上下文的方式会让用户反复重复信息,从而造成运营效率低下和用户挫败感。此外,企业也错失了利用长期数据进行个性化或服务改进的机会。

具备数据持久化能力的 AI 通过将所有交互以结构化形式记录到数据库中来解决这一问题。通过持续保存记录,这类系统可以记住历史上下文、在时间维度上追踪特定实体,并利用以往交互模式提供一致且个性化的用户体验。

我们要构建什么:连接数据库的 AI Agent 系统

我们将构建一个可用于生产的 AI Agent,它使用 LangChain 和 GPT-4 处理消息。它会把每一次对话保存到 PostgreSQL,实时抽取实体与洞察,跨会话保持完整对话历史,使用自动重试系统管理错误,并通过日志实现监控。

该系统将处理:

  • 具有正确关系与索引的数据库 schema
  • 带自定义数据库工具的 LangChain Agent
  • 自动对话持久化与实体抽取
  • 用于数据处理的后台处理流水线
  • 带事务管理的错误处理
  • 用于检索历史数据的查询接口
  • Bright Data 集成的 RAG(检索增强生成)网页情报能力

前置条件

先为你的开发环境完成如下设置:

  • Python 3.10 及以上。用于支持现代异步特性和类型注解
  • PostgreSQL 14+SQLite 3.35+。用于数据持久化的数据库
  • OpenAI API Key。用于访问 GPT-4。从 OpenAI Platform 获取
    Creating an OpenAI Key
  • LangChain。用于构建 AI Agent 的框架,参见 文档
  • Python 虚拟环境。用于隔离依赖,参见 venv 文档

环境搭建

创建项目目录并安装依赖:

mkdir database-agent
cd database-agent
python -m venv venv

# macOS/Linux: source venv/bin/activate
# Windows: venv\\Scripts\\activate

pip install langchain langchain-openai sqlalchemy psycopg2-binary python-dotenv pydantic

创建一个名为 agent.py 的新文件并加入以下导入:

import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from queue import Queue
from threading import Thread

# SQLAlchemy imports
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, JSON, ForeignKey, text
from sqlalchemy.orm import sessionmaker, relationship, Session, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError

# LangChain imports
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import Tool
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.schema import HumanMessage, AIMessage, SystemMessage

# RAG imports
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
import requests

# Environment setup
from dotenv import load_dotenv
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

创建 .env 文件并填入你的凭据:

# Database Configuration
DATABASE_URL="postgresql://username:password@localhost:5432/agent_db"
# Or for SQLite: DATABASE_URL="sqlite:///./agent_data.db"

# API Keys
OPENAI_API_KEY="your-openai-api-key"

# Optional: Bright Data (for Step 7)
BRIGHT_DATA_API_KEY="your-bright-data-api-key"

# Application Settings
AGENT_MODEL="gpt-4-turbo-preview"
CONNECTION_POOL_SIZE=5
MAX_RETRIES=3

你需要:

  • Database URL:PostgreSQL 或 SQLite 的连接字符串
  • OpenAI API Key:用于通过 GPT-4 提供 Agent 智能
  • Bright Data API Key:可选,用于在第 7 步中接入实时网页数据
    Creating a BrightData API Key

构建连接数据库的 AI Agent

步骤 1:设计数据库 Schema

为用户、对话、消息和抽取到的实体设计数据表。该 schema 使用外键和关系来维护数据完整性。

Base = declarative_base()


class User(Base):
    """User profile table - stores user information and preferences."""
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    user_id = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(255))
    email = Column(String(255))
    preferences = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    last_active = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Relationships
    conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<User(user_id='{self.user_id}', name='{self.name}')>"


class Conversation(Base):
    """Conversation session table - tracks individual conversation sessions."""
    __tablename__ = 'conversations'

    id = Column(Integer, primary_key=True)
    conversation_id = Column(String(255), unique=True, nullable=False, index=True)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    title = Column(String(500))
    summary = Column(Text)
    status = Column(String(50), default='active')  # active, archived, deleted
    meta_data = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Relationships
    user = relationship("User", back_populates="conversations")
    messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
    entities = relationship("Entity", back_populates="conversation", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<Conversation(id='{self.conversation_id}', user='{self.user_id}')>"


class Message(Base):
    """Individual message table - stores each message in a conversation."""
    __tablename__ = 'messages'

    id = Column(Integer, primary_key=True)
    conversation_id = Column(Integer, ForeignKey('conversations.id'), nullable=False, index=True)
    role = Column(String(50), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    tokens = Column(Integer)
    model = Column(String(100))
    meta_data = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)

    # Relationships
    conversation = relationship("Conversation", back_populates="messages")

    def __repr__(self):
        return f"<Message(role='{self.role}', conversation='{self.conversation_id}')>"


class Entity(Base):
    """Extracted entities table - stores named entities extracted from conversations."""
    __tablename__ = 'entities'

    id = Column(Integer, primary_key=True)
    conversation_id = Column(Integer, ForeignKey('conversations.id'), nullable=False, index=True)
    entity_type = Column(String(100), nullable=False, index=True)  # person, organization, location, etc.
    entity_value = Column(String(500), nullable=False)
    context = Column(Text)
    confidence = Column(Float, default=0.0)
    meta_data = Column(JSON, default={})
    extracted_at = Column(DateTime, default=datetime.utcnow)

    # Relationships
    conversation = relationship("Conversation", back_populates="entities")

    def __repr__(self):
        return f"<Entity(type='{self.entity_type}', value='{self.entity_value}')>"


class AgentLog(Base):
    """Agent operation logs table - stores operational logs for monitoring."""
    __tablename__ = 'agent_logs'

    id = Column(Integer, primary_key=True)
    conversation_id = Column(String(255), index=True)
    level = Column(String(50), nullable=False)  # INFO, WARNING, ERROR
    operation = Column(String(255), nullable=False)
    message = Column(Text, nullable=False)
    error_details = Column(JSON)
    execution_time = Column(Float)  # in seconds
    created_at = Column(DateTime, default=datetime.utcnow)

    def __repr__(self):
        return f"<AgentLog(level='{self.level}', operation='{self.operation}')>"

该 schema 定义了五张核心表。User 使用 JSON 字段保存用户偏好,便于灵活扩展。Conversation 记录会话并包含状态字段。Message 保存每一条对话消息,并通过角色区分用户消息与助手消息。Entity 保存抽取出来的实体信息及其置信度。AgentLog 用于记录运行日志,方便监控。外键保证了引用完整性,对高频查询字段建立的索引提升了性能。cascade="all, delete-orphan" 设置可在删除父记录时自动清理相关的子记录。

步骤 2:搭建数据库连接层

使用 SQLAlchemy 配置数据库连接管理器。该管理器负责连接池、健康检查以及带自动重试逻辑的高可靠操作。

class DatabaseManager:
    """
    Manages database connections and operations.

    Features:
    - Connection pooling for efficient resource usage
    - Health checks to verify database connectivity
    - Automatic table creation
    """

    def __init__(self, database_url: str, pool_size: int = 5, max_retries: int = 3):
        """
        Initialize database manager.

        Args:
            database_url: Database connection string (e.g., 'sqlite:///./agent_data.db')
            pool_size: Number of connections to maintain in the pool
            max_retries: Maximum number of retry attempts for failed operations
        """
        self.database_url = database_url
        self.max_retries = max_retries

        # Create engine with connection pooling
        self.engine = create_engine(
            database_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=10,
            pool_pre_ping=True,  # Verify connections before using
            echo=False  # Set to True for SQL debugging
        )

        # Create session factory
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False
        )

        logger.info(f"✓ Database engine created with {pool_size} connection pool")

    def initialize_database(self):
        """Create all tables in the database."""
        try:
            Base.metadata.create_all(bind=self.engine)
            logger.info("✓ Database tables created successfully")
        except Exception as e:
            logger.error(f"❌ Failed to create database tables: {e}")
            raise

    def get_session(self) -> Session:
        """Get a new database session for performing operations."""
        return self.SessionLocal()

    def health_check(self) -> bool:
        """
        Check database connectivity.

        Returns:
            bool: True if database is healthy, False otherwise
        """
        try:
            with self.engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            logger.info("✓ Database health check passed")
            return True
        except Exception as e:
            logger.error(f"❌ Database health check failed: {e}")
            return False

DatabaseManager 使用 SQLAlchemy 的连接池来建立连接。pool_size=5 表示维护 5 个持久连接以提升效率。pool_pre_ping 会在使用前校验连接,避免“陈旧连接”错误。重试机制会针对失败操作进行最多三次的指数回退重试,以应对瞬时网络问题。

步骤 3:构建 LangChain Agent 核心

使用 LangChain 创建 AI Agent,并实现与数据库交互的自定义工具。该 Agent 使用 Function Calling 来保存信息并检索对话历史。

class DataPersistentAgent:
    """
    AI Agent with database persistence capabilities.

    This agent:
    - Remembers conversations across sessions
    - Saves and retrieves user information
    - Extracts and stores important entities
    - Provides personalized responses based on history
    """

    def __init__(
        self,
        db_manager: DatabaseManager,
        model_name: str = "gpt-4-turbo-preview",
        temperature: float = 0.7
    ):
        """
        Initialize the data-persistent agent.

        Args:
            db_manager: Database manager instance
            model_name: LLM model to use (default: gpt-4-turbo-preview)
            temperature: Model temperature for response generation
        """
        self.db_manager = db_manager
        self.model_name = model_name

        # Initialize LLM
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=temperature,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        # Create tools for agent
        self.tools = self._create_agent_tools()

        # Create agent prompt
        self.prompt = self._create_agent_prompt()

        # Initialize memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )

        # Create agent
        self.agent = create_openai_functions_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )

        # Create agent executor
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )

        logger.info(f"✓ Data-persistent agent initialized with {model_name}")

    def _create_agent_tools(self) -> List[Tool]:
        """Create custom tools for database operations."""

        def save_user_info(user_data: str) -> str:
            """Save user information to database."""
            try:
                data = json.loads(user_data)
                session = self.db_manager.get_session()

                user = session.query(User).filter_by(user_id=data['user_id']).first()
                if not user:
                    user = User(**data)
                    session.add(user)
                else:
                    for key, value in data.items():
                        setattr(user, key, value)

                session.commit()
                session.close()

                return f"✓ User information saved successfully"
            except Exception as e:
                logger.error(f"Failed to save user info: {e}")
                return f"❌ Error saving user info: {str(e)}"

        def retrieve_user_history(user_id: str) -> str:
            """Retrieve user's conversation history."""
            try:
                session = self.db_manager.get_session()

                user = session.query(User).filter_by(user_id=user_id).first()
                if not user:
                    return "No user found"

                conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(5).all()

                history = []
                for conv in conversations:
                    messages = session.query(Message).filter_by(conversation_id=conv.id).all()
                    history.append({
                        'conversation_id': conv.conversation_id,
                        'created_at': conv.created_at.isoformat(),
                        'message_count': len(messages),
                        'summary': conv.summary
                    })

                session.close()
                return json.dumps(history, indent=2)
            except Exception as e:
                logger.error(f"Failed to retrieve history: {e}")
                return f"❌ Error retrieving history: {str(e)}"

        def extract_entities(text: str) -> str:
            """Extract entities from text and save to database."""

            try:
                entities = []
                # Simple keyword extraction (replace with proper NER)
                keywords = ['important', 'key', 'critical']
                for keyword in keywords:
                    if keyword in text.lower():
                        entities.append({
                            'entity_type': 'keyword',
                            'entity_value': keyword,
                            'confidence': 0.8
                        })

                return json.dumps(entities, indent=2)
            except Exception as e:
                logger.error(f"Failed to extract entities: {e}")
                return f"❌ Error extracting entities: {str(e)}"

        tools = [
            Tool(
                name="SaveUserInfo",
                func=save_user_info,
                description="Save user information to the database. Input should be a JSON string with user details."
            ),
            Tool(
                name="RetrieveUserHistory",
                func=retrieve_user_history,
                description="Retrieve a user's conversation history from the database. Input should be the user_id."
            ),
            Tool(
                name="ExtractEntities",
                func=extract_entities,
                description="Extract important entities from text and save to database. Input should be the text to analyze."
            )
        ]

        return tools

    def _create_agent_prompt(self) -> ChatPromptTemplate:
        """Create agent prompt template."""

        system_message = """You are a helpful AI assistant with the ability to remember and learn from conversations.

You have access to the following tools:
- SaveUserInfo: Save user information to remember for future conversations
- RetrieveUserHistory: Look up past conversations with a user
- ExtractEntities: Extract and save important information from conversations

Use these tools to provide personalized, context-aware responses. Always check if you have previous conversations with a user before responding.

Be proactive about saving important information for future conversations."""

        prompt = ChatPromptTemplate.from_messages([
            ("system", system_message),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])

        return prompt

    def chat(self, user_id: str, message: str, conversation_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Process a chat message and persist to database.

        This method handles:
        1. Creating or retrieving conversations
        2. Saving user messages to database
        3. Generating agent responses
        4. Saving agent responses to database
        5. Logging operations for monitoring

        Args:
            user_id: Unique identifier for the user
            message: User's message text
            conversation_id: Optional conversation ID to continue existing conversation

        Returns:
            dict: Contains conversation_id, response, and execution_time
        """
        start_time = datetime.utcnow()

        try:
            # Get or create conversation
            session = self.db_manager.get_session()

            if conversation_id:
                conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            else:
                # Create new conversation
                user = session.query(User).filter_by(user_id=user_id).first()
                if not user:
                    user = User(user_id=user_id, name=user_id)
                    session.add(user)
                    session.commit()

                conversation = Conversation(
                    conversation_id=f"conv_{user_id}_{datetime.utcnow().timestamp()}",
                    user_id=user.id,
                    title=message[:100]
                )
                session.add(conversation)
                session.commit()

            # Save user message
            user_message = Message(
                conversation_id=conversation.id,
                role="user",
                content=message,
                model=self.model_name
            )
            session.add(user_message)
            session.commit()

            # Get agent response
            response = self.agent_executor.invoke({
                "input": f"[User ID: {user_id}] {message}"
            })

            # Save assistant message
            assistant_message = Message(
                conversation_id=conversation.id,
                role="assistant",
                content=response['output'],
                model=self.model_name
            )
            session.add(assistant_message)
            session.commit()

            # Log operation
            execution_time = (datetime.utcnow() - start_time).total_seconds()
            log_entry = AgentLog(
                conversation_id=conversation.conversation_id,
                level="INFO",
                operation="chat",
                message="Chat processed successfully",
                execution_time=execution_time
            )
            session.add(log_entry)
            session.commit()

            # Extract conversation_id before closing session
            conversation_id_result = conversation.conversation_id

            session.close()

            logger.info(f"✓ Chat processed for user {user_id} in {execution_time:.2f}s")

            return {
                'conversation_id': conversation_id_result,
                'response': response['output'],
                'execution_time': execution_time
            }

        except Exception as e:
            logger.error(f"❌ Error processing chat: {e}")

            # Log error
            session = self.db_manager.get_session()
            error_log = AgentLog(
                conversation_id=conversation_id or "unknown",
                level="ERROR",
                operation="chat",
                message=str(e),
                error_details={'exception_type': type(e).__name__}
            )
            session.add(error_log)
            session.commit()
            session.close()

            raise

DataPersistentAgent 对 LangChain 的 Function Calling Agent 进行了封装,并增加了数据库工具。SaveUserInfo 工具通过创建或更新 User 记录来持久化用户数据,RetrieveHistory 工具查询以往对话为当前响应提供上下文。系统提示会要求 Agent 主动保存重要信息并检查历史记录。ConversationBufferMemory 在单次会话内维护短期上下文,而数据库存储则提供跨会话的长期记忆。

Data persistent AI agent output

步骤 3.5:创建数据收集模块

构建用于从对话中抽取和结构化数据的工具。该收集器通过 LLM 生成摘要、抽取偏好、识别实体。

class DataCollector:
    """
    Collects and structures data from agent conversations.

    This module:
    - Generates conversation summaries
    - Extracts user preferences from conversation history
    - Identifies and saves named entities
    """

    def __init__(self, db_manager: DatabaseManager, llm: ChatOpenAI):
        """
        Initialize data collector.

        Args:
            db_manager: Database manager instance
            llm: Language model for text analysis
        """
        self.db_manager = db_manager
        self.llm = llm
        logger.info("✓ Data collector initialized")

    def extract_conversation_summary(self, conversation_id: str) -> str:
        """
        Generate and save conversation summary using LLM.

        Args:
            conversation_id: ID of conversation to summarize

        Returns:
            str: Generated summary text
        """
        try:
            session = self.db_manager.get_session()

            conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            if not conversation:
                return "Conversation not found"

            messages = session.query(Message).filter_by(conversation_id=conversation.id).all()

            # Build conversation text
            conv_text = "\n".join([
                f"{msg.role}: {msg.content}" for msg in messages
            ])

            # Generate summary using LLM
            summary_prompt = f"""Summarize the following conversation in 2-3 sentences, capturing the main topics and outcomes:

{conv_text}

Summary:"""

            summary_response = self.llm.invoke([HumanMessage(content=summary_prompt)])
            summary = summary_response.content

            # Update conversation with summary
            conversation.summary = summary
            session.commit()
            session.close()

            logger.info(f"✓ Generated summary for conversation {conversation_id}")
            return summary

        except Exception as e:
            logger.error(f"Failed to generate summary: {e}")
            return ""

    def extract_user_preferences(self, user_id: str) -> Dict[str, Any]:
        """
        Extract and save user preferences from conversation history.

        Args:
            user_id: ID of user to analyze

        Returns:
            dict: Extracted preferences
        """
        try:
            session = self.db_manager.get_session()

            user = session.query(User).filter_by(user_id=user_id).first()
            if not user:
                return {}

            # Get recent conversations
            conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(10).all()

            all_messages = []
            for conv in conversations:
                messages = session.query(Message).filter_by(conversation_id=conv.id).all()
                all_messages.extend([msg.content for msg in messages if msg.role == "user"])

            if not all_messages:
                return {}

            # Analyze preferences using LLM
            analysis_prompt = f"""Analyze the following messages from a user and extract their preferences, interests, and communication style.

Messages:
{chr(10).join(all_messages[:20])}

Return a JSON object with the following structure:
{{
    "interests": ["interest1", "interest2"],
    "communication_style": "description",
    "preferred_topics": ["topic1", "topic2"],
    "language_preference": "language"
}}"""

            response = self.llm.invoke([HumanMessage(content=analysis_prompt)])

            try:
                # Extract JSON from response
                content = response.content
                if '```json' in content:
                    content = content.split('```json')[1].split('```')[0].strip()
                elif '```' in content:
                    content = content.split('```')[1].split('```')[0].strip()

                preferences = json.loads(content)

                # Update user preferences
                user.preferences = preferences
                session.commit()

                logger.info(f"✓ Extracted preferences for user {user_id}")
                return preferences

            except json.JSONDecodeError:
                logger.warning("Failed to parse preferences JSON")
                return {}
            finally:
                session.close()

        except Exception as e:
            logger.error(f"Failed to extract preferences: {e}")
            return {}

    def extract_entities_with_llm(self, conversation_id: str) -> List[Dict[str, Any]]:
        """
        Extract named entities using LLM.

        Args:
            conversation_id: ID of conversation to analyze

        Returns:
            list: List of extracted entities
        """
        try:
            session = self.db_manager.get_session()

            conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            if not conversation:
                return []

            messages = session.query(Message).filter_by(conversation_id=conversation.id).all()
            conv_text = "\n".join([msg.content for msg in messages])

            # Extract entities using LLM
            entity_prompt = f"""Extract named entities from the following conversation. Identify:
- People (PERSON)
- Organizations (ORG)
- Locations (LOC)
- Dates (DATE)
- Products (PRODUCT)
- Technologies (TECH)

Conversation:
{conv_text}

Return a JSON array of entities with format:
[
    {{"type": "PERSON", "value": "John Doe", "context": "mentioned as team lead"}},
    {{"type": "ORG", "value": "Acme Corp", "context": "customer company"}}
]"""

            response = self.llm.invoke([HumanMessage(content=entity_prompt)])

            try:
                content = response.content
                if '```json' in content:
                    content = content.split('```json')[1].split('```')[0].strip()
                elif '```' in content:
                    content = content.split('```')[1].split('```')[0].strip()

                entities_data = json.loads(content)

                # Save entities to database
                saved_entities = []
                for entity_data in entities_data:
                    entity = Entity(
                        conversation_id=conversation.id,
                        entity_type=entity_data['type'],
                        entity_value=entity_data['value'],
                        context=entity_data.get('context', ''),
                        confidence=0.9  # LLM extraction has high confidence
                    )
                    session.add(entity)
                    saved_entities.append(entity_data)

                session.commit()
                session.close()

                logger.info(f"✓ Extracted {len(saved_entities)} entities from conversation {conversation_id}")
                return saved_entities

            except json.JSONDecodeError:
                logger.warning("Failed to parse entities JSON")
                return []

        except Exception as e:
            logger.error(f"Failed to extract entities: {e}")
            return []

DataCollector 使用 LLM 对对话内容进行分析。extract_conversation_summary 方法生成简洁的对话摘要;extract_user_preferences 方法通过分析消息模式识别用户兴趣与沟通风格;extract_entities_with_llm 方法通过结构化提示抽取如人物、组织、技术等命名实体。所有抽取到的数据都保存到数据库中,以便后续使用。

步骤 4:构建智能数据处理流水线

实现后台处理,将数据收集从 Agent 的主流程中解耦。该流水线使用工作线程和队列来异步处理摘要与实体抽取。

class DataProcessingPipeline:
    """
    Asynchronous data processing pipeline.

    This pipeline:
    - Processes conversations in the background
    - Generates summaries
    - Extracts entities without blocking main flow
    - Updates user preferences periodically
    """

    def __init__(self, db_manager: DatabaseManager, collector: DataCollector, batch_size: int = 10):
        """
        Initialize processing pipeline.

        Args:
            db_manager: Database manager instance
            collector: Data collector for processing operations
            batch_size: Number of items to process in each batch
        """
        self.db_manager = db_manager
        self.collector = collector
        self.batch_size = batch_size

        # Processing queues
        self.summary_queue = Queue()
        self.entity_queue = Queue()
        self.preference_queue = Queue()

        # Worker threads
        self.workers = []
        self.running = False

        logger.info("✓ Data processing pipeline initialized")

    def start(self):
        """Start background processing workers."""
        self.running = True

        # Create worker threads
        summary_worker = Thread(target=self._process_summaries, daemon=True)
        entity_worker = Thread(target=self._process_entities, daemon=True)
        preference_worker = Thread(target=self._process_preferences, daemon=True)

        summary_worker.start()
        entity_worker.start()
        preference_worker.start()

        self.workers = [summary_worker, entity_worker, preference_worker]

        logger.info("✓ Started 3 background processing workers")

    def stop(self):
        """Stop background processing workers."""
        self.running = False
        for worker in self.workers:
            worker.join(timeout=5)
        logger.info("✓ Stopped background processing workers")

    def queue_conversation_for_processing(self, conversation_id: str, user_id: str):
        """
        Add conversation to processing queues.

        Args:
            conversation_id: ID of conversation to process
            user_id: ID of user for preference extraction
        """
        self.summary_queue.put(conversation_id)
        self.entity_queue.put(conversation_id)
        self.preference_queue.put(user_id)

        logger.info(f"✓ Queued conversation {conversation_id} for processing")

    def _process_summaries(self):
        """Worker for processing conversation summaries."""
        while self.running:
            try:
                if not self.summary_queue.empty():
                    conversation_id = self.summary_queue.get()
                    self.collector.extract_conversation_summary(conversation_id)
                    self.summary_queue.task_done()
                else:
                    time.sleep(1)
            except Exception as e:
                logger.error(f"Error in summary worker: {e}")

    def _process_entities(self):
        """Worker for processing entity extraction."""
        while self.running:
            try:
                if not self.entity_queue.empty():
                    conversation_id = self.entity_queue.get()
                    self.collector.extract_entities_with_llm(conversation_id)
                    self.entity_queue.task_done()
                else:
                    time.sleep(1)
            except Exception as e:
                logger.error(f"Error in entity worker: {e}")

    def _process_preferences(self):
        """Worker for processing user preferences."""
        while self.running:
            try:
                if not self.preference_queue.empty():
                    user_id = self.preference_queue.get()
                    self.collector.extract_user_preferences(user_id)
                    self.preference_queue.task_done()
                else:
                    time.sleep(1)
            except Exception as e:
                logger.error(f"Error in preference worker: {e}")

    def get_queue_status(self) -> Dict[str, int]:
        """
        Get current queue sizes.

        Returns:
            dict: Queue sizes for each processing type
        """
        return {
            'summary_queue': self.summary_queue.qsize(),
            'entity_queue': self.entity_queue.qsize(),
            'preference_queue': self.preference_queue.qsize()
        }

ProcessingPipeline 将数据收集从消息处理流程中解耦。当对话结束时,只会把对话加入队列,而不会立刻处理。独立的工作线程从队列中拉取任务并在后台执行,从而避免数据处理阻塞 Agent 的响应。daemon=True 确保在主进程退出时工作线程自动结束,队列状态监控则帮助你了解处理积压情况。

Data processing pipeline Agent

步骤 5:添加实时监控与日志

创建监控系统以跟踪 Agent 性能、检测错误并生成报告。监控器会分析日志来提供运维洞察。

class AgentMonitor:
    """
    Real-time monitoring and metrics collection.

    This module:
    - Tracks performance metrics
    - Monitors system health
    - Generates analytics reports
    """

    def __init__(self, db_manager: DatabaseManager):
        """
        Initialize agent monitor.

        Args:
            db_manager: Database manager instance
        """
        self.db_manager = db_manager
        logger.info("✓ Agent monitor initialized")

    def get_performance_metrics(self, hours: int = 24) -> Dict[str, Any]:
        """
        Get performance metrics for the specified time period.

        Args:
            hours: Number of hours to look back

        Returns:
            dict: Performance metrics including operation counts and error rates
        """
        try:
            session = self.db_manager.get_session()

            cutoff_time = datetime.utcnow() - timedelta(hours=hours)

            # Query logs
            logs = session.query(AgentLog).filter(
                AgentLog.created_at >= cutoff_time
            ).all()

            # Calculate metrics
            total_operations = len(logs)
            error_count = len([log for log in logs if log.level == "ERROR"])
            avg_execution_time = sum([log.execution_time or 0 for log in logs]) / max(total_operations, 1)

            # Get conversation counts
            conversations = session.query(Conversation).filter(
                Conversation.created_at >= cutoff_time
            ).count()

            messages = session.query(Message).join(Conversation).filter(
                Message.created_at >= cutoff_time
            ).count()

            session.close()

            metrics = {
                'time_period_hours': hours,
                'total_operations': total_operations,
                'error_count': error_count,
                'error_rate': error_count / max(total_operations, 1),
                'avg_execution_time': avg_execution_time,
                'conversations_created': conversations,
                'messages_processed': messages
            }

            logger.info(f"✓ Generated performance metrics for last {hours} hours")
            return metrics

        except Exception as e:
            logger.error(f"Failed to get performance metrics: {e}")
            return {}

    def health_check(self) -> Dict[str, Any]:
        """
        Perform health check.

        Returns:
            dict: Health status including database connectivity and error rates
        """
        try:
            # Check database connectivity
            db_healthy = self.db_manager.health_check()

            # Check recent error rate
            metrics = self.get_performance_metrics(hours=1)
            recent_errors = metrics.get('error_count', 0)

            # Determine overall health
            is_healthy = db_healthy and recent_errors < 10

            health_status = {
                'status': 'healthy' if is_healthy else 'degraded',
                'database_connected': db_healthy,
                'recent_errors': recent_errors,
                'timestamp': datetime.utcnow().isoformat()
            }

            logger.info(f"✓ Health check: {health_status['status']}")
            return health_status

        except Exception as e:
            logger.error(f"Health check failed: {e}")
            return {
                'status': 'unhealthy',
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            }

AgentMonitor 为系统运行提供可观测性。它通过查询 AgentLog 表来统计总操作数、错误率、平均执行时间等指标。get_metrics 支持按时间窗口统计数据,get_error_report 则可提供详细错误信息用于排障。这种监控使你可以提前发现问题,在影响用户之前进行排查。

步骤 6:构建查询接口

实现用于检索和分析存储数据的查询能力。该接口提供搜索对话、追踪实体以及生成分析报表等方法。

class DataQueryInterface:
    """
    Interface for querying stored agent data.

    This module provides methods to:
    - Query user analytics
    - Retrieve conversation history
    - Search for specific information
    """

    def __init__(self, db_manager: DatabaseManager):
        """
        Initialize query interface.

        Args:
            db_manager: Database manager instance
        """
        self.db_manager = db_manager
        logger.info("✓ Query interface initialized")

    def get_user_analytics(self, user_id: str) -> Dict[str, Any]:
        """
        Get analytics for a specific user.

        Args:
            user_id: ID of user to analyze

        Returns:
            dict: User analytics including conversation counts and preferences
        """
        try:
            session = self.db_manager.get_session()

            user = session.query(User).filter_by(user_id=user_id).first()
            if not user:
                return {}

            # Get conversation count
            conversation_count = session.query(Conversation).filter_by(user_id=user.id).count()

            # Get message count
            message_count = session.query(Message).join(Conversation).filter(
                Conversation.user_id == user.id
            ).count()

            # Get entity count
            entity_count = session.query(Entity).join(Conversation).filter(
                Conversation.user_id == user.id
            ).count()

            # Get time range
            first_conversation = session.query(Conversation).filter_by(
                user_id=user.id
            ).order_by(Conversation.created_at).first()

            last_conversation = session.query(Conversation).filter_by(
                user_id=user.id
            ).order_by(Conversation.created_at.desc()).first()

            session.close()

            analytics = {
                'user_id': user_id,
                'name': user.name,
                'conversation_count': conversation_count,
                'message_count': message_count,
                'entity_count': entity_count,
                'preferences': user.preferences,
                'first_interaction': first_conversation.created_at.isoformat() if first_conversation else None,
                'last_interaction': last_conversation.created_at.isoformat() if last_conversation else None,
                'avg_messages_per_conversation': message_count / max(conversation_count, 1)
            }

            logger.info(f"✓ Generated analytics for user {user_id}")
            return analytics

        except Exception as e:
            logger.error(f"Failed to get user analytics: {e}")
            return {}

QueryInterface 提供访问存储数据的方法。get_user_conversations 可检索完整对话历史(可选是否包含消息内容);search_conversations 使用 SQL 的 ILIKE 在消息内容上做全文搜索;get_entity_mentions 找出提及某个实体的所有对话;get_user_analytics 则生成用户层面的活跃与偏好统计。这些查询能力可用于构建仪表盘、生成报告以及提供更个性化的体验。

步骤 7:使用 Bright Data 的实时网页数据构建 RAG

利用 Bright Data 的实时网页情报 为你的数据库型 Agent 增加 RAG 能力。这个集成会把你的对话历史与最新网页数据结合起来,从而生成更优的回答。

class BrightDataRAGEnhancer:
    """
    Enhance data-persistent agent with Bright Data web intelligence.

    This module:
    - Fetches real-time web data from Bright Data
    - Ingests web data into vector store for RAG
    - Enhances agent with web-augmented knowledge
    """

    def __init__(self, api_key: str, db_manager: DatabaseManager):
        """
        Initialize RAG enhancer with Bright Data.

        Args:
            api_key: Bright Data API key
            db_manager: Database manager instance
        """
        self.api_key = api_key
        self.db_manager = db_manager
        self.base_url = "https://api.brightdata.com"

        # Initialize vector store for RAG
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = Chroma(
            embedding_function=self.embeddings,
            persist_directory="./chroma_db"
        )

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

        logger.info("✓ Bright Data RAG enhancer initialized")

    def fetch_dataset_data(
        self,
        dataset_id: str,
        filters: Optional[Dict[str, Any]] = None,
        limit: int = 1000
    ) -> List[Dict[str, Any]]:
        """
        Fetch data from Bright Data Dataset Marketplace.

        Args:
            dataset_id: ID of dataset to fetch
            filters: Optional filters for data
            limit: Maximum number of records to fetch

        Returns:
            list: Retrieved dataset records
        """
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        endpoint = f"{self.base_url}/datasets/v3/snapshot/{dataset_id}"

        params = {
            "format": "json",
            "limit": limit
        }

        if filters:
            params["filter"] = json.dumps(filters)

        try:
            response = requests.get(endpoint, headers=headers, params=params)
            response.raise_for_status()

            data = response.json()
            logger.info(f"✓ Retrieved {len(data)} records from Bright Data dataset {dataset_id}")
            return data

        except Exception as e:
            logger.error(f"Failed to fetch Bright Data dataset: {e}")
            return []

    def ingest_web_data_to_rag(
        self,
        dataset_records: List[Dict[str, Any]],
        text_fields: List[str],
        metadata_fields: Optional[List[str]] = None
    ) -> int:
        """
        Ingest web data into RAG vector store.

        Args:
            dataset_records: Records from Bright Data
            text_fields: Fields to use as text content
            metadata_fields: Fields to include in metadata

        Returns:
            int: Number of document chunks ingested
        """
        try:
            documents = []

            for record in dataset_records:
                # Combine text fields
                text_content = " ".join([
                    str(record.get(field, ""))
                    for field in text_fields
                    if record.get(field)
                ])

                if not text_content.strip():
                    continue

                # Build metadata
                metadata = {
                    "source": "bright_data",
                    "record_id": record.get("id", "unknown"),
                    "timestamp": datetime.utcnow().isoformat()
                }

                if metadata_fields:
                    for field in metadata_fields:
                        if field in record:
                            metadata[field] = record[field]

                # Split text into chunks
                chunks = self.text_splitter.split_text(text_content)

                for chunk in chunks:
                    documents.append({
                        "content": chunk,
                        "metadata": metadata
                    })

            # Add to vector store
            if documents:
                texts = [doc["content"] for doc in documents]
                metadatas = [doc["metadata"] for doc in documents]

                self.vector_store.add_texts(
                    texts=texts,
                    metadatas=metadatas
                )

                logger.info(f"✓ Ingested {len(documents)} document chunks into RAG")

            return len(documents)

        except Exception as e:
            logger.error(f"Failed to ingest web data to RAG: {e}")
            return 0

    def create_rag_enhanced_agent(
        self,
        base_agent: DataPersistentAgent
    ) -> DataPersistentAgent:
        """
        Enhance existing agent with RAG capabilities.

        Args:
            base_agent: Base agent to enhance

        Returns:
            DataPersistentAgent: Enhanced agent with RAG tool
        """
        def rag_search(query: str) -> str:
            """Search both conversation history and web data."""
            try:
                # Retrieve from conversation history
                session = self.db_manager.get_session()

                messages = session.query(Message).filter(
                    Message.content.ilike(f'%{query}%')
                ).order_by(Message.created_at.desc()).limit(5).all()

                results = []
                for msg in messages:
                    results.append({
                        'content': msg.content,
                        'source': 'conversation_history',
                        'relevance': 0.8
                    })

                session.close()

                # Retrieve from vector store (web data)
                try:
                    vector_results = self.vector_store.similarity_search_with_score(query, k=5)

                    for doc, score in vector_results:
                        results.append({
                            'content': doc.page_content,
                            'source': 'web_data',
                            'relevance': 1 - score
                        })
                except Exception as e:
                    logger.error(f"Failed to retrieve from vector store: {e}")

                if not results:
                    return "No relevant information found."

                # Format context
                context_text = "\n\n".join([
                    f"[{item['source']}] {item['content'][:200]}..."
                    for item in results[:5]
                ])

                return f"Retrieved context:\n{context_text}"

            except Exception as e:
                logger.error(f"RAG search failed: {e}")
                return f"Error performing search: {str(e)}"

        # Add RAG tool to agent
        rag_tool = Tool(
            name="SearchKnowledgeBase",
            func=rag_search,
            description="Search both conversation history and real-time web data for relevant information. Input should be a search query."
        )

        base_agent.tools.append(rag_tool)

        # Recreate agent with new tools
        base_agent.agent = create_openai_functions_agent(
            llm=base_agent.llm,
            tools=base_agent.tools,
            prompt=base_agent.prompt
        )

        base_agent.agent_executor = AgentExecutor(
            agent=base_agent.agent,
            tools=base_agent.tools,
            memory=base_agent.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )

        logger.info("✓ Enhanced agent with RAG capabilities")
        return base_agent

BrightDataEnhancer 将实时网页数据引入你的 Agent。fetch_dataset 方法从 Bright Data 的数据集市场拉取结构化数据,ingest_to_rag 方法对这些数据进行切分与处理,并存入 Chroma 向量数据库以支持语义检索。retrieve_context 方法执行混合检索,将数据库历史与向量相似度检索结果结合,create_rag_tool 则将此能力封装为 LangChain Tool,供 Agent 调用。enhance_agent 最终为现有 Agent 添加 RAG 能力,使其能同时利用内部对话历史与外部最新数据来回答问题。

运行完整的数据持久化 Agent 系统

将所有组件整合起来,构建一个可运行的系统。

def main():
    """Main execution flow demonstrating all components working together."""

    print("=" * 60)
    print("Data-Persistent AI Agent System - Initialization")
    print("=" * 60)

    # Step 1: Initialize database
    print("\n[Step 1] Setting up database connection...")
    db_manager = DatabaseManager(
        database_url=os.getenv("DATABASE_URL"),
        pool_size=5,
        max_retries=3
    )
    db_manager.initialize_database()

    # Step 2: Initialize core agent
    print("\n[Step 2] Building AI agent core...")
    agent = DataPersistentAgent(
        db_manager=db_manager,
        model_name=os.getenv("AGENT_MODEL", "gpt-4-turbo-preview")
    )

    # Step 3: Initialize data collector
    print("\n[Step 3] Creating data collection module...")
    collector = DataCollector(db_manager, agent.llm)

    # Step 4: Initialize processing pipeline
    print("\n[Step 4] Implementing data processing pipeline...")
    pipeline = DataProcessingPipeline(db_manager, collector)
    pipeline.start()

    # Step 5: Initialize monitoring
    print("\n[Step 5] Adding monitoring and logging...")
    monitor = AgentMonitor(db_manager)

    # Step 6: Initialize query interface
    print("\n[Step 6] Building query interface...")
    query_interface = DataQueryInterface(db_manager)

    # Step 7: Optional Bright Data RAG Enhancement
    print("\n[Step 7] RAG Enhancement (Optional)...")
    bright_data_key = os.getenv("BRIGHT_DATA_API_KEY")
    if bright_data_key and bright_data_key != "your-bright-data-api-key":
        print("Fetching real-time web data from Bright Data...")
        enhancer = BrightDataRAGEnhancer(bright_data_key, db_manager)

        # Example: Fetch and ingest web data
        web_data = enhancer.fetch_dataset_data(
            dataset_id="example_dataset_id",
            limit=100
        )

        if web_data:
            enhancer.ingest_web_data_to_rag(
                dataset_records=web_data,
                text_fields=["title", "content", "description"],
                metadata_fields=["url", "published_date"]
            )

        # Enhance agent with RAG
        agent = enhancer.create_rag_enhanced_agent(agent)
        print("✓ Agent enhanced with Bright Data RAG capabilities")
    else:
        print("⚠️ Bright Data API key not found - skipping web data integration")

    print("\n" + "=" * 60)
    print("Demo Conversations")
    print("=" * 60)

    # Demo user interactions
    test_user = "demo_user_001"

    # First conversation
    print("\n📝 Conversation 1:")
    response1 = agent.chat(
        user_id=test_user,
        message="Hi! I'm interested in learning about machine learning."
    )
    print(f"Agent: {response1['response']}\n")

    # Queue for processing
    pipeline.queue_conversation_for_processing(
        response1['conversation_id'],
        test_user
    )

    # Second conversation
    print("📝 Conversation 2:")
    response2 = agent.chat(
        user_id=test_user,
        message="Help me understand neural networks?",
        conversation_id=response1['conversation_id']
    )
    print(f"Agent: {response2['response']}\n")

    # Wait for background processing
    print("⏳ Processing data in background...")
    time.sleep(5)

    print("\n" + "=" * 60)
    print("Analytics & Monitoring")
    print("=" * 60)

    # Get performance metrics
    metrics = monitor.get_performance_metrics(hours=1)
    print(f"\n📊 Performance Metrics:")
    print(f"  - Total operations: {metrics.get('total_operations', 0)}")
    print(f"  - Error rate: {metrics.get('error_rate', 0):.2%}")
    print(f"  - Avg execution time: {metrics.get('avg_execution_time', 0):.2f}s")
    print(f"  - Conversations created: {metrics.get('conversations_created', 0)}")
    print(f"  - Messages processed: {metrics.get('messages_processed', 0)}")

    # Get user analytics
    analytics = query_interface.get_user_analytics(test_user)
    print(f"\n👤 User Analytics:")
    print(f"  - Conversation count: {analytics.get('conversation_count', 0)}")
    print(f"  - Message count: {analytics.get('message_count', 0)}")
    print(f"  - Entity count: {analytics.get('entity_count', 0)}")
    print(f"  - Avg messages/conversation: {analytics.get('avg_messages_per_conversation', 0):.1f}")

    # Health check
    health = monitor.health_check()
    print(f"\n🏥 System Health: {health['status']}")

    # Queue status
    queue_status = pipeline.get_queue_status()
    print(f"\n📋 Processing Queues:")
    print(f"  - Summary queue: {queue_status['summary_queue']}")
    print(f"  - Entity queue: {queue_status['entity_queue']}")
    print(f"  - Preference queue: {queue_status['preference_queue']}")

    # Stop pipeline
    pipeline.stop()

    print("\n" + "=" * 60)
    print("Data-Persistent Agent System - Complete")
    print("=" * 60)
    print("\n✓ All data persisted to database")
    print("✓ Background processing completed")
    print("✓ System ready for production use")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n\n⚠️ Shutting down gracefully...")
    except Exception as e:
        logger.error(f"System error: {e}")
        import traceback
        traceback.print_exc()

运行你的数据库型 Agent 系统:

python agent.py

系统将执行完整的工作流:初始化数据库并创建所有表;搭建带数据库工具的 LangChain Agent;启动后台工作线程;处理演示对话并将数据保存到数据库;在后台抽取实体并生成摘要;最后展示实时分析与指标。

你会看到详细的日志输出,展示每个组件的初始化和数据处理过程。Agent 会存储每一条消息、抽取洞察、维护完整对话上下文。

Building an AI Agent that Saves Data to Database Demo

典型应用场景

1. 拥有完整历史记录的客户支持

# Agent retrieves past interactions
support_agent = DataPersistentAgent(db_manager)
response = support_agent.chat(
    user_id="customer_123",
    message="I'm still having that connection issue"
)
# Agent sees previous conversations about connection problems

2. 会持续学习的个人 AI 助理

# Agent learns preferences over time
query_interface = QueryInterface(db_manager)
analytics = query_interface.get_user_analytics("user_456")
# Shows interaction patterns, preferences, common topics

3. 带知识库的研究助理

# Combine conversation history with web data
enhancer = BrightDataEnhancer(api_key, db_manager)
enhancer.ingest_to_rag(research_data, ["title", "abstract", "content"])
agent = enhancer.enhance_agent(agent)
# Agent references both past discussions and latest research

优势概览

功能 无数据库 有数据库持久化
记忆 重启即丢失 永久存储
个性化 基于完整历史的个性化
分析能力 不可行 完整交互数据可用
错误恢复 人工干预 自动重试与日志记录
可扩展性 单实例 多实例共享状态
洞察 会话结束即丢失 被抽取与持续追踪

总结

现在你已经拥有一个可用于生产的 AI Agent 系统,它能将对话持久化到数据库中。该系统会保存每一次交互,抽取实体与洞察,维护完整对话历史,并通过自动错误恢复提供监控能力。

你可以在此基础上继续增强:增加用户认证实现安全访问、构建可视化分析的仪表盘、引入向量嵌入实现语义搜索、提供 API 接口便于集成,或使用 Docker 部署以实现更好的扩展性。模块化设计使你可以轻松按需定制。

前往了解更多 高级 AI Agent 模式 以及 Bright Data 的网页情报平台,获取更多能力。

创建一个免费账号,开始构建真正“记得住且会学习”的智能系统。

支持支付宝等多种支付方式

Arindam Majumder

AI 内容创作者

Arindam Majumder 是一名开发者推广专员、YouTube博主和技术作家,专注于将大语言模型 (LLM)、智能体工作流及 AI 内容讲解得简单易懂,拥有超过5000名关注者。

Expertise