Ch 15 文件与 S3 连接器¶
面包屑
本书主页 › Part III 数据工程实践 › Ch 15
项目第 1 年 · 核心建设期——文件连接器
本章你将学到¶
- 文件源摄取与信号文件事件驱动触发机制
- S3 事件触发与归档生命周期策略
- 文件格式支持与扩展点设计(含 FormatParser 抽象基类 + 格式注册表伪代码)
- 编码检测与处理(UTF-8 vs GBK 中文数据痛点)
- 文件大小限制与多文件数据集处理
15.1 文件源摄取与信号文件事件驱动触发¶
信号文件协议¶
文件源(如 SFTP)的数据到达是异步的——供应商可能在任意时间推送文件。平台通过信号文件机制实现事件驱动触发:
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
sequenceDiagram
participant V as 供应商
participant SFTP as SFTP 服务器
participant S3 as S3 Landing
participant L as Lambda
participant SF as Step Functions
activate V
V->>SFTP: 上传数据文件 data.csv
activate SFTP
V->>SFTP: 上传信号文件 data.done
Note over SFTP: 信号文件表示"数据已就绪"
SFTP->>S3: 同步到 S3 Landing
activate S3
deactivate SFTP
S3->>L: S3 事件通知(信号文件到达)
deactivate S3
activate L
L->>L: 检测到信号文件 → 触发处理
L->>SF: 启动摄取状态机
deactivate L
activate SF
SF->>S3: 读取数据文件并处理
activate S3
deactivate SF
deactivate S3
deactivate V
图 15-1 信号文件协议
为什么用信号文件而非直接检测数据文件¶
| 方案 | 问题 |
|---|---|
| 直接检测数据文件到达 | 大文件上传中途会触发事件 → 处理不完整文件 → 失败 |
| 信号文件协议 | 数据文件先传完,再传一个小的信号文件 → 信号文件到达 = 数据完整就绪 |
表 15-1 为什么用信号文件而非直接检测数据文件
引申
信号文件是分布式系统中"两阶段提交"的轻量版——先用大文件传数据,再用小文件传"就绪信号"。这避免了"处理到一半的文件"问题。这种模式在 EDI(电子数据交换)和银行业数据交换中非常常见。
这个设计不是我从理论推导的,而是企业征信的教训逼出来的。企业征信时我直接检测数据文件到达——结果有次供应商传一个 2GB 的 CSV,S3 在文件传到 30% 时就触发了 ObjectCreated 事件,Lambda 读取了一个不完整的文件,处理到一半报"列数不对",整个批次失败。更糟的是 Lambda 重试时文件还没传完,连续失败三次进了 DLQ——等文件真正传完时,已经没人重新触发了。这次事故让我学会了信号文件协议——供应商传完数据文件后,再传一个空的 .done 信号文件,Lambda 只对信号文件触发。信号文件是"上传完成"的可靠语义——S3 的 ObjectCreated 事件只能告诉你"有东西在传",信号文件才能告诉你"传完了"。
归档与清理¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
S3_LANDING@{ icon: "logos:aws-s3", form: "rounded", label: "S3 Landing", pos: "b", h: 48 } -->|处理成功后| ARCHIVE[归档目录<br/>按日期组织]
S3_LANDING -->|处理失败| QUARANTINE[隔离目录<br/>待人工介入]
ARCHIVE -->|生命周期规则| COLD[冷存储/删除<br/>按保留策略]
class S3_LANDING bpData
class ARCHIVE bpData
class QUARANTINE bpError
class COLD bpExternal
图 15-2 归档与清理
归档与清理这个设计,是企业征信"Landing 区塞满"教训催生的。企业征信时处理完的文件留在 Landing 区不清理——最初想着"留着排障方便",结果三个月后 Landing 区塞了上万个文件,S3 LIST 操作变慢(S3 的前缀扫描在海量文件下性能急剧下降),而且新文件和旧文件混在一起,排障时根本找不到"最近哪批数据"。到 Aurora 我定了"处理完立即归档"的规则——成功处理的文件移到 archive/ 目录按日期组织,失败的移到 quarantine/ 隔离目录待人工介入。Landing 区永远只保留"待处理"的文件——LIST 操作快了,排障也清晰了。
隔离目录(quarantine)是个容易被忽视但很重要的设计——处理失败的文件不能删(要留证据排障),也不能留在 Landing(会重复触发处理)。移到隔离目录既保留了文件供排障,又不会再触发 Lambda。失败文件的处理和成功文件一样需要明确策略——不能"失败了就不管了"。
15.2 S3 事件触发与归档策略¶
S3 事件通知机制¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
subgraph S3事件通知链路["S3 事件通知链路"]
S3@{ icon: "logos:aws-s3", form: "rounded", label: "S3 ObjectCreated 事件", pos: "b", h: 48 } --> SQS@{ icon: "logos:aws-sqs", form: "rounded", label: "SQS 队列<br/>缓冲事件", pos: "b", h: 48 }
SQS --> LAMBDA@{ icon: "logos:aws-lambda", form: "rounded", label: "Lambda 消费", pos: "b", h: 48 }
LAMBDA -->|过滤信号文件| TRIGGER@{ icon: "logos:aws-step-functions", form: "rounded", label: "触发 Step Functions", pos: "b", h: 48 }
LAMBDA -->|过滤非信号文件| IGNORE[忽略]
end
class S3 bpData
class SQS bpData
class LAMBDA bpProcess
class TRIGGER bpProcess
class IGNORE bpProcess
图 15-3 S3 事件通知机制
| 设计要点 | 说明 |
|---|---|
| 事件过滤 | Lambda 只对信号文件事件触发处理,忽略数据文件事件 |
| SQS 缓冲 | 用 SQS 缓冲 S3 事件,避免大量并发文件到达时 Lambda 被打爆 |
| 幂等处理 | 同一文件可能触发多次事件,处理逻辑需幂等 |
| 归档分离 | 处理完后移至归档目录,Landing 区只保留待处理文件 |
表 15-2 S3 事件通知机制
归档生命周期¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8'}}}%%
timeline
title 归档生命周期
section 0-30 天
热存储 : Landing 区 : 频繁访问 : 排障
section 30-90 天
温存储 : 归档区 : 偶尔回溯
section 90 天-1 年
冷存储 : Glacier : 合规归档 : 低成本
section 过期
自动删除 : 清理释放空间
图 15-4 归档生命周期
Trade-off
归档保留期是成本与合规的 trade-off。医药行业 GxP 要求"数据可追溯",但"可追溯"不等于"永久热存储"。我们的策略是:热 30 天(排障用)→ 温 90 天(回溯用)→ 冷 1 年(合规用)→ 删除。这比"全量永久热存"节省 80%+ 存储成本。
这四个周期(30天/90天/1年/删除)不是拍脑袋定的,而是基于排障数据定的。我跑了一个月的排障统计——95% 的排障在数据到达后 7 天内发生("上周的数据怎么不对"),30 天能覆盖 99%。所以热存储 30 天足够排障。温存储 90 天覆盖季度回溯("上季度的数据复核一下")。冷存储 1 年是 GxP 合规底线——审计员最多查 1 年内的原始数据。超过 1 年的,Raw 层的标准化副本已足够(不需要 Landing 的原始格式)。保留周期要用数据说话,不是"越久越好"——永久热存既浪费成本,也让排障时在海量旧文件里找不到了。
15.3 文件格式支持与扩展点设计¶
支持的文件格式¶
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
subgraph 文件格式支持矩阵
direction TB
subgraph 结构化格式
CSV[CSV / TSV<br/>最常见]
JSON[JSON / JSONL<br/>结构化]
PARQUET[Parquet<br/>已标准化]
end
subgraph 电子表格格式
EXCEL[Excel<br/>电子表格]
end
subgraph 遗留格式
FIXED[定长文本<br/>遗留格式]
end
end
CSV -->|解析器| CSVP[CSV Parser]
JSON -->|解析器| JSONP[JSON Parser]
PARQUET -->|解析器| PARQUETP[Parquet Parser]
EXCEL -->|解析器| EXCELP[Excel Parser]
FIXED -->|解析器| FIXEDP[Fixed-width Parser]
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
class CSV,JSON,PARQUET,EXCEL,FIXED bpProcess
class CSVP,JSONP,PARQUETP,EXCELP,FIXEDP bpData
图 15-5 支持的文件格式
| 格式 | 扩展名 | 特点 | 适用场景 | 解析器 |
|---|---|---|---|---|
| CSV / TSV | .csv, .tsv |
文本、通用、人类可读 | 数据交换、日志导出 | CsvParser |
| JSON / JSONL | .json, .jsonl |
结构化、嵌套支持 | API 响应、事件流 | JsonParser |
| Excel | .xlsx, .xls |
电子表格、多 Sheet | 业务报表、手动数据 | ExcelParser |
| Parquet | .parquet |
列式存储、压缩高效 | 大数据处理、数据湖 | ParquetParser |
| 定长文本 | .txt, .dat |
固定宽度列、遗留格式 | 遗留系统导出 | FixedWidthParser |
表 15-3 文件格式支持矩阵
这五种格式不是我一开始就全支持的,而是按实际遇到的源系统逐步加的。CSV 是第一天就有的(最通用);JSON 是第二周加的(API 源的响应格式);Excel 是第一月底加的——因为市场部门习惯用 Excel 发报表,"为什么不能直接读 Excel"是他们最常问的问题;Parquet 是 Raw 层标准化后的输出格式(不是输入);定长文本是第二年才加的——有个遗留系统只能导出定长文本,被迫支持。格式支持是需求驱动的,不是"越多越好"——每加一种格式要实现解析器、测试、维护,只有真实需求才值得加。
扩展点设计¶
新增文件格式的扩展遵循开闭原则:
%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
linkStyle default stroke:#697077,stroke-width:2px,fill:none
NEW[需要支持新格式?] --> IMPLEMENT[实现格式解析器<br/>遵循统一接口]
IMPLEMENT --> REGISTER[注册到格式注册表]
REGISTER --> CONFIG[在任务配置中指定新格式]
CONFIG --> RUNTIME[引擎自动路由到新解析器]
class NEW bpDecision
class IMPLEMENT,REGISTER,CONFIG bpProcess
class RUNTIME bpSuccess
图 15-6 扩展点设计
| 扩展步骤 | 说明 |
|---|---|
| ① 实现解析器 | 遵循统一接口:输入文件路径 → 输出标准化 DataFrame |
| ② 注册 | 在格式注册表中注册新格式标识 |
| ③ 配置 | 任务配置中指定新格式标识 |
| ④ 运行 | 引擎自动路由——无需改动公共后处理逻辑 |
表 15-4 扩展点设计
引申
扩展点设计的关键是"统一接口"。所有格式解析器实现同一个接口(输入文件→输出 DataFrame),引擎只依赖接口而非具体实现。这是"依赖倒置原则"的体现——高层模块(引擎)不依赖低层模块(具体格式解析器),两者都依赖抽象(统一接口)。
把这个统一接口落到代码,就是 FormatParser 抽象基类 + 各格式实现 + 格式注册表——引擎通过注册表按扩展名查找解析器,新增格式只需注册新实现:
# 示意:FormatParser 抽象基类 + 格式注册表
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame
class FormatParser(ABC):
"""统一接口:输入文件路径 → 输出标准化 DataFrame。"""
@abstractmethod
def parse(self, file_path: str, spark) -> DataFrame: ...
class CsvParser(FormatParser):
def parse(self, file_path, spark):
return (spark.read.option("header", True).option("encoding", detect_encoding(file_path))
.csv(file_path)) # 核心意图:编码自动检测(见下)
class ExcelParser(FormatParser):
def parse(self, file_path, spark):
# Excel 需先转 CSV(pandas read_excel → pandas_df → spark_df)
import pandas as pd
pdf = pd.read_excel(file_path, sheet_name=0)
return spark.createDataFrame(pdf)
# 格式注册表:扩展名 → 解析器(新增格式只加一行)
FORMAT_REGISTRY = {".csv": CsvParser(), ".xlsx": ExcelParser(), ".json": JsonParser()}
def parse_file(file_path: str, spark) -> DataFrame:
ext = Path(file_path).suffix.lower()
return FORMAT_REGISTRY[ext].parse(file_path, spark) # 核心意图:引擎只依赖接口,不依赖具体实现
15.4 编码检测与处理:中文数据的隐形陷阱¶
文件连接器最隐蔽的坑是编码——上游系统导出的 CSV 可能是 UTF-8、GBK、GB2312,混用时会乱码或解析失败。这在医药行业尤其常见:医院系统的老旧导出工具常默认 GBK 编码,而平台默认按 UTF-8 读取,结果中文全部变成乱码。
# 示意:编码自动检测(chardet)
import chardet
def detect_encoding(file_path: str) -> str:
with open(file_path, "rb") as f:
raw = f.read(10000) # 读前 10KB 检测即可
result = chardet.detect(raw)
# 核心意图:自动检测编码,避免硬编码 UTF-8 导致中文乱码
return result["encoding"] or "utf-8" # 检测失败回退 UTF-8
| 编码场景 | 问题 | 应对 |
|---|---|---|
| 上游用 GBK,平台按 UTF-8 读 | 中文乱码 | detect_encoding 自动检测 |
| 同一批文件混用编码 | 部分行乱码 | 按文件检测,不假设同批一致 |
| BOM 头(UTF-8 with BOM) | 首列名多出不可见字符 | encoding="utf-8-sig" 处理 BOM |
| Excel 内部编码与导出不同 | pandas 读正常,Spark 读乱码 | 统一用 pandas 中转 + 显式 encoding |
表 15-5 示意:编码自动检测(chardet)
Trade-off
编码检测不是 100% 准确——chardet 对短文本或纯 ASCII 文件可能误判。关键数据应在 Landing 层记录原始编码,Raw 层统一转 UTF-8,后续层不再处理编码问题。这是"在最早层解决问题"的原则——编码问题越往后传,排查越难。
15.5 文件大小限制与多文件数据集处理¶
文件连接器还要处理两类现实问题:超大文件(内存溢出)和多文件数据集(一个逻辑数据集散落在多个文件)。
| 场景 | 问题 | 应对 |
|---|---|---|
| 超大文件(>1GB CSV) | 单次读取内存溢出 | Spark 天然分块读 CSV;非 Spark 场景用分块流式读取 |
| 多文件数据集 | 一个表的数据分成 part-001.csv、part-002.csv... |
Spark spark.read.csv("s3://.../dir/") 自动读目录下所有文件合并 |
| 异构文件混合 | 目录里混有 CSV 和 Excel | 信号文件协议(§15.1)按目录触发,解析器按扩展名分发 |
| 空文件/损坏文件 | 解析失败 | 连接器容错(Ch 13 DLQ) |
表 15-6 文件大小限制与多文件数据集处理
"超大文件"这一行是我在企业征信踩过 OOM 坑的。当时有个供应商传了一个 5GB 的 CSV,Python Shell 用 pandas 单机读——直接内存溢出(pandas 要把整个文件加载到内存)。换成 Spark 后,Spark 天然分块读 CSV(按行分割到多个 partition),5GB 文件轻松处理。Spark 的价值不只是"分布式计算",更是"分块读取"——任何超大数据集在单机内存装不下时,Spark 是默认选择。但 Spark 也不是万能——Excel 文件 Spark 不能直接读(要 pandas 中转),所以 Excel 超大文件是真正的难题。我的解法是:要求供应商把大 Excel 拆成多个 CSV(Excel 本来就不适合大数据量),这比"硬啃大 Excel"更务实。
"空文件/损坏文件"这一行也值得强调——这是文件源最常见的故障。我在第一年统计过,文件源的失败有 40% 是"空文件或损坏文件"(供应商传了空文件、CSV 被截断、Excel 格式损坏)。这些失败靠 Ch 13 DLQ 兜底——失败写入 DLQ,通知供应商重传。文件源的数据质量天然不如数据库源——数据库有 schema 约束,文件没有——所以文件连接器的容错设计要更厚。
# 示意:多文件数据集处理(Spark 自动合并目录下文件)
def parse_dataset(dir_path: str, spark) -> DataFrame:
# 核心意图:读目录而非单文件,Spark 自动合并多文件为一个 DataFrame
files = list_s3_objects(dir_path) # 列出目录下所有文件
if not files:
raise EmptyDatasetError(f"{dir_path} 无文件") # 空数据集告警([Ch 49](./49-日志-监控-审计与告警.md))
ext = Path(files[0]).suffix.lower() # 假设同目录同格式
if any(Path(f).suffix.lower() != ext for f in files):
raise HeterogeneousFilesError("目录内格式不一致") # 异构文件阻断
return FORMAT_REGISTRY[ext].parse(dir_path + "/*", spark) # Spark 通配读多文件
本章小结¶
- 文件源通过信号文件协议实现事件驱动触发:数据文件先传完,信号文件到达表示就绪——避免处理不完整文件
- S3 事件通知经 SQS 缓冲后由 Lambda 消费,过滤信号文件触发处理;归档分热/温/冷三级生命周期,平衡成本与合规
- 文件格式支持通过"统一接口 + 格式注册表"实现开闭原则扩展——新增格式只需实现解析器并注册
下一章
Ch 16 API、SaaS 与邮件连接器 —— 接下来看剩余三类连接器:API/SaaS/邮件的通用设计与实战。