使用最新网络数据采集构建 Kubeflow Pipelines

本指南展示了如何在 Kubeflow 流水线中添加实时网络数据采集组件,并使用 Bright Data 进行 TikTok 情感分析。
1 分钟阅读
使用最新网络数据采集构建 Kubeflow Pipelines

在这篇博客中,你将学习:

  • 为什么 Kubeflow Pipelines 应该包含专门的网络数据采集组件。
  • 该方法在一个具体的 TikTok 情感分析流水线中的应用。
  • 如何通过特定的抓取解决方案连接 TikTok 评论数据源来实现该流水线。

让我们开始吧!

为什么 Kubeflow Pipelines 能从结构化的网页抓取数据中获益

现代机器学习与 AI 工作流高度依赖 高质量数据。相反,传统流水线通常摄取静态数据集或预处理文件。然而,这些来源很快就会过时,导致模型在陈旧信息上训练。

这正是结构化网页抓取数据的价值所在!通过从网络中采集实时、具备上下文的数据,流水线可以持续对齐最新趋势、用户行为和新兴内容。

Kubeflow Pipelines 的设计目标是模块化、可复现与可扩展的 ML 工作流,因此将网络数据采集组件集成进去会带来巨大收益。这类组件提供最新、结构化的数据流,可在下游自动摄取、过滤与处理。

在你的流水线中加入一个 网络数据采集组件 无疑能提升模型准确性。因此,添加一个专用的数据采集组件——甚至针对不同数据源添加多个组件——在策略上非常合理。它使你的流水线能够持续适应、再训练,并近实时地产出洞察,为任何 AI 驱动项目打下坚实基础。

介绍用于 TikTok 情感分析的 Kubeflow 流水线

为更好理解网络数据采集组件如何增强 Kubeflow 流水线,我们来看一个真实案例。假设你想构建一个数据分析工作流:输入一组 TikTok 帖子,并对其内容进行情感分析。

你可以设计一个包含两个组件的流水线:

  1. TikTok 评论数据组件:通过网页抓取从 TikTok 帖子中获取结构化评论数据。
  2. 数据分析组件:为这些评论补充 情感洞察positivenegativeneutral)。

问题在于,抓取 TikTok(以及许多其他热门平台)一直以困难著称。这是因为存在 反爬措施,例如验证码(CAPTCHA)、JavaScript 挑战、IP 封锁以及速率限制。扩大规模只会增加复杂度,因为限流与封禁很容易中断数据采集。

为避免这些问题,使用一流的 网络数据服务(如 Bright Data) 来驱动网络数据采集组件是更合理的选择。Bright Data 依托可高度扩展的基础设施,在 195 个国家提供 1.5 亿代理 IP,成功率达 99.95%,正常运行时间达 99.99%,可实现大规模、可靠的抓取。

具体来说,我们将使用 TikTok Scraper,这是一款旨在简化从 TikTok 帖子中采集结构化数据的网页抓取 API。它是众多可用于从热门站点获取数据的 Web Scraping APIs 之一。同样地,你也可以使用 Filter Dataset APIBright Data 数据集 中拉取过滤后的数据,为你的 ML/AI 流水线提供开箱即用的数据。

如何构建带有动态网页抓取数据组件的 Kubeflow 流水线

在本引导章节中,你将看到如何构建前面介绍的用于 TikTok 情感分析的 Kubeflow 流水线。

按照以下步骤操作!

前置条件

要学习本教程,你需要:

Kubeflow Pipelines 的工作方式 有基本了解,也会帮助你理解下面的说明。

建议运行以下示例的操作系统为 Linux、macOS 或 WSL(Windows Subsystem for Linux)。

步骤 #1:项目初始化

首先打开终端,为 Kubeflow Pipelines 项目创建一个新目录:

mkdir kfp-bright-data-pipeline

进入项目目录,并在其中创建一个 Python 虚拟环境

cd kfp-bright-data-pipeline
python -m venv .venv

接着,在你偏好的 Python IDE 中打开该项目文件夹。我们推荐 安装了 Python 扩展的 Visual Studio CodePyCharm Community Edition

在项目根目录创建一个名为 tiktok_sentiment_analysis_kfp_pipeline.py 的新文件。结构应如下所示:

kfp-bright-data-pipeline/
├── .venv/
└── tiktok_sentiment_analysis_kfp_pipeline.py # <-----------

在 IDE 的终端中激活虚拟环境。Linux 或 macOS 上运行:

source venv/bin/activate

同样地,在 Windows 上执行:

venv/Scripts/activate

激活虚拟环境后,安装所需依赖:

pip install kfp

唯一必需的库是 kfp,它可用于构建与编译可移植、可扩展的机器学习流水线。

最后,打开 tiktok_sentiment_analysis_kfp_pipeline.py 并导入必要模块:

from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset

完成!现在你已经拥有一个可以用于构建 Kubeflow 流水线的 Python 开发环境。

步骤 #2:开始使用 Bright Data

流水线中的第一个组件将使用 Bright Data 的 Web Scraping APIs 获取实时网络数据。在实现它之前,你需要正确配置 Bright Data 账户。

由于我们将使用 Web Scraping APIs,建议你花几分钟 查看官方文档。简而言之,这些 API 会提供来自热门网站的结构化数据流,便于在 ML/AI 工作流(或任何其他 支持的使用场景)中直接消费。

如果你还没有账户,请 创建一个。否则,登录 并打开用户控制台。然后进入 “Web Scrapers” 区域:
进入“Web Scrapers”页面
转到 “Web Scrapers Library” 标签页。你将看到 120+ 个面向互联网热门平台的现成采集器。

在本教程中,搜索 “tiktok.com”,因为我们的目标是获取 TikTok 帖子的实时评论数据并对其进行情感分析。
搜索“tiktok.com”采集器

进入 TikTok scraper 页面 后,查看可用的抓取端点。
TikTok 采集器页面
在这里,你可以配置输入参数、检查请求/响应格式、查看示例 API 调用等。

对于本流水线,在 “TikTok – Comments” 下拉菜单中找到 “Collect by URL” 采集器:
“TikTok – Comments → Collect by URL”采集器
这就是你将在 Kubeflow 流水线的数据采集组件中使用的、由 Bright Data 驱动的端点。

记下它的 dataset ID:

gd_lkf2st302ap89utw5k

你将需要它来触发用于采集 TikTok 评论数据的特定 Web Scraping API。

另外,如你在右侧代码片段中所见,对 Web Scraping APIs 的 Bright Data API 调用使用 API_TOKEN 进行认证。该值应替换为你的 Bright Data API key,这是 认证 API 请求 的推荐方式。

按文档说明获取你的 API key 并妥善保管。下一步会用到它!

步骤 #3:定义网络数据采集组件

通过与 Bright Data Web Scraping API(用于 TikTok 抓取)集成,实现用于网络数据采集的 Kubeflow 流水线组件:

@dsl.component(
    base_image="python:3.10",
    packages_to_install=["requests"]
)
def collect_tiktok_comments(post_urls: list, output_dataset: Output[Dataset]):
    import requests, time, json, os

    BRIGHT_DATA_API_KEY = "<YOUR_BRIGHT_DATA_API_KEY>" # Replace with your Bright Data API key
    # The ID of the "TikTok – Comments → Collect by URL" Bright Data web scraping API
    TIKTOK_DATASET_ID = "gd_lkf2st302ap89utw5k"

    # The HTTP headers common to all requests to Bright Data
    headers = {"Authorization": f"Bearer {BRIGHT_DATA_API_KEY}", "Content-Type": "application/json"}

    # Trigger the Bright Data Web Scraping API on the input TikTok posts
    trigger = requests.post(
        f"https://api.brightdata.com/datasets/v3/trigger?dataset_id={TIKTOK_DATASET_ID}",
        headers=headers,
        json={"input": [{"url": u} for u in post_urls]},
    )
    trigger.raise_for_status()
    # Retrieve the data snaptshot ID
    snapshot_id = trigger.json()["snapshot_id"]

    # Poll the snapshot endpoint to check whether the snapshot
    # containing the data of interest has been produced
    scraped_data = []
    status = "running"
    while status in ["running", "building", "starting"]:
        progress = requests.get(f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format=json", headers=headers)
        progress.raise_for_status()

        # Access the JSON response data
        response_data = progress.json()
        # If the response does not include a status, it means it contains the scraped data
        if isinstance(response_data, dict) and "status" in response_data:
            # Extract the current snapshot status
            status = progress.json()["status"]
            # Waiting for 5 seconds for the next check
            time.sleep(5)
        else:
            scraped_data = response_data
            break

    # Store the scraped dataset
    with open(output_dataset.path, "w", encoding="utf-8") as f:
        json.dump(scraped_data, f, ensure_ascii=False, indent=2)

注意:请务必将 <YOUR_BRIGHT_DATA_API_KEY> 占位符替换为你前面获取的 Bright Data API key。在生产级流水线中,不要在组件里硬编码密钥。应改为安全地管理密钥,例如按文档所述 使用 Kubernetes Secret

在 Kubeflow Pipelines 中,组件是一个自包含单元(通过 dsl.component 注解定义),用于执行特定任务。本例中,该组件从 Bright Data 获取网络数据。每个组件都会被打包进一个 Docker 容器。

在该组件中,基础镜像为 Python 3.10 环境,然后加入 requests,用于向 Bright Data 的 API 端点发起 HTTP 请求。部署时构建组件时,会拉取 Python 3.10 镜像并自动安装 requests

Bright Data 的 Web Scraping APIs 同时支持 同步异步数据交付。同步方式适用于快速取数;异步方式更适合大规模数据集。对于生产级流水线,通常推荐使用异步方式。

在异步方式下,请求数据时可能无法立即返回。Bright Data 会生成所请求数据的快照(snapshot),可能需要几秒或更久。因此需要轮询机制:重复检查快照是否可用,然后再获取数据。

基于此,该网络数据组件的代码工作流程如下:

  1. 发送数据请求:组件向 Bright Data 发起 API 调用,开始生成所请求数据的快照。
  2. 轮询快照端点:组件反复调用快照端点检查状态。如果响应包含 status 字段且值为 “running”,则说明快照仍在准备中;如果 status 字段不存在,则表示快照已就绪且响应中包含抓取数据。
  3. 获取数据:快照就绪后,组件从 API 响应中提取数据,并提供给流水线下游组件使用。

太棒了!用于网络数据采集的 Kubeflow 流水线组件已经完成。

步骤 #4:构建情感分析组件

抓取到的 TikTok 数据将以 JSON 数组形式返回,结构如下:
Web Scraping API 响应结构
要对这些数据做情感分析,你可以将 comment_text 字段传给情感分析工具,例如 VADER Sentiment Analysis。VADER 是基于词典与规则的方法,专为捕捉社交媒体语境中的情绪表达而设计。当然,你也可以使用其他情感分析方法,包括基于 AI 的模型

VADER 被包含在 NLTK 中,这是最流行的 Python 自然语言处理工具包之一。典型工作流为:

  1. 从上一个组件读取输入 JSON 数组(抓取到的 TikTok 评论)。
  2. 使用 pandas 简化数据过滤与选择。
  3. 通过 nltk 将文本传入 VADER Sentiment Analyzer。
  4. 将分析结果保存下来供下游组件使用。

综合以上,情感分析组件可实现为:

@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "nltk"]
)
def sentiment_analysis(input_dataset: Input[Dataset], sentiment_output: Output[Dataset]):
    import pandas as pd
    from nltk.sentiment import SentimentIntensityAnalyzer
    import nltk

    # Download the VADER sentiment lexicon (used by NLTK for sentiment scoring)
    nltk.download("vader_lexicon")

    # Load the input dataset containing TikTok comments
    df = pd.read_json(input_dataset.path)

    # Initialize the sentiment analyzer
    sia = SentimentIntensityAnalyzer()

    # Apply sentiment analysis to each comment and classify as positive, negative, or neutral
    df["sentiment"] = df["comment_text"].apply(lambda t: (
        "positive" if sia.polarity_scores(str(t))["compound"] >= 0.05 else
        "negative" if sia.polarity_scores(str(t))["compound"] <= -0.05 else "neutral"
    ))

    # Save the results to the output dataset for downstream components
    df.to_json(sentiment_output.path, orient="records")

很好!流水线的两个主要组件(网络数据采集与情感分析)现在都已完整实现。

步骤 #5:完成 Kubeflow 流水线

现在两个组件都准备好了,你可以用带有 dsl.pipeline 注解的函数,将它们组合成一个 Kubeflow 流水线:

@dsl.pipeline(name="TikTok Sentiment Pipeline")
def tiktok_sentiment_pipeline():
    # List of TikTok post URLs to scrape comments from
    tiktok_post_urls = [
        "https://www.tiktok.com/@nike/video/7600211777267272991",
        "https://www.tiktok.com/@nike/video/7556252854294482189"
    ]

    # Collect TikTok comments using the Bright Data web scraping component
    collect_task = collect_tiktok_comments(post_urls=tiktok_post_urls)

    # Perform sentiment analysis on the collected comments
    sentiment_task = sentiment_analysis(
        input_dataset=collect_task.outputs["output_dataset"]
    )

该流水线会先对同一账号(@nike)的两个目标视频运行 TikTok 评论采集组件。具体来说,这两个 TikTok 视频之所以被选中,是因为它们展示了新鞋款。对其进行情感分析对于业务至关重要,有助于理解受众对新品发布的看法。
用于情感分析的 TikTok 源视频之一
通过 Bright Data Web Scraping API 生成的数据集会被传递给下游的情感分析组件。情感分析步骤会处理抓取到的评论,并生成一个包含情感标签(positivenegativeneutral)的新数据集。该输出可被更多下游组件使用,例如报表或可视化。

非常好!Kubeflow 流水线现在已完整定义。

步骤 #6:编译流水线

最后一步是将流水线编译成 Kubeflow YAML 流水线文件

if __name__ == "__main__":
    compiler.Compiler().compile(
        pipeline_func=tiktok_sentiment_pipeline,
        package_path="tiktok_sentiment_analysis_kfp_pipeline.yaml"
    )

当你运行 tiktok_sentiment_analysis_kfp_pipeline.py 脚本时,这段代码会生成名为 tiktok_sentiment_analysis_kfp_pipeline.yaml 的文件。该 YAML 文件包含 Kubeflow 部署所需的完整流水线规范。任务完成!

步骤 #7:最终代码

下面是你在 tiktok_sentiment_analysis_kfp_pipeline.py 文件中应得到的完整 Kubeflow 流水线:

# tiktok_sentiment_analysis_kfp_pipeline.py
# pip install kfp

from kfp import dsl, compiler
from kfp.dsl import Input, Output, Dataset


@dsl.component(
    base_image="python:3.10",
    packages_to_install=["requests"]
)
def collect_tiktok_comments(post_urls: list, output_dataset: Output[Dataset]):
    import requests, time, json, os

    BRIGHT_DATA_API_KEY = "<YOUR_BRIGHT_DATA_API_KEY>" # Replace with your Bright Data API key
    # The ID of the "TikTok – Comments → Collect by URL" Bright Data web scraping API
    TIKTOK_DATASET_ID = "gd_lkf2st302ap89utw5k"

    # The HTTP headers common to all requests to Bright Data
    headers = {"Authorization": f"Bearer {BRIGHT_DATA_API_KEY}", "Content-Type": "application/json"}

    # Trigger the Bright Data Web Scraping API on the input TikTok posts
    trigger = requests.post(
        f"https://api.brightdata.com/datasets/v3/trigger?dataset_id={TIKTOK_DATASET_ID}",
        headers=headers,
        json={"input": [{"url": u} for u in post_urls]},
    )
    trigger.raise_for_status()
    # Retrieve the data snaptshot ID
    snapshot_id = trigger.json()["snapshot_id"]

    # Poll the snapshot endpoint to check whether the snapshot
    # containing the data of interest has been produced
    scraped_data = []
    status = "running"
    while status in ["running", "building", "starting"]:
        progress = requests.get(f"https://api.brightdata.com/datasets/v3/snapshot/{snapshot_id}?format=json", headers=headers)
        progress.raise_for_status()

        # Access the JSON response data
        response_data = progress.json()
        # If the response does not include a status, it means it contains the scraped data
        if isinstance(response_data, dict) and "status" in response_data:
            # Extract the current snapshot status
            status = progress.json()["status"]
            # Waiting for 5 seconds for the next check
            time.sleep(5)
        else:
            scraped_data = response_data
            break

    # Store the scraped dataset
    with open(output_dataset.path, "w", encoding="utf-8") as f:
        json.dump(scraped_data, f, ensure_ascii=False, indent=2)


@dsl.component(
    base_image="python:3.10",
    packages_to_install=["pandas", "nltk"]
)
def sentiment_analysis(input_dataset: Input[Dataset], sentiment_output: Output[Dataset]):
    import pandas as pd
    from nltk.sentiment import SentimentIntensityAnalyzer
    import nltk

    # Download the VADER sentiment lexicon (used by NLTK for sentiment scoring)
    nltk.download("vader_lexicon")

    # Load the input dataset containing TikTok comments
    df = pd.read_json(input_dataset.path)

    # Initialize the sentiment analyzer
    sia = SentimentIntensityAnalyzer()

    # Apply sentiment analysis to each comment and classify as positive, negative, or neutral
    df["sentiment"] = df["comment_text"].apply(lambda t: (
        "positive" if sia.polarity_scores(str(t))["compound"] >= 0.05 else
        "negative" if sia.polarity_scores(str(t))["compound"] <= -0.05 else "neutral"
    ))

    # Save the results to the output dataset for downstream components
    df.to_json(sentiment_output.path, orient="records")


@dsl.pipeline(name="TikTok Sentiment Pipeline")
def tiktok_sentiment_pipeline():
    # List of TikTok post URLs to scrape comments from
    tiktok_post_urls = [
        "https://www.tiktok.com/@nike/video/7600211777267272991",
        "https://www.tiktok.com/@nike/video/7556252854294482189"
    ]

    # Collect TikTok comments using the Bright Data web scraping component
    collect_task = collect_tiktok_comments(post_urls=tiktok_post_urls)

    # Perform sentiment analysis on the collected comments
    sentiment_task = sentiment_analysis(
        input_dataset=collect_task.outputs["output_dataset"]
    )


if __name__ == "__main__":
    compiler.Compiler().compile(
        pipeline_func=tiktok_sentiment_pipeline,
        package_path="tiktok_sentiment_analysis_kfp_pipeline.yaml"
    )

用以下命令运行上述脚本:

python3 tiktok_sentiment_analysis_kfp_pipeline.py

运行后应生成名为 tiktok_sentiment_analysis_kfp_pipeline.yaml 的文件,如下所示:
“tiktok_sentiment_analysis_kfp_pipeline.yaml”文件
现在你可以将其部署到 Kubeflow 进行测试,或使用 Docker 在本地运行。本指南将重点介绍第二种方式。

步骤 #8:在本地测试 Kubeflow 流水线

要在本地运行 Kubeflow 流水线,你可以使用 DockerRunner 类。这要求你的机器已安装并运行 Docker。

DockerRunner 会在独立的 Docker 容器中执行每个流水线任务。换句话说,它模拟了流水线在真实 Kubeflow 环境中的运行方式。

在虚拟环境激活状态下,先安装所需的 docker 库:

pip install docker 

接着,在项目目录中添加一个 run_pipeline_local.py 文件:

kfp-bright-data-pipeline/
├── .venv/
├── run_pipeline_local.py # <-----------
├── tiktok_sentiment_analysis_kfp_pipeline.py
└── tiktok_sentiment_analysis_kfp_pipeline.yaml

内容如下:

# run_pipeline_local.py
# pip install docker

from kfp import local
from tiktok_sentiment_analysis_kfp_pipeline import tiktok_sentiment_pipeline

# initialize the local Docker runner
local.init(runner=local.DockerRunner())

# Run the pipeline as a Python function call
pipeline_task = tiktok_sentiment_pipeline()

该脚本从 tiktok_sentiment_analysis_kfp_pipeline.py 导入 tiktok_sentiment_pipeline(),并通过本地 Docker runner 运行它,使每个组件在自己的容器中执行。

测试流水线前,请确认 Docker 正在运行。然后执行:

python3 run_pipeline_local.py

执行日志应显示类似下方的成功信息:
注意“SUCCESS”状态
流水线输出将保存在 ./local_outputs 目录中。现在就来查看结果吧!

步骤 #9:查看流水线结果

运行流水线后,打开 ./local_outputs 文件夹。在其中,你会找到当前运行对应的子文件夹,包含所有生成的产物(artifacts)。

首先查看由 collect-tiktok-comments 组件生成的输出数据集:
由 collect-tiktok-comments 组件生成的数据集
如预期所示,该数据集包含通过 Bright Data 的 TikTok Scraper 从两个指定帖子返回的评论。

接着,查看情感分析输出数据集:
注意情感分析输出数据集中的“sentiment”标签
可以看到,每条评论都被情感分析组件标注为了 positivenegativeneutral

大功告成!你已经了解了如何构建一个 Kubeflow 流水线:使用 Bright Data 获取最新网页数据,并对其进行分析。

结论

在本教程中,你理解了为什么 Kubeflow 流水线会受益于通过网页抓取获取的新鲜数据。尤其是,你看到了在流水线中设置一个专门组件来采集来自网络的最新、具备上下文、结构化的数据的重要性。

Bright Data 通过丰富的 Web Scraping APIs 为此提供支持,它们可作为你的流水线的结构化数据源。正如本文演示的那样,借助 Bright Data 的抓取 API,在 Kubeflow 流水线中构建网络数据采集组件非常直接!

立即创建一个免费的 Bright Data 账户,开始探索我们的网络数据解决方案!

支持支付宝等多种支付方式

Antonello Zanini

技术写作

5.5 years experience

Antonello是一名软件工程师,但他更喜欢称自己为技术传教士。通过写作传播知识是他的使命。

Expertise
Web 开发 网页抓取 AI 集成