Ch 13 连接器框架总览¶
面包屑
本书主页 › Part III 数据工程实践 › Ch 13
项目第 1 年 · 核心建设期——连接器框架
本章你将学到¶
- 统一作业入口与源系统路由的设计思想
- 五类连接器(文件/JDBC/API/SaaS/邮件)的定位与协作
- 连接器框架的可扩展性设计
13.1 统一作业入口与源系统路由设计¶
平台有五类数据源,但不是一个源写一个 Glue Job——而是用一个统一作业入口,内部按源类型路由到不同的连接器分支。
C4Component
title 连接器框架 — Component Diagram (管道+插件模式)
Container_Boundary(framework, "连接器框架") {
Component(entry, "统一作业入口", "Glue PySpark", "平台唯一 ETL 作业入口——所有数据源共用一个 Glue Job,配置驱动路由")
Component(router, "源类型路由器", "Python Strategy Pattern", "根据 DynamoDB 配置中的 source_type 字段(file/jdbc/api/saas/email)动态分发到对应连接器分支")
Component(file_conn, "文件连接器", "Glue Python Shell", "SFTP 下载→S3 落地——信号文件事件驱动 + 归档生命周期管理")
Component(jdbc_conn, "JDBC 连接器", "Glue PySpark", "Spark JDBC 分区并行读取——增量水位追踪 + 全量快照 + 自定义 SQL")
Component(api_conn, "API 连接器", "Glue Python Shell", "REST API 分页消费——OAuth/JWT/API Key 认证 + 速率限制 + 指数退避重试")
Component(saas_conn, "SaaS 连接器", "Glue Python Shell", "Salesforce Bulk API 批量抽取——双向任务监控 + 变更数据捕获")
Component(email_conn, "邮件连接器", "Glue Python Shell", "IMAP 监听→附件下载→S3 落地——定期扫描 + 已读标记去重")
Component(post_proc, "公共后处理", "Glue PySpark", "所有连接器共享的后处理管道:标准化→质检(PyDeequ)→脱敏→代理键→入仓")
Component(dlq, "DLQ 死信队列", "S3 + SNS", "连接器失败时写入 S3 dlq/ 路径 + 可选 Slack/Teams 通知 + 人工修复后重放接口")
}
Rel(entry, router, "注入 source_type 配置", "Python fn call")
Rel(router, file_conn, "source_type=file →", "策略分发")
Rel(router, jdbc_conn, "source_type=jdbc →", "策略分发")
Rel(router, api_conn, "source_type=api →", "策略分发")
Rel(router, saas_conn, "source_type=saas →", "策略分发")
Rel(router, email_conn, "source_type=email →", "策略分发")
Rel(file_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
Rel(jdbc_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
Rel(api_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
Rel(saas_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
Rel(email_conn, post_proc, "输出标准化 DataFrame", "Spark DataFrame")
Rel(file_conn, dlq, "失败时写入错误上下文", "S3 PUT")
Rel(jdbc_conn, dlq, "失败时写入错误上下文", "S3 PUT")
Rel(api_conn, dlq, "失败时写入错误上下文", "S3 PUT")
UpdateElementStyle(entry, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(router, $bgColor="#fcf4d6", $fontColor="#161616", $borderColor="#f1c21b")
UpdateElementStyle(file_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(jdbc_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(api_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(saas_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(email_conn, $bgColor="#edf5ff", $fontColor="#161616", $borderColor="#0f62fe")
UpdateElementStyle(post_proc, $bgColor="#defbe6", $fontColor="#161616", $borderColor="#198038")
UpdateElementStyle(dlq, $bgColor="#fff1f1", $fontColor="#161616", $borderColor="#da1e28")
UpdateRelStyle(router, file_conn, $textColor="#0f62fe", $lineColor="#0f62fe")
UpdateRelStyle(router, jdbc_conn, $textColor="#0f62fe", $lineColor="#0f62fe")
UpdateRelStyle(file_conn, dlq, $textColor="#da1e28", $lineColor="#da1e28")
UpdateLayoutConfig($c4ShapeInRow="5", $c4BoundaryInRow="1")
图 13-1 统一作业入口与源系统路由设计
为什么用统一入口而非每源一个 Job¶
| 方案 | 优势 | 劣势 |
|---|---|---|
| 每源一个 Job | 隔离性好 | 代码重复严重,维护 N 份几乎相同的后处理逻辑 |
| 统一入口 + 路由(本书) | 公共逻辑只写一次,新增源类型只加分支 | 单个 Job 复杂度高 |
表 13-1 为什么用统一入口而非每源一个 Job
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
subgraph 统一入口的价值["统一入口的价值"]
V1[公共后处理只写一次<br/>标准化/质检/脱敏/入仓]
V2[新增源类型只需加一个分支]
V3[配置驱动:路由由配置决定]
V4[统一监控:所有源走同一入口]
end
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
class V1,V2,V3,V4 bpSuccess
linkStyle default stroke:#697077,stroke-width:2px
图 13-2 为什么用统一入口而非每源一个 Job
Trade-off
统一入口的代价是"单 Job 复杂度高"——随着源类型增多,入口代码会变大。应对策略是把每个连接器分支抽成独立模块(单一职责),入口只做路由和公共逻辑编排。这是"管道+插件"模式:入口是管道,连接器是插件。
我从"每源一个 Job"改到"统一入口",是企业征信的维护痛催生的。企业征信时每个数据源一个独立 ETL 脚本——最初 3 个源还好,到第 8 个源时,8 个脚本有 8 份几乎相同的后处理逻辑(标准化、质检、入仓)。有一次我要给所有源加一个"行数对账"质检步骤——改了 8 个脚本,每个改法略有不同(因为 8 个开发者的风格不同),花了两天还改出了两个 bug。到 Aurora 我发誓不再重复——统一入口让公共后处理只写一份,加"行数对账"只改一处,所有源同时生效。这个转变在第三个月就兑现了价值:新增一个 API 数据源,开发者只写了一个连接器分支(30 行代码),公共后处理零修改——整个接入花了半天。统一入口的本质是"把重复收敛到一处,把差异隔离到分支"——这是管道+插件模式(M2 分层)在连接器层的落地。
13.2 五类连接器:文件/JDBC/API/SaaS/邮件¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
subgraph 五类连接器["五类连接器"]
F@{ icon: "codicon:file-binary", form: "rounded", label: "文件连接器<br/>SFTP / 共享盘", pos: "b", h: 40 }
J@{ icon: "devicon:microsoftsqlserver", form: "rounded", label: "JDBC 连接器<br/>SQL Server / PostgreSQL", pos: "b", h: 40 }
A@{ icon: "codicon:globe", form: "rounded", label: "API 连接器<br/>REST API", pos: "b", h: 40 }
S@{ icon: "logos:salesforce", form: "rounded", label: "SaaS 连接器<br/>Salesforce", pos: "b", h: 40 }
E@{ icon: "codicon:mail", form: "rounded", label: "邮件连接器<br/>邮件附件", pos: "b", h: 40 }
end
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
class F,J,A,S,E bpProcess
linkStyle default stroke:#697077,stroke-width:2px
图 13-3 五类连接器:文件/JDBC/API/SaaS/邮件
| 连接器 | 摄取方式 | 触发方式 | 详细章节 |
|---|---|---|---|
| 文件连接器 | SFTP 下载 → S3 落地 | 信号文件事件驱动 | Ch 15 |
| JDBC 连接器 | Spark JDBC 拉取 | 定时调度 | Ch 14 |
| API 连接器 | HTTP 请求 + 分页 | 定时调度 | Ch 16 |
| SaaS 连接器 | SaaS 平台 API 批量抽取 | 定时 + 事件混合 | Ch 16 |
| 邮件连接器 | 邮件监听 → 附件下载 | 邮件到达事件 | Ch 16 |
表 13-2 五类连接器:文件/JDBC/API/SaaS/邮件
连接器的统一契约¶
每类连接器虽然摄取方式不同,但都遵循统一契约:
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
subgraph 统一契约["连接器统一契约"]
S1[① 输入:配置参数<br/>源连接信息/加载模式]
S2[② 输出:标准化 DataFrame<br/>统一 schema 供后处理]
S3[③ 副作用:记录元数据<br/>批次标识/行数/水位]
end
S1 --> CONNECTOR[连接器实现]
CONNECTOR --> S2
CONNECTOR --> S3
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
class S1,S3 bpInfo
class CONNECTOR bpProcess
class S2 bpData
linkStyle default stroke:#697077,stroke-width:2px
图 13-4 连接器的统一契约
无论哪个连接器,输出都是标准化的 DataFrame,交给公共后处理模块。这就是"统一入口"能工作的基础——连接器只管"取数据",后处理只管"加工数据",两者通过统一契约解耦。
把这个契约落到代码层面,就是一个抽象基类定义"输入配置 → 输出 DataFrame + 元数据"的统一接口,五类连接器各自实现:
# 示意:连接器统一契约的抽象基类(策略模式)
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame
class Connector(ABC):
"""所有连接器遵循同一契约:输入配置,输出标准化 DataFrame + 元数据。"""
@abstractmethod
def invoke(self, config: dict) -> tuple[DataFrame, dict]:
# 核心意图:连接器只管"取数据",后处理只管"加工数据",二者解耦
# config 由 DynamoDB 任务配置注入(见 Ch 12),含源连接信息/加载模式/水位
...
class JdbcConnector(Connector):
def invoke(self, config: dict) -> tuple[DataFrame, dict]:
df = self._read_partitioned(config) # 见 Ch 14:分区读取 + 水位
meta = {"batch_id": config["batch_id"],
"row_count": df.count(),
"watermark": self._next_watermark()} # 副作用:记录元数据
return df, meta # 输出标准化 schema 供后处理
# 新增一类源(如未来加 Kafka 流式源),只需实现 invoke 契约,无需改后处理
引申
统一契约的设计灵感来自"策略模式"(Strategy Pattern)。连接器是可替换的策略,公共后处理是不变的上下文。新增一种数据源类型(比如未来加 Kafka 流式源),只需要实现统一契约的新策略,无需改动后处理逻辑。
连接器容错设计:DLQ 死信队列模式¶
上游数据源千差万别,连接器失败是常态而非例外——JDBC 连接超时、API 返回 5xx、SFTP 文件被占用、邮件附件格式异常。如果失败后直接抛错中断整个批次,一个连接器的偶发故障会阻塞其他正常连接器。所以连接器框架需要一套统一的容错契约,核心是死信队列(Dead Letter Queue, DLQ)模式:
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
IN[连接器执行] --> TRY{try}
TRY -->|成功| OK[标准化 DataFrame<br/>进入后处理]
TRY -->|失败| CAP[捕获异常<br/>记录错误上下文]
CAP --> DLQ[写入 DLQ<br/>s3://.../dlq/连接器/批次/]
DLQ --> NOTIFY[可选:SNS<br/>→ Slack/Teams 通知]
DLQ -.->|人工修复后| REPLAY[重放接口<br/>从 DLQ 路径重新触发]
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
class IN bpProcess
class TRY bpDecision
class OK bpSuccess
class CAP,DLQ bpError
class NOTIFY bpInfo
class REPLAY bpProcess
linkStyle default stroke:#697077,stroke-width:2px
linkStyle 4 stroke:#8a3ffc,stroke-width:2px,stroke-dasharray:5 4
图 13-5 连接器容错设计:DLQ 死信队列模式
容错契约的四要素:
| 要素 | 做法 | 目的 |
|---|---|---|
| ① 入口捕获 | 连接器入口 try/except 包裹,捕获所有异常 |
单连接器失败不影响同批次其他连接器 |
| ② 错误持久化 | 失败记录 + 错误上下文(stack trace、配置快照、源标识)写入 S3 dlq/{connector_name}/{batch_id}/ |
失败可追溯、可重放 |
| ③ 通知可选 | 通过 SNS → Slack/Teams 推送告警 | 关键失败即时感知,非关键失败仅落 DLQ |
| ④ 重放接口 | 人工修复根因后,从 DLQ 路径重新触发同一批次 | 失败不丢数据,修复后可补跑 |
表 13-3 连接器容错设计:DLQ 死信队列模式
# 示意:连接器容错契约(DLQ 死信队列模式)
def safe_invoke(connector: Connector, config: dict, ctx) -> tuple[DataFrame, dict] | None:
batch_id = config["batch_id"]
try:
return connector.invoke(config) # 核心意图:失败不中断批次
except Exception as e:
ctx.s3.put_object(
Bucket=ctx.dlq_bucket,
Key=f"dlq/{config['source_type']}/{batch_id}/error.json",
Body=json.dumps({"error": str(e), "stack": traceback.format_exc(),
"config_snapshot": config, "ts": now()}))
ctx.sns.notify(f"连接器 {config['source_type']} 批次 {batch_id} 失败:{e}")
return None # 返回 None,调度器继续下一个连接器
Trade-off
DLQ 模式提升了容错性,代价是失败变得"安静"——如果不配套告警和 DLQ 积压监控,失败会悄悄堆积成数据缺口。所以 DLQ 必须配合可观测性:DLQ 路径下的对象数量要纳入监控,超过阈值告警(详见 Ch 49)。这是"容错"与"可观测"不可分割的典型例证。
本章小结¶
- 连接器框架采用"统一作业入口 + 源系统路由"设计:公共后处理只写一次,新增源类型只加分支
- 五类连接器:文件 / JDBC / API / SaaS / 邮件,各有摄取方式和触发方式
- 所有连接器遵循统一契约:输入配置参数 → 输出标准化 DataFrame → 记录元数据(策略模式实现)
- 连接器容错采用 DLQ 死信队列模式:入口捕获 → 错误持久化 → 可选通知 → 重放接口,配合可观测性监控 DLQ 积压
- 框架本质是"管道+插件"模式:入口是管道,连接器是可替换的策略插件
下一章
Ch 14 数据库与 JDBC 连接器 —— 接下来深入第一类连接器:关系型数据库的加载模式与增量水位追踪。