2018-09-28: 示例job已上傳至github,地址見文末
0. 前言本文介紹了使用Kettle 對一張業(yè)務(wù)表數(shù)據(jù)(500萬條數(shù)據(jù)以上)進(jìn)行實(shí)時(shí)(10秒)同步,采用了時(shí)間戳增量回滾同步的方法。關(guān)于ETL和Kettle的入門知識大家可以閱讀相關(guān)的blog和文檔學(xué)習(xí)。 1. 時(shí)間戳增量回滾同步假定在源數(shù)據(jù)表中有一個(gè)字段會記錄數(shù)據(jù)的新增或修改時(shí)間,可以通過它對數(shù)據(jù)在時(shí)間維度上進(jìn)行排序。通過中間表記錄每次更新的時(shí)間戳,在下一個(gè)同步周期時(shí),通過這個(gè)時(shí)間戳同步該時(shí)間戳以后的增量數(shù)據(jù)。這是時(shí)間戳增量同步。 但是時(shí)間戳增量同步不能對源數(shù)據(jù)庫中歷史數(shù)據(jù)的刪除操作進(jìn)行同步,我們可以通過在每次同步時(shí),把時(shí)間戳往前回滾一段時(shí)間,從而同步一定時(shí)間段內(nèi)的刪除操作。這就是時(shí)間戳增量回滾同步,這個(gè)名字是我自己給取得,意會即可,就是在時(shí)間戳增量同步的同時(shí)回滾一定的時(shí)間段。 說明: 2. 前期準(zhǔn)備在兩個(gè)數(shù)據(jù)庫中分別創(chuàng)建數(shù)據(jù)表,并通過腳本在源數(shù)據(jù)表中插入500萬條數(shù)據(jù),完成后再以每秒一條的速度插入新數(shù)據(jù),模擬生產(chǎn)環(huán)境。 源數(shù)據(jù)表結(jié)構(gòu)如下: CREATE TABLE `im_message` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sender` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息發(fā)送者:SYSTEM',
`send_time` datetime(6) NOT NULL,
`receiver` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息接受者',
`content` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '消息內(nèi)容',
`is_read` tinyint(4) NOT NULL COMMENT '消息是否被讀取:0-未讀;非0-已讀',
`read_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='消息表'
3. 作業(yè)流程開始組件 建時(shí)間戳中間表 獲取中間表的時(shí)間戳,并設(shè)置為全局變量 刪除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù) 抽取兩個(gè)數(shù)據(jù)表的時(shí)間戳及時(shí)間戳以后的數(shù)據(jù)進(jìn)行比對,并根據(jù)比對結(jié)果進(jìn)行刪除、新增或修改操作 更新時(shí)間戳
4. 創(chuàng)建作業(yè)作業(yè)的最終截圖如下:
4.1 創(chuàng)建作業(yè)和DB連接打開Spoon工具,新建作業(yè),然后在左側(cè)主對象樹DB連接中新建DB連接。創(chuàng)建連接并測試通過后可以在左側(cè)DB連接下右鍵共享出來。因?yàn)樵趩蝹€(gè)作業(yè)或者轉(zhuǎn)換中新建的DB連接都是局域數(shù)據(jù)源,在其他轉(zhuǎn)換和作業(yè)中是不能使用的,即使屬于同一個(gè)作業(yè)下的不同轉(zhuǎn)換,所以需要把他們共享,這樣DB連接就會成為全局?jǐn)?shù)據(jù)源,不用多次編輯。 4.2 建時(shí)間戳中間表這一步是為了在目標(biāo)數(shù)據(jù)庫建中間表etl_temp ,并插入初始的時(shí)間戳字段。因?yàn)樵撟鳂I(yè)在生產(chǎn)環(huán)境是循環(huán)調(diào)用的,該步驟在每一個(gè)同步周期中都會調(diào)用,所以在建表時(shí)需要判斷該表是否已經(jīng)存在,如果不存在才建表。 SQL代碼和組件配置截圖如下: CREATE TABLE IF NOT EXISTS etl_temp(id int primary key,time_stamp timestamp);
INSERT IGNORE INTO etl_temp (id,time_stamp) VALUES (1,'2018-05-22 00:00:00');
我把該作業(yè)時(shí)間戳的ID設(shè)為1,在接下來的步驟中也是通過這個(gè)ID查詢我們想要的時(shí)間戳
4.2 獲取時(shí)間戳并設(shè)為變量新建一個(gè)轉(zhuǎn)換,在轉(zhuǎn)換中使用表輸入和設(shè)置變量兩個(gè)組件 表輸入SQL 代碼和組件配置截圖如下
在Kettle 中設(shè)置的變量都是字符串類型,為了便于比較。我在SQL語句把查出的時(shí)間戳進(jìn)行了格式轉(zhuǎn)換 select date_format(time_stamp , '%Y-%m-%d %H:%i:%s') time_stamp from etl_temp where id='1'
設(shè)置變量變量活動類型可以為該變量設(shè)置四種有效活動范圍,分別是JVM、該Job、父Job和祖父Job 4.3 刪除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù)這樣做有兩個(gè)好處: 避免在同步中重復(fù)或者遺漏數(shù)據(jù)。例如當(dāng)時(shí)間戳在源數(shù)據(jù)表中不是唯一的,上一次同步周期最后一條數(shù)據(jù)的時(shí)間戳是2018-05-25 18:12:12 ,那么上一次同步周期結(jié)束后中間表中的時(shí)間戳就會更新為2018-05-25 18:12:12 。如果在下一個(gè)同步周期時(shí)源數(shù)據(jù)表中仍然有時(shí)間戳為2018-05-25 18:12:12 的新數(shù)據(jù),那么同步就會出現(xiàn)數(shù)據(jù)不一致。采用大于時(shí)間戳的方式同步就會遺漏數(shù)據(jù),采用等于時(shí)間戳的方式同步就會重復(fù)同步數(shù)據(jù)。 增加健壯性 當(dāng)作業(yè)異常結(jié)束后,不用做任何多余的操作就可以重啟。因?yàn)闀h除目標(biāo)表中時(shí)間戳及時(shí)間戳以后的數(shù)據(jù),所以不用擔(dān)心數(shù)據(jù)一致性問題
2018-09-29:對增加健壯性進(jìn)行補(bǔ)充:在一次同步周期中腳本異常中斷,這時(shí)候中間表的時(shí)間戳沒有更新,但是目標(biāo)表已經(jīng)同步了部分?jǐn)?shù)據(jù),當(dāng)再次啟動腳本就會出現(xiàn)數(shù)據(jù)重復(fù)的情況,而且在很多時(shí)候因?yàn)橹麈I的存在,腳本啟動會報(bào)錯(cuò)
在組件中使用了上一步驟設(shè)置的變量,所以必須勾選使用變量替換 delete from test_kettle.im_message where send_time>='${TIME_STAMP}'
4.4 抽取、比對和更新數(shù)據(jù)這一步才是真正的數(shù)據(jù)同步步驟,完成了數(shù)據(jù)的抽取、比對,并根據(jù)不同的比對結(jié)果刪除、更新、插入或不做任何操作。 正如前文所說,為了同步刪除操作,在原始表輸入和目標(biāo)表輸入步驟中回滾了一定時(shí)間段。其中回滾的時(shí)間段設(shè)置為了全局的參數(shù)。左右空白處右鍵即可設(shè)置參數(shù),該作業(yè)下的所有作業(yè)和轉(zhuǎn)換都能使用,設(shè)置如下圖 轉(zhuǎn)換截圖如下 原始表輸入SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM ueqcsd.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);
目標(biāo)表輸入SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM test_kettle.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);
注意兩個(gè)組件的數(shù)據(jù)庫鏈接是不同的,當(dāng)然它們也就這個(gè)和名字不同
比對記錄對兩個(gè)表輸入查出的數(shù)據(jù)進(jìn)行比對,并把比對的結(jié)果寫進(jìn)輸入流,傳遞給后面的組件。 比對的結(jié)果有三種: 標(biāo)注字段表示比對結(jié)果的字段名,后面有用。關(guān)鍵字段表示比對的字段,在這個(gè)作業(yè)中我們比較兩個(gè)的主鍵ID 。 Switch該步驟對上一步驟產(chǎn)生的標(biāo)注字段進(jìn)行路由,不同的結(jié)果路由到不同的步驟。其中目標(biāo)步驟表示下一步驟的名字。 插入Kettle 有一個(gè)插入/更新組件,但是據(jù)網(wǎng)友介紹這個(gè)組件性能低下,每秒最多只能同步幾百條數(shù)據(jù),所有我對插入和更新分別作了不同的處理。插入使用表輸出組件;更新使用更新組件。 為了進(jìn)一步提升同步效率,我在表輸出組件使用了多線程(右鍵>改變開始復(fù)制的數(shù)量),使同步速度達(dá)到每秒12000條。Switch組件和表輸出組件中間的虛擬組件(空操作)也是為了使用多線程添加的。
勾選批量插入,可以極大提高同步速度
更新和刪除4.5 更新時(shí)間戳set @new_etl_start_time_stamp = (SELECT SEND_TIME FROM test_kettle.im_message ORDER BY SEND_TIME DESC LIMIT 1);
update etl_temp set time_stamp=@new_etl_start_time_stamp where id='1';
4.6 發(fā)送郵箱關(guān)于發(fā)送郵件組件網(wǎng)上有很多資料,就不多做介紹。特別強(qiáng)調(diào)一點(diǎn),郵箱密碼是 單獨(dú)的授權(quán)碼,而不是郵箱登錄密碼。 運(yùn)行在開發(fā)環(huán)境點(diǎn)擊Spoon界面左上角三角符號運(yùn)行作業(yè)即可。 在第一次運(yùn)行時(shí),為了提高同步效率,可以先不創(chuàng)建目標(biāo)表的索引。在第一此同步完成后,再創(chuàng)建索引。然后在START組件中編輯調(diào)度邏輯,再次啟動。 如下圖所示 運(yùn)行日志如下圖
|