AI

使用 Bright Data Web MCP 构建 AI BDR/SDR 智能体

学习构建多智能体 AI BDR/SDR 系统,通过 Bright Data 提供的实时网络情报,实现自动化的潜在客户发现与个性化外联。
24 分钟阅读
基于 Bright Data 的 AI SDR 代理

该系统利用实时网络情报发现新的潜在客户,自动检测购买信号,并基于真实的业务事件生成个性化外联。可直接在GitHub上手实践。

你将学到:

  • 如何使用 CrewAI 构建多智能体系统,以执行专业化的获客任务
  • 如何利用 Bright Data MCP 获取实时的公司与联系人情报
  • 如何自动检测如招聘、融资、领导层变动等触发事件
  • 如何基于实时商业情报生成个性化外联
  • 如何创建从线索发现到 CRM 集成的自动化流程

开始吧!

现代销售开发的挑战

传统的销售开发依赖人工调研,需要在 LinkedIn 个人资料、公司网站和新闻文章之间来回切换以识别潜在客户。这种方式耗时、易错,常导致过时的联系人列表和定位不准的沟通。

将 CrewAI 与 Bright Data 集成可实现整个获客流程的自动化,将数小时的人工工作缩短至数分钟。

我们将构建什么:智能销售开发系统

你将构建一个多智能体 AI 系统,查找符合理想客户画像(ICP)的公司。该系统会跟踪表明购买意向的触发事件,收集经验证的决策者信息,并基于真实商业情报创建个性化外联消息。系统可直接连接到你的 CRM,以保持高质量的销售管道。

前提条件

使用以下要求设置开发环境:

环境设置

创建项目目录并安装依赖。首先设置一个干净的虚拟环境,避免与其他 Python 项目冲突。

python -m venv ai_bdr_env
source ai_bdr_env/bin/activate  # Windows: ai_bdr_envScriptsactivate
pip install crewai "crewai-tools[mcp]" openai pandas python-dotenv streamlit requests

创建环境配置:

BRIGHT_DATA_API_TOKEN="your_bright_data_api_token"
OPENAI_API_KEY="your_openai_api_key"
HUBSPOT_API_KEY="your_hubspot_api_key"

构建 AI BDR 系统

现在开始为我们的 AI BDR 系统构建 AI 智能体。

步骤 1:Bright Data MCP 设置

创建网页抓取基础设施,以便从多个来源实时采集数据。MCP 客户端负责与 Bright Data 抓取网络的全部通信。

在项目根目录创建一个 mcp_client.py 文件,并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from typing import Any
from pydantic import BaseModel, Field
from .utils import validate_companies_input, safe_mcp_call, deduplicate_by_key, extract_domain_from_url

class CompanyDiscoveryInput(BaseModel):
    industry: str = Field(description="Target industry for company discovery")
    size_range: str = Field(description="Company size range (startup, small, medium, enterprise)")
    location: str = Field(default="", description="Geographic location or region")

class CompanyDiscoveryTool(BaseTool):
    name: str = "discover_companies"
    description: str = "Find companies matching ICP criteria using web scraping"
    args_schema: type[BaseModel] = CompanyDiscoveryInput
    mcp: Any = None

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp = mcp_client

    def _run(self, industry: str, size_range: str, location: str = "") -> list:
        companies = []

        search_terms = [
            f"{industry} companies {size_range}",
            f"{industry} startups {location}",
            f"{industry} technology companies"
        ]

        for term in search_terms:
            results = self._search_companies(term)
            for company in results:
                enriched = self._enrich_company_data(company)
                if self._matches_icp(enriched, industry, size_range):
                    companies.append(enriched)

        return deduplicate_by_key(companies, lambda c: c.get('domain') or c['name'].lower())

    def _search_companies(self, term):
        """Search for companies using real web search through Bright Data."""
        try:
            companies = []

            search_queries = [
                f"{term} directory",
                f"{term} list",
                f"{term} news"
            ]

            for query in search_queries:
                try:
                    results = self._perform_company_search(query)
                    companies.extend(results)

                    if len(companies) >= 10:
                        break

                except Exception as e:
                    print(f"Error in search query '{query}': {str(e)}")
                    continue

            return self._filter_unique_companies(companies)

        except Exception as e:
            print(f"Error searching companies for '{term}': {str(e)}")
            return []

    def _enrich_company_data(self, company):
        linkedin_data = safe_mcp_call(self.mcp, 'scrape_company_linkedin', company['name'])
        website_data = safe_mcp_call(self.mcp, 'scrape_company_website', company.get('domain', ''))

        employee_count = linkedin_data.get('employee_count') or 150

        return {
            **company,
            'linkedin_intelligence': linkedin_data,
            'website_intelligence': website_data,
            'employee_count': employee_count,
            'icp_score': 0
        }

    def _matches_icp(self, company, industry, size_range):
        score = 0
        if industry.lower() in company.get('industry', '').lower():
            score += 30
        if self._check_size_range(company.get('employee_count', 0), size_range):
            score += 25

        if company.get('name') and company.get('domain'):
            score += 20

        company['icp_score'] = score

        return score >= 20

    def _check_size_range(self, count, size_range):
        ranges = {'startup': (1, 50), 'small': (51, 200), 'medium': (201, 1000)}
        min_size, max_size = ranges.get(size_range, (0, 999999))
        return min_size <= count <= max_size


    def _perform_company_search(self, query):
        """Perform company search using Bright Data MCP."""
        search_result = safe_mcp_call(self.mcp, 'search_company_news', query)

        if search_result and search_result.get('results'):
            return self._extract_companies_from_mcp_results(search_result['results'], query)
        else:
            print(f"No MCP results for: {query}")
            return []


    def _filter_unique_companies(self, companies):
        """Filter out duplicate companies."""
        seen_names = set()
        unique_companies = []

        for company in companies:
            name_key = company.get('name', '').lower()
            if name_key and name_key not in seen_names:
                seen_names.add(name_key)
                unique_companies.append(company)

        return unique_companies

    def _extract_companies_from_mcp_results(self, mcp_results, original_query):
        """Extract company information from MCP search results."""
        companies = []

        for result in mcp_results[:10]:
            try:
                title = result.get('title', '')
                url = result.get('url', '')
                snippet = result.get('snippet', '')

                company_name = self._extract_company_name_from_result(title, url)

                if company_name and len(company_name) > 2:
                    domain = self._extract_domain_from_url(url)

                    industry = self._extract_industry_from_query(original_query)

                    companies.append({
                        'name': company_name,
                        'domain': domain,
                        'industry': industry
                    })

            except Exception as e:
                print(f"Error extracting company from MCP result: {str(e)}")
                continue

        return companies

    def _extract_company_name_from_result(self, title, url):
        """Extract company name from search result title or URL."""
        import re

        if title:
            title_clean = re.sub(r'[|-—–].*$', '', title).strip()

            title_clean = re.sub(r's+(Inc|Corp|LLC|Ltd|Solutions|Systems|Technologies|Software|Platform|Company)$', '', title_clean, flags=re.IGNORECASE)

            if len(title_clean) > 2 and len(title_clean) < 50:
                return title_clean

        if url:
            domain_parts = url.split('/')[2].split('.')
            if len(domain_parts) > 1:
                return domain_parts[0].title()

        return None

    def _extract_domain_from_url(self, url):
        """Extract domain from URL."""
        return extract_domain_from_url(url)

    def _extract_industry_from_query(self, query):
        """Extract industry from search query."""
        query_lower = query.lower()

        industry_mappings = {
            'saas': 'SaaS',
            'fintech': 'FinTech',
            'ecommerce': 'E-commerce',
            'healthcare': 'Healthcare',
            'ai': 'AI/ML',
            'machine learning': 'AI/ML',
            'artificial intelligence': 'AI/ML'
        }

        for keyword, industry in industry_mappings.items():
            if keyword in query_lower:
                return industry

        return 'Technology'

def create_company_discovery_agent(mcp_client):
    return Agent(
        role='Company Discovery Specialist',
        goal='Find high-quality prospects matching ICP criteria',
        backstory='Expert at identifying potential customers using real-time web intelligence.',
        tools=[CompanyDiscoveryTool(mcp_client)],
        verbose=True
    )

该 MCP 客户端基于 Bright Data 的 AI 基础设施管理所有网页抓取任务。它可可靠地访问 LinkedIn 公司页面、企业官网、融资数据库和新闻来源。客户端负责连接池管理,并自动应对反爬虫保护。

步骤 2:公司发现智能体

将你的理想客户画像(ICP)转化为一个智能发现系统,寻找符合特定要求的公司。该智能体搜索多个来源,并用商业情报完善公司数据。

首先在项目根目录创建一个 agents 文件夹。然后创建 agents/company_discovery.py 文件并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from typing import Any
from pydantic import BaseModel, Field
from .utils import validate_companies_input, safe_mcp_call, deduplicate_by_key, extract_domain_from_url

class CompanyDiscoveryInput(BaseModel):
    industry: str = Field(description="Target industry for company discovery")
    size_range: str = Field(description="Company size range (startup, small, medium, enterprise)")
    location: str = Field(default="", description="Geographic location or region")

class CompanyDiscoveryTool(BaseTool):
    name: str = "discover_companies"
    description: str = "Find companies matching ICP criteria using web scraping"
    args_schema: type[BaseModel] = CompanyDiscoveryInput
    mcp: Any = None

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp = mcp_client

    def _run(self, industry: str, size_range: str, location: str = "") -> list:
        companies = []

        search_terms = [
            f"{industry} companies {size_range}",
            f"{industry} startups {location}",
            f"{industry} technology companies"
        ]

        for term in search_terms:
            results = self._search_companies(term)
            for company in results:
                enriched = self._enrich_company_data(company)
                if self._matches_icp(enriched, industry, size_range):
                    companies.append(enriched)

        return deduplicate_by_key(companies, lambda c: c.get('domain') or c['name'].lower())

    def _search_companies(self, term):
        """Search for companies using real web search through Bright Data."""
        try:
            companies = []

            search_queries = [
                f"{term} directory",
                f"{term} list",
                f"{term} news"
            ]

            for query in search_queries:
                try:
                    results = self._perform_company_search(query)
                    companies.extend(results)

                    if len(companies) >= 10:
                        break

                except Exception as e:
                    print(f"Error in search query '{query}': {str(e)}")
                    continue

            return self._filter_unique_companies(companies)

        except Exception as e:
            print(f"Error searching companies for '{term}': {str(e)}")
            return []

    def _enrich_company_data(self, company):
        linkedin_data = safe_mcp_call(self.mcp, 'scrape_company_linkedin', company['name'])
        website_data = safe_mcp_call(self.mcp, 'scrape_company_website', company.get('domain', ''))

        employee_count = linkedin_data.get('employee_count') or 150

        return {
            **company,
            'linkedin_intelligence': linkedin_data,
            'website_intelligence': website_data,
            'employee_count': employee_count,
            'icp_score': 0
        }

    def _matches_icp(self, company, industry, size_range):
        score = 0
        if industry.lower() in company.get('industry', '').lower():
            score += 30
        if self._check_size_range(company.get('employee_count', 0), size_range):
            score += 25

        if company.get('name') and company.get('domain'):
            score += 20

        company['icp_score'] = score
        return score >= 20

    def _check_size_range(self, count, size_range):
        ranges = {'startup': (1, 50), 'small': (51, 200), 'medium': (201, 1000)}
        min_size, max_size = ranges.get(size_range, (0, 999999))
        return min_size <= count <= max_size

def create_company_discovery_agent(mcp_client):
    return Agent(
        role='Company Discovery Specialist',
        goal='Find high-quality prospects matching ICP criteria',
        backstory='Expert at identifying potential customers using real-time web intelligence.',
        tools=[CompanyDiscoveryTool(mcp_client)],
        verbose=True
    )

发现智能体会在多种数据源中搜索符合 ICP 的公司,为每家公司补充来自 LinkedIn 与公司官网的商业信息;随后根据你设定的评分标准过滤结果。去重流程保持线索列表整洁,避免重复条目。

步骤 3:触发事件检测智能体

监测能体现购买意向与外联最佳时机的业务事件。该智能体关注招聘动态、融资公告、领导层变动以及扩张信号,以便为潜在客户排序。

创建 agents/trigger_detection.py 文件并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from datetime import datetime, timedelta
from typing import Any, List
from pydantic import BaseModel, Field
from .utils import validate_companies_input, safe_mcp_call

class TriggerDetectionInput(BaseModel):
    companies: List[dict] = Field(description="List of companies to analyze for trigger events")

class TriggerDetectionTool(BaseTool):
    name: str = "detect_triggers"
    description: str = "Find hiring signals, funding news, leadership changes"
    args_schema: type[BaseModel] = TriggerDetectionInput
    mcp: Any = None

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp = mcp_client

    def _run(self, companies) -> list:
        companies = validate_companies_input(companies)
        if not companies:
            return []

        for company in companies:

            triggers = []

            hiring_signals = self._detect_hiring_triggers(company)
            triggers.extend(hiring_signals)

            funding_signals = self._detect_funding_triggers(company)
            triggers.extend(funding_signals)

            leadership_signals = self._detect_leadership_triggers(company)
            triggers.extend(leadership_signals)

            expansion_signals = self._detect_expansion_triggers(company)
            triggers.extend(expansion_signals)

            company['trigger_events'] = triggers
            company['trigger_score'] = self._calculate_trigger_score(triggers)

        return sorted(companies, key=lambda x: x.get('trigger_score', 0), reverse=True)

    def _detect_hiring_triggers(self, company):
        """Detect hiring triggers using LinkedIn data."""
        linkedin_data = safe_mcp_call(self.mcp, 'scrape_company_linkedin', company['name'])
        triggers = []

        if linkedin_data:
            hiring_posts = linkedin_data.get('hiring_posts', [])
            recent_activity = linkedin_data.get('recent_activity', [])

            if hiring_posts:
                triggers.append({
                    'type': 'hiring_spike',
                    'severity': 'high',
                    'description': f"Active hiring detected at {company['name']} - {len(hiring_posts)} open positions",
                    'date_detected': datetime.now().isoformat(),
                    'source': 'linkedin_api'
                })

            if recent_activity:
                triggers.append({
                    'type': 'company_activity',
                    'severity': 'medium',
                    'description': f"Increased LinkedIn activity at {company['name']}",
                    'date_detected': datetime.now().isoformat(),
                    'source': 'linkedin_api'
                })

        return triggers


    def _detect_funding_triggers(self, company):
        """Detect funding triggers using news search."""
        funding_data = safe_mcp_call(self.mcp, 'search_funding_news', company['name'])
        triggers = []

        if funding_data and funding_data.get('results'):
            triggers.append({
                'type': 'funding_round',
                'severity': 'high',
                'description': f"Recent funding activity detected at {company['name']}",
                'date_detected': datetime.now().isoformat(),
                'source': 'news_search'
            })

        return triggers


    def _detect_leadership_triggers(self, company):
        """Detect leadership changes using news search."""
        return self._detect_keyword_triggers(
            company, 'leadership_change', 'medium',
            ['ceo', 'cto', 'vp', 'hired', 'joins', 'appointed'],
            f"Leadership changes detected at {company['name']}"
        )

    def _detect_expansion_triggers(self, company):
        """Detect business expansion using news search."""
        return self._detect_keyword_triggers(
            company, 'expansion', 'medium',
            ['expansion', 'new office', 'opening', 'market'],
            f"Business expansion detected at {company['name']}"
        )

    def _detect_keyword_triggers(self, company, trigger_type, severity, keywords, description):
        """Generic method to detect triggers based on keywords in news."""
        news_data = safe_mcp_call(self.mcp, 'search_company_news', company['name'])
        triggers = []

        if news_data and news_data.get('results'):
            for result in news_data['results']:
                if any(keyword in str(result).lower() for keyword in keywords):
                    triggers.append({
                        'type': trigger_type,
                        'severity': severity,
                        'description': description,
                        'date_detected': datetime.now().isoformat(),
                        'source': 'news_search'
                    })
                    break

        return triggers

    def _calculate_trigger_score(self, triggers):
        severity_weights = {'high': 15, 'medium': 10, 'low': 5}
        return sum(severity_weights.get(t.get('severity', 'low'), 5) for t in triggers)

def create_trigger_detection_agent(mcp_client):
    return Agent(
        role='Trigger Event Analyst',
        goal='Identify buying signals and optimal timing for outreach',
        backstory='Expert at detecting business events that indicate readiness to buy.',
        tools=[TriggerDetectionTool(mcp_client)],
        verbose=True
    )

触发检测系统监控多种体现购买意向的业务信号和最佳外联时机。它分析来自 LinkedIn 职位发布的招聘模式、跟踪新闻来源中的融资公告、关注领导层变动,并识别扩张活动。每个触发事件都会获得一个严重性评分,帮助根据紧迫性与机会规模来优先排序潜在客户。

步骤 4:联系人调研智能体

查找并核验决策者的联系方式,同时考虑来自不同数据源的置信度评分。该智能体会基于角色和数据质量对联系人进行优先级排序。

创建 agents/contact_research.py 文件并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from typing import Any, List
from pydantic import BaseModel, Field
import re
from .utils import validate_companies_input, safe_mcp_call, validate_email, deduplicate_by_key

class ContactResearchInput(BaseModel):
    companies: List[dict] = Field(description="List of companies to research contacts for")
    target_roles: List[str] = Field(description="List of target roles to find contacts for")

class ContactResearchTool(BaseTool):
    name: str = "research_contacts"
    description: str = "Find and verify decision-maker contact information using MCP"
    args_schema: type[BaseModel] = ContactResearchInput
    mcp: Any = None

    def __init__(self, mcp_client):
        super().__init__()
        self.mcp = mcp_client

    def _run(self, companies, target_roles) -> list:
        companies = validate_companies_input(companies)
        if not companies:
            return []

        if not isinstance(target_roles, list):
            target_roles = [target_roles] if target_roles else []

        for company in companies:

            contacts = []

            for role in target_roles:
                role_contacts = self._search_contacts_by_role(company, role)
                for contact in role_contacts:
                    enriched = self._enrich_contact_data(contact, company)
                    if self._validate_contact(enriched):
                        contacts.append(enriched)

            company['contacts'] = self._deduplicate_contacts(contacts)
            company['contact_score'] = self._calculate_contact_quality(contacts)

        return companies

    def _search_contacts_by_role(self, company, role):
        """Search for contacts by role using MCP."""
        contacts = []

        search_query = f"{company['name']} {role} LinkedIn contact"
        search_result = safe_mcp_call(self.mcp, 'search_company_news', search_query)

        if search_result and search_result.get('results'):
            contacts.extend(self._extract_contacts_from_mcp_results(search_result['results'], role))

        if not contacts:
            contact_query = f"{company['name']} {role} email contact"
            contact_result = safe_mcp_call(self.mcp, 'search_company_news', contact_query)
            if contact_result and contact_result.get('results'):
                contacts.extend(self._extract_contacts_from_mcp_results(contact_result['results'], role))

        return contacts[:3]

    def _extract_contacts_from_mcp_results(self, results, role):
        """Extract contact information from MCP search results."""
        contacts = []

        for result in results:
            try:
                title = result.get('title', '')
                snippet = result.get('snippet', '')
                url = result.get('url', '')

                names = self._extract_names_from_text(title + ' ' + snippet)

                for name_parts in names:
                    if len(name_parts) >= 2:
                        first_name, last_name = name_parts[0], ' '.join(name_parts[1:])

                        contacts.append({
                            'first_name': first_name,
                            'last_name': last_name,
                            'title': role,
                            'linkedin_url': url if 'linkedin' in url else '',
                            'data_sources': 1,
                            'source': 'mcp_search'
                        })

                        if len(contacts) >= 2:
                            break

            except Exception as e:
                print(f"Error extracting contact from result: {str(e)}")
                continue

        return contacts

    def _extract_names_from_text(self, text):
        """Extract likely names from text."""
        import re

        name_patterns = [
            r'b([A-Z][a-z]+)s+([A-Z][a-z]+)b',
            r'b([A-Z][a-z]+)s+([A-Z].?s*[A-Z][a-z]+)b',
            r'b([A-Z][a-z]+)s+([A-Z][a-z]+)s+([A-Z][a-z]+)b'
        ]

        names = []
        for pattern in name_patterns:
            matches = re.findall(pattern, text)
            for match in matches:
                if isinstance(match, tuple):
                    names.append(list(match))

        return names[:3]

    def _enrich_contact_data(self, contact, company):
        if not contact.get('email'):
            contact['email'] = self._generate_email(
                contact['first_name'],
                contact['last_name'],
                company.get('domain', '')
            )

        contact['email_valid'] = validate_email(contact.get('email', ''))

        contact['confidence_score'] = self._calculate_confidence(contact)

        return contact

    def _generate_email(self, first, last, domain):
        if not all([first, last, domain]):
            return ""
        return f"{first.lower()}.{last.lower()}@{domain}"


    def _calculate_confidence(self, contact):
        score = 0
        if contact.get('linkedin_url'): score += 30
        if contact.get('email_valid'): score += 25
        if contact.get('data_sources', 0) > 1: score += 20
        if all(contact.get(f) for f in ['first_name', 'last_name', 'title']): score += 25
        return score

    def _validate_contact(self, contact):
        required = ['first_name', 'last_name', 'title']
        return (all(contact.get(f) for f in required) and
                contact.get('confidence_score', 0) >= 50)

    def _deduplicate_contacts(self, contacts):
        unique = deduplicate_by_key(
            contacts,
            lambda c: c.get('email', '') or f"{c.get('first_name', '')}_{c.get('last_name', '')}"
        )
        return sorted(unique, key=lambda x: x.get('confidence_score', 0), reverse=True)

    def _calculate_contact_quality(self, contacts):
        if not contacts:
            return 0
        avg_confidence = sum(c.get('confidence_score', 0) for c in contacts) / len(contacts)
        high_quality = sum(1 for c in contacts if c.get('confidence_score', 0) >= 75)
        return min(avg_confidence + (high_quality * 5), 100)

def create_contact_research_agent(mcp_client):
    return Agent(
        role='Contact Intelligence Specialist',
        goal='Find accurate contact information for decision-makers using MCP',
        backstory='Expert at finding and verifying contact information using advanced MCP search tools.',
        tools=[ContactResearchTool(mcp_client)],
        verbose=True
    )

联系人调研系统通过在 LinkedIn 与公司官网上搜索职位来识别决策者,基于常见企业邮箱规则生成电子邮件,并通过多种验证方式校验联系方式。系统根据数据源质量分配置信度评分,通过去重保持联系人列表整洁,并按验证置信度进行优先级排序。

步骤 5:智能消息生成

将商业情报转化为个性化外联消息,在文案中提及具体触发事件并展现调研成果。该生成器会针对不同渠道生成多种消息格式。

创建 agents/message_generation.py 文件并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from typing import Any, List
from pydantic import BaseModel, Field
import openai
import os

class MessageGenerationInput(BaseModel):
    companies: List[dict] = Field(description="List of companies with contacts to generate messages for")
    message_type: str = Field(default="cold_email", description="Type of message to generate (cold_email, linkedin_message, follow_up)")

class MessageGenerationTool(BaseTool):
    name: str = "generate_messages"
    description: str = "Create personalized outreach based on company intelligence"
    args_schema: type[BaseModel] = MessageGenerationInput
    client: Any = None

    def __init__(self):
        super().__init__()
        self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    def _run(self, companies, message_type="cold_email") -> list:
        # Ensure companies is a list
        if not isinstance(companies, list):
            print(f"Warning: Expected list of companies, got {type(companies)}")
            return []

        if not companies:
            print("No companies provided for message generation")
            return []

        for company in companies:
            if not isinstance(company, dict):
                print(f"Warning: Expected company dict, got {type(company)}")
                continue

            for contact in company.get('contacts', []):
                if not isinstance(contact, dict):
                    continue

                message = self._generate_personalized_message(contact, company, message_type)
                contact['generated_message'] = message
                contact['message_quality_score'] = self._calculate_message_quality(message, company)
        return companies

    def _generate_personalized_message(self, contact, company, message_type):
        context = self._build_message_context(contact, company)

        if message_type == "cold_email":
            return self._generate_cold_email(context)
        elif message_type == "linkedin_message":
            return self._generate_linkedin_message(context)
        else:
            return self._generate_cold_email(context)

    def _build_message_context(self, contact, company):
        triggers = company.get('trigger_events', [])
        primary_trigger = triggers[0] if triggers else None

        return {
            'contact_name': contact.get('first_name', ''),
            'contact_title': contact.get('title', ''),
            'company_name': company.get('name', ''),
            'industry': company.get('industry', ''),
            'primary_trigger': primary_trigger,
            'trigger_count': len(triggers)
        }

    def _generate_cold_email(self, context):
        trigger_text = ""
        if context['primary_trigger']:
            trigger_text = f"I noticed {context['company_name']} {context['primary_trigger']['description'].lower()}."

        prompt = f"""Write a personalized cold email:

Contact: {context['contact_name']}, {context['contact_title']} at {context['company_name']}
Industry: {context['industry']}
Context: {trigger_text}

Requirements:
- Subject line that references the trigger event
- Personal greeting with first name
- Opening that demonstrates research
- Brief value proposition
- Clear call-to-action
- Maximum 120 words

Format as:
SUBJECT: [subject line]
BODY: [email body]"""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=300
        )

        return self._parse_email_response(response.choices[0].message.content)

    def _generate_linkedin_message(self, context):
        prompt = f"""Write a LinkedIn connection request (max 300 chars):

Contact: {context['contact_name']} at {context['company_name']}
Context: {context.get('primary_trigger', {}).get('description', '')}

Be professional, reference their company activity, no direct sales pitch."""

        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=100
        )

        return {
            'subject': 'LinkedIn Connection Request',
            'body': response.choices[0].message.content.strip()
        }

    def _parse_email_response(self, response):
        lines = response.strip().split('n')
        subject = ""
        body_lines = []

        for line in lines:
            if line.startswith('SUBJECT:'):
                subject = line.replace('SUBJECT:', '').strip()
            elif line.startswith('BODY:'):
                body_lines.append(line.replace('BODY:', '').strip())
            elif body_lines:
                body_lines.append(line)

        return {
            'subject': subject,
            'body': 'n'.join(body_lines).strip()
        }

    def _calculate_message_quality(self, message, company):
        score = 0
        body = message.get('body', '').lower()

        if company.get('name', '').lower() in message.get('subject', '').lower():
            score += 25
        if company.get('trigger_events') and any(t.get('type', '') in body for t in company['trigger_events']):
            score += 30
        if len(body.split()) <= 120:
            score += 20
        if any(word in body for word in ['call', 'meeting', 'discuss', 'connect']):
            score += 25

        return score

def create_message_generation_agent():
    return Agent(
        role='Personalization Specialist',
        goal='Create compelling personalized outreach that gets responses',
        backstory='Expert at crafting messages that demonstrate research and provide value.',
        tools=[MessageGenerationTool()],
        verbose=True
    )

消息生成系统将商业情报转化为个性化外联。它会引用具体触发事件并呈现详细调研,生成契合语境的主题行、个性化问候以及与每位潜在客户情境相匹配的价值主张。系统可输出适配不同渠道的多种消息格式,并保持个性化质量的一致性。

步骤 6:线索评分与管道管理器

基于多种情报因素对潜在客户评分,并将合格线索自动导出至你的 CRM。管理器会根据匹配度、时机与数据质量进行优先级排序。

创建 agents/pipeline_manager.py 文件并添加以下代码:

from crewai import Agent, Task
from crewai.tools import BaseTool
from datetime import datetime
from typing import List
from pydantic import BaseModel, Field
import requests
import os
from .utils import validate_companies_input

class LeadScoringInput(BaseModel):
    companies: List[dict] = Field(description="List of companies to score")

class LeadScoringTool(BaseTool):
    name: str = "score_leads"
    description: str = "Score leads based on multiple intelligence factors"
    args_schema: type[BaseModel] = LeadScoringInput

    def _run(self, companies) -> list:
        companies = validate_companies_input(companies)
        if not companies:
            return []

        for company in companies:

            score_breakdown = self._calculate_lead_score(company)
            company['lead_score'] = score_breakdown['total_score']
            company['score_breakdown'] = score_breakdown
            company['lead_grade'] = self._assign_grade(score_breakdown['total_score'])

        return sorted(companies, key=lambda x: x.get('lead_score', 0), reverse=True)

    def _calculate_lead_score(self, company):
        breakdown = {
            'icp_score': min(company.get('icp_score', 0) * 0.3, 25),
            'trigger_score': min(company.get('trigger_score', 0) * 2, 30),
            'contact_score': min(company.get('contact_score', 0) * 0.2, 20),
            'timing_score': self._assess_timing(company),
            'company_health': self._assess_health(company)
        }
        breakdown['total_score'] = sum(breakdown.values())
        return breakdown

    def _assess_timing(self, company):
        triggers = company.get('trigger_events', [])
        if not triggers:
            return 0

        recent_triggers = sum(1 for t in triggers if 'high' in t.get('severity', ''))
        return min(recent_triggers * 8, 15)

    def _assess_health(self, company):
        score = 0
        if company.get('trigger_events'):
            score += 5
        if company.get('employee_count', 0) > 50:
            score += 5
        return score

    def _assign_grade(self, score):
        if score >= 80: return 'A'
        elif score >= 65: return 'B'
        elif score >= 50: return 'C'
        else: return 'D'

class CRMIntegrationInput(BaseModel):
    companies: List[dict] = Field(description="List of companies to export to CRM")
    min_grade: str = Field(default="B", description="Minimum lead grade to export (A, B, C, D)")

class CRMIntegrationTool(BaseTool):
    name: str = "crm_integration"
    description: str = "Export qualified leads to HubSpot CRM"
    args_schema: type[BaseModel] = CRMIntegrationInput

    def _run(self, companies, min_grade='B') -> dict:
        companies = validate_companies_input(companies)
        if not companies:
            return {"message": "No companies provided for CRM export", "success": 0, "errors": 0}

        qualified = [c for c in companies if isinstance(c, dict) and c.get('lead_grade', 'D') in ['A', 'B']]

        if not os.getenv("HUBSPOT_API_KEY"):
            return {"error": "HubSpot API key not configured", "success": 0, "errors": 0}

        results = {"success": 0, "errors": 0, "details": []}

        for company in qualified:
            for contact in company.get('contacts', []):
                if not isinstance(contact, dict):
                    continue

                result = self._create_hubspot_contact(contact, company)
                if result.get('success'):
                    results['success'] += 1
                else:
                    results['errors'] += 1
                results['details'].append(result)

        return results

    def _create_hubspot_contact(self, contact, company):
        api_key = os.getenv("HUBSPOT_API_KEY")
        if not api_key:
            return {"success": False, "error": "HubSpot API key not configured"}

        url = "https://api.hubapi.com/crm/v3/objects/contacts"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

        trigger_summary = "; ".join([
            f"{t.get('type', '')}: {t.get('description', '')}"
            for t in company.get('trigger_events', [])
        ])

        email = contact.get('email', '').strip()
        if not email:
            return {"success": False, "error": "Contact email is required", "contact": contact.get('first_name', 'Unknown')}

        properties = {
            "email": email,
            "firstname": contact.get('first_name', ''),
            "lastname": contact.get('last_name', ''),
            "jobtitle": contact.get('title', ''),
            "company": company.get('name', ''),
            "website": f"https://{company.get('domain', '')}" if company.get('domain') else "",
            "hs_lead_status": "NEW",
            "lifecyclestage": "lead"
        }

        if company.get('lead_score'):
            properties["lead_score"] = str(company.get('lead_score', 0))
        if company.get('lead_grade'):
            properties["lead_grade"] = company.get('lead_grade', 'D')
        if trigger_summary:
            properties["trigger_events"] = trigger_summary[:1000]
        if contact.get('confidence_score'):
            properties["contact_confidence"] = str(contact.get('confidence_score', 0))

        properties["ai_discovery_date"] = datetime.now().isoformat()

        try:
            response = requests.post(url, json={"properties": properties}, headers=headers, timeout=30)

            if response.status_code == 201:
                return {
                    "success": True,
                    "contact": contact.get('first_name', ''),
                    "company": company.get('name', ''),
                    "hubspot_id": response.json().get('id')
                }
            elif response.status_code == 409:
                existing_contact = response.json()
                return {
                    "success": True,
                    "contact": contact.get('first_name', ''),
                    "company": company.get('name', ''),
                    "hubspot_id": existing_contact.get('id'),
                    "note": "Contact already exists"
                }
            else:
                error_detail = response.text if response.text else f"HTTP {response.status_code}"
                return {
                    "success": False,
                    "contact": contact.get('first_name', ''),
                    "company": company.get('name', ''),
                    "error": f"API Error: {error_detail}"
                }
        except requests.exceptions.RequestException as e:
            return {
                "success": False,
                "contact": contact.get('first_name', ''),
                "company": company.get('name', ''),
                "error": f"Network error: {str(e)}"
            }
        except Exception as e:
            return {
                "success": False,
                "contact": contact.get('first_name', ''),
                "company": company.get('name', ''),
                "error": f"Unexpected error: {str(e)}"
            }

def create_pipeline_manager_agent():
    return Agent(
        role='Pipeline Manager',
        goal='Score leads and manage CRM integration for qualified prospects',
        backstory='Expert at evaluating prospect quality and managing sales pipeline.',
        tools=[LeadScoringTool(), CRMIntegrationTool()],
        verbose=True
    )

线索评分系统从多维度评估潜在客户,包括与理想客户画像的匹配度、触发事件的紧迫性、联系人数据质量与时机因素。它提供可用于数据驱动优先级的详细评分拆解,并自动分配字母等级以快速判定资格。CRM 集成工具会将合格线索直接导出至 HubSpot,确保所有情报数据以适合销售跟进的格式呈现。

步骤 6.1:共享工具

在创建主应用之前,先创建 agents/utils.py 文件,包含跨各智能体通用的工具函数:

"""
Shared utility functions for all agent modules.
"""
from typing import List, Dict, Any
import re

def validate_companies_input(companies: Any) -> List[Dict]:
    """Validate and normalize companies input across all agents."""
    if isinstance(companies, dict) and 'companies' in companies:
        companies = companies['companies']

    if not isinstance(companies, list):
        print(f"Warning: Expected list of companies, got {type(companies)}")
        return []

    if not companies:
        print("No companies provided")
        return []

    valid_companies = []
    for company in companies:
        if isinstance(company, dict):
            valid_companies.append(company)
        else:
            print(f"Warning: Expected company dict, got {type(company)}")

    return valid_companies

def safe_mcp_call(mcp_client, method_name: str, *args, **kwargs) -> Dict:
    """Safely call MCP methods with consistent error handling."""
    try:
        method = getattr(mcp_client, method_name)
        result = method(*args, **kwargs)
        return result if result and not result.get('error') else {}
    except Exception as e:
        print(f"Error calling MCP {method_name}: {str(e)}")
        return {}

def validate_email(email: str) -> bool:
    """Validate email format."""
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def deduplicate_by_key(items: List[Dict], key_func) -> List[Dict]:
    """Remove duplicates from list of dicts using a key function."""
    seen = set()
    unique_items = []

    for item in items:
        key = key_func(item)
        if key and key not in seen:
            seen.add(key)
            unique_items.append(item)

    return unique_items

def extract_domain_from_url(url: str) -> str:
    """Extract domain from URL with fallback parsing."""
    if not url:
        return ""

    try:
        from urllib.parse import urlparse
        parsed = urlparse(url)
        return parsed.netloc
    except:
        if '//' in url:
            return url.split('//')[1].split('/')[0]
        return ""

你还需要创建一个空的 agents/__init__.py 文件,以将 agents 文件夹设为 Python 包。

步骤 7:系统编排

创建主 Streamlit 应用,以在智能工作流中协调所有智能体。该界面提供实时反馈,并允许用户为不同的获客场景自定义参数。

在项目根目录创建 ai_bdr_system.py 文件并添加以下代码:

import streamlit as st
import os
from dotenv import load_dotenv
from crewai import Crew, Process, Task
import pandas as pd
from datetime import datetime
import json
from mcp_client import Bright DataMCP
from agents.company_discovery import create_company_discovery_agent
from agents.trigger_detection import create_trigger_detection_agent
from agents.contact_research import create_contact_research_agent
from agents.message_generation import create_message_generation_agent
from agents.pipeline_manager import create_pipeline_manager_agent

load_dotenv()

st.set_page_config(
    page_title="AI BDR/SDR System",
    page_icon="🤖",
    layout="wide"
)

st.title("🤖 AI BDR/SDR Agent System")
st.markdown("**Real-time prospecting with multi-agent intelligence and trigger-based personalization**")

if 'workflow_results' not in st.session_state:
    st.session_state.workflow_results = None

with st.sidebar:
    try:
        st.image("bright-data-logo.png", width=200)
        st.markdown("---")
    except:
        st.markdown("**🌐 Powered by Bright Data**")
        st.markdown("---")

    st.header("⚙️ Configuration")

    st.subheader("Ideal Customer Profile")
    industry = st.selectbox("Industry", ["SaaS", "FinTech", "E-commerce", "Healthcare", "AI/ML"])
    size_range = st.selectbox("Company Size", ["startup", "small", "medium", "enterprise"])
    location = st.text_input("Location (optional)", placeholder="San Francisco, NY, etc.")
    max_companies = st.slider("Max Companies", 5, 50, 20)

    st.subheader("Target Decision Makers")
    all_roles = ["CEO", "CTO", "VP Engineering", "Head of Product", "VP Sales", "CMO", "CFO"]
    target_roles = st.multiselect("Roles", all_roles, default=["CEO", "CTO", "VP Engineering"])

    st.subheader("Outreach Configuration")
    message_types = st.multiselect(
        "Message Types",
        ["cold_email", "linkedin_message", "follow_up"],
        default=["cold_email"]
    )

    with st.expander("Advanced Intelligence"):
        enable_competitive = st.checkbox("Competitive Intelligence", value=True)
        enable_validation = st.checkbox("Multi-source Validation", value=True)
        min_lead_grade = st.selectbox("Min CRM Export Grade", ["A", "B", "C"], index=1)

    st.divider()

    st.subheader("🔗 API Status")

    apis = [
        ("Bright Data", "BRIGHT_DATA_API_TOKEN", "🌐"),
        ("OpenAI", "OPENAI_API_KEY", "🧠"),
        ("HubSpot CRM", "HUBSPOT_API_KEY", "📊")
    ]

    for name, env_var, icon in apis:
        if os.getenv(env_var):
            st.success(f"{icon} {name} Connected")
        else:
            if name == "HubSpot CRM":
                st.warning(f"⚠️ {name} Required for CRM export")
            elif name == "Bright Data":
                st.error(f"❌ {name} Missing")
                if st.button("🔧 Configuration Help", key="bright_data_help"):
                    st.info("""
                    **Bright Data Setup Required:**

                    1. Get credentials from Bright Data dashboard
                    2. Update .env file with:
                       ```
                       BRIGHT_DATA_API_TOKEN=your_password
                       WEB_UNLOCKER_ZONE=lum-customer-username-zone-zonename
                       ```
                    3. See BRIGHT_DATA_SETUP.md for detailed guide

                    **Current Error**: 407 Invalid Auth = Wrong credentials
                    """)
            else:
                st.error(f"❌ {name} Missing")

col1, col2 = st.columns([3, 1])

with col1:
    st.subheader("🚀 AI Prospecting Workflow")

    if st.button("Start Multi-Agent Prospecting", type="primary", use_container_width=True):
        required_keys = ["BRIGHT_DATA_API_TOKEN", "OPENAI_API_KEY"]
        missing_keys = [key for key in required_keys if not os.getenv(key)]

        if missing_keys:
            st.error(f"Missing required API keys: {', '.join(missing_keys)}")
            st.stop()

        progress_bar = st.progress(0)
        status_text = st.empty()

        try:
            mcp_client = Bright DataMCP()

            discovery_agent = create_company_discovery_agent(mcp_client)
            trigger_agent = create_trigger_detection_agent(mcp_client)
            contact_agent = create_contact_research_agent(mcp_client)
            message_agent = create_message_generation_agent()
            pipeline_agent = create_pipeline_manager_agent()

            status_text.text("🔍 Discovering companies matching ICP...")
            progress_bar.progress(15)

            discovery_task = Task(
                description=f"Find {max_companies} companies in {industry} ({size_range} size) in {location}",
                expected_output="List of companies with ICP scores and intelligence",
                agent=discovery_agent
            )

            discovery_crew = Crew(
                agents=[discovery_agent],
                tasks=[discovery_task],
                process=Process.sequential
            )

            companies = discovery_agent.tools[0]._run(industry, size_range, location)

            st.success(f"✅ Discovered {len(companies)} companies")

            status_text.text("🎯 Analyzing trigger events and buying signals...")
            progress_bar.progress(30)

            trigger_task = Task(
                description="Detect hiring spikes, funding rounds, leadership changes, and expansion signals",
                expected_output="Companies with trigger events and scores",
                agent=trigger_agent
            )

            trigger_crew = Crew(
                agents=[trigger_agent],
                tasks=[trigger_task],
                process=Process.sequential
            )

            companies_with_triggers = trigger_agent.tools[0]._run(companies)

            total_triggers = sum(len(c.get('trigger_events', [])) for c in companies_with_triggers)

            st.success(f"✅ Detected {total_triggers} trigger events")
            progress_bar.progress(45)

            status_text.text("👥 Finding decision-maker contacts...")

            contact_task = Task(
                description=f"Find verified contacts for roles: {', '.join(target_roles)}",
                expected_output="Companies with decision-maker contact information",
                agent=contact_agent
            )

            contact_crew = Crew(
                agents=[contact_agent],
                tasks=[contact_task],
                process=Process.sequential
            )

            companies_with_contacts = contact_agent.tools[0]._run(companies_with_triggers, target_roles)

            total_contacts = sum(len(c.get('contacts', [])) for c in companies_with_contacts)

            st.success(f"✅ Found {total_contacts} verified contacts")
            progress_bar.progress(60)

            status_text.text("✍️ Generating personalized outreach messages...")

            message_task = Task(
                description=f"Generate {', '.join(message_types)} for each contact using trigger intelligence",
                expected_output="Companies with personalized messages",
                agent=message_agent
            )

            message_crew = Crew(
                agents=[message_agent],
                tasks=[message_task],
                process=Process.sequential
            )

            companies_with_messages = message_agent.tools[0]._run(companies_with_contacts, message_types[0])

            total_messages = sum(len(c.get('contacts', [])) for c in companies_with_messages)

            st.success(f"✅ Generated {total_messages} personalized messages")
            progress_bar.progress(75)

            status_text.text("📊 Scoring leads and updating CRM...")

            pipeline_task = Task(
                description=f"Score leads and export Grade {min_lead_grade}+ to HubSpot CRM",
                expected_output="Scored leads with CRM integration results",
                agent=pipeline_agent
            )

            pipeline_crew = Crew(
                agents=[pipeline_agent],
                tasks=[pipeline_task],
                process=Process.sequential
            )

            final_companies = pipeline_agent.tools[0]._run(companies_with_messages)
            qualified_leads = [c for c in final_companies if c.get('lead_grade', 'D') in ['A', 'B']]

            crm_results = {"success": 0, "errors": 0}
            if os.getenv("HUBSPOT_API_KEY"):
                crm_results = pipeline_agent.tools[1]._run(final_companies, min_lead_grade)

            progress_bar.progress(100)
            status_text.text("✅ Workflow completed successfully!")

            st.session_state.workflow_results = {
                'companies': final_companies,
                'total_companies': len(final_companies),
                'total_triggers': total_triggers,
                'total_contacts': total_contacts,
                'qualified_leads': len(qualified_leads),
                'crm_results': crm_results,
                'timestamp': datetime.now()
            }

        except Exception as e:
            st.error(f"❌ Workflow failed: {str(e)}")
            st.write("Please check your API configurations and try again.")

if st.session_state.workflow_results:
    results = st.session_state.workflow_results

    st.markdown("---")
    st.subheader("📊 Workflow Results")

    col1, col2, col3, col4 = st.columns(4)
    with col1:
        st.metric("Companies Analyzed", results['total_companies'])
    with col2:
        st.metric("Trigger Events", results['total_triggers'])
    with col3:
        st.metric("Contacts Found", results['total_contacts'])
    with col4:
        st.metric("Qualified Leads", results['qualified_leads'])

    if results['crm_results']['success'] > 0 or results['crm_results']['errors'] > 0:
        st.subheader("🔄 HubSpot CRM Integration")
        col1, col2 = st.columns(2)
        with col1:
            st.metric("Exported to CRM", results['crm_results']['success'], delta="contacts")
        with col2:
            if results['crm_results']['errors'] > 0:
                st.metric("Export Errors", results['crm_results']['errors'], delta_color="inverse")

    st.subheader("🏢 Company Intelligence")

    for company in results['companies'][:10]:
        with st.expander(f"📋 {company.get('name', 'Unknown')} - Grade {company.get('lead_grade', 'D')} (Score: {company.get('lead_score', 0):.0f})"):

            col1, col2 = st.columns(2)

            with col1:
                st.write(f"**Industry:** {company.get('industry', 'Unknown')}")
                st.write(f"**Domain:** {company.get('domain', 'Unknown')}")
                st.write(f"**ICP Score:** {company.get('icp_score', 0)}")

                triggers = company.get('trigger_events', [])
                if triggers:
                    st.write("**🎯 Trigger Events:**")
                    for trigger in triggers:
                        severity_emoji = {"high": "🔥", "medium": "⚡", "low": "💡"}.get(trigger.get('severity', 'low'), '💡')
                        st.write(f"{severity_emoji} {trigger.get('description', 'Unknown trigger')}")

            with col2:
                contacts = company.get('contacts', [])
                if contacts:
                    st.write("**👥 Decision Makers:**")
                    for contact in contacts:
                        confidence = contact.get('confidence_score', 0)
                        confidence_color = "🟢" if confidence >= 75 else "🟡" if confidence >= 50 else "🔴"

                        st.write(f"{confidence_color} **{contact.get('first_name', '')} {contact.get('last_name', '')}**")
                        st.write(f"   {contact.get('title', 'Unknown title')}")
                        st.write(f"   📧 {contact.get('email', 'No email')}")
                        st.write(f"   Confidence: {confidence}%")

                        message = contact.get('generated_message', {})
                        if message.get('subject'):
                            st.write(f"   **Subject:** {message['subject']}")
                        if message.get('body'):
                            preview = message['body'][:100] + "..." if len(message['body']) > 100 else message['body']
                            st.write(f"   **Preview:** {preview}")
                        st.write("---")

    st.subheader("📥 Export & Actions")

    col1, col2, col3 = st.columns(3)

    with col1:
        export_data = []
        for company in results['companies']:
            for contact in company.get('contacts', []):
                export_data.append({
                    'Company': company.get('name', ''),
                    'Industry': company.get('industry', ''),
                    'Lead Grade': company.get('lead_grade', ''),
                    'Lead Score': company.get('lead_score', 0),
                    'Trigger Count': len(company.get('trigger_events', [])),
                    'Contact Name': f"{contact.get('first_name', '')} {contact.get('last_name', '')}",
                    'Title': contact.get('title', ''),
                    'Email': contact.get('email', ''),
                    'Confidence': contact.get('confidence_score', 0),
                    'Subject Line': contact.get('generated_message', {}).get('subject', ''),
                    'Message': contact.get('generated_message', {}).get('body', '')
                })

        if export_data:
            df = pd.DataFrame(export_data)
            csv = df.to_csv(index=False)

            st.download_button(
                label="📄 Download Full Report (CSV)",
                data=csv,
                file_name=f"ai_bdr_prospects_{datetime.now().strftime('%Y%m%d_%H%M')}.csv",
                mime="text/csv",
                use_container_width=True
            )

    with col2:
        if st.button("🔄 Sync to HubSpot CRM", use_container_width=True):
            if not os.getenv("HUBSPOT_API_KEY"):
                st.warning("HubSpot API key required for CRM export")
            else:
                with st.spinner("Syncing to HubSpot..."):
                    pipeline_agent = create_pipeline_manager_agent()
                    new_crm_results = pipeline_agent.tools[1]._run(results['companies'], min_lead_grade)
                    st.session_state.workflow_results['crm_results'] = new_crm_results
                st.rerun()

    with col3:
        if st.button("🗑️ Clear Results", use_container_width=True):
            st.session_state.workflow_results = None
            st.rerun()

if __name__ == "__main__":
    pass

该 Streamlit 编排系统以高效工作流协调所有智能体,提供实时进度追踪与可自定义设置。它以清晰的结果展示指标、详尽的公司信息与导出选项。该界面使复杂的多智能体操作通过直观仪表盘即可管理,销售团队无需技术背景即可使用。

通过 Streamlit 展示的 AI SDR 智能体界面

运行你的 AI BDR 系统

运行该应用以启动智能获客工作流。在终端中进入你的项目目录。

streamlit run ai_bdr_system.py
SDR 智能体演示

你将看到系统如何以智能工作流满足你的需求:

  1. 使用实时数据校验,发现符合你理想客户画像的公司。
  2. 跟踪来自多种来源的触发事件以把握最佳时机。
  3. 通过多种来源查询决策者联系方式,并基于可靠性进行评分。
  4. 生成提及具体商业洞察的个性化消息。
  5. 自动为线索打分,并将合格的潜在客户加入你的 CRM 管道。

结语

该 AI BDR 系统展示了自动化如何优化获客与线索资格鉴定流程。若希望进一步增强你的销售管道,可考虑使用 Bright Data 的产品,例如我们的LinkedIn 数据集(用于精准联系人与公司数据),以及为 BDR 和销售团队打造的其他数据集与自动化工具。

Bright Data 文档中探索更多解决方案。

创建一个免费的 Bright Data 账户,开始构建你的自动化 BDR 工作流。

支持支付宝等多种支付方式