数据架构演进
数据架构演进历程:
┌─────────────────────────────────────────┐
│ 第一代:数据仓库(Data Warehouse) │
│ - 结构化数据 │
│ - Schema-on-Write │
│ - 批处理ETL │
│ - 代表:Oracle、Teradata、Redshift │
│ │
│ 第二代:数据湖(Data Lake) │
│ - 非结构化/半结构化数据 │
│ - Schema-on-Read │
│ - 原始数据存储 │
│ - 代表:HDFS、S3、Azure Data Lake │
│ │
│ 第三代:湖仓一体(Lakehouse) │
│ - 结合仓库和湖的优势 │
│ - ACID事务支持 │
│ - 实时+离线统一 │
│ - 代表:Delta Lake、Iceberg、Hudi │
│ │
│ 第四代:实时数仓(Real-time Warehouse) │
│ - 流批一体 │
│ - 秒级延迟 │
│ - 代表:ClickHouse、StarRocks、Doris │
└─────────────────────────────────────────┘
维度建模
星型模型
-- 事实表:订单事实
CREATE TABLE fact_orders (
order_id BIGINT PRIMARY KEY,
order_date_key INT, -- 日期维度外键
user_key BIGINT, -- 用户维度外键
product_key BIGINT, -- 产品维度外键
store_key INT, -- 门店维度外键
-- 度量值
quantity INT,
unit_price DECIMAL(10,2),
discount_amount DECIMAL(10,2),
total_amount DECIMAL(10,2),
-- 退化维度
order_status VARCHAR(20),
payment_method VARCHAR(20)
);
-- 日期维度
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
date DATE,
year INT,
quarter INT,
month INT,
week INT,
day_of_week INT,
is_holiday BOOLEAN
);
-- 用户维度(缓慢变化维度)
CREATE TABLE dim_user (
user_key BIGINT PRIMARY KEY,
user_id BIGINT,
username VARCHAR(50),
email VARCHAR(100),
register_date DATE,
vip_level INT,
-- SCD Type 2
effective_date DATE,
expiry_date DATE,
is_current BOOLEAN
);
-- 典型查询:按月统计销售额
SELECT
d.year,
d.month,
SUM(f.total_amount) as total_sales,
COUNT(DISTINCT f.user_key) as unique_users
FROM fact_orders f
JOIN dim_date d ON f.order_date_key = d.date_key
WHERE d.year = 2026
GROUP BY d.year, d.month
ORDER BY d.month;
雪花模型
-- 产品维度(规范化)
CREATE TABLE dim_product (
product_key BIGINT PRIMARY KEY,
product_id BIGINT,
product_name VARCHAR(200),
category_key INT, -- 外键到类别维度
brand_key INT, -- 外键到品牌维度
price DECIMAL(10,2)
);
CREATE TABLE dim_category (
category_key INT PRIMARY KEY,
category_name VARCHAR(100),
parent_key INT, -- 层级结构
level INT
);
CREATE TABLE dim_brand (
brand_key INT PRIMARY KEY,
brand_name VARCHAR(100),
country VARCHAR(50)
);
-- 查询:按品牌和类别统计
SELECT
b.brand_name,
c.category_name,
SUM(f.total_amount) as sales
FROM fact_orders f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_brand b ON p.brand_key = b.brand_key
JOIN dim_category c ON p.category_key = c.category_key
GROUP BY b.brand_name, c.category_name;
ETL流程设计
批处理ETL
# etl/daily_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("DailyETL") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 1. 抽取(Extract)
orders_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://source-db:3306/ecommerce") \
.option("dbtable", "(SELECT * FROM orders WHERE DATE(created_at) = CURRENT_DATE - 1) as orders") \
.option("user", "etl_user") \
.option("password", "password") \
.load()
users_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:mysql://source-db:3306/ecommerce") \
.option("dbtable", "users") \
.option("user", "etl_user") \
.option("password", "password") \
.load()
# 2. 转换(Transform)
# 清洗数据
orders_clean = orders_df \
.filter(col("status").isin("PAID", "SHIPPED", "COMPLETED")) \
.dropna(subset=["user_id", "total_amount"])
# 关联维度
orders_with_user = orders_clean.join(
users_df.select("user_id", "vip_level", "register_date"),
on="user_id",
how="left"
)
# 计算派生指标
orders_transformed = orders_with_user \
.withColumn("order_date_key", date_format(col("created_at"), "yyyyMMdd").cast("int")) \
.withColumn("is_new_user", datediff(col("created_at"), col("register_date")) <= 7) \
.withColumn("discount_rate", col("discount_amount") / col("total_amount"))
# 3. 加载(Load)
# 写入分区表
orders_transformed.write \
.format("parquet") \
.mode("append") \
.partitionBy("order_date_key") \
.save("/data/warehouse/fact_orders")
# 更新维度表(SCD Type 2)
def update_slowly_changing_dimension(new_data, table_name, key_column):
# 读取现有数据
existing = spark.read.parquet(f"/data/warehouse/{table_name}")
# 找出变化的记录
changed = new_data.join(
existing.filter(col("is_current") == True),
on=key_column,
how="inner"
).filter(
(new_data["vip_level"] != existing["vip_level"]) |
(new_data["email"] != existing["email"])
)
if changed.count() > 0:
# 将旧记录标记为过期
old_records = existing.filter(
col(key_column).isin(changed.select(key_column).rdd.flatMap(lambda x: x).collect())
).withColumn("is_current", lit(False)) \
.withColumn("expiry_date", current_date())
# 插入新记录
new_records = changed \
.withColumn("user_key", monotonically_increasing_id()) \
.withColumn("effective_date", current_date()) \
.withColumn("expiry_date", lit(None).cast("date")) \
.withColumn("is_current", lit(True))
# 合并写入
final = existing.filter(~col(key_column).isin(
changed.select(key_column).rdd.flatMap(lambda x: x).collect()
)).union(old_records).union(new_records)
final.write \
.mode("overwrite") \
.parquet(f"/data/warehouse/{table_name}")
update_slowly_changing_dimension(users_df, "dim_user", "user_id")
spark.stop()
实时ETL
# etl/realtime_etl.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("RealtimeETL") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoint/realtime") \
.getOrCreate()
# 从Kafka读取订单事件
orders_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
.option("subscribe", "order-events") \
.option("startingOffsets", "latest") \
.load()
# 解析JSON
orders_parsed = orders_stream.select(
from_json(col("value").cast("string"), order_schema).alias("data")
).select("data.*")
# 窗口聚合:每5分钟统计
windowed_agg = orders_parsed \
.withWatermark("created_at", "10 minutes") \
.groupBy(
window(col("created_at"), "5 minutes"),
col("store_id")
) \
.agg(
count("*").alias("order_count"),
sum("total_amount").alias("total_sales"),
avg("total_amount").alias("avg_order_value")
)
# 写入Kafka供下游消费
query = windowed_agg.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") \
.option("topic", "order-aggregations") \
.outputMode("update") \
.start()
query.awaitTermination()
湖仓一体架构
Apache Iceberg
// iceberg/TableOperations.java
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.SparkCatalog;
public class IcebergTableOperations {
private final SparkSession spark;
private final SparkCatalog catalog;
public IcebergTableOperations(SparkSession spark) {
this.spark = spark;
this.catalog = (SparkCatalog) spark.sessionState().catalogManager().catalog("iceberg_catalog");
}
// 创建Iceberg表
public void createTable() {
spark.sql("""
CREATE TABLE iceberg_catalog.db.orders (
order_id BIGINT,
user_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING
)
USING iceberg
PARTITIONED BY (days(order_date))
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd'
)
""");
}
// 写入数据(支持ACID事务)
public void appendData(Dataset<Row> df) {
df.writeTo("iceberg_catalog.db.orders")
.tableProperty("write.wap.enabled", "true") // Write-Audit-Publish
.append();
}
// 更新数据(Merge-on-Read)
public void updateStatus(long orderId, String newStatus) {
spark.sql(f"""
UPDATE iceberg_catalog.db.orders
SET status = '{newStatus}', updated_at = current_timestamp()
WHERE order_id = {orderId}
""");
}
// 时间旅行查询
public Dataset<Row> queryAtTimestamp(long timestamp) {
return spark.sql(f"""
SELECT * FROM iceberg_catalog.db.orders
FOR TIMESTAMP AS OF {timestamp}
""");
}
// 快照管理
public void manageSnapshots() {
Table table = catalog.loadTable(TableIdentifier.of("db", "orders"));
// 列出快照
for (Snapshot snapshot : table.snapshots()) {
System.out.printf("Snapshot ID: %d, Timestamp: %s%n",
snapshot.snapshotId(),
new Date(snapshot.timestampMillis()));
}
// 回滚到指定快照
table.manageSnapshots()
.rollbackTo(snapshotId)
.commit();
// 清理旧快照
table.expireSnapshots()
.expireOlderThan(System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000)
.commit();
}
}
ClickHouse实时数仓
表引擎选择
-- MergeTree:基础引擎
CREATE TABLE events_merge_tree (
event_date Date,
event_time DateTime,
user_id UInt64,
event_type String,
properties String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_type, user_id, event_time);
-- ReplacingMergeTree:去重
CREATE TABLE users_replacing (
user_id UInt64,
username String,
email String,
updated_at DateTime,
version UInt32
)
ENGINE = ReplacingMergeTree(version)
ORDER BY user_id;
-- AggregatingMergeTree:预聚合
CREATE TABLE daily_sales_agg (
date Date,
store_id UInt32,
sales AggregateFunction(sum, Decimal(10,2)),
order_count AggregateFunction(count)
)
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, store_id);
-- 物化视图:自动聚合
CREATE MATERIALIZED VIEW daily_sales_mv
TO daily_sales_agg
AS SELECT
toDate(created_at) as date,
store_id,
sumState(total_amount) as sales,
countState() as order_count
FROM orders
GROUP BY date, store_id;
查询优化
-- 使用PREWHERE优化
SELECT
user_id,
count() as event_count
FROM events
PREWHERE event_date >= '2026-01-01' -- 先过滤,减少读取量
WHERE event_type = 'purchase'
GROUP BY user_id
HAVING event_count > 10;
-- 使用Sampling加速近似查询
SELECT
uniq(user_id) * 10 as estimated_users -- 乘以采样倍率
FROM events
SAMPLE 0.1 -- 只扫描10%的数据
WHERE event_date = '2026-04-08';
-- 使用字典加速维度关联
CREATE DICTIONARY dim_user_dict (
user_id UInt64,
username String,
vip_level UInt8
)
PRIMARY KEY user_id
SOURCE(CLICKHOUSE(
HOST 'localhost' PORT 9000 DB 'warehouse' TABLE 'dim_user'
))
LAYOUT(HASHED())
LIFETIME(MIN 60 MAX 120);
-- 使用字典函数
SELECT
dictGet('dim_user_dict', 'username', user_id) as username,
count() as event_count
FROM events
GROUP BY user_id;
总结
数据架构的选择应基于业务需求:
- 数据仓库:适合结构化数据的复杂分析
- 数据湖:适合海量原始数据的灵活探索
- 湖仓一体:兼顾两者优势,支持ACID事务
- 实时数仓:满足秒级延迟的分析需求
关键原则:
- 选择合适的建模方法(星型/雪花)
- 设计健壮的ETL流程
- 利用分区和索引优化查询
- 实现数据质量监控
- 考虑数据治理和血缘追踪
延伸阅读
- The Data Warehouse Toolkit - Ralph Kimball
- Apache Iceberg文档
- ClickHouse官方文档
- Delta Lake vs Iceberg vs Hudi
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。