Ch 14 数据库与 JDBC 连接器¶
面包屑
本书主页 › Part III 数据工程实践 › Ch 14
项目第 1 年 · 核心建设期——JDBC连接器
本章你将学到¶
- 关系型数据库的三种加载模式设计:全量/增量/自定义
- 增量水位追踪与变更捕获策略,以及 PySpark JDBC 分区读取与水位管理的伪代码
- JDBC 性能调优(numPartitions/fetchSize/谓词下推)与迟到数据处理(回溯窗口 + 幂等覆盖)
- 驱动依赖治理与运行时类加载的设计
14.1 关系型数据库的加载模式设计:全量/增量/自定义¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
subgraph 三种加载模式["三种加载模式"]
FULL[全量加载<br/>每次拉取全表]
INCR[增量加载<br/>只拉取变更数据]
CUST[自定义加载<br/>按业务逻辑定义查询]
end
FULL -->|适合| F_S[数据量小 / 源表全量覆盖]
INCR -->|适合| I_S[数据量大 / 有可靠变更标识]
CUST -->|适合| C_S[特殊场景 / 复杂查询逻辑]
class FULL,INCR,CUST bpProcess
图 14-1 关系型数据库的加载模式设计:全量/增量/自定义
| 模式 | 机制 | 优势 | 劣势 |
|---|---|---|---|
| 全量 | SELECT * FROM table |
简单可靠、无需水位 | 大表性能差、源压力大 |
| 增量 | WHERE update_time > 上次水位 |
性能好、源压力小 | 依赖可靠水位、需处理删除 |
| 自定义 | 业务定义 SQL 查询 | 灵活、可复杂逻辑 | 维护成本高、难标准化 |
表 14-1 关系型数据库的加载模式设计:全量/增量/自定义
加载模式的选择决策¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TD
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
Q1{数据量?}
Q1 -->|<100万行| FULL[全量加载]
Q1 -->|>100万行| Q2{有可靠变更标识?}
Q2 -->|是| INCR[增量加载]
Q2 -->|否| Q3{能否接受全量?}
Q3 -->|源压力可接受| FULL
Q3 -->|不可接受| CUST[自定义加载<br/>或推动源系统改造]
class Q1,Q2,Q3 bpDecision
class FULL,INCR,CUST bpProcess
图 14-2 加载模式的选择决策
Trade-off
增量加载看似总是更优,但它引入了"水位管理"的复杂性——需要处理水位回退、源表删除、时间戳冲突等边界情况。对于小表(<100 万行),全量加载的简单性远胜于增量加载的性能优势。原则是:不要过早优化——当全量加载的运行时间可接受时,优先选全量。
这个"不要过早优化"原则是我从企业征信的过度优化教训里学的。企业征信时我给所有表都上了增量加载——包括一张只有 500 行的配置表。结果配置表的增量逻辑(水位读取、查询拼接、水位更新)比全量拉取本身还复杂,有次水位更新失败导致配置表增量漏数据,排障花了半天——而全量拉取 500 行只需 2 秒。到 Aurora 我定了一个简单规则:100 万行以下的表默认全量,超过才考虑增量。这个规则让 60% 的小表走全量(简单可靠),40% 的大表走增量(性能优先)——维护成本大幅下降,而性能损失可忽略(小表全量也很快)。优化的标准不是"能不能更快",而是"复杂度是否值得"。
14.2 增量水位追踪与变更捕获策略¶
水位追踪机制¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
sequenceDiagram
participant S as 源数据库
participant C as 水位表
participant G as Glue Job
activate G
G->>C: 读取上次水位 last_watermark
activate C
C-->>G: 返回 2026-06-17 00:00:00
deactivate C
G->>S: SELECT * WHERE updated_at > '2026-06-17 00:00:00'
activate S
S-->>G: 返回增量数据
deactivate S
G->>G: 处理数据
G->>C: 更新水位为本次最大 updated_at
activate C
deactivate C
deactivate G
图 14-3 水位追踪机制
上面的时序图描述了水位机制的数据流,落到 PySpark 代码层面,关键是两点:分区并行读取(大表不能单连接拉)和水位的读写要持久化(放 DynamoDB 而非内存):
# 示意:PySpark JDBC 分区读取 + 水位管理(增量加载)
import boto3, datetime
ddb = boto3.resource("dynamodb").Table("aurora_cdp_watermark")
def load_incremental(spark, config):
# 核心意图①:水位持久化在 DynamoDB,跨批次复用
last_wm = ddb.get_item(Key={"table_name": config["table"]}).get("Item", {}).get("watermark")
lower = last_wm or datetime.datetime(2000, 1, 1) # 首次全量
# 核心意图②:分区并行读取大表,避免单连接瓶颈
df = (spark.read.format("jdbc")
.option("url", config["jdbc_url"])
.option("dbtable", f"(SELECT * FROM {config['table']} WHERE updated_at > '{lower}') t")
.option("partitionColumn", "id") # 均匀分布的数值主键
.option("lowerBound", config["id_min"])
.option("upperBound", config["id_max"])
.option("numPartitions", 32) # 并发连接数
.option("fetchsize", 10000) # 每批拉取行数
.load())
# 核心意图③:读完后用本次最大 updated_at 更新水位(仅当本次有数据)
new_wm = df.agg({"updated_at": "max"}).collect()[0][0]
if new_wm:
ddb.put_item(Item={"table_name": config["table"], "watermark": str(new_wm)})
return df
JDBC 性能调优¶
上面这段伪代码里藏着几个对大表性能至关重要的参数,调优不当会让"10TB 全量迁移"从 24 小时拖成一周。下面是关键参数与调优建议:
| 参数 | 作用 | 调优建议 |
|---|---|---|
numPartitions |
JDBC 并发连接数 | 大表 16-32,小表 4-8;受源库 max_connections 限制,过高会打挂源库 |
partitionColumn |
分区列 | 选均匀分布的数值列(主键最佳),避免数据倾斜导致部分分区空转 |
lowerBound/upperBound |
分区区间 | 用 SELECT MIN(id), MAX(id) 预估,区间过窄导致分区不均 |
fetchsize |
每次网络往返拉取行数 | 大表设 10000-50000,减少 round-trip;过高则 Glue 端内存吃紧 |
| 谓词下推 | 把过滤推到源库执行 | 用 dbtable 包子查询(如上面伪代码),让 WHERE 在源库执行 |
表 14-2 JDBC 性能调优
Trade-off
numPartitions 调高能加速读取,但每个分区会占用源库一个连接——32 个分区 = 32 个并发查询打在源库上。对生产库(尤其医药企业的 SFE/CRM 这种业务在用库),必须与 DBA 协商可接受的并发窗口,常在业务低峰期(凌晨)跑大表全量。这是"读取速度"与"源库稳定"的经典权衡。
迟到数据处理¶
水位机制有一个隐含假设:数据的 updated_at 就是它真正发生变更的时间。但现实中存在迟到数据——源系统的批处理延迟、跨时区时钟漂移、事务提交晚于业务时间,都会导致一条本属于"上一个水位窗口"的数据,在本次才出现 updated_at。如果严格按 updated_at > last_watermark 过滤,这条数据会被永久漏掉。
应对策略是水位回溯窗口——查询时把水位往前推一个安全余量,配合 S3 分区的幂等覆盖:
# 示意:迟到数据处理——回溯窗口 + 分区幂等覆盖
def load_with_lookback(spark, config, lookback_days=3):
last_wm = ddb.get_item(Key={"table_name": config["table"]})["Item"]["watermark"]
# 核心意图:水位向前回溯 N 天,把迟到数据重新纳入
safe_lower = last_wm - datetime.timedelta(days=lookback_days)
df = read_jdbc(spark, config, where=f"updated_at > '{safe_lower}'")
# 核心意图:按业务日期分区幂等覆盖,迟到数据重跑对应分区即覆盖旧值
(df.write.mode("overwrite")
.partitionBy("biz_date") # 按业务日期分区
.parquet(f"s3://ap-aurora-cdp-enriched/{config['domain']}/{config['table']}/"))
return df
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
L1@{ icon: "codicon:calendar", form: "rounded", label: "上次水位 06-15", pos: "b", h: 40 } --> LB[回溯窗口<br/>06-12 起查]
LB --> CATCH[捕获 06-13 的迟到数据]
CATCH --> PART@{ icon: "logos:aws-s3", form: "rounded", label: "按 biz_date=06-13 分区覆盖重写", pos: "b", h: 40 }
PART --> IDEM[幂等:重跑同一分区覆盖旧值<br/>不产生重复]
class L1,LB bpDecision
class CATCH,PART bpProcess
class IDEM bpSuccess
图 14-4 示意:迟到数据处理——回溯窗口 + 分区幂等覆盖
引申
回溯窗口的大小是一个权衡——太小会漏迟到数据,太大会让每次增量都重拉大量已处理数据,浪费算力。3 天是医药行业批处理场景的常见经验值(多数 SFE/CRM 的批处理延迟在 1-2 天内)。回溯窗口必须配合幂等写入(按分区 overwrite)才能工作——否则重拉的数据会和旧数据重复。幂等性是增量加载的基石,详见 Ch 17 的代理键与对账设计。
变更捕获策略对比¶
| 策略 | 机制 | 优势 | 劣势 |
|---|---|---|---|
| 时间戳水位 | WHERE updated_at > last |
简单通用 | 无法捕获硬删除 |
| CDC(变更数据捕获) | 读取数据库日志(如 binlog) | 能捕获 INSERT/UPDATE/DELETE | 需要源系统支持+权限 |
| 触发器+标志表 | 源表加触发器记录变更 | 不依赖日志 | 侵入源系统 |
| 全量比对 | 全量拉取后与上次比对 | 无需源改造 | 性能差 |
表 14-3 变更捕获策略对比
引申
理想的增量捕获是 CDC(Change Data Capture)——通过读取数据库事务日志,精确捕获每条 INSERT/UPDATE/DELETE。AWS DMS 的 CDC 模式、Debezium、Flink CDC 都是这一思路。但 CDC 需要源系统开放日志权限且配置复杂,在医药企业的遗留系统上往往不可行。因此平台以时间戳水位为主、定期全量校准为辅——这是务实选择。
我在项目第一月认真评估过 CDC——它确实能解决"硬删除捕获"这个时间戳水位的死穴(见 §14.2.5)。但评估后发现两个阻断性问题:一是 Aurora 的核心源系统 SQL Server 是遗留版本,binlog 读取需要 DBA 开权限且影响性能,DBA 明确拒绝;二是 Salesforce SaaS 源根本没有"数据库日志"这个概念,CDC 无从谈起。所以 CDC 对 Aurora 不可行不是"我不想用",而是"源系统不支持"。退而求其次选时间戳水位+定期全量校准——虽然不如 CDC 精确,但在遗留系统约束下是唯一可行方案。架构选型不能只看"哪个更先进",要看"源系统允许什么"——这是数据工程务实主义的体现。
处理删除的难题¶
增量加载最大的痛点是硬删除——数据被物理删除后,增量查询查不到它,但数仓里还留着。
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
PROBLEM[问题:源表删除了一行<br/>增量查询查不到<br/>数仓仍保留该行] --> SOLUTION[解决方案]
subgraph SOLUTION["解决方案"]
S1[方案A:定期全量校准<br/>每周跑一次全量,覆盖增量差异]
S2[方案B:软删除标志<br/>推动源系统加 is_deleted 字段]
S3[方案C:CDC<br/>读取删除日志]
end
class PROBLEM bpError
class S1 bpSuccess
class S2 bpProcess
class S3 bpProcess
图 14-5 处理删除的难题
平台采用方案 A(定期全量校准)作为兜底——简单、无侵入、可靠。
14.3 驱动依赖治理与运行时类加载¶
问题¶
JDBC 连接器需要数据库驱动(如 SQL Server 的 mssql-jdbc、 PostgreSQL 的 pgjdbc)。Glue 是托管环境,不能随意安装系统级依赖。
这个问题我在项目第一周就踩了——JDBC 连接器代码写好了,本地测试通过,部署到 Glue 后报 ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver。原因很简单:Glue 的运行时环境只预装了少量常见库,SQL Server 驱动不在其中。最初我尝试把驱动打进 Python wheel 包——但 Glue PySpark 的类加载机制和普通 Python 不同,wheel 里的 JAR 不会被 Spark 的 JVM 类加载器发现。折腾了两天后我才找到正确方案:JAR 注入——把驱动 JAR 放 S3,Glue Job 启动时通过 --extra-jars 参数加载到 Spark 类路径。Glue 的托管性是双刃剑——省了运维但限制了依赖管理自由度,JAR 注入是绕过这个限制的标准做法。
解决方案: JAR 注入¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
JAR@{ icon: "codicon:package", form: "rounded", label: "数据库驱动 JAR<br/>打包在 CI 中", pos: "b", h: 40 } -->|上传| S3@{ icon: "logos:aws-s3", form: "rounded", label: "S3 tooling 桶", pos: "b", h: 40 }
S3 -->|Glue Job 启动时下载| GLUE@{ icon: "logos:aws-glue", form: "rounded", label: "Glue 运行时类路径", pos: "b", h: 40 }
GLUE -->|Spark 加载| JDBC@{ icon: "devicon:apachespark", form: "rounded", label: "Spark JDBC 连接源库", pos: "b", h: 40 }
class JAR bpProcess
class S3 bpData
class GLUE bpProcess
class JDBC bpSuccess
图 14-6 解决方案: J...
| 设计要点 | 说明 |
|---|---|
| JAR 版本化 | 每个驱动有版本号,CI 打包时记录 |
| 运行时注入 | Glue Job 启动时从 S3 下载所需 JAR 到类路径 |
| 依赖隔离 | 不同源可能需要不同版本驱动,通过配置指定 |
| 安全审计 | JAR 来源可控(非随意从公网下载) |
表 14-4 解决方案: JAR 注入
表里"依赖隔离"这一行是我踩过坑才加的。项目第二年接了一个 PostgreSQL 新源——开发者直接把 pgjdbc 的最新版 JAR 传上去了,结果和另一个旧版 PG 源的驱动冲突(两个版本的驱动类名相同但行为不同),导致旧源的连接器报诡异的 SSL 错误。排查了一天才发现是版本冲突。修复方案是给每个 JAR 加版本号后缀(mssql-jdbc-8.4.1.jar/mssql-jdbc-12.2.0.jar),配置里指定用哪个版本——不同源可以引用不同版本,互不冲突。依赖管理最怕"全局唯一版本"假设——现实是不同源需要不同版本,必须隔离。
Trade-off
JAR 注入比"把驱动打进 Glue Python 包"更灵活——可以按需加载不同版本驱动,不需要重新打包整个框架。代价是增加了运行时依赖管理的复杂度(需确保 S3 上的 JAR 可用)。我在第一年还踩过一个坑——S3 上的 JAR 被误删(生命周期策略配错),所有 JDBC 连接器同时挂。从那以后我把 JAR 路径加了"防删除"标记(S3 Object Lock),并纳入备份。运行时依赖的可用性也是 SLA 的一部分——JAR 丢了,连接器就停了,和数据库挂了一样严重。
本章小结¶
- 三种加载模式:全量(简单可靠)/ 增量(性能好但需水位)/ 自定义(灵活但难标准化)——小表优先全量
- 增量水位追踪以时间戳水位为主,水位持久化在 DynamoDB;PySpark 通过
numPartitions/partitionColumn分区并行读取大表 - JDBC 性能调优核心是
numPartitions/fetchsize/谓词下推,但需与源库 DBA 协商并发窗口避免打挂源库 - 迟到数据用回溯窗口 + 分区幂等覆盖处理,幂等性是增量加载的基石
- 定期全量校准兜底删除问题;CDC 是理想方案但遗留系统常不可行
- 驱动依赖通过 JAR 注入解决:JAR 版本化存 S3,Glue 启动时加载到类路径,实现按需加载和依赖隔离
下一章
Ch 15 文件与 S3 连接器 —— 接下来看第二类连接器:文件源的事件驱动摄取与 S3 归档策略。