跳转至

Ch 13 连接器框架总览

面包屑

本书主页Part III 数据工程实践 › Ch 13

项目第 1 年 · 核心建设期——连接器框架


本章你将学到

  • 统一作业入口与源系统路由的设计思想
  • 五类连接器(文件/JDBC/API/SaaS/邮件)的定位与协作
  • 连接器框架的可扩展性设计

13.1 统一作业入口与源系统路由设计

平台有五类数据源,但不是一个源写一个 Glue Job——而是用一个统一作业入口,内部按源类型路由到不同的连接器分支。

C4Component
    title 连接器框架 — Component Diagram (管道+插件模式)

    Container_Boundary(framework, "连接器框架") {
        Component(entry, "统一作业入口", "Glue PySpark", "平台唯一 ETL 作业入口——所有数据源共用一个 Glue Job,配置驱动路由")
        Component(router, "源类型路由器", "Python Strategy Pattern", "根据 DynamoDB 配置中的 source_type 字段(file/jdbc/api/saas/email)动态分发到对应连接器分支")
        Component(file_conn, "文件连接器", "Glue Python Shell", "SFTP 下载→S3 落地——信号文件事件驱动 + 归档生命周期管理")
        Component(jdbc_conn, "JDBC 连接器", "Glue PySpark", "Spark JDBC 分区并行读取——增量水位追踪 + 全量快照 + 自定义 SQL")
        Component(api_conn, "API 连接器", "Glue Python Shell", "REST API 分页消费——OAuth/JWT/API Key 认证 + 速率限制 + 指数退避重试")
        Component(saas_conn, "SaaS 连接器", "Glue Python Shell", "Salesforce Bulk API 批量抽取——双向任务监控 + 变更数据捕获")
        Component(email_conn, "邮件连接器", "Glue Python Shell", "IMAP 监听→附件下载→S3 落地——定期扫描 + 已读标记去重")
        Component(post_proc, "公共后处理", "Glue PySpark", "所有连接器共享的后处理管道:标准化→质检(PyDeequ)→脱敏→代理键→入仓")
        Component(dlq, "DLQ 死信队列", "S3 + SNS", "连接器失败时写入 S3 dlq/ 路径 + 可选 Slack/Teams 通知 + 人工修复后重放接口")
    }

    Rel(entry, router, "注入 source_type 配置", "Python fn call")
    Rel(router, file_conn, "source_type=file →", "策略分发")
    Rel(router, jdbc_conn, "source_type=jdbc →", "策略分发")
    Rel(router, api_conn, "source_type=api →", "策略分发")
    Rel(router, saas_conn, "source_type=saas →", "策略分发")
    Rel(router, email_conn, "source_type=email →", "策略分发")
    Rel(file_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
    Rel(jdbc_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
    Rel(api_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
    Rel(saas_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
    Rel(email_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
    Rel(file_conn, dlq, "失败时写入错误上下文", "S3 PUT")
    Rel(jdbc_conn, dlq, "失败时写入错误上下文", "S3 PUT")
    Rel(api_conn, dlq, "失败时写入错误上下文", "S3 PUT")

    UpdateElementStyle(entry, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(router, $bgColor="#fcf4d6", $fontColor="#161616", $borderColor="#f1c21b")
    UpdateElementStyle(file_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(jdbc_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(api_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(saas_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(email_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
    UpdateElementStyle(post_proc, $bgColor="#defbe6", $fontColor="#161616", $borderColor="#198038")
    UpdateElementStyle(dlq, $bgColor="#fff1f1", $fontColor="#161616", $borderColor="#da1e28")

    UpdateRelStyle(router, file_conn, $textColor="#0f62fe", $lineColor="#0f62fe")
    UpdateRelStyle(router, jdbc_conn, $textColor="#0f62fe", $lineColor="#0f62fe")
    UpdateRelStyle(file_conn, dlq, $textColor="#da1e28", $lineColor="#da1e28")

    UpdateLayoutConfig($c4ShapeInRow="5", $c4BoundaryInRow="1")

图 13-1 统一作业入口与源系统路由设计

为什么用统一入口而非每源一个 Job

方案 优势 劣势
每源一个 Job 隔离性好 代码重复严重,维护 N 份几乎相同的后处理逻辑
统一入口 + 路由(本书) 公共逻辑只写一次,新增源类型只加分支 单个 Job 复杂度高

表 13-1 为什么用统一入口而非每源一个 Job

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    subgraph 统一入口的价值["统一入口的价值"]
        V1[公共后处理只写一次<br/>标准化/质检/脱敏/入仓]
        V2[新增源类型只需加一个分支]
        V3[配置驱动:路由由配置决定]
        V4[统一监控:所有源走同一入口]
    end
classDef bpSuccess  fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616

class V1,V2,V3,V4 bpSuccess
linkStyle default stroke:#697077,stroke-width:2px

图 13-2 为什么用统一入口而非每源一个 Job

Trade-off

统一入口的代价是"单 Job 复杂度高"——随着源类型增多,入口代码会变大。应对策略是把每个连接器分支抽成独立模块(单一职责),入口只做路由和公共逻辑编排。这是"管道+插件"模式:入口是管道,连接器是插件。

我从"每源一个 Job"改到"统一入口",是企业征信的维护痛催生的。企业征信时每个数据源一个独立 ETL 脚本——最初 3 个源还好,到第 8 个源时,8 个脚本有 8 份几乎相同的后处理逻辑(标准化、质检、入仓)。有一次我要给所有源加一个"行数对账"质检步骤——改了 8 个脚本,每个改法略有不同(因为 8 个开发者的风格不同),花了两天还改出了两个 bug。到 Aurora 我发誓不再重复——统一入口让公共后处理只写一份,加"行数对账"只改一处,所有源同时生效。这个转变在第三个月就兑现了价值:新增一个 API 数据源,开发者只写了一个连接器分支(30 行代码),公共后处理零修改——整个接入花了半天。统一入口的本质是"把重复收敛到一处,把差异隔离到分支"——这是管道+插件模式(M2 分层)在连接器层的落地。


13.2 五类连接器:文件/JDBC/API/SaaS/邮件

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    subgraph 五类连接器["五类连接器"]
        F@{ icon: "codicon:file-binary", form: "rounded", label: "文件连接器<br/>SFTP / 共享盘", pos: "b", h: 40 }
        J@{ icon: "devicon:microsoftsqlserver", form: "rounded", label: "JDBC 连接器<br/>SQL Server / PostgreSQL", pos: "b", h: 40 }
        A@{ icon: "codicon:globe", form: "rounded", label: "API 连接器<br/>REST API", pos: "b", h: 40 }
        S@{ icon: "logos:salesforce", form: "rounded", label: "SaaS 连接器<br/>Salesforce", pos: "b", h: 40 }
        E@{ icon: "codicon:mail", form: "rounded", label: "邮件连接器<br/>邮件附件", pos: "b", h: 40 }
    end
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616

class F,J,A,S,E bpProcess
linkStyle default stroke:#697077,stroke-width:2px

图 13-3 五类连接器:文件/JDBC/API/SaaS/邮件

连接器 摄取方式 触发方式 详细章节
文件连接器 SFTP 下载 → S3 落地 信号文件事件驱动 Ch 15
JDBC 连接器 Spark JDBC 拉取 定时调度 Ch 14
API 连接器 HTTP 请求 + 分页 定时调度 Ch 16
SaaS 连接器 SaaS 平台 API 批量抽取 定时 + 事件混合 Ch 16
邮件连接器 邮件监听 → 附件下载 邮件到达事件 Ch 16

表 13-2 五类连接器:文件/JDBC/API/SaaS/邮件

连接器的统一契约

每类连接器虽然摄取方式不同,但都遵循统一契约

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    subgraph 统一契约["连接器统一契约"]
        S1[① 输入:配置参数<br/>源连接信息/加载模式]
        S2[② 输出:标准化 DataFrame<br/>统一 schema 供后处理]
        S3[③ 副作用:记录元数据<br/>批次标识/行数/水位]
    end

    S1 --> CONNECTOR[连接器实现]
    CONNECTOR --> S2
    CONNECTOR --> S3
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData     fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpInfo     fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616

class S1,S3 bpInfo
class CONNECTOR bpProcess
class S2 bpData
linkStyle default stroke:#697077,stroke-width:2px

图 13-4 连接器的统一契约

无论哪个连接器,输出都是标准化的 DataFrame,交给公共后处理模块。这就是"统一入口"能工作的基础——连接器只管"取数据",后处理只管"加工数据",两者通过统一契约解耦。

把这个契约落到代码层面,就是一个抽象基类定义"输入配置 → 输出 DataFrame + 元数据"的统一接口,五类连接器各自实现:

# 示意:连接器统一契约的抽象基类(策略模式)
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame

class Connector(ABC):
    """所有连接器遵循同一契约:输入配置,输出标准化 DataFrame + 元数据。"""

    @abstractmethod
    def invoke(self, config: dict) -> tuple[DataFrame, dict]:
        # 核心意图:连接器只管"取数据",后处理只管"加工数据",二者解耦
        # config 由 DynamoDB 任务配置注入(见 Ch 12),含源连接信息/加载模式/水位
        ...

class JdbcConnector(Connector):
    def invoke(self, config: dict) -> tuple[DataFrame, dict]:
        df = self._read_partitioned(config)          # 见 Ch 14:分区读取 + 水位
        meta = {"batch_id": config["batch_id"],
                "row_count": df.count(),
                "watermark": self._next_watermark()}  # 副作用:记录元数据
        return df, meta                               # 输出标准化 schema 供后处理

# 新增一类源(如未来加 Kafka 流式源),只需实现 invoke 契约,无需改后处理

引申

统一契约的设计灵感来自"策略模式"(Strategy Pattern)。连接器是可替换的策略,公共后处理是不变的上下文。新增一种数据源类型(比如未来加 Kafka 流式源),只需要实现统一契约的新策略,无需改动后处理逻辑。

连接器容错设计:DLQ 死信队列模式

上游数据源千差万别,连接器失败是常态而非例外——JDBC 连接超时、API 返回 5xx、SFTP 文件被占用、邮件附件格式异常。如果失败后直接抛错中断整个批次,一个连接器的偶发故障会阻塞其他正常连接器。所以连接器框架需要一套统一的容错契约,核心是死信队列(Dead Letter Queue, DLQ)模式:

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    IN[连接器执行] --> TRY{try}
    TRY -->|成功| OK[标准化 DataFrame<br/>进入后处理]
    TRY -->|失败| CAP[捕获异常<br/>记录错误上下文]
    CAP --> DLQ[写入 DLQ<br/>s3://.../dlq/连接器/批次/]
    DLQ --> NOTIFY[可选:SNS<br/>→ Slack/Teams 通知]
    DLQ -.->|人工修复后| REPLAY[重放接口<br/>从 DLQ 路径重新触发]
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpSuccess  fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpError    fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpInfo     fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616

class IN bpProcess
class TRY bpDecision
class OK bpSuccess
class CAP,DLQ bpError
class NOTIFY bpInfo
class REPLAY bpProcess
linkStyle default stroke:#697077,stroke-width:2px
linkStyle 4 stroke:#8a3ffc,stroke-width:2px,stroke-dasharray:5 4

图 13-5 连接器容错设计:DLQ 死信队列模式

容错契约的四要素:

要素 做法 目的
① 入口捕获 连接器入口 try/except 包裹,捕获所有异常 单连接器失败不影响同批次其他连接器
② 错误持久化 失败记录 + 错误上下文(stack trace、配置快照、源标识)写入 S3 dlq/{connector_name}/{batch_id}/ 失败可追溯、可重放
③ 通知可选 通过 SNS → Slack/Teams 推送告警 关键失败即时感知,非关键失败仅落 DLQ
④ 重放接口 人工修复根因后,从 DLQ 路径重新触发同一批次 失败不丢数据,修复后可补跑

表 13-3 连接器容错设计:DLQ 死信队列模式

# 示意:连接器容错契约(DLQ 死信队列模式)
def safe_invoke(connector: Connector, config: dict, ctx) -> tuple[DataFrame, dict] | None:
    batch_id = config["batch_id"]
    try:
        return connector.invoke(config)                  # 核心意图:失败不中断批次
    except Exception as e:
        ctx.s3.put_object(
            Bucket=ctx.dlq_bucket,
            Key=f"dlq/{config['source_type']}/{batch_id}/error.json",
            Body=json.dumps({"error": str(e), "stack": traceback.format_exc(),
                             "config_snapshot": config, "ts": now()}))
        ctx.sns.notify(f"连接器 {config['source_type']} 批次 {batch_id} 失败:{e}")
        return None                                      # 返回 None,调度器继续下一个连接器

Trade-off

DLQ 模式提升了容错性,代价是失败变得"安静"——如果不配套告警和 DLQ 积压监控,失败会悄悄堆积成数据缺口。所以 DLQ 必须配合可观测性:DLQ 路径下的对象数量要纳入监控,超过阈值告警(详见 Ch 49)。这是"容错"与"可观测"不可分割的典型例证。


本章小结

  • 连接器框架采用"统一作业入口 + 源系统路由"设计:公共后处理只写一次,新增源类型只加分支
  • 五类连接器:文件 / JDBC / API / SaaS / 邮件,各有摄取方式和触发方式
  • 所有连接器遵循统一契约:输入配置参数 → 输出标准化 DataFrame → 记录元数据(策略模式实现)
  • 连接器容错采用 DLQ 死信队列模式:入口捕获 → 错误持久化 → 可选通知 → 重放接口,配合可观测性监控 DLQ 积压
  • 框架本质是"管道+插件"模式:入口是管道,连接器是可替换的策略插件

下一章

Ch 14 数据库与 JDBC 连接器 —— 接下来深入第一类连接器:关系型数据库的加载模式与增量水位追踪。

评论