這個實驗主要思想是在創(chuàng)建數據庫表的時候, 通過增加一個額外的字段,也就是時間戳字段, 例如在同步表 tt1 和表 tt2 的時候, 通過檢查那個表是最新更新的,那個表就作為新表,而另外的表最為舊表被新表中的數據進行更新。 實驗數據如下: mysql database 5.1 test.tt1( id int primary key , name varchar(50) ); mysql.tt2( id int primary key, name varchar(50) );
快照表,可以將其存放在test數據庫中, 同樣可以為了簡便,可以將其創(chuàng)建為temporary 表類型。
數據如圖 kettle-1 kettle-1 ============================================================
主流程如圖 kettle-2 kettle-2
在prepare中,向 tt1,tt2 表中增加 時間戳字段, 由于tt1,tt2所在的數據庫是不同的,所以分別創(chuàng)建兩個數據庫的連接。 prepare
kettle-3
在執(zhí)行這個job之后,就會在數據庫查詢的時候看到下面的字段:
kettle-4 然后, 我們來對tt1表做一個 insert 操作 一個update操作吧~ kettle-5 在原表上無論是insert操作還是update操作,對應的updateTime都會發(fā)生變更。 如果tt1 表 和 tt2 表中 updateTime 字段為最新時間的話,則說明該表是新表 。
下面只要是對應main_thread的截圖: kettle-6
在這里介紹一下Main的層次: Main START Main.prepare Main.main_thread { START main_thread.create_tempTable main_thread.insert_tempTable main_thread.tt1_tt2_syn SUCCESS } Main.finish SUCCESS
在main_thread中的過程是這樣的: 作為一個局部的整體,使它每隔200s內進行一次循環(huán), 這樣的話,如果在其中有指定的表 tt1 或是 tt2 對應被更新或是插入的話, 該表中的updateTime字段就會被捕捉到,并且進行同步。 如果沒有更新出現(xiàn),則會走switch的 default 路線對應的是write to log. 繼續(xù)循環(huán)。
首先創(chuàng)建一個快照表,然后將tt1,tt2表中的最大(最新)時間戳的值插入到快照表中。 然后,通過一個transformation來判斷那個表的updateTime值最新, 來選擇對應是 tt1表來更新 tt2 還是 tt2 表來更新 tt1 表;
main_thread.create_tempTable.JOB:
main_thread.insert_tempTable.Job: PS: 對于第二個SQL 應該改成(不修改會出錯的) set @var1 = ( select MAX(updatetime) from tt2); insert into test.temp values ( 2 , @var1 ) ; 因為conn對應的是連接mysql(數據庫實例名稱), 但是我們把快照表和tt1 表都存到了test(數據庫實例名稱)里面。
在上面這個圖中對應的語句是想實現(xiàn),在temp表中插入兩行記錄元組。 其中id為1 的元組對應的temp.lastTime 字段 是 從tt1 表中選出的 updateTime 值為最新的, id 為2的元組對應的 temp.lastTime 字段 是 從 tt2 表中選出的 updateTime 值為最新的 字段。 當然 , id 是用來給后續(xù) switch 操作提供參考的,用于標示最新 updateTime 是來自 tt1 還是 tt2, 同樣也可以使用 tableName varchar(50) 這種字段 來存放 最新updateTime 對應的 數據庫.數據表的名稱也可以的。
main_thread.tt1_tt2_syn.Transformation: 首先,創(chuàng)建連接 test 數據庫的 temp 表的連接, 選擇 temp表中 對應 lastTime 值最新的所在的記錄 所對應的 id 號碼。 首先將temp中 lastTime 字段進行 降序排列, 然后選擇id , 并且將選擇記錄僅限定成一行。 然后根據id的值進行 switch選擇。 在這里LZ很想使用,SQL Executor, 但是它無法返回對應的id值。 但是表輸入可以返回對應的id值, 并被switch接收到。
下圖是對應 switch id = 1 的時候:即 tt1 更新 tt2 注意合并行比較 的新舊數據源 的選擇 和Insert/Update 中的Target table的選擇 下圖是對應 switch id = 2 的時候:即 tt2 更新 tt1 注意合并行比較 的新舊數據源 的選擇 和Insert/Update 中的Target table的選擇 但是考慮到增加一個 column 會浪費很多的空間, 所以咋最終結束同步之后使用 finish操作步驟來將該 updateTime這個字段進行刪除操作即可。 這個與Main中的prepare的操作是相對應的。 Main.finish 這樣的話,實驗環(huán)境已經搭建好了, 接下來進行,實驗的數據測試了,寫到下一個博客中。 當然,觸發(fā)器也是一種同步的好方法,寫到后續(xù)博客中吧~ 時間戳的方式相比于觸發(fā)器,較為簡單并且通用, 但是 數據庫表中的時間戳字段,很費空間,并且無法對應刪除操作, 也就是說 表中刪除一行記錄, 該表應該作為新表來更新其余表,但是由于記錄刪除 時間戳無所依附所以無法記錄到。 |
|
來自: 日月桃子 > 《kettle(ETL工具)》