Blog / AI
AI

使用 Bright Data 构建 LinkedIn 求职 AI 助手

了解如何使用 Bright Data 和 OpenAI 构建强大的 LinkedIn 求职助手,实现职位自动搜索、AI 匹配打分,全面提效你的求职流程。
2 分钟阅读
LinkedIn 求职 AI 助手(Bright Data 博客配图)

在本教程中,你将学到:

  1. AI 驱动的 LinkedIn 求职助手是如何工作的。
  2. 如何通过集成 Bright Data 的 LinkedIn 职位数据与 OpenAI 工作流来构建它。
  3. 如何改进并扩展该工作流,使其成为一个强健的求职助手。

你可以在此处查看最终项目文件。

开始吧!

LinkedIn 求职 AI 助手工作流解析

首先,没有 LinkedIn 职位列表数据,你无法构建一个 LinkedIn 求职 AI 助手。这正是 Bright Data 大显身手的地方!

借助 LinkedIn 职位抓取工具,你可以通过网页抓取获取 LinkedIn 的公开职位列表数据。体验就像在 LinkedIn Jobs 门户中搜索,但你拿到的不是网页,而是直接以 JSON 或 CSV 格式返回的结构化职位数据。

有了这些数据,你就可以让 AI 根据你的技能和目标职位为每个岗位打分。宏观来看,这正是 LinkedIn 求职 AI 助手为你做的事。

技术步骤

实现 LinkedIn 求职 AI 工作流所需的步骤如下:

  1. 加载 CLI 参数:解析命令行参数以获取运行时参数。这样无需改动代码即可灵活执行与个性化配置。
  2. 加载环境变量:从环境变量中读取 OpenAI 和 Bright Data 的 API Key。这些用于连接支撑此 AI 工作流的第三方集成。
  3. 加载配置文件:读取包含职位搜索参数、候选人资料详情和目标职位描述的 JSON 配置文件。该配置将指导职位抓取与 AI 打分。
  4. 从 LinkedIn 抓取职位:调用 LinkedIn Jobs Scraper API,按照配置过滤并获取职位列表。
  5. 通过 AI 为职位打分:将每一批职位发送给 OpenAI。AI 会基于你的资料与目标职位为每个岗位打出 0100 的分数,并附上简短说明,帮助理解匹配质量。
  6. 用 AI 评分与评论扩展职位数据:将 AI 生成的分数和评论并回合并到原始职位数据中,为每条职位记录新增这些 AI 字段。
  7. 导出带分数的职位数据:将富化后的职位数据导出为 CSV,便于进一步分析与处理。
  8. 打印最佳职位匹配:在控制台直接展示匹配度最高的职位要点,快速洞察最相关的机会。

看看如何用 Python 实现这个 AI 工作流!

如何使用 OpenAI 与 Bright Data 构建 LinkedIn 求职 AI 工作流

本教程将教你构建一个帮助你在 LinkedIn 上找工作的 AI 工作流。LinkedIn 职位数据来自 Bright Data,AI 能力由 OpenAI 提供。需要注意的是,你也可以使用其他任意 LLM。

完成本节后,你将拥有一个可在命令行运行的完整 Python AI 工作流。它会自动识别最合适的 LinkedIn 职位,让你把时间和精力聚焦在真正有希望的机会。

让我们开始构建一个 LinkedIn 求职 AI 助手!

前置条件

在继续之前,请确保你具备以下条件:

如果你还没有 Bright Data API Key,请创建 Bright Data 账号并按官方指南完成设置。类似地,按 OpenAI 官方说明获取你的 OpenAI API Key

步骤 #0:创建你的 Python 项目

打开终端,为 LinkedIn 求职 AI 助手创建一个新目录:

mkdir linkedin-job-hunting-ai-assistant/

linkedin-job-hunting-ai-assistant 文件夹将保存你的 AI 工作流的所有 Python 代码。

接着进入项目目录,并在其中初始化一个虚拟环境

cd linkedin-job-hunting-ai-assistant/
python -m venv venv

现在,用你喜欢的 Python IDE 打开项目。我们推荐安装 Python 扩展的 Visual Studio CodePyCharm Community Edition

在项目根目录下创建一个名为 assistant.py 的新文件。目录结构如下:

linkedin-job-hunting-ai-assistant/
├── venv/
└── assistant.py

在终端中激活虚拟环境。Linux 或 macOS:

source venv/bin/activate

Windows:

venv/Scripts/activate

接下来我们会引导你安装所需的 Python 包。如果你想现在一次性安装,在已激活的虚拟环境中运行:

pip install python-dotenv requests openai pydantic

具体依赖库包括:

  • python-dotenv:从 .env 文件加载环境变量,方便安全管理 API Key。
  • pydantic:帮助校验并将配置文件解析为结构化的 Python 对象。
  • requests:执行 HTTP 请求,调用 Bright Data 等 API 获取数据。
  • openai:提供与 OpenAI 语言模型交互的客户端,用于 AI 职位打分。

注意:这里我们安装了 openai 库,因为本教程使用 OpenAI 作为语言模型提供方。如果你计划使用其他 LLM,请安装相应的 SDK 或依赖。

一切就绪!你的 Python 开发环境已准备好使用 OpenAI 和 Bright Data 构建 AI 工作流。

步骤 #1:加载 CLI 参数

该 LinkedIn 求职 AI 脚本需要一些参数。为了在不修改代码的情况下保持复用与自定义能力,建议通过 CLI 读取。

你将需要以下命令行参数:

  • --config_file:包含职位搜索参数、候选人资料与目标职位描述的 JSON 配置文件路径。默认 config.json
  • --batch_size:每次发送给 AI 进行打分的职位数量。默认 5
  • --jobs_number:Bright Data LinkedIn Jobs Scraper 返回的职位条目最大数量。默认 20
  • --output_csv:包含 AI 打分与评论的富化职位数据输出 CSV 文件名。默认 jobs_scored.csv

使用如下函数从命令行读取这些参数:

def parse_cli_args():
    # Parse command-line arguments for config and runtime options
    parser = argparse.ArgumentParser(description="LinkedIn Job Hunting Assistant")
    parser.add_argument("--config_file", type=str, default="config.json", help="Path to config JSON file")
    parser.add_argument("--jobs_number", type=int, default=20, help="Limit the number of jobs returned by Bright Data Scraper API")
    parser.add_argument("--batch_size", type=int, default=5, help="Number of jobs to score in each batch")
    parser.add_argument("--output_csv", type=str, default="jobs_scored.csv", help="Output CSV filename")

    return parser.parse_args()

别忘了从 Python 标准库导入 argparse

import argparse

很好!你现在可以从 CLI 获取参数了。

步骤 #2:加载环境变量

将脚本配置为从环境变量读取密钥。为简化加载环境变量,使用 python-dotenv 包。在虚拟环境中安装:

pip install python-dotenv

然后在 assistant.py 中导入并调用 load_dotenv()

from dotenv import load_dotenv

load_dotenv()

现在你的助手可以从本地 .env 文件读取变量了。在项目根目录添加 .env 文件:

linkedin-job-hunting-ai-assistant/
├── venv/
├── .env         # <-----------
└── assistant.py

打开 .env,添加 OPENAI_API_KEYBRIGHT_DATA_API_KEY

OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
BRIGHT_DATA_API_KEY="<YOUR_BRIGHT_DATA_API_KEY>"

<YOUR_OPENAI_API_KEY> 替换为你的 OpenAI 实际 API Key。同理,将 <YOUR_BRIGHT_DATA_API_KEY> 替换为你的Bright Data API Key

然后在脚本中添加如下函数以读取这两个环境变量:

def load_env_vars():
    # Read required API keys from environment and verify presence
    openai_api_key = os.getenv("OPENAI_API_KEY")
    brightdata_api_key = os.getenv("BRIGHT_DATA_API_KEY")

    missing = []
    if not openai_api_key:
        missing.append("OPENAI_API_KEY")
    if not brightdata_api_key:
        missing.append("BRIGHT_DATA_API_KEY")
    if missing:
        raise EnvironmentError(
            f"Missing required environment variables: {', '.join(missing)}\n"
            "Please set them in your .env or environment."
        )

    return openai_api_key, brightdata_api_key

需要从标准库导入:

import os

非常棒!你已通过环境变量安全地加载了第三方集成密钥。

步骤 #3:加载配置文件

现在,你需要一种可编程的方式告诉助手你感兴趣的职位。为了输出准确,助手还需要了解你的工作经验以及你在寻找什么样的岗位。

为避免将这些信息硬编码到代码里,我们将从一个 JSON 配置文件读取。该文件应包含:

  • location:你想搜索职位的地理位置,定义职位收集的主要区域。
  • keyword:与职位名称或角色相关的关键词,例如 “Python Developer”。使用引号可强制精确匹配。
  • country:两位国家代码(如 US 表示美国、FR 表示法国),用于限定国家范围。
  • time_range:职位发布的时间范围,用于筛选近期或相关职位(如 Past weekPast month 等)。
  • job_type:用来筛选的雇佣类型,如 Full-timePart-time 等。
  • experience_level:所需经验水平,如 Entry levelAssociate 等。
  • remote:按工作模式筛选(如 RemoteOn-siteHybrid)。
  • company:聚焦于特定公司或雇主的职位。
  • selective_search:启用后,会排除职位标题不包含指定关键词的职位,以得到更精确结果。
  • jobs_to_not_include:要从搜索结果中排除的特定职位 ID 列表,用于去重或过滤不需要的职位。
  • location_radius:定义围绕指定位置的搜索半径,包含附近区域。
  • profile_summary:你的职业概述。AI 将依据该信息评估职位匹配度。
  • desired_job_summary:你所期望岗位的简要描述,帮助 AI 依据契合程度进行评分。

这些字段与 Bright Data LinkedIn 职位列表“按关键词发现”API(属于 LinkedIn Jobs Scraper 解决方案)所需参数完全一致:

请注意 Bright Data LinkedIn 职位列表“按关键词发现”页面中的来源参数

关于这些字段及其可取值的更多信息,请参考官方文档

最后两个字段(profile_summarydesired_job_summary)描述了你的职业身份和目标岗位。它们会传给 AI,用于对 Bright Data 返回的每条职位进行评分。

为了在代码中更方便地处理配置文件,建议将其映射为一个 Pydantic 模型。首先在虚拟环境中安装 Pydantic:

pip install pydantic

然后定义映射 JSON 配置文件的 Pydantic 模型:

class JobSearchConfig(BaseModel):
    location: str
    keyword: Optional[str] = None
    country: Optional[str] = None
    time_range: Optional[str] = None
    job_type: Optional[str] = None
    experience_level: Optional[str] = None
    remote: Optional[str] = None
    company: Optional[str] = None
    selective_search: Optional[bool] = Field(default=False)
    jobs_to_not_include: Optional[List[str]] = Field(default_factory=list)
    location_radius: Optional[str] = None
    # Additional fields
    profile_summary: str  # Candidate's profile summary for AI scoring
    desired_job_summary: str  # Description of the desired job for AI scoring

注意,仅第一个字段和最后两个字段为必填。

接下来,创建一个函数,从 --config_file 指定的文件路径读取 JSON 配置,并反序列化为 JobSearchConfig 实例:

def load_and_validate_config(filename: str) -> JobSearchConfig:
    # Load JSON config file
    try:
        with open(filename, "r", encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        raise FileNotFoundError(f"Config file '{filename}' not found.")

    try:
        # Deserialize the input JSON data to a JobSearchConfig instance
        config = JobSearchConfig(**data)
    except ValidationError as e:
        raise ValueError(f"Config deserialization error:\n{e}")

    return config

这次需要导入以下模块:

from pydantic import BaseModel, Field, ValidationError
from typing import Optional, List
import json

很好!你的配置文件已按预期被正确读取并反序列化。

步骤 #4:从 LinkedIn 抓取职位

现在,是时候使用前面加载的配置来调用 Bright Data 的 LinkedIn Jobs Scraper API 了。

如果你还不熟悉 Bright Data 的 Web Scraper API,建议先查看文档

简单来说,Web Scraper API 提供可直接从特定站点检索公开数据的 API 端点。Bright Data 在后台初始化并运行一项现成的抓取任务。这些 API 处理 IP 轮换、验证码等措施,高效且合规地从网页收集公开数据。任务完成后,抓取数据会被解析为结构化格式并作为快照提供给你。

因此,一般工作流为:

  1. 触发 API 调用以启动网页抓取任务。
  2. 周期性检查包含抓取数据的快照是否就绪。
  3. 一旦可用,获取该快照中的数据。

你只需几行代码即可实现上述逻辑:

def trigger_and_poll_linkedin_jobs(config: JobSearchConfig, brightdata_api_key: str, jobs_number: int, polling_timeout=10):
    # Trigger the Bright Data LinkedIn job search
    url = "https://api.brightdata.com/datasets/v3/trigger"
    headers = {
        "Authorization": f"Bearer {brightdata_api_key}",
        "Content-Type": "application/json",
    }
    params = {
        "dataset_id": "gd_lpfll7v5hcqtkxl6l",  # Bright Data "Linkedin job listings information - discover by keyword" dataset ID
        "include_errors": "true",
        "type": "discover_new",
        "discover_by": "keyword",
        "limit_per_input": str(jobs_number),
    }

    # Prepare payload for Bright Data API based on user config
    data = [{
        "location": config.location,
        "keyword": config.keyword or "",
        "country": config.country or "",
        "time_range": config.time_range or "",
        "job_type": config.job_type or "",
        "experience_level": config.experience_level or "",
        "remote": config.remote or "",
        "company": config.company or "",
        "selective_search": config.selective_search,
        "jobs_to_not_include": config.jobs_to_not_include or "",
        "location_radius": config.location_radius or "",
    }]

    response = requests.post(url, headers=headers, params=params, json=data)
    if response.status_code != 200:
        raise RuntimeError(f"Trigger request failed: {response.status_code} - {response.text}")

    snapshot_id = response.json().get("snapshot_id")
    if not snapshot_id:
        raise RuntimeError("No snapshot_id returned from Bright Data trigger.")

    print(f"LinkedIn job search triggered! Snapshot ID: {snapshot_id}")

    # Poll snapshot endpoint until data is ready or timeout
    snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format=json"
    headers = {"Authorization": f"Bearer {brightdata_api_key}"}

    print(f"Polling snapshot for ID: {snapshot_id}")

    while True:
        snap_resp = requests.get(snapshot_url, headers=headers)
        if snap_resp.status_code == 200:
            # Snapshot ready: return job postings JSON data
            print("Snapshot is ready")

            return snap_resp.json()
        elif snap_resp.status_code == 202:
            # Snapshot not ready yet: wait and retry
            print(f"Snapshot not ready yet. Retrying in {polling_timeout} seconds...")
            time.sleep(polling_timeout)
        else:
            raise RuntimeError(f"Snapshot polling failed: {snap_resp.status_code} - {snap_resp.text}")

该函数使用配置文件中的搜索参数触发 Bright Data 的 LinkedIn 职位抓取,确保你只获取符合条件的职位。随后轮询直到快照就绪,一旦可用即返回 JSON 格式的职位数据。请注意,认证由先前加载的 Bright Data API Key 处理。

由 LinkedIn Jobs Scraper 获取的快照将包含如下 JSON 格式的职位列表:

LinkedIn Jobs Scraper 生成的包含 LinkedIn 职位列表的 JSON 快照

注意:生成的 JSON 快照最多包含 --jobs_number 条职位。本例中为 20 条。

为使上述函数工作,需要安装 requests

pip install requests

关于其工作原理的更多信息,请参考我们的Python HTTP Requests 高级指南

然后记得一并导入它以及标准库中的 time

import requests
import time

太棒了!你已经集成 Bright Data 来获取最新且精确过滤的 LinkedIn 职位数据。

步骤 #5:通过 AI 为职位打分

现在,是时候让一个 LLM(例如 OpenAI 的模型)来评估每条抓取到的职位了。目标是基于以下因素给出 0100 的分数,并附上简短评论:

  1. 你的工作经验(profile_summary
  2. 你的目标职位(desired_job_summary

为减少 API 往返并加速处理,建议按批处理职位。具体来说,每次评估 --batch_size 条职位。

首先安装 openai 包:

pip install openai

然后导入 OpenAI 并初始化客户端:

from openai import OpenAI

# ...

# Initialize OpenAI client
client = OpenAI()

注意,无需手动将 API Key 传给 OpenAI 构造函数。该库会自动从 OPENAI_API_KEY 环境变量读取,你之前已经设置过。

继续创建 AI 驱动的职位打分函数:

def score_jobs_batch(jobs_batch: List[dict], profile_summary: str, desired_job_summary: str) -> List[JobScore]:
    # Construct prompt for AI to score job matches based on candidate profile
    prompt = f"""
        "You are an expert recruiter. Given the following candidate profile:\n"
        "{profile_summary}\n\n"
        "Desired job description:\n{desired_job_summary}\n\n"
        "Score each job posting accurately from 0 to 100 on how well it matches the profile and desired job.\n"
        "For each job, add a short comment (max 50 words) explaining the score and match quality.\n"
        "Return an array of objects with keys 'job_posting_id', 'score', and 'comment'.\n\n"
        "Jobs:\n{json.dumps(jobs_batch)}\n"
    """
    messages = [
        {"role": "system", "content": "You are a helpful job scoring assistant."},
        {"role": "user", "content": prompt},
    ]

    # Use OpenAI API to parse structured response into JobScoresResponse model
    response = client.responses.parse(
        model="gpt-5-mini",
        input=messages,
        text_format=JobScoresResponse,
    )

    # Return list of scored jobs
    return response.output_parsed.scores

这里使用了新版 gpt-5-mini 模型来为每条职位从 0100 打分,并给出简短解释性评论。

为确保响应始终符合所需的精确格式,我们调用了 parse() 方法。该方法强制结构化输出,其模型定义如下:

class JobScore(BaseModel):
    job_posting_id: str
    score: int = Field(..., ge=0, le=100)
    comment: str

class JobScoresResponse(BaseModel):
    scores: List[JobScore]

基本上,AI 会返回如下结构化 JSON 数据:

{
  "scores": [
    {
      "job_posting_id": "4271494891",
      "score": 80,
      "comment": "Strong SaaS product fit with end-to-end ownership, APIs, and cross-functional work—aligns with your startup PM and customer-first experience. Role targets 2–4 yrs, so it's slightly junior for your 7 years."
    },
    // omitted for brevity...
    {
      "job_posting_id": "4273328527",
      "score": 65,
      "comment": "Product role with heavy data/technical emphasis; agile and cross-functional responsibilities align, but it prefers quantitative/technical domain experience (finance/stat modeling) which may be a weaker fit."
    }
  ]
}

parse() 方法随后会将该 JSON 响应转换为 JobScoresResponse 实例。之后你可以在代码中以编程方式访问分数与评论。

注意:如果你偏好其他 LLM 提供方,请相应调整上述代码。

搞定!AI 职位评估完成。

步骤 #6:用 AI 评分与评论扩展职位数据

回看前面的 AI 原始 JSON 输出。你会发现每条职位评分包含一个 job_posting_id 字段。这对应 LinkedIn 用于标识职位的 ID。

这些 ID 也出现在 Bright Data LinkedIn Jobs Scraper 生成的快照数据中,因此你可以使用它们来:

  1. 在抓取到的职位数组中找到原始职位对象。
  2. 将 AI 生成的分数与评论合并到该职位对象中。

使用以下函数实现:

def extend_jobs_with_scores(jobs: List[dict], all_scores: List[JobScore]) -> List[dict]:
    # Where to store the enriched data
    extended_jobs = []

    # Combine original jobs with AI scores and comments
    for score_obj in all_scores:
        matched_job = None
        for job in jobs:
            if job.get("job_posting_id") == score_obj.job_posting_id:
                matched_job = job
                break
        if matched_job:
            job_with_score = dict(matched_job)
            job_with_score["ai_score"] = score_obj.score
            job_with_score["ai_comment"] = score_obj.comment
            extended_jobs.append(job_with_score)

    # Sort extended jobs by AI score (highest first)
    extended_jobs.sort(key=lambda j: j["ai_score"], reverse=True)

    return extended_jobs

如你所见,几个 for 循环就足以完成任务。在返回富化数据前,我们按 ai_score 降序排序,让最佳匹配的职位出现在列表顶部,便于快速定位。

很好!你的 LinkedIn 求职 AI 助手已接近完成!

步骤 #7:导出打分后的职位数据

使用 Python 内置的 csv 包将抓取并富化后的职位数据导出为 CSV 文件:

def export_extended_jobs(extended_jobs: List[dict], output_csv: str):
    # Dynamically get the field names from the first element in the array
    fieldnames = list(extended_jobs[0].keys())
    with open(output_csv, mode="w", newline="", encoding="utf-8") as csvfile:
         # Write extended job data with AI scores to CSV
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for job in extended_jobs:
            writer.writerow(job)

    print(f"Exported {len(extended_jobs)} jobs to {output_csv}")

上述函数会用 --output_csv CLI 参数指定的文件名来保存。

别忘了导入 csv

import csv

完美!LinkedIn 求职 AI 助手现在会将包含 AI 评分的数据导出为 CSV。

步骤 #8:打印最佳职位匹配

为了无需打开输出 CSV 就能在终端快速查看结果,编写一个函数打印匹配度最高的 3 条职位的关键信息:

def print_top_jobs(extended_jobs: List[dict], top: int = 3):
    print(f"\n*** Top {top} job matches ***")
    for job in extended_jobs[:3]:
        print(f"URL: {job.get('url', 'N/A')}")
        print(f"Title: {job.get('job_title', 'N/A')}")
        print(f"AI Score: {job.get('ai_score')}")
        print(f"AI Comment: {job.get('ai_comment', 'N/A')}")
        print("-" * 40)

步骤 #9:整合所有功能

将前面步骤中的所有函数组合到主逻辑中,形成完整的 LinkedIn 求职助手:

# Get runtime parameters from CLI
args = parse_cli_args()

try:
     # Load API keys from environment
    _, brightdata_api_key = load_env_vars()

     # Load job search config file
    config = load_and_validate_config(args.config_file)

    # Fetch jobs
    jobs_data = trigger_and_poll_linkedin_jobs(config, brightdata_api_key, args.jobs_number)

    print(f"{len(jobs_data)} jobs found!")
except Exception as e:
    print(f"[Error] {e}")
    return

all_scores = []
# Process jobs in batches to avoid overloading API and to handle large datasets
for i in range(0, len(jobs_data), args.batch_size):
    batch = jobs_data[i : i + args.batch_size]

    print(f"Scoring batch {i // args.batch_size + 1} with {len(batch)} jobs...")

    scores = score_jobs_batch(batch, config.profile_summary, config.desired_job_summary)
    all_scores.extend(scores)

    time.sleep(1) # To avoid triggering API rate limits

# Merge scores into scraped jobs
extended_jobs = extend_jobs_with_scores(jobs_data, all_scores)

# Save results to CSV
export_extended_jobs(extended_jobs, args.output_csv)

# Print top job matches with key info for quick review
print_top_jobs(extended_jobs)

太棒了!现在只剩下查看助手的完整代码并进行首次运行验证。

步骤 #10:完整代码与首次运行

你的最终 assistant.py 文件应包含:

# pip install python-dotenv requests openai pydantic

import argparse
from dotenv import load_dotenv
import os
from pydantic import BaseModel, Field, ValidationError
from typing import Optional, List
import json
import requests
import time
from openai import OpenAI
import csv

# Load environment variables from .env file
load_dotenv()

# Pydantic models supporting the project
class JobSearchConfig(BaseModel):
    # Source: https://docs.brightdata.com/api-reference/web-scraper-api/social-media-apis/linkedin#discover-by-keyword
    location: str
    keyword: Optional[str] = None
    country: Optional[str] = None
    time_range: Optional[str] = None
    job_type: Optional[str] = None
    experience_level: Optional[str] = None
    remote: Optional[str] = None
    company: Optional[str] = None
    selective_search: Optional[bool] = Field(default=False)
    jobs_to_not_include: Optional[List[str]] = Field(default_factory=list)
    location_radius: Optional[str] = None
    # Additional fields
    profile_summary: str  # Candidate's profile summary for AI scoring
    desired_job_summary: str  # Description of the desired job for AI scoring

class JobScore(BaseModel):
    job_posting_id: str
    score: int = Field(..., ge=0, le=100)
    comment: str

class JobScoresResponse(BaseModel):
    scores: List[JobScore]

def parse_cli_args():
    # Parse command-line arguments for config and runtime options
    parser = argparse.ArgumentParser(description="LinkedIn Job Hunting Assistant")
    parser.add_argument("--config_file", type=str, default="config.json", help="Path to config JSON file")
    parser.add_argument("--jobs_number", type=int, default=20, help="Limit the number of jobs returned by Bright Data Scraper API")
    parser.add_argument("--batch_size", type=int, default=5, help="Number of jobs to score in each batch")
    parser.add_argument("--output_csv", type=str, default="jobs_scored.csv", help="Output CSV filename")

    return parser.parse_args()

def load_env_vars():
    # Read required API keys from environment and verify presence
    openai_api_key = os.getenv("OPENAI_API_KEY")
    brightdata_api_key = os.getenv("BRIGHT_DATA_API_KEY")

    missing = []
    if not openai_api_key:
        missing.append("OPENAI_API_KEY")
    if not brightdata_api_key:
        missing.append("BRIGHT_DATA_API_KEY")
    if missing:
        raise EnvironmentError(
            f"Missing required environment variables: {', '.join(missing)}\n"
            "Please set them in your .env or environment."
        )

    return openai_api_key, brightdata_api_key

def load_and_validate_config(filename: str) -> JobSearchConfig:
    # Load JSON config file
    try:
        with open(filename, "r", encoding="utf-8") as f:
            data = json.load(f)
    except FileNotFoundError:
        raise FileNotFoundError(f"Config file '{filename}' not found.")

    try:
        # Deserielizing the input JSON data to a JobSearchConfig instance
        config = JobSearchConfig(**data)
    except ValidationError as e:
        raise ValueError(f"Config deserialization error:\n{e}")

    return config

def trigger_and_poll_linkedin_jobs(config: JobSearchConfig, brightdata_api_key: str, jobs_number: int, polling_timeout=10):
    # Trigger the Bright Data LinkedIn job search
    url = "https://api.brightdata.com/datasets/v3/trigger"
    headers = {
        "Authorization": f"Bearer {brightdata_api_key}",
        "Content-Type": "application/json",
    }
    params = {
        "dataset_id": "gd_lpfll7v5hcqtkxl6l", # Bright Data "Linkedin job listings information - discover by keyword" dataset ID
        "include_errors": "true",
        "type": "discover_new",
        "discover_by": "keyword",
        "limit_per_input": str(jobs_number),
    }

    # Prepare payload for Bright Data API based on user config
    data = [{
        "location": config.location,
        "keyword": config.keyword or "",
        "country": config.country or "",
        "time_range": config.time_range or "",
        "job_type": config.job_type or "",
        "experience_level": config.experience_level or "",
        "remote": config.remote or "",
        "company": config.company or "",
        "selective_search": config.selective_search,
        "jobs_to_not_include": config.jobs_to_not_include or "",
        "location_radius": config.location_radius or "",
    }]

    response = requests.post(url, headers=headers, params=params, json=data)
    if response.status_code != 200:
        raise RuntimeError(f"Trigger request failed: {response.status_code} - {response.text}")

    snapshot_id = response.json().get("snapshot_id")
    if not snapshot_id:
        raise RuntimeError("No snapshot_id returned from Bright Data trigger.")

    print(f"LinkedIn job search triggered! Snapshot ID: {snapshot_id}")

    # Poll snapshot endpoint until data is ready or timeout
    snapshot_url = f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format=json"
    headers = {"Authorization": f"Bearer {brightdata_api_key}"}

    print(f"Polling snapshot for ID: {snapshot_id}")

    while True:
        snap_resp = requests.get(snapshot_url, headers=headers)
        if snap_resp.status_code == 200:
            # Snapshot ready: return job postings JSON data
            print("Snapshot is ready")

            return snap_resp.json()
        elif snap_resp.status_code == 202:
            # Snapshot not ready yet: wait and retry
            print(f"Snapshot not ready yet. Retrying in {polling_timeout} seconds...")
            time.sleep(polling_timeout)
        else:
            raise RuntimeError(f"Snapshot polling failed: {snap_resp.status_code} - {snap_resp.text}")

# Initialize OpenAI client
client = OpenAI()

def score_jobs_batch(jobs_batch: List[dict], profile_summary: str, desired_job_summary: str) -> List[JobScore]:
    # Construct prompt for AI to score job matches based on candidate profile
    prompt = f"""
        "You are an expert recruiter. Given the following candidate profile:\n"
        "{profile_summary}\n\n"
        "Desired job description:\n{desired_job_summary}\n\n"
        "Score each job posting accurately from 0 to 100 on how well it matches the profile and desired job.\n"
        "For each job, add a short comment (max 50 words) explaining the score and match quality.\n"
        "Return an array of objects with keys 'job_posting_id', 'score', and 'comment'.\n\n"
        "Jobs:\n{json.dumps(jobs_batch)}\n"
    """
    messages = [
        {"role": "system", "content": "You are a helpful job scoring assistant."},
        {"role": "user", "content": prompt},
    ]

    # Use OpenAI API to parse structured response into JobScoresResponse model
    response = client.responses.parse(
        model="gpt-5-mini",
        input=messages,
        text_format=JobScoresResponse,
    )

    # Return list of scored jobs
    return response.output_parsed.scores

def extend_jobs_with_scores(jobs: List[dict], all_scores: List[JobScore]) -> List[dict]:
    # Where to store the enriched data
    extended_jobs = []

    # Combine original jobs with AI scores and comments
    for score_obj in all_scores:
        matched_job = None
        for job in jobs:
            if job.get("job_posting_id") == score_obj.job_posting_id:
                matched_job = job
                break
        if matched_job:
            job_with_score = dict(matched_job)
            job_with_score["ai_score"] = score_obj.score
            job_with_score["ai_comment"] = score_obj.comment
            extended_jobs.append(job_with_score)

    # Sort extended jobs by AI score (highest first)
    extended_jobs.sort(key=lambda j: j["ai_score"], reverse=True)
    return extended_jobs

def export_extended_jobs(extended_jobs: List[dict], output_csv: str):
    # Dynamically get the field names from the first element in the array
    fieldnames = list(extended_jobs[0].keys())
    with open(output_csv, mode="w", newline="", encoding="utf-8") as csvfile:
         # Write extended job data with AI scores to CSV
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for job in extended_jobs:
            writer.writerow(job)

    print(f"Exported {len(extended_jobs)} jobs to {output_csv}")

def print_top_jobs(extended_jobs: List[dict], top: int = 3):
    print(f"\n*** Top {top} job matches ***")
    for job in extended_jobs[:3]:
        print(f"URL: {job.get('url', 'N/A')}")
        print(f"Title: {job.get('job_title', 'N/A')}")
        print(f"AI Score: {job.get('ai_score')}")
        print(f"AI Comment: {job.get('ai_comment', 'N/A')}")
        print("-" * 40)

def main():
    # Get runtime parameters from CLI
    args = parse_cli_args()

    try:
         # Load API keys from environment
        _, brightdata_api_key = load_env_vars()

         # Load job search config file
        config = load_and_validate_config(args.config_file)

        # Fetch jobs
        jobs_data = trigger_and_poll_linkedin_jobs(config, brightdata_api_key, args.jobs_number)

        print(f"{len(jobs_data)} jobs found!")
    except Exception as e:
        print(f"[Error] {e}")
        return

    all_scores = []
    # Process jobs in batches to avoid overloading API and to handle large datasets
    for i in range(0, len(jobs_data), args.batch_size):
        batch = jobs_data[i : i + args.batch_size]

        print(f"Scoring batch {i // args.batch_size + 1} with {len(batch)} jobs...")

        scores = score_jobs_batch(batch, config.profile_summary, config.desired_job_summary)
        all_scores.extend(scores)

        time.sleep(1) # To avoid triggering API rate limits

    # Merge scores into scraped jobs
    extended_jobs = extend_jobs_with_scores(jobs_data, all_scores)

    # Save results to CSV
    export_extended_jobs(extended_jobs, args.output_csv)

    # Print top job matches with key info for quick review
    print_top_jobs(extended_jobs)

if __name__ == "__main__":
    main()

假设你是拥有 7 年经验的产品经理,正在寻找纽约的混合办公岗位。你的 config.json 可以这样配置:

{
  "location": "New York",
  "keyword": "Product Manager",
  "country": "US",
  "time_range": "Past month",
  "job_type": "Full-time",
  "experience_level": "Mid-Senior level",
  "remote": "Hybrid",
  "profile_summary": "Experienced product manager with 7 years in tech startups, specializing in agile methodologies and cross-functional team leadership.",
  "desired_job_summary": "Looking for a full-time product manager role focusing on SaaS products and customer-centric development."
}

然后,运行该 LinkedIn 求职助手:

python assistant.py

可选:如果需要自定义运行:

python assistant.py --config_file=config.json --batch_size=10 --jobs_number=40 --output_csv=results.csv

该命令将使用你指定的 config.json 文件,以 10 的批量处理职位,从 Bright Data 获取最多 40 条职位,并将包含 AI 分数与评论的富化结果保存到 results.csv

使用默认 CLI 参数运行后,你在终端应能看到类似输出:

LinkedIn job search triggered! Snapshot ID: s_me6x0s3qldm9zz0wv
Polling snapshot for ID: s_me6x0s3qldm9zz0wv
Snapshot not ready yet. Retrying in 10 seconds...
# Omitted for brevity...
Snapshot not ready yet. Retrying in 10 seconds...
Snapshot is ready
20 jobs found!
Scoring batch 1 with 5 jobs...
Scoring batch 2 with 5 jobs...
Scoring batch 3 with 5 jobs...
Scoring batch 4 with 5 jobs...
Exported 20 jobs to jobs.csv

随后,前三个职位洞察可能类似如下:

*** Top 3 job matches ***
URL: https://www.linkedin.com/jobs/view/product-manager-growth-at-yext-4267903356?_l=en
Title: Product Manager, Growth
AI Score: 92
AI Comment: Excellent fit: SaaS-focused growth PM with customer-centric objectives, product-led growth, experimentation and cross-functional collaboration—direct match to candidate's experience and desired role.
----------------------------------------
URL: https://www.linkedin.com/jobs/view/product-manager-at-industrial-color-4271494891?_l=en
Title: Product Manager
AI Score: 90
AI Comment: Strong match: SaaS product, API/integrations, agile and cross-functional leadership emphasized. Only minor mismatch is the listed 2–4 years target (you have 7), which likely makes you overqualified but highly applicable.
----------------------------------------
URL: https://www.linkedin.com/jobs/view/product-manager-at-resourceful-talent-group-4277945862?_l=en
Title: Product Manager
AI Score: 88
AI Comment: Very similar SaaS/integrations role with agile practices and customer-driven iteration. Recruiter listing targets 2–4 years, but your 7 years of startup PM experience and cross-functional leadership map well.
----------------------------------------

打开生成的 jobs_scored.csv 文件。主要列大致如下:

jobs_scored.csv 文件中的输出结果

留意每条职位都被 AI 打分并附有评论。这能帮助你聚焦在真正有成功希望的岗位!

就这样!借助这个 AI 驱动的 LinkedIn 求职工作流,找工作从未如此轻松。

后续步骤

这里构建的 LinkedIn 求职助手以工作流形式运行,但仍有一些值得探索的增强点:

  1. 避免重复评估相同职位:为了每次运行都评估不同的职位,请在 config.json 中设置 jobs_to_not_include 数组。该数组应包含助手已分析过的 job_posting_id。例如,要排除当前抓取到的职位,你的配置可能如下:
{
 "location": "New York",
 "keyword": "Product Manager",
 "country": "US",
 "time_range": "Past month",
 "job_type": "Full-time",
 "experience_level": "Mid-Senior level",
 "remote": "Hybrid",
 "jobs_to_not_include": ["4267903356", "4271494891", "4277945862", "4267906118", "4255405781", "4267537560", "4245709356", "4265355147", "4277751182", "4256914967", "4281336197", "4232207277", "4273328527", "4277435772", "4253823512", "4279286518", "4224506933", "4250788498", "4256023955", "4252894407"], // <--- NOTE: The IDs of the jobs to exclude
 "profile_summary": "Experienced product manager with 7 years in tech startups, specializing in agile methodologies and cross-functional team leadership.",
 "desired_job_summary": "Looking for a full-time product manager role focusing on SaaS products and customer-centric development."
}
  1. 自动化定时运行脚本:使用如 Cron 等工具定期(例如每日)运行脚本。记得将 time_range 参数设置为合适值(如 “Past 24 hours”),并更新 jobs_to_not_include 列表以排除已评估的职位,确保聚焦于最新职位。
  2. 使用专门的 AI 评审模型:可考虑用一个专门为职位匹配与打分微调的模型替代通用 GPT-5 模型。这个简单调整就能显著提升职位评估的准确度与相关性。

结论

本文展示了如何利用 Bright Data 的 LinkedIn 职位抓取能力来构建一个 AI 驱动的求职助手。

该 AI 工作流非常适合正寻求新岗位、希望通过聚焦最佳机会来最大化成功率的人群。它帮你节省时间与精力,将申请集中在真正符合职业目标、成功概率更高的职位上。

若要构建更高级的工作流,欢迎探索 Bright Data AI 基础设施中获取、校验与转换实时网页数据的全套方案。

创建一个免费 Bright Data 账号,开始试用我们的 AI 就绪数据工具吧!

常见问题

为什么聚焦于 LinkedIn 求职,而不是 Indeed 或其他平台?

上述示例以 LinkedIn 作为数据来源,但你可以轻松扩展脚本以适配 Indeed 或任何其他通过 Bright Data 可用的职位来源。关于集成 Indeed 的更多详情,请参考 Indeed 职位抓取工具

为什么选择 OpenAI,而不是 Gemini 或其他 LLM 提供方?

本 AI 工作流选择 OpenAI,因其广泛采用与流行度。但你也可以轻松适配为其他 LLM 提供方,如 Gemini、Anthropic、Cohere,或任何提供 API 的大语言模型。

为何采用 AI 工作流而非专门的 AI Agent?

LinkedIn Jobs Scraper 返回的数据质量高、结构化程度好,你可以直接用 LLM 进行评分处理。因此不一定需要具备推理与决策能力的复杂自治智能体。
不过,如果你希望构建更高级的 LinkedIn 求职 AI Agent,可以考虑如下多智能体架构
职位获取 Agent:与 Bright Data 基础设施集成(通过工具集成或 MCP),连续调用 LinkedIn Jobs Scraper API 来获取并更新职位列表。
职位评分 Agent:基于候选人画像与偏好使用 LLM 对职位进行评估和打分的专业 Agent。
编排 Agent:顶层 Agent,负责协调上述两个 Agent,反复触发数据获取与评分循环,直到得到期望数量的高分、相关职位。
你甚至可以编程让该 Agent 自动为你投递这些职位。如果你考虑构建这样的 LinkedIn 求职系统,推荐使用 CrewAI 等多智能体平台

支持支付宝等多种支付方式

Antonello Zanini

技术写作

5.5 years experience

Antonello是一名软件工程师,但他更喜欢称自己为技术传教士。通过写作传播知识是他的使命。

Expertise
Web 开发 网页抓取 AI 集成