跳转至

Ch 17 Landing→Raw→Enriched 开发实战

面包屑

本书主页Part III 数据工程实践 › Ch 17

项目第 1 年 · 核心建设期——三层ETL开发


本章你将学到

  • Landing/Raw/Enriched 三层开发的职责与产物,以及三层 PySpark pipeline 的伪代码
  • 数据质量校验框架:约束声明(PyDeequ)与阈值治理
  • 代理键生成(哈希 UDF)与行数对账的设计
  • Schema 演进处理:防御式 Crawler diff vs 自适应 mergeSchema 的取舍

17.1 三层开发的职责与产物

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    subgraph 三层ETL["三层 ETL 加工"]
        L2R@{ icon: "logos:aws-s3", form: "rounded", label: "Landing → Raw<br/>标准化层", pos: "b", h: 40 }
        R2E@{ icon: "logos:aws-s3", form: "rounded", label: "Raw → Enriched<br/>加工层", pos: "b", h: 40 }
        E2RS@{ icon: "logos:aws-redshift", form: "rounded", label: "Enriched → Redshift<br/>入仓层", pos: "b", h: 40 }
    end

    L2R -->|产物| R_OUT[统一格式 Parquet<br/>字段名/类型标准化]
    R2E -->|产物| E_OUT[清洗/关联/质检通过<br/>脱敏/代理键]
    E2RS -->|产物| RS_OUT[Redshift 维度/事实表<br/>分析就绪]
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData     fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616

class L2R,R2E bpData
class E2RS,RS_OUT bpProcess
class R_OUT,E_OUT bpData
linkStyle default stroke:#697077,stroke-width:2px

图 17-1 三层开发的职责与产物

层间转换 职责 不做什么
Landing → Raw 格式标准化、字段名/类型统一、编码处理 不做业务清洗、不做关联
Raw → Enriched 清洗、关联、质检、脱敏、代理键 不改变数据粒度(除非配置要求)
Enriched → Redshift COPY 入仓、建表/改表 DDL 不做数据加工

表 17-1 三层开发的职责与产物

Trade-off

三层分离的好处是"每层职责单一、可独立重跑"。代价是数据被写三次(Landing→Raw→Enriched),存储和计算成本更高。另一种方案是"两层"( 合并 Landing 和 Raw),代价是"原始数据和标准化数据混在一起,重跑困难"。对于医药合规场景,三层的可追溯性优势值得这个成本。

把三层职责用 PySpark 实现,就是三个转换阶段——每层只管自己的事,层间靠 Parquet 解耦,哪层挂了就重跑哪层:

# 示意:Landing→Raw→Enriched 三层 PySpark pipeline
def run_pipeline(spark, config):
    # ① Landing → Raw:只做格式标准化,不做业务清洗
    raw = (spark.read.parquet(f"s3://ap-aurora-cdp-landing/{config['domain']}/{config['table']}/")
               .withColumnRenamed("col_a", "hospital_id")      # 字段名统一
               .withColumn("biz_date", F.to_date("raw_ts")))   # 类型标准化
    (raw.write.mode("overwrite").partitionBy("biz_date")
         .parquet(f"s3://ap-aurora-cdp-raw/{config['domain']}/{config['table']}/"))

    # ② Raw → Enriched:清洗、关联、质检、脱敏、代理键
    enriched = (spark.read.parquet(f".../raw/{config['table']}/")
                    .dropDuplicates(["hospital_id", "biz_date"])      # 清洗去重
                    .withColumn("sk", surrogate_key_udf("hospital_id"))  # 代理键(见下)
                    .transform(lambda df: apply_masking(df, config))     # 脱敏(见 Ch 18)))
    run_quality_checks(enriched, config)                              # 质检门禁(PyDeequ)
    (enriched.write.mode("overwrite").partitionBy("biz_date")
              .parquet(f"s3://ap-aurora-cdp-enriched/{config['domain']}/{config['table']}/"))

    # ③ Enriched → Redshift:COPY 入仓,不做加工(见 Ch 8)
    redshift_copy(enriched, config)

17.2 数据质量校验框架:约束声明与阈值治理

质量校验架构

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
    subgraph 质量校验框架["数据质量校验框架"]
        DECLARE[声明式约束<br/>在配置中声明质量规则]
        CHECK[校验引擎<br/>基于 PyDeequ 执行约束]
        THRESHOLD[阈值治理<br/>通过率低于阈值则告警/阻断]
        ACTION[处置动作<br/>告警 / 隔离 / 阻断入仓]
    end

    DECLARE --> CHECK --> THRESHOLD --> ACTION
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
class ACTION,CHECK,DECLARE,THRESHOLD bpProcess
linkStyle default stroke:#697077,stroke-width:2px

图 17-2 质量校验架构

声明式约束落到代码,就是用 PyDeequ 的 VerificationSuite 把上面表格里的约束逐行声明出来,引擎自己生成校验逻辑去跑——比手写 SQL 校验好维护:

# 示意:PyDeequ 声明式质量校验(约束即声明)
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite

def run_quality_checks(df, config):
    check = Check(spark, CheckLevel.Error, "enriched_quality_gate")   # 核心意图:约束即声明
    (check.isComplete("prescription_id")                              # 完整性
          .isUnique("prescription_id")                                # 唯一性
          .isInRange("quantity", (1, 10000))                          # 范围性
          .satisfies("end_date >= start_date", "date_consistency"))   # 一致性
    result = VerificationSuite(spark).onData(df).addCheck(check).run()
    pass_rate = result.checkMetrics.filter("constraint_status='Success'").count() / result.checkMetrics.count()
    if pass_rate < config["block_threshold"]:                         # 阈值治理:低于阈值阻断入仓
        raise QualityGateError(f"质检通过率 {pass_rate:.2%} 低于阻断阈值,隔离数据")

质量约束类型

约束类型 说明 举例
完整性 字段非空 prescription_id IS NOT NULL
唯一性 主键不重复 COUNT(prescription_id) = COUNT(DISTINCT prescription_id)
范围性 值在合理范围内 quantity BETWEEN 1 AND 10000
一致性 跨字段逻辑一致 end_date >= start_date
引用性 外键引用有效 hospital_id EXISTS IN dim_hospital

表 17-2 质量约束类型

阈值治理

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TD
    CHECK[执行质量校验] --> RESULT{通过率}
    RESULT -->|≥ 99%| PASS[通过,继续入仓]
    RESULT -->|95% - 99%| WARN[告警,继续入仓,记录异常]
    RESULT -->|< 95%| BLOCK[阻断入仓,隔离数据]

    classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
    classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
    classDef bpSuccess  fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
    classDef bpError    fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616
    classDef bpInfo     fill:#f6f2ff,stroke:#8a3ffc,stroke-width:2px,color:#161616

    class CHECK bpProcess
    class RESULT bpDecision
    class PASS bpSuccess
    class WARN bpInfo
    class BLOCK bpError
    linkStyle default stroke:#697077,stroke-width:2px

图 17-3 阈值治理

引申

质量校验框架基于 Amazon PyDeequ——一个构建在 Spark 上的数据质量库。它的核心理念是"约束即声明"——你声明期望的约束,引擎自动生成校验逻辑并执行。这比手写 SQL 校验更可维护、更可复用。如果今天重新选型,Great Expectations 也是优秀的开源替代。


17.3 代理键生成与行数对账

代理键生成

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart LR
    subgraph 代理键生成策略["代理键生成策略"]
        NAT[自然键<br/>业务系统原有 ID] --> HASH[哈希函数<br/>SHA-256]
        HASH --> SUR[代理键<br/>统一格式 Surrogate Key]
        CONCAT[复合自然键<br/>多字段拼接] --> HASH
    end
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
class CONCAT,HASH,NAT,SUR bpProcess
linkStyle default stroke:#697077,stroke-width:2px

图 17-4 代理键生成

策略 机制 优势 劣势
哈希代理键 SHA256(natural_key) 确定性、跨源可关联 哈希碰撞(概率极低)
自增序列 数据库自增 简单 跨系统不可关联
UUID 随机生成 无碰撞 不可追溯、索引差

表 17-3 代理键生成

平台用的方案是哈希代理键——对自然键(可能是复合键)做 SHA-256 哈希,出来的代理键是确定性的。同一实体在不同源系统里,只要自然键一样,代理键就一样,跨源关联直接能用。

落到代码就是个 UDF:把一个或多个自然键字段拼起来做 SHA-256,复合键拿分隔符拼接后收敛成一个哈希:

# 示意:哈希代理键 UDF(支持复合自然键)
from pyspark.sql.functions import udf, sha2, concat_ws
import pyspark.sql.functions as F

# 核心意图:复合自然键拼接后哈希,保证跨源同键同代理键
def make_surrogate_key(columns):
    return sha2(concat_ws("||", *columns), 256)     # 分隔符避免 "ab"+"c" 与 "a"+"bc" 撞键

enriched = enriched.withColumn("sk", make_surrogate_key(["hospital_id", "source_system"]))

行数对账

%%{init: {'theme':'base','themeVariables':{'primaryColor':'#edf5ff','primaryTextColor':'#161616','primaryBorderColor':'#0f62fe','lineColor':'#697077','secondaryColor':'#d9fbfb','tertiaryColor':'#f2f4f8','fontSize':'14px'}}}%%
flowchart TB
    SRC_COUNT@{ icon: "devicon:microsoftsqlserver", form: "rounded", label: "源系统行数", pos: "b", h: 36 } --> COMPARE{对账}
    LANDING_COUNT@{ icon: "logos:aws-s3", form: "rounded", label: "Landing 行数", pos: "b", h: 36 } --> COMPARE
    RAW_COUNT@{ icon: "logos:aws-s3", form: "rounded", label: "Raw 行数", pos: "b", h: 36 } --> COMPARE
    ENRICHED_COUNT@{ icon: "logos:aws-s3", form: "rounded", label: "Enriched 行数", pos: "b", h: 36 } --> COMPARE
    RS_COUNT@{ icon: "logos:aws-redshift", form: "rounded", label: "Redshift 行数", pos: "b", h: 36 } --> COMPARE

    COMPARE -->|一致| OK[对账通过]
    COMPARE -->|不一致| ALERT[告警<br/>定位丢失环节]
classDef bpProcess  fill:#edf5ff,stroke:#0f62fe,stroke-width:2px,color:#161616
classDef bpData     fill:#d9fbfb,stroke:#007d79,stroke-width:2px,color:#161616
classDef bpDecision fill:#fcf4d6,stroke:#f1c21b,stroke-width:2px,color:#161616
classDef bpSuccess  fill:#defbe6,stroke:#198038,stroke-width:2px,color:#161616
classDef bpError    fill:#fff1f1,stroke:#da1e28,stroke-width:2px,color:#161616

class SRC_COUNT,LANDING_COUNT,RAW_COUNT,ENRICHED_COUNT,RS_COUNT bpData
class COMPARE bpDecision
class OK bpSuccess
class ALERT bpError
linkStyle default stroke:#697077,stroke-width:2px

图 17-5 行数对账

行数对账是最简单但最有效的质量保障手段——每层记录行数,层间互相比较。Landing 有 1000 行但 Raw 只有 998 行?标准化过程丢了 2 行,得查。

代码上就是每层写入后抓一下行数,写进对账表,层间互相比较:

# 示意:三层行数对账
def reconcile(spark, config, counts: dict):
    # 核心意图:层间行数比对,丢失即告警
    ddb.put_item(Item={"table": config["table"], "batch_id": config["batch_id"], **counts})
    if counts["landing"] != counts["raw"]:
        alert(f"{config['table']} Landing→Raw 丢失 {counts['landing']-counts['raw']} 行")
    if counts["raw"] != counts["enriched"] and not config.get("dedup_enabled"):
        alert(f"{config['table']} Raw→Enriched 行数变化,检查是否预期去重")

# pipeline 各层末尾捕获行数
counts = {"landing": df_landing.count(), "raw": df_raw.count(), "enriched": df_enriched.count()}
reconcile(spark, config, counts)

Trade-off

行数对账不完美——它检测不到"行数不变但内容错误"的问题(如某行被错误覆盖)。但对于"数据丢失"这类最常见、最致命的问题,行数对账是性价比最高的检测手段。配合质量校验框架,两者互补。

Schema 演进处理

数据源是会变的——业务系统加字段、改类型、删列,这都是常事。三层 pipeline 如果对 schema 变化没预案,要么"上游加了列,Enriched 层静默丢掉",要么"类型从 INT 变 VARCHAR,质检门禁全红"。应对 schema 演进有两套方案,各有利弊:

方案 机制 优势 劣势
A 防御式(Crawler diff) Glue Crawler 检测 DDL 变更 → 生成 diff → Lambda 通知 → 人工确认 → 更新 target schema 可控、变更可审查 人工介入,延迟 schema 上线
B 自适应(mergeSchema) 启用 mergeSchema=true 自动合并新增列,旧分区 NULL 填充,监控告警删除列/类型变更 自动化、无延迟 静默合并可能掩盖非预期变更

表 17-4 Schema 演进处理

# 示意:方案 B 自适应 schema 合并
(spark.read.option("mergeSchema", True)                 # 核心意图:新增列自动合并
      .parquet(f"s3://ap-aurora-cdp-raw/{table}/")
      .write.mode("overwrite")
      .option("mergeSchema", True)
      .parquet(f"s3://ap-aurora-cdp-enriched/{table}/"))
# 删除列 / 类型变更仍需告警,mergeSchema 不处理这两类破坏性变更

Trade-off

平台默认走方案 A(防御式)——医药合规要求"任何 schema 变更可审查",静默合并的风险太大。方案 B 只用在非合规敏感域里快速迭代的场景。不管选哪个方案,删除列和类型变更这两类破坏性变更都必须告警——它们会直接让下游 Redshift COPY 失败或质检门禁全红。


本章小结

  • 三层 ETL:Landing→Raw(标准化)/ Raw→Enriched(加工)/ Enriched→Redshift(入仓),每层职责单一、可独立重跑,三层 PySpark pipeline 界限分明
  • 数据质量校验基于声明式约束(PyDeequ VerificationSuite)+ 阈值治理:完整性/唯一性/范围性/一致性/引用性约束,按通过率决定通过/告警/阻断
  • 代理键采用哈希策略(SHA-256 of 复合自然键),支持跨源关联;行数对账是最简单有效的数据丢失检测手段
  • Schema 演进两套方案:防御式 Crawler diff(医药合规默认,可审查)vs 自适应 mergeSchema(快速迭代),删除列/类型变更必须告警

下一章

Ch 18 数据脱敏与隐私治理 —— 数据加工好了,但敏感数据怎么保护?接下来看脱敏策略与 RLS/CLS 协同防护。

评论