跳转至

Ch 15 文件与 S3 连接器

面包屑

本书主页Part III 数据工程实践 › Ch 15

项目第 1 年 · 核心建设期——文件连接器


本章你将学到

  • 文件源摄取与信号文件事件驱动触发机制
  • S3 事件触发与归档生命周期策略
  • 文件格式支持与扩展点设计(含 FormatParser 抽象基类 + 格式注册表伪代码)
  • 编码检测与处理(UTF-8 vs GBK 中文数据痛点)
  • 文件大小限制与多文件数据集处理

15.1 文件源摄取与信号文件事件驱动触发

信号文件协议

文件源(如 SFTP)的数据到达是异步的——供应商可能在任意时间推送文件。平台通过信号文件机制实现事件驱动触发:

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
sequenceDiagram
    participant V as 供应商
    participant SFTP as SFTP 服务器
    participant S3 as S3 Landing
    participant L as Lambda
    participant SF as Step Functions

    activate V
    V->>SFTP: 上传数据文件 data.csv
    activate SFTP
    V->>SFTP: 上传信号文件 data.done
    Note over SFTP: 信号文件表示"数据已就绪"
    SFTP->>S3: 同步到 S3 Landing
    activate S3
    deactivate SFTP
    S3->>L: S3 事件通知(信号文件到达)
    deactivate S3
    activate L
    L->>L: 检测到信号文件 → 触发处理
    L->>SF: 启动摄取状态机
    deactivate L
    activate SF
    SF->>S3: 读取数据文件并处理
    activate S3
    deactivate SF
    deactivate S3
    deactivate V

图 15-1 信号文件协议

为什么用信号文件而非直接检测数据文件

方案 问题
直接检测数据文件到达 大文件上传中途会触发事件 → 处理不完整文件 → 失败
信号文件协议 数据文件先传完,再传一个小的信号文件 → 信号文件到达 = 数据完整就绪

表 15-1 为什么用信号文件而非直接检测数据文件

引申

信号文件是分布式系统中"两阶段提交"的轻量版——先用大文件传数据,再用小文件传"就绪信号"。这避免了"处理到一半的文件"问题。这种模式在 EDI(电子数据交换)和银行业数据交换中非常常见。

这个设计不是我从理论推导的,而是企业征信的教训逼出来的。企业征信时我直接检测数据文件到达——结果有次供应商传一个 2GB 的 CSV,S3 在文件传到 30% 时就触发了 ObjectCreated 事件,Lambda 读取了一个不完整的文件,处理到一半报"列数不对",整个批次失败。更糟的是 Lambda 重试时文件还没传完,连续失败三次进了 DLQ——等文件真正传完时,已经没人重新触发了。这次事故让我学会了信号文件协议——供应商传完数据文件后,再传一个空的 .done 信号文件,Lambda 只对信号文件触发。信号文件是"上传完成"的可靠语义——S3 的 ObjectCreated 事件只能告诉你"有东西在传",信号文件才能告诉你"传完了"。

归档与清理

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
    classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
    classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
    classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
    classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
    classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
    classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
    linkStyle default stroke:#697077,stroke-width:2px,fill:none

    S3_LANDING@{ icon: "logos:aws-s3", form: "rounded", label: "S3 Landing", pos: "b", h: 48 } -->|处理成功后| ARCHIVE[归档目录<br/>按日期组织]
    S3_LANDING -->|处理失败| QUARANTINE[隔离目录<br/>待人工介入]
    ARCHIVE -->|生命周期规则| COLD[冷存储/删除<br/>按保留策略]

    class S3_LANDING bpData
    class ARCHIVE bpData
    class QUARANTINE bpError
    class COLD bpExternal

图 15-2 归档与清理

归档与清理这个设计,是企业征信"Landing 区塞满"教训催生的。企业征信时处理完的文件留在 Landing 区不清理——最初想着"留着排障方便",结果三个月后 Landing 区塞了上万个文件,S3 LIST 操作变慢(S3 的前缀扫描在海量文件下性能急剧下降),而且新文件和旧文件混在一起,排障时根本找不到"最近哪批数据"。到 Aurora 我定了"处理完立即归档"的规则——成功处理的文件移到 archive/ 目录按日期组织,失败的移到 quarantine/ 隔离目录待人工介入。Landing 区永远只保留"待处理"的文件——LIST 操作快了,排障也清晰了。

隔离目录(quarantine)是个容易被忽视但很重要的设计——处理失败的文件不能删(要留证据排障),也不能留在 Landing(会重复触发处理)。移到隔离目录既保留了文件供排障,又不会再触发 Lambda。失败文件的处理和成功文件一样需要明确策略——不能"失败了就不管了"。


15.2 S3 事件触发与归档策略

S3 事件通知机制

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
    classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
    classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
    classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
    classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
    classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
    classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
    classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
    linkStyle default stroke:#697077,stroke-width:2px,fill:none

    subgraph S3事件通知链路["S3 事件通知链路"]
        S3@{ icon: "logos:aws-s3", form: "rounded", label: "S3 ObjectCreated 事件", pos: "b", h: 48 } --> SQS@{ icon: "logos:aws-sqs", form: "rounded", label: "SQS 队列<br/>缓冲事件", pos: "b", h: 48 }
        SQS --> LAMBDA@{ icon: "logos:aws-lambda", form: "rounded", label: "Lambda 消费", pos: "b", h: 48 }
        LAMBDA -->|过滤信号文件| TRIGGER@{ icon: "logos:aws-step-functions", form: "rounded", label: "触发 Step Functions", pos: "b", h: 48 }
        LAMBDA -->|过滤非信号文件| IGNORE[忽略]
    end

    class S3 bpData
    class SQS bpData
    class LAMBDA bpProcess
    class TRIGGER bpProcess
    class IGNORE bpProcess

图 15-3 S3 事件通知机制

设计要点 说明
事件过滤 Lambda 只对信号文件事件触发处理,忽略数据文件事件
SQS 缓冲 用 SQS 缓冲 S3 事件,避免大量并发文件到达时 Lambda 被打爆
幂等处理 同一文件可能触发多次事件,处理逻辑需幂等
归档分离 处理完后移至归档目录,Landing 区只保留待处理文件

表 15-2 S3 事件通知机制

归档生命周期

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8'}}}%%
timeline
    title 归档生命周期
    section 0-30 天
        热存储 : Landing 区 : 频繁访问 : 排障
    section 30-90 天
        温存储 : 归档区 : 偶尔回溯
    section 90 天-1 年
        冷存储 : Glacier : 合规归档 : 低成本
    section 过期
        自动删除 : 清理释放空间

图 15-4 归档生命周期

Trade-off

归档保留期是成本与合规的 trade-off。医药行业 GxP 要求"数据可追溯",但"可追溯"不等于"永久热存储"。我们的策略是:热 30 天(排障用)→ 温 90 天(回溯用)→ 冷 1 年(合规用)→ 删除。这比"全量永久热存"节省 80%+ 存储成本。

这四个周期(30天/90天/1年/删除)不是拍脑袋定的,而是基于排障数据定的。我跑了一个月的排障统计——95% 的排障在数据到达后 7 天内发生("上周的数据怎么不对"),30 天能覆盖 99%。所以热存储 30 天足够排障。温存储 90 天覆盖季度回溯("上季度的数据复核一下")。冷存储 1 年是 GxP 合规底线——审计员最多查 1 年内的原始数据。超过 1 年的,Raw 层的标准化副本已足够(不需要 Landing 的原始格式)。保留周期要用数据说话,不是"越久越好"——永久热存既浪费成本,也让排障时在海量旧文件里找不到了。


15.3 文件格式支持与扩展点设计

支持的文件格式

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
    subgraph 文件格式支持矩阵
        direction TB
        subgraph 结构化格式
            CSV[CSV / TSV<br/>最常见]
            JSON[JSON / JSONL<br/>结构化]
            PARQUET[Parquet<br/>已标准化]
        end
        subgraph 电子表格格式
            EXCEL[Excel<br/>电子表格]
        end
        subgraph 遗留格式
            FIXED[定长文本<br/>遗留格式]
        end
    end

    CSV -->|解析器| CSVP[CSV Parser]
    JSON -->|解析器| JSONP[JSON Parser]
    PARQUET -->|解析器| PARQUETP[Parquet Parser]
    EXCEL -->|解析器| EXCELP[Excel Parser]
    FIXED -->|解析器| FIXEDP[Fixed-width Parser]

    classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
    classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616

    class CSV,JSON,PARQUET,EXCEL,FIXED bpProcess
    class CSVP,JSONP,PARQUETP,EXCELP,FIXEDP bpData

图 15-5 支持的文件格式

格式 扩展名 特点 适用场景 解析器
CSV / TSV .csv, .tsv 文本、通用、人类可读 数据交换、日志导出 CsvParser
JSON / JSONL .json, .jsonl 结构化、嵌套支持 API 响应、事件流 JsonParser
Excel .xlsx, .xls 电子表格、多 Sheet 业务报表、手动数据 ExcelParser
Parquet .parquet 列式存储、压缩高效 大数据处理、数据湖 ParquetParser
定长文本 .txt, .dat 固定宽度列、遗留格式 遗留系统导出 FixedWidthParser

表 15-3 文件格式支持矩阵

这五种格式不是我一开始就全支持的,而是按实际遇到的源系统逐步加的。CSV 是第一天就有的(最通用);JSON 是第二周加的(API 源的响应格式);Excel 是第一月底加的——因为市场部门习惯用 Excel 发报表,"为什么不能直接读 Excel"是他们最常问的问题;Parquet 是 Raw 层标准化后的输出格式(不是输入);定长文本是第二年才加的——有个遗留系统只能导出定长文本,被迫支持。格式支持是需求驱动的,不是"越多越好"——每加一种格式要实现解析器、测试、维护,只有真实需求才值得加。

扩展点设计

新增文件格式的扩展遵循开闭原则

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
    classDef bpProcess fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
    classDef bpData fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
    classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
    classDef bpSuccess fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
    classDef bpError fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
    classDef bpExternal fill:#f2f4f8,stroke:#697077,stroke-width:2px,color:#161616
    classDef bpInfo fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616
    linkStyle default stroke:#697077,stroke-width:2px,fill:none

    NEW[需要支持新格式?] --> IMPLEMENT[实现格式解析器<br/>遵循统一接口]
    IMPLEMENT --> REGISTER[注册到格式注册表]
    REGISTER --> CONFIG[在任务配置中指定新格式]
    CONFIG --> RUNTIME[引擎自动路由到新解析器]

    class NEW bpDecision
    class IMPLEMENT,REGISTER,CONFIG bpProcess
    class RUNTIME bpSuccess

图 15-6 扩展点设计

扩展步骤 说明
① 实现解析器 遵循统一接口:输入文件路径 → 输出标准化 DataFrame
② 注册 在格式注册表中注册新格式标识
③ 配置 任务配置中指定新格式标识
④ 运行 引擎自动路由——无需改动公共后处理逻辑

表 15-4 扩展点设计

引申

扩展点设计的关键是"统一接口"。所有格式解析器实现同一个接口(输入文件→输出 DataFrame),引擎只依赖接口而非具体实现。这是"依赖倒置原则"的体现——高层模块(引擎)不依赖低层模块(具体格式解析器),两者都依赖抽象(统一接口)。

把这个统一接口落到代码,就是 FormatParser 抽象基类 + 各格式实现 + 格式注册表——引擎通过注册表按扩展名查找解析器,新增格式只需注册新实现:

# 示意:FormatParser 抽象基类 + 格式注册表
from abc import ABC, abstractmethod
from pyspark.sql import DataFrame

class FormatParser(ABC):
    """统一接口:输入文件路径 → 输出标准化 DataFrame。"""
    @abstractmethod
    def parse(self, file_path: str, spark) -> DataFrame: ...

class CsvParser(FormatParser):
    def parse(self, file_path, spark):
        return (spark.read.option("header", True).option("encoding", detect_encoding(file_path))
                    .csv(file_path))                          # 核心意图:编码自动检测(见下)

class ExcelParser(FormatParser):
    def parse(self, file_path, spark):
        # Excel 需先转 CSV(pandas read_excel → pandas_df → spark_df)
        import pandas as pd
        pdf = pd.read_excel(file_path, sheet_name=0)
        return spark.createDataFrame(pdf)

# 格式注册表:扩展名 → 解析器(新增格式只加一行)
FORMAT_REGISTRY = {".csv": CsvParser(), ".xlsx": ExcelParser(), ".json": JsonParser()}

def parse_file(file_path: str, spark) -> DataFrame:
    ext = Path(file_path).suffix.lower()
    return FORMAT_REGISTRY[ext].parse(file_path, spark)        # 核心意图:引擎只依赖接口,不依赖具体实现

15.4 编码检测与处理:中文数据的隐形陷阱

文件连接器最隐蔽的坑是编码——上游系统导出的 CSV 可能是 UTF-8、GBK、GB2312,混用时会乱码或解析失败。这在医药行业尤其常见:医院系统的老旧导出工具常默认 GBK 编码,而平台默认按 UTF-8 读取,结果中文全部变成乱码。

# 示意:编码自动检测(chardet)
import chardet

def detect_encoding(file_path: str) -> str:
    with open(file_path, "rb") as f:
        raw = f.read(10000)                                    # 读前 10KB 检测即可
    result = chardet.detect(raw)
    # 核心意图:自动检测编码,避免硬编码 UTF-8 导致中文乱码
    return result["encoding"] or "utf-8"                       # 检测失败回退 UTF-8
编码场景 问题 应对
上游用 GBK,平台按 UTF-8 读 中文乱码 detect_encoding 自动检测
同一批文件混用编码 部分行乱码 按文件检测,不假设同批一致
BOM 头(UTF-8 with BOM) 首列名多出不可见字符 encoding="utf-8-sig" 处理 BOM
Excel 内部编码与导出不同 pandas 读正常,Spark 读乱码 统一用 pandas 中转 + 显式 encoding

表 15-5 示意:编码自动检测(chardet)

Trade-off

编码检测不是 100% 准确——chardet 对短文本或纯 ASCII 文件可能误判。关键数据应在 Landing 层记录原始编码,Raw 层统一转 UTF-8,后续层不再处理编码问题。这是"在最早层解决问题"的原则——编码问题越往后传,排查越难。

15.5 文件大小限制与多文件数据集处理

文件连接器还要处理两类现实问题:超大文件(内存溢出)和多文件数据集(一个逻辑数据集散落在多个文件)。

场景 问题 应对
超大文件(>1GB CSV) 单次读取内存溢出 Spark 天然分块读 CSV;非 Spark 场景用分块流式读取
多文件数据集 一个表的数据分成 part-001.csvpart-002.csv... Spark spark.read.csv("s3://.../dir/") 自动读目录下所有文件合并
异构文件混合 目录里混有 CSV 和 Excel 信号文件协议(§15.1)按目录触发,解析器按扩展名分发
空文件/损坏文件 解析失败 连接器容错(Ch 13 DLQ)

表 15-6 文件大小限制与多文件数据集处理

"超大文件"这一行是我在企业征信踩过 OOM 坑的。当时有个供应商传了一个 5GB 的 CSV,Python Shell 用 pandas 单机读——直接内存溢出(pandas 要把整个文件加载到内存)。换成 Spark 后,Spark 天然分块读 CSV(按行分割到多个 partition),5GB 文件轻松处理。Spark 的价值不只是"分布式计算",更是"分块读取"——任何超大数据集在单机内存装不下时,Spark 是默认选择。但 Spark 也不是万能——Excel 文件 Spark 不能直接读(要 pandas 中转),所以 Excel 超大文件是真正的难题。我的解法是:要求供应商把大 Excel 拆成多个 CSV(Excel 本来就不适合大数据量),这比"硬啃大 Excel"更务实。

"空文件/损坏文件"这一行也值得强调——这是文件源最常见的故障。我在第一年统计过,文件源的失败有 40% 是"空文件或损坏文件"(供应商传了空文件、CSV 被截断、Excel 格式损坏)。这些失败靠 Ch 13 DLQ 兜底——失败写入 DLQ,通知供应商重传。文件源的数据质量天然不如数据库源——数据库有 schema 约束,文件没有——所以文件连接器的容错设计要更厚。

# 示意:多文件数据集处理(Spark 自动合并目录下文件)
def parse_dataset(dir_path: str, spark) -> DataFrame:
    # 核心意图:读目录而非单文件,Spark 自动合并多文件为一个 DataFrame
    files = list_s3_objects(dir_path)                          # 列出目录下所有文件
    if not files:
        raise EmptyDatasetError(f"{dir_path} 无文件")          # 空数据集告警([Ch 49](./49-日志-监控-审计与告警.md))
    ext = Path(files[0]).suffix.lower()                        # 假设同目录同格式
    if any(Path(f).suffix.lower() != ext for f in files):
        raise HeterogeneousFilesError("目录内格式不一致")      # 异构文件阻断
    return FORMAT_REGISTRY[ext].parse(dir_path + "/*", spark)  # Spark 通配读多文件

本章小结

  • 文件源通过信号文件协议实现事件驱动触发:数据文件先传完,信号文件到达表示就绪——避免处理不完整文件
  • S3 事件通知经 SQS 缓冲后由 Lambda 消费,过滤信号文件触发处理;归档分热/温/冷三级生命周期,平衡成本与合规
  • 文件格式支持通过"统一接口 + 格式注册表"实现开闭原则扩展——新增格式只需实现解析器并注册

下一章

Ch 16 API、SaaS 与邮件连接器 —— 接下来看剩余三类连接器:API/SaaS/邮件的通用设计与实战。

评论