AI

使用 Bright Data 和 OpenHands SDK 构建企业级 TPRM 代理

本指南将向你展示如何使用 Bright Data API 和 OpenHands SDK 构建可扩展的 TPRM 代理,实现供应商风险筛查自动化。
20 分钟阅读
OpenHands 代理 SDK 与 Bright Data

在本指南中,你将学习:

  • 什么是第三方风险管理(TPRM),以及为什么人工筛查会失效
  • 如何构建一个能够自主调查供应商不利舆情的 AI 代理
  • 如何集成 Bright Data SERP APIWeb Unlocker,获取可靠且最新的网络数据
  • 如何使用 OpenHands SDK 进行代理式脚本生成,并使用 OpenAI 进行风险分析
  • 如何使用 Browser API 增强代理能力,以应对法院登记处等复杂场景

让我们开始吧!

人工供应商筛查的问题

企业合规团队面临一项几乎不可能完成的任务:在全网范围内持续监控数百家第三方供应商的风险信号。传统方法通常包括:

  • 手动 Google 搜索:对每个供应商名称结合“诉讼”“破产”“欺诈”等关键词逐一检索
  • 在尝试访问新闻文章和法院记录时遇到付费墙和验证码
  • 文档记录不统一,对调查结果没有标准化记录流程
  • 缺乏持续监控:供应商只在准入时筛查一次,之后就不再复查

这种方法在三个关键方面会失败:

  1. 规模:一个分析师每天最多只能彻底调查 5–10 家供应商
  2. 访问:法院登记处、付费新闻网站等受保护资源会阻止自动化访问
  3. 连续性:一次性的准入评估无法发现后续出现的风险

解决方案:自治 TPRM 代理

TPRM 代理通过三个专业层自动化整个供应商调查流程:

  • 发现层(SERP API):代理通过 Google 搜索诉讼、监管处罚、财务困境等风险信号
  • 访问层(Web Unlocker):当相关结果被付费墙或验证码保护时,代理通过 Web Unlocker 绕过这些障碍并提取完整内容
  • 行动层(OpenAI + OpenHands SDK):代理使用 OpenAI 分析内容风险等级,再通过 OpenHands SDK 生成 Python 监控脚本,每日检查新的不利媒体

该系统可以将数小时的人工调研缩短为几分钟的自动分析。

前置条件

在开始之前,请确保你已经具备:

  • Python 3.12 或更高版本(OpenHands SDK 要求)
  • 一个具有 API 访问权限的 Bright Data 账号(可使用免费试用)
  • 用于风险分析的 OpenAI API 密钥
  • 用于代理式脚本生成的 OpenHands Cloud 账号 或你自己的 LLM API 密钥
  • 对 Python 和 REST API 有基本了解

项目架构

TPRM 代理遵循三阶段流水线:

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   DISCOVERY     │────▶│     ACCESS      │────▶│     ACTION      │
│   (SERP API)    │     │ (Web Unlocker)  │     │ (OpenAI + SDK)  │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       │                       │
   Search Google          Bypass paywalls         Analyze risks
   for red flags          and CAPTCHAs           Generate scripts

创建如下项目结构:

tprm-agent/
├── src/
│   ├── __init__.py
│   ├── config.py         # Configuration
│   ├── discovery.py      # SERP API integration
│   ├── access.py         # Web Unlocker integration
│   ├── actions.py        # OpenAI + OpenHands SDK
│   ├── agent.py          # Main orchestration
│   └── browser.py        # Browser API (enhancement)
├── api/
│   └── main.py           # FastAPI endpoints
├── scripts/
│   └── generated/        # Auto-generated monitoring scripts
├── .env
├── requirements.txt
└── README.md

环境搭建

创建虚拟环境并安装依赖:

python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

pip install requests fastapi uvicorn python-dotenv pydantic openai beautifulsoup4 playwright openhands-sdk openhands-tools

创建 .env 文件保存你的 API 凭据:

# Bright Data API Token (for SERP API)
BRIGHT_DATA_API_TOKEN=your_api_token

# Bright Data SERP Zone
BRIGHT_DATA_SERP_ZONE=your_serp_zone_name

# Bright Data Web Unlocker credentials
BRIGHT_DATA_CUSTOMER_ID=your_customer_id
BRIGHT_DATA_UNLOCKER_ZONE=your_unlocker_zone_name
BRIGHT_DATA_UNLOCKER_PASSWORD=your_zone_password

# OpenAI (for risk analysis)
OPENAI_API_KEY=your_openai_api_key

# OpenHands (for agentic script generation)
# Use OpenHands Cloud: openhands/claude-sonnet-4-5-20260929
# Or bring your own: anthropic/claude-sonnet-4-5-20260929
LLM_API_KEY=your_llm_api_key
LLM_MODEL=openhands/claude-sonnet-4-5-20260929

Bright Data 配置

步骤一:创建 Bright Data 账号

Bright Data 注册并进入控制台。

步骤二:配置 SERP API 区

  1. 进入 Proxies & Scraping Infrastructure
  2. 点击 Add 并选择 SERP API
  3. 为你的区命名(例如 tprm_serp
  4. 复制区名称,并在 Settings > API tokens 中记下 API token

SERP API 会返回来自 Google 的结构化搜索结果,不会被封锁。在搜索 URL 中添加 brd_json=1 可获取解析后的 JSON 输出。
Bright Data 仪表盘 SERP API

步骤三:配置 Web Unlocker 区

  1. 点击 Add 并选择 Web Unlocker
  2. 为你的区命名(例如 tprm_unlocker
  3. 复制区凭据(用户名格式:brd-customer-CUSTOMER_ID-zone-ZONE_NAME

Web Unlocker 通过代理端点自动处理验证码、指纹和 IP 轮换。
Bright Data 仪表盘 Web Unlocker API

构建发现层(SERP API)

发现层借助 SERP API 在 Google 中搜索与供应商相关的不利舆情。创建 src/discovery.py

import requests
from typing import Optional
from dataclasses import dataclass
from urllib.parse import quote_plus
from config import settings


@dataclass
class SearchResult:
    title: str
    url: str
    snippet: str
    source: str


class DiscoveryClient:
    """Search for adverse media using Bright Data SERP API (Direct API)."""

    RISK_CATEGORIES = {
        "litigation": ["lawsuit", "litigation", "sued", "court case", "legal action"],
        "financial": ["bankruptcy", "insolvency", "debt", "financial trouble", "default"],
        "fraud": ["fraud", "scam", "investigation", "indictment", "scandal"],
        "regulatory": ["violation", "fine", "penalty", "sanctions", "compliance"],
        "operational": ["recall", "safety issue", "supply chain", "disruption"],
    }

    def __init__(self):
        self.api_url = "https://api.brightdata.com/request"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {settings.BRIGHT_DATA_API_TOKEN}",
        }

    def _build_queries(self, vendor_name: str, categories: Optional[list] = None) -> list[str]:
        """Build search queries for each risk category."""
        categories = categories or list(self.RISK_CATEGORIES.keys())
        queries = []

        for category in categories:
            keywords = self.RISK_CATEGORIES.get(category, [])
            keyword_str = " OR ".join(keywords)
            query = f'"{vendor_name}" ({keyword_str})'
            queries.append(query)

        return queries

    def search(self, query: str) -> list[SearchResult]:
        """Execute a single search query using Bright Data SERP API."""
        try:
            # Build Google search URL with brd_json=1 for parsed JSON
            encoded_query = quote_plus(query)
            google_url = f"https://www.google.com/search?q={encoded_query}&hl=en&gl=us&brd_json=1"

            payload = {
                "zone": settings.BRIGHT_DATA_SERP_ZONE,
                "url": google_url,
                "format": "raw",
            }

            response = requests.post(
                self.api_url,
                headers=self.headers,
                json=payload,
                timeout=30,
            )
            response.raise_for_status()
            data = response.json()

            results = []
            organic = data.get("organic", [])

            for item in organic:
                results.append(
                    SearchResult(
                        title=item.get("title", ""),
                        url=item.get("link", ""),
                        snippet=item.get("description", ""),
                        source=item.get("displayed_link", ""),
                    )
                )
            return results

        except Exception as e:
            print(f"Search error: {e}")
            return []

    def discover_adverse_media(
        self,
        vendor_name: str,
        categories: Optional[list] = None,
    ) -> dict[str, list[SearchResult]]:
        """Search for adverse media across all risk categories."""
        queries = self._build_queries(vendor_name, categories)
        category_names = categories or list(self.RISK_CATEGORIES.keys())

        categorized_results = {}
        for category, query in zip(category_names, queries):
            print(f"  Searching: {category}...")
            results = self.search(query)
            categorized_results[category] = results

        return categorized_results

    def filter_relevant_results(
        self, results: dict[str, list[SearchResult]], vendor_name: str
    ) -> dict[str, list[SearchResult]]:
        """Filter out irrelevant results."""
        filtered = {}
        vendor_lower = vendor_name.lower()

        for category, items in results.items():
            relevant = []
            for item in items:
                if (
                    vendor_lower in item.title.lower()
                    or vendor_lower in item.snippet.lower()
                ):
                    relevant.append(item)
            filtered[category] = relevant

        return filtered

SERP API 返回包含自然结果的结构化 JSON,方便你解析每条搜索结果的标题、URL 和摘要。

构建访问层(Web Unlocker)

当发现层找到相关 URL 后,访问层会使用 Web Unlocker API 抓取完整内容。创建 src/access.py

import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import Optional
from config import settings


@dataclass
class ExtractedContent:
    url: str
    title: str
    text: str
    publish_date: Optional[str]
    author: Optional[str]
    success: bool
    error: Optional[str] = None


class AccessClient:
    """Access protected content using Bright Data Web Unlocker (API-based)."""

    def __init__(self):
        self.api_url = "https://api.brightdata.com/request"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {settings.BRIGHT_DATA_API_TOKEN}",
        }

    def fetch_url(self, url: str) -> ExtractedContent:
        """Fetch and extract content from a URL using Web Unlocker API."""
        try:
            payload = {
                "zone": settings.BRIGHT_DATA_UNLOCKER_ZONE,
                "url": url,
                "format": "raw",
            }

            response = requests.post(
                self.api_url,
                headers=self.headers,
                json=payload,
                timeout=60,
            )
            response.raise_for_status()

            # Web Unlocker API returns the HTML directly
            html_content = response.text
            content = self._extract_content(html_content, url)
            return content

        except requests.Timeout:
            return ExtractedContent(
                url=url,
                title="",
                text="",
                publish_date=None,
                author=None,
                success=False,
                error="Request timed out",
            )
        except Exception as e:
            return ExtractedContent(
                url=url,
                title="",
                text="",
                publish_date=None,
                author=None,
                success=False,
                error=str(e),
            )

    def _extract_content(self, html: str, url: str) -> ExtractedContent:
        """Extract article content from HTML."""
        soup = BeautifulSoup(html, "html.parser")

        # Remove unwanted elements
        for element in soup(["script", "style", "nav", "footer", "header", "aside"]):
            element.decompose()

        # Extract title
        title = ""
        if soup.title:
            title = soup.title.string or ""
        elif soup.find("h1"):
            title = soup.find("h1").get_text(strip=True)

        # Extract main content
        article = soup.find("article") or soup.find("main") or soup.find("body")
        text = article.get_text(separator="\n", strip=True) if article else ""

        # Limit text length
        text = text[:10000] if len(text) > 10000 else text

        # Try to extract publish date
        publish_date = None
        date_meta = soup.find("meta", {"property": "article:published_time"})
        if date_meta:
            publish_date = date_meta.get("content")

        # Try to extract author
        author = None
        author_meta = soup.find("meta", {"name": "author"})
        if author_meta:
            author = author_meta.get("content")

        return ExtractedContent(
            url=url,
            title=title,
            text=text,
            publish_date=publish_date,
            author=author,
            success=True,
        )

    def fetch_multiple(self, urls: list[str]) -> list[ExtractedContent]:
        """Fetch multiple URLs sequentially."""
        results = []
        for url in urls:
            print(f"  Fetching: {url[:60]}...")
            content = self.fetch_url(url)
            if not content.success:
                print(f"  Error: {content.error}")
            results.append(content)
        return results

Web Unlocker 会自动处理验证码、浏览器指纹和 IP 轮换,你只需将请求通过代理路由,其余工作由其自动完成。

构建行动层(OpenAI + OpenHands SDK)

行动层使用 OpenAI 分析风险严重程度,并使用 OpenHands SDK 生成基于 Bright Data Web Unlocker API 的监控脚本。OpenHands SDK 提供“代理式”能力:代理可以推理、编辑文件并执行命令,直到脚本可在生产中稳定运行。

创建 src/actions.py

import os
import json
from datetime import datetime, UTC
from dataclasses import dataclass, asdict
from openai import OpenAI
from pydantic import SecretStr
from openhands.sdk import LLM, Agent, Conversation, Tool
from openhands.tools.terminal import TerminalTool
from openhands.tools.file_editor import FileEditorTool
from config import settings


@dataclass
class RiskAssessment:
    vendor_name: str
    category: str
    severity: str
    summary: str
    key_findings: list[str]
    sources: list[str]
    recommended_actions: list[str]
    assessed_at: str


@dataclass
class MonitoringScript:
    vendor_name: str
    script_path: str
    urls_monitored: list[str]
    check_frequency: str
    created_at: str


class ActionsClient:
    """Analyze risks using OpenAI and generate monitoring scripts using OpenHands SDK."""

    def __init__(self):
        # OpenAI for risk analysis
        self.openai_client = OpenAI(api_key=settings.OPENAI_API_KEY)

        # OpenHands for agentic script generation
        self.llm = LLM(
            model=settings.LLM_MODEL,
            api_key=SecretStr(settings.LLM_API_KEY),
        )

        self.workspace = os.path.join(os.getcwd(), "scripts", "generated")
        os.makedirs(self.workspace, exist_ok=True)

    def analyze_risk(
        self,
        vendor_name: str,
        category: str,
        content: list[dict],
    ) -> RiskAssessment:
        """Analyze extracted content for risk severity using OpenAI."""
        content_summary = "\n\n".join(
            [f"Source: {c['url']}\nTitle: {c['title']}\nContent: {c['text'][:2000]}" for c in content]
        )

        prompt = f"""Analyze the following content about "{vendor_name}" for third-party risk assessment.

Category: {category}

Content:
{content_summary}

Provide a JSON response with:
{{
    "severity": "low|medium|high|critical",
    "summary": "2-3 sentence summary of findings",
    "key_findings": ["finding 1", "finding 2", ...],
    "recommended_actions": ["action 1", "action 2", ...]
}}

Consider:
- Severity should be based on potential business impact
- Critical = immediate action required (active fraud, bankruptcy filing)
- High = significant risk requiring investigation
- Medium = notable concern worth monitoring
- Low = minor issue or historical matter
"""

        response = self.openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"},
        )

        response_text = response.choices[0].message.content
        try:
            result = json.loads(response_text)
        except (json.JSONDecodeError, ValueError):
            result = {
                "severity": "medium",
                "summary": "Unable to parse risk assessment",
                "key_findings": [],
                "recommended_actions": ["Manual review required"],
            }

        return RiskAssessment(
            vendor_name=vendor_name,
            category=category,
            severity=result.get("severity", "medium"),
            summary=result.get("summary", ""),
            key_findings=result.get("key_findings", []),
            sources=[c["url"] for c in content],
            recommended_actions=result.get("recommended_actions", []),
            assessed_at=datetime.now(UTC).isoformat(),
        )

    def generate_monitoring_script(
        self,
        vendor_name: str,
        urls: list[str],
        check_keywords: list[str],
    ) -> MonitoringScript:
        """Generate a Python monitoring script using OpenHands SDK agent."""
        script_name = f"monitor_{vendor_name.lower().replace(' ', '_')}.py"
        script_path = os.path.join(self.workspace, script_name)

        prompt = f"""Create a Python monitoring script at {script_path} that:

1. Checks these URLs daily for new content: {urls[:5]}
2. Looks for these keywords: {check_keywords}
3. Sends an alert (print to console) if new relevant content is found
4. Logs all checks to a JSON file named 'monitoring_log.json'

The script MUST use Bright Data Web Unlocker API to bypass paywalls and CAPTCHAs:
- API endpoint: https://api.brightdata.com/request
- Use environment variable BRIGHT_DATA_API_TOKEN for the Bearer token
- Use environment variable BRIGHT_DATA_UNLOCKER_ZONE for the zone name
- Make POST requests with JSON payload: {{"zone": "zone_name", "url": "target_url", "format": "raw"}}
- Add header: "Authorization": "Bearer <token>"
- Add header: "Content-Type": "application/json"

The script should:
- Load Bright Data credentials from environment variables using python-dotenv
- Use the Bright Data Web Unlocker API for all HTTP requests (NOT plain requests.get)
- Handle errors gracefully with try/except
- Include a main() function that can be run directly
- Support being scheduled via cron
- Store content hashes to detect changes

Write the complete script to {script_path}.
"""

        # Create OpenHands agent with terminal and file editor tools
        agent = Agent(
            llm=self.llm,
            tools=[
                Tool(name=TerminalTool.name),
                Tool(name=FileEditorTool.name),
            ],
        )

        # Run the agent to generate the script
        conversation = Conversation(agent=agent, workspace=self.workspace)
        conversation.send_message(prompt)
        conversation.run()

        return MonitoringScript(
            vendor_name=vendor_name,
            script_path=script_path,
            urls_monitored=urls[:5],
            check_frequency="daily",
            created_at=datetime.now(UTC).isoformat(),
        )

    def export_assessment(self, assessment: RiskAssessment, output_path: str) -> None:
        """Export risk assessment to JSON file."""
        with open(output_path, "w") as f:
            json.dump(asdict(assessment), f, indent=2)

与简单的“单次 Prompt 生成代码”相比,使用 OpenHands SDK 的优势在于代理可以对自己的工作进行迭代:运行脚本、修复错误并持续改进,直到脚本真正可用。

代理编排

现在将所有模块串联起来。创建 src/agent.py

from dataclasses import dataclass
from datetime import datetime, UTC
from typing import Optional

from discovery import DiscoveryClient, SearchResult
from access import AccessClient, ExtractedContent
from actions import ActionsClient, RiskAssessment, MonitoringScript


@dataclass
class InvestigationResult:
    vendor_name: str
    started_at: str
    completed_at: str
    total_sources_found: int
    total_sources_accessed: int
    risk_assessments: list[RiskAssessment]
    monitoring_scripts: list[MonitoringScript]
    errors: list[str]


class TPRMAgent:
    """Autonomous agent for Third-Party Risk Management investigations."""

    def __init__(self):
        self.discovery = DiscoveryClient()
        self.access = AccessClient()
        self.actions = ActionsClient()

    def investigate(
        self,
        vendor_name: str,
        categories: Optional[list[str]] = None,
        generate_monitors: bool = True,
    ) -> InvestigationResult:
        """Run a complete vendor investigation."""
        started_at = datetime.now(UTC).isoformat()
        errors = []
        risk_assessments = []
        monitoring_scripts = []

        # Stage 1: Discovery (SERP API)
        print(f"[Discovery] Searching for adverse media about {vendor_name}...")
        try:
            raw_results = self.discovery.discover_adverse_media(vendor_name, categories)
            filtered_results = self.discovery.filter_relevant_results(raw_results, vendor_name)
        except Exception as e:
            errors.append(f"Discovery failed: {str(e)}")
            return InvestigationResult(
                vendor_name=vendor_name,
                started_at=started_at,
                completed_at=datetime.now(UTC).isoformat(),
                total_sources_found=0,
                total_sources_accessed=0,
                risk_assessments=[],
                monitoring_scripts=[],
                errors=errors,
            )

        total_sources = sum(len(results) for results in filtered_results.values())
        print(f"[Discovery] Found {total_sources} relevant sources")

        # Stage 2: Access (Web Unlocker)
        print(f"[Access] Extracting content from sources...")
        all_urls = []
        url_to_category = {}
        for category, results in filtered_results.items():
            for result in results:
                all_urls.append(result.url)
                url_to_category[result.url] = category

        try:
            extracted_content = self.access.fetch_multiple(all_urls)
            successful_extractions = [c for c in extracted_content if c.success]
        except Exception as e:
            error_msg = f"Access failed: {str(e)}"
            print(f"[Access] {error_msg}")
            errors.append(error_msg)
            successful_extractions = []

        print(f"[Access] Successfully extracted {len(successful_extractions)} sources")

        # Stage 3: Action - Analyze risks (OpenAI)
        print(f"[Action] Analyzing risks...")
        category_content = {}
        for content in successful_extractions:
            category = url_to_category.get(content.url, "unknown")
            if category not in category_content:
                category_content[category] = []
            category_content[category].append({
                "url": content.url,
                "title": content.title,
                "text": content.text,
            })

        for category, content_list in category_content.items():
            if not content_list:
                continue
            try:
                assessment = self.actions.analyze_risk(vendor_name, category, content_list)
                risk_assessments.append(assessment)
            except Exception as e:
                errors.append(f"Risk analysis failed for {category}: {str(e)}")

        # Stage 3: Action - Generate monitoring scripts
        if generate_monitors and successful_extractions:
            print(f"[Action] Generating monitoring scripts...")
            try:
                urls_to_monitor = [c.url for c in successful_extractions[:10]]
                keywords = [vendor_name, "lawsuit", "bankruptcy", "fraud"]
                script = self.actions.generate_monitoring_script(
                    vendor_name, urls_to_monitor, keywords
                )
                monitoring_scripts.append(script)
            except Exception as e:
                errors.append(f"Script generation failed: {str(e)}")

        completed_at = datetime.now(UTC).isoformat()
        print(f"[Complete] Investigation finished")

        return InvestigationResult(
            vendor_name=vendor_name,
            started_at=started_at,
            completed_at=completed_at,
            total_sources_found=total_sources,
            total_sources_accessed=len(successful_extractions),
            risk_assessments=risk_assessments,
            monitoring_scripts=monitoring_scripts,
            errors=errors,
        )


def main():
    """Example usage."""
    agent = TPRMAgent()
    result = agent.investigate("Acme Corp")

    print(f"\n{'='*50}")
    print(f"Investigation Complete: {result.vendor_name}")
    print(f"Sources Found: {result.total_sources_found}")
    print(f"Sources Accessed: {result.total_sources_accessed}")
    print(f"Risk Assessments: {len(result.risk_assessments)}")
    print(f"Monitoring Scripts: {len(result.monitoring_scripts)}")

    for assessment in result.risk_assessments:
        print(f"\n[{assessment.category.upper()}] Severity: {assessment.severity}")
        print(f"Summary: {assessment.summary}")


if __name__ == "__main__":
    main()

该代理协调三个层次的流程,优雅地处理错误,并输出一份完整的调查结果。

配置

创建 src/config.py 来配置应用运行所需的密钥与凭据:

import os
from dotenv import load_dotenv

load_dotenv()


class Settings:
    # SERP API
    BRIGHT_DATA_API_TOKEN: str = os.getenv("BRIGHT_DATA_API_TOKEN", "")
    BRIGHT_DATA_SERP_ZONE: str = os.getenv("BRIGHT_DATA_SERP_ZONE", "")

    # Web Unlocker
    BRIGHT_DATA_CUSTOMER_ID: str = os.getenv("BRIGHT_DATA_CUSTOMER_ID", "")
    BRIGHT_DATA_UNLOCKER_ZONE: str = os.getenv("BRIGHT_DATA_UNLOCKER_ZONE", "")
    BRIGHT_DATA_UNLOCKER_PASSWORD: str = os.getenv("BRIGHT_DATA_UNLOCKER_PASSWORD", "")

    # OpenAI (for risk analysis)
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")

    # OpenHands (for agentic script generation)
    LLM_API_KEY: str = os.getenv("LLM_API_KEY", "")
    LLM_MODEL: str = os.getenv("LLM_MODEL", "openhands/claude-sonnet-4-5-20260929")


settings = Settings()

构建 API 层

使用 FastAPI 创建 api/main.py,通过 REST 接口对外暴露代理能力:

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import uuid
import sys
sys.path.insert(0, 'src')

from agent import TPRMAgent, InvestigationResult

app = FastAPI(
    title="TPRM Agent API",
    description="Autonomous Third-Party Risk Management Agent",
    version="1.0.0",
)

investigations: dict[str, InvestigationResult] = {}
agent = TPRMAgent()


class InvestigationRequest(BaseModel):
    vendor_name: str
    categories: Optional[list[str]] = None
    generate_monitors: bool = True


class InvestigationResponse(BaseModel):
    investigation_id: str
    status: str
    message: str


@app.post("/investigate", response_model=InvestigationResponse)
def start_investigation(
    request: InvestigationRequest,
    background_tasks: BackgroundTasks,
):
    """Start a new vendor investigation."""
    investigation_id = str(uuid.uuid4())

    def run_investigation():
        result = agent.investigate(
            vendor_name=request.vendor_name,
            categories=request.categories,
            generate_monitors=request.generate_monitors,
        )
        investigations[investigation_id] = result

    background_tasks.add_task(run_investigation)

    return InvestigationResponse(
        investigation_id=investigation_id,
        status="started",
        message=f"Investigation started for {request.vendor_name}",
    )


@app.get("/investigate/{investigation_id}")
def get_investigation(investigation_id: str):
    """Get investigation results."""
    if investigation_id not in investigations:
        raise HTTPException(status_code=404, detail="Investigation not found or still in progress")

    return investigations[investigation_id]


@app.get("/reports/{vendor_name}")
def get_reports(vendor_name: str):
    """Get all reports for a vendor."""
    vendor_reports = [
        result
        for result in investigations.values()
        if result.vendor_name.lower() == vendor_name.lower()
    ]

    if not vendor_reports:
        raise HTTPException(status_code=404, detail="No reports found for this vendor")

    return vendor_reports


@app.get("/health")
def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}

本地运行 API:

python -m uvicorn api.main:app --reload

访问 http://localhost:8000/docs 查看交互式 API 文档。
FastAPI 文档

使用 Browser API(Scraping Browser)增强能力

对于法院登记处等需要表单提交,或大量 JavaScript 渲染的网站,可以通过 Bright Data 的 Browser API(Scraping Browser)增强代理能力。其设置方式与 Web Unlocker API 和 SERP API 类似。
Bright Data 仪表盘 Browser API

Browser API 提供了云托管浏览器,你可以通过 Playwright 和 Chrome DevTools Protocol(CDP)进行控制,适用于:

  • 法院登记检索,需要表单填写和多步导航
  • 高度依赖 JavaScript 的网站,比如动态加载内容
  • 多步认证流程
  • 截图采集,用于合规存证

配置

.env 中添加 Browser API 凭据:

# Browser API
BRIGHT_DATA_BROWSER_USER: str = os.getenv("BRIGHT_DATA_BROWSER_USER", "")
BRIGHT_DATA_BROWSER_PASSWORD: str = os.getenv("BRIGHT_DATA_BROWSER_PASSWORD", "")

Browser 客户端实现

创建 src/browser.py

import asyncio
from playwright.async_api import async_playwright
from dataclasses import dataclass
from typing import Optional
from config import settings


@dataclass
class BrowserContent:
    url: str
    title: str
    text: str
    screenshot_path: Optional[str]
    success: bool
    error: Optional[str] = None


class BrowserClient:
    """Access dynamic content using Bright Data Browser API (Scraping Browser).

    Use this for:
    - JavaScript-heavy sites that require full rendering
    - Multi-step forms (e.g., court registry searches)
    - Sites requiring clicks, scrolling, or interaction
    - Capturing screenshots for compliance documentation
    """

    def __init__(self):
        # Build WebSocket endpoint for CDP connection
        auth = f"{settings.BRIGHT_DATA_BROWSER_USER}:{settings.BRIGHT_DATA_BROWSER_PASSWORD}"
        self.endpoint_url = f"wss://{auth}@brd.superproxy.io:9222"

    async def fetch_dynamic_page(
        self,
        url: str,
        wait_for_selector: Optional[str] = None,
        take_screenshot: bool = False,
        screenshot_path: Optional[str] = None,
    ) -> BrowserContent:
        """Fetch content from a dynamic page using Browser API."""
        async with async_playwright() as playwright:
            try:
                print(f"Connecting to Bright Data Scraping Browser...")
                browser = await playwright.chromium.connect_over_cdp(self.endpoint_url)

                try:
                    page = await browser.new_page()
                    print(f"Navigating to {url}...")
                    await page.goto(url, timeout=120000)

                    # Wait for specific selector if provided
                    if wait_for_selector:
                        await page.wait_for_selector(wait_for_selector, timeout=30000)

                    # Get page content
                    title = await page.title()

                    # Extract text
                    text = await page.evaluate("() => document.body.innerText")

                    # Take screenshot if requested
                    if take_screenshot and screenshot_path:
                        await page.screenshot(path=screenshot_path, full_page=True)

                    return BrowserContent(
                        url=url,
                        title=title,
                        text=text[:10000],
                        screenshot_path=screenshot_path if take_screenshot else None,
                        success=True,
                    )

                finally:
                    await browser.close()

            except Exception as e:
                return BrowserContent(
                    url=url,
                    title="",
                    text="",
                    screenshot_path=None,
                    success=False,
                    error=str(e),
                )

    async def fill_and_submit_form(
        self,
        url: str,
        form_data: dict[str, str],
        submit_selector: str,
        result_selector: str,
    ) -> BrowserContent:
        """Fill a form and get results - useful for court registries."""
        async with async_playwright() as playwright:
            try:
                browser = await playwright.chromium.connect_over_cdp(self.endpoint_url)

                try:
                    page = await browser.new_page()
                    await page.goto(url, timeout=120000)

                    # Fill form fields
                    for selector, value in form_data.items():
                        await page.fill(selector, value)

                    # Submit form
                    await page.click(submit_selector)

                    # Wait for results
                    await page.wait_for_selector(result_selector, timeout=30000)

                    title = await page.title()
                    text = await page.evaluate("() => document.body.innerText")

                    return BrowserContent(
                        url=url,
                        title=title,
                        text=text[:10000],
                        screenshot_path=None,
                        success=True,
                    )

                finally:
                    await browser.close()

            except Exception as e:
                return BrowserContent(
                    url=url,
                    title="",
                    text="",
                    screenshot_path=None,
                    success=False,
                    error=str(e),
                )

    async def scroll_and_collect(
        self,
        url: str,
        scroll_count: int = 5,
        wait_between_scrolls: float = 1.0,
    ) -> BrowserContent:
        """Handle infinite scroll pages."""
        async with async_playwright() as playwright:
            try:
                browser = await playwright.chromium.connect_over_cdp(self.endpoint_url)

                try:
                    page = await browser.new_page()
                    await page.goto(url, timeout=120000)

                    # Scroll down multiple times
                    for i in range(scroll_count):
                        await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
                        await asyncio.sleep(wait_between_scrolls)

                    title = await page.title()
                    text = await page.evaluate("() => document.body.innerText")

                    return BrowserContent(
                        url=url,
                        title=title,
                        text=text[:10000],
                        screenshot_path=None,
                        success=True,
                    )

                finally:
                    await browser.close()

            except Exception as e:
                return BrowserContent(
                    url=url,
                    title="",
                    text="",
                    screenshot_path=None,
                    success=False,
                    error=str(e),
                )


# Example usage for court registry search
async def example_court_search():
    client = BrowserClient()

    # Example: Search a court registry
    result = await client.fill_and_submit_form(
        url="https://example-court-registry.gov/search",
        form_data={
            "#party-name": "Acme Corp",
            "#case-type": "civil",
        },
        submit_selector="#search-button",
        result_selector=".search-results",
    )

    if result.success:
        print(f"Found court records: {result.text[:500]}")
    else:
        print(f"Error: {result.error}")


if __name__ == "__main__":
    asyncio.run(example_court_search())

何时使用 Browser API,何时使用 Web Unlocker

场景 选择
简单 HTTP 请求 Web Unlocker
静态 HTML 页面 Web Unlocker
页面加载时触发验证码 Web Unlocker
JavaScript 渲染内容 Browser API
需要表单提交 Browser API
多步导航流程 Browser API
需要截图 Browser API

使用 Railway 部署

你可以使用 Railway 或 Render 将 TPRM 代理部署到生产环境,这两种平台都支持依赖较重的 Python 应用。

Railway 对于包含 OpenHands SDK 等大型依赖的 Python 应用来说是最简单的部署选项。你需要先注册并创建账号。

步骤一:全局安装 Railway CLI

npm i -g @railway/cli

步骤二:添加 Procfile

在项目根目录创建 Procfile 并添加以下内容,作为启动命令配置:

web: uvicorn api.main:app --host 0.0.0.0 --port $PORT

步骤三:登录并在项目目录初始化 Railway

railway login
railway init

步骤四:部署

railway up
Railway 初始化与部署

步骤五:添加环境变量

进入 Railway 项目控制台 → SettingsShared Variables,添加如下环境变量及其值:

BRIGHT_DATA_API_TOKEN
BRIGHT_DATA_SERP_ZONE
BRIGHT_DATA_UNLOCKER_ZONE
OPENAI_API_KEY
LLM_API_KEY
LLM_MODEL
为 Railway 应用添加环境变量

Railway 会自动检测到变更并在控制台提示你重新部署。点击 Deploy 即可用新密钥更新应用。
添加变量后重新部署 Railway 应用

重新部署完成后,点击服务卡片并选择 Settings,即可看到生成域名的入口。点击 Generate domain 获取公共访问 URL。
为 Railway 应用生成域名

运行完整调查

在本地使用 curl 运行

启动 FastAPI 服务:

# Activate your virtual environment
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Run the server
python -m uvicorn api.main:app --reload

访问 http://localhost:8000/docs 浏览交互式 API 文档。

发起 API 请求

  • 发起一次调查:
curl -X POST "http://localhost:8000/investigate" \
  -H "Content-Type: application/json" \
  -d '{
    "vendor_name": "Acme Corp",
    "categories": ["litigation", "fraud"],
    "generate_monitors": true
  }'
  • 返回的结果包含调查 ID:
{
  "investigation_id": "f6af2e0f-991a-4cb7-949e-2f316e677b5c",
  "status": "started",
  "message": "Investigation started for Acme Corp"
}
  • 查询调查状态:
curl http://localhost:8000/investigate/f6af2e0f-991a-4cb7-949e-2f316e677b5c

以脚本方式运行代理

在项目根目录创建 run_investigation.py

import sys
sys.path.insert(0, 'src')

from agent import TPRMAgent

def investigate_vendor():
    """Run a complete vendor investigation."""
    agent = TPRMAgent()

    # Run investigation
    result = agent.investigate(
        vendor_name="Acme Corp",
        categories=["litigation", "financial", "fraud"],
        generate_monitors=True,
    )

    # Print summary
    print(f"\n{'='*60}")
    print(f"Investigation Complete: {result.vendor_name}")
    print(f"{'='*60}")
    print(f"Sources Found: {result.total_sources_found}")
    print(f"Sources Accessed: {result.total_sources_accessed}")
    print(f"Risk Assessments: {len(result.risk_assessments)}")
    print(f"Monitoring Scripts: {len(result.monitoring_scripts)}")

    # Print risk assessments
    for assessment in result.risk_assessments:
        print(f"\n{'─'*60}")
        print(f"[{assessment.category.upper()}] Severity: {assessment.severity.upper()}")
        print(f"{'─'*60}")
        print(f"Summary: {assessment.summary}")
        print("\nKey Findings:")
        for finding in assessment.key_findings:
            print(f"  • {finding}")
        print("\nRecommended Actions:")
        for action in assessment.recommended_actions:
            print(f"  → {action}")

    # Print monitoring script info
    for script in result.monitoring_scripts:
        print(f"\n{'='*60}")
        print(f"Generated Monitoring Script")
        print(f"{'='*60}")
        print(f"Path: {script.script_path}")
        print(f"Monitoring {len(script.urls_monitored)} URLs")
        print(f"Frequency: {script.check_frequency}")

    # Print errors if any
    if result.errors:
        print(f"\n{'='*60}")
        print("Errors:")
        for error in result.errors:
            print(f"  ⚠️  {error}")

if __name__ == "__main__":
    investigate_vendor()

在新终端运行调查脚本:

# Activate your virtual environment
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Run the investigation script
python run_investigation.py

代理将会:

  1. 使用 SERP API 在 Google 搜索不利舆情
  2. 使用 Web Unlocker 访问源站内容
  3. 利用 OpenAI 评估风险严重程度
  4. 使用 OpenHands SDK 生成可通过 cron 调度的 Python 监控脚本
终端运行 tprm-agent 应用

运行自动生成的监控脚本

调查完成后,你会在 scripts/generated 目录中看到生成的监控脚本:

cd scripts/generated
python monitor_acme_corp.py

该监控脚本会使用 Bright Data Web Unlocker API 检查所有监控 URL,并输出运行结果:
终端运行 tprm-agent 生成的监控脚本

你可以配合 cron 等调度工具定时运行该脚本,从而随时掌握供应商最新风险信息。

总结

你现在已经拥有一整套用于构建企业级 TPRM 代理的框架,可以自动化完成供应商不利舆情调查。该系统可以:

模块化架构使其易于扩展:

  • 通过更新 RISK_CATEGORIES 字典添加新的风险类别
  • 扩展 API 层以与现有 GRC 平台集成
  • 利用后台任务队列扩展到成千上万家供应商
  • 通过 Browser API 添加法院登记等高级检索能力

下一步

若要进一步提升该代理,可以考虑:

  • 集成更多数据源:例如 SEC 披露文件、OFAC 制裁名单、工商登记信息等
  • 加入数据库持久化:将调查历史存入 PostgreSQL 或 MongoDB
  • 实现 webhook 通知:在发现高风险供应商时推送 Slack 或 Teams 警报
  • 构建可视化看板:用 React 等框架构建前端展示供应商风险评分
  • 调度自动扫描:使用 Celery 或 APScheduler 做周期性供应商监控

参考资源

支持支付宝等多种支付方式

Arindam Majumder

AI 内容创作者

Arindam Majumder 是一名开发者推广专员、YouTube博主和技术作家,专注于将大语言模型 (LLM)、智能体工作流及 AI 内容讲解得简单易懂,拥有超过5000名关注者。

Expertise