Fork me on GitHub

大数据增量同步实现方案

目前做的项目使用阿里 DataX 作为不同数据源数据同步的实现工具。数据的批量一次性导入比较简单,对于增量数据需要对不同场景设计不同的方案。

会变的数据增量同步

每天全量同步

如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也就是说每天保存的都是数据的全量数据,这样历史的数据和当前的数据都可以很方便地获得。

设定日分区,每天同步全量数据。

1
2
3
4
5
6
7
8
--全量同步
create table ods_user_full(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
) partitioned by (ds string);

查询全量用 where 分区 语句where ds = "2017-10-19"

每天增量同步

真实场景中因为某些特殊情况,需要每天只做增量同步。又因为目前流行的大数据平台都不支持 Update 语句进行修改数据,只能用其他方法来实现。

两个表,结果表和增量表,用 full outer join 合并 + insert overwrite(阿里巴巴大数据实践中阿里平台使用方案)

操作如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
--结果表
create table dw_user_inc(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
);
--增量记录表
create table ods_user_inc(
uid bigint,
uname string,
deptno bigint,
gender string,
optime DATETIME
)

1
2
3
4
5
6
7
8
9
10
11
12
insert overwrite table dw_user_inc
select
--所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准
case when b.uid is not null then b.uid else a.uid end as uid,
case when b.uid is not null then b.uname else a.uname end as uname,
case when b.uid is not null then b.deptno else a.deptno end as deptno,
case when b.uid is not null then b.gender else a.gender end as gender,
case when b.uid is not null then b.optime else a.optime end as optime
from
dw_user_inc a
full outer join ods_user_inc b
on a.uid = b.uid ;

对比以上两种同步方式,可以很清楚看到两种同步方法的区别和优劣。第二种方法的优点是同步的增量数据量比较小,但是带来的缺点有可能有数据不一致的风险,而且还需要用额外的计算进行数据合并。如无必要,会变化的数据就使用方法一即可。如无必要,会变化的数据就使用方法一即可。如果对历史数据希望只保留一定的时间,超出时间的做自动删除,可以设置Lifecycle。

不变的数据增量同步

这个场景,由于数据生成后就不会发生变化,因此可以很方便地根据数据的生成规律进行分区,较常见的是根据日期进行分区,比如每天一个分区。
做法是按日期字段 where 过滤所需日期,增量 insert。

参考

  1. 阿里大数据平台文档
  2. 大数据之路-阿里巴巴大数据实践
------------- The endThanks for reading-------------