使用kettle来根据时间戳或者批次号来批量导入数据,达到增量的效果。

Posted on Posted in kettle示例

1、Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。下载图形化界面的zip包格式的,直接解压缩使用即可。安装部署模式这里不说了,自己可以根据自己的需求安装为单机模式或者集群模式。    
Kettle的社区官网:https://community.hitachivantara.com/docs/DOC-1009855        
Kettle的下载地址:https://sourceforge.net/projects/pentaho/files/Data%20Integration/
kettle国内镜像下载:http://mirror.bit.edu.cn/pentaho/Data%20Integration/
2、由于这里只是演示了如何配置通过时间戳和批次号增量的导入数据,所以具体的操作不再叙述,具体的使用自己可以根据需求来使用。

Job如下所示:

复制代码
思路。
批次量将一批数据从一个数据库导入到另外一个数据库,而且每批次的数据量不能重复。
这里使用时间戳,你也可以使用批次号。原理基本一样,都是确定每一批次的数据量。
job步骤:
    第一步。start,可以设置定时或者手动点击启动job。
    第二步。执行转换。
    第三步。将start_time=next_time。
    第四步。成功。 1、Start,类型可以选择不需要定时,时间间隔,天,周,月。
默认不需要定时,如果需要定时的话,首先把重复的框勾选。
然后如果选择时间间隔的话,可以输入以分钟计算的间隔或者以秒计算的间隔。
如果按天,就选择天,然后选择每天几天的几分开始跑。
如果按照周,就选择每周的每天几点几分开始跑job。
如果是每月的话,就选择那一月的每天几点几分跑job。 2、转换的作业项名称,自己填自己的作业项名称,
在转换设置的tab然后自己填自己的转换文件名core_table_name_down。
高级tab,设置日志tab,位置参数tab,
命名参数tab,如果自己需要的话可以自己使用和研究。 3、作业项名称,自己填自己的,数据库连接,自己新建和编辑即可。
SQL脚本,自己填上自己的sql脚本。
这个主要是批次量导入数据,所以使用时间戳来实现批次量导入数据。
所以每次批次量导入数据结束,将start_time=next_time。这样下次
执行这个job,就是下一批的数据量了。
update 数据表名称 set start_time=next_time where table_name='数据表名称' and part=第几步
复制代码


111-34

转换如下所示:

复制代码
注意: 1)、由于是将上一步查询的值插入到下一步?的地方,所以一定要注意。
将带有?的步骤,替换SQL语句里面的变量,进行勾选。
从步骤插入数据,进行选择上一步的名称。 2)、步骤名称,自己起自己的名字。
    数据库连接,自己新建和编辑。
    目标模式,如果是Greenplum或者postgresql要输入自己的模式。
    目标表就是自己的数据表。
    提交记录数量,一般1000或者2000。下面主选项使用批量插入进行勾选。
    数据库字段,自己获取字段和映射,
    更新,用来查询的关键字和更新字段。自行配置。
    Switch/Case,Switch字段和Case值数据类型和Case值。
        自己根据自己的字段和类型进行填写。

change步骤:
    第一步。在数据源的库表里面查询出这批数据的最大时间或者最大的批次号。
    第二步。然后在自己的数据表里面获取到开始时间或者最小的批次号
    (此数据表自己初始化好起始时间start_time或者最小批次号和查询条件,比如第几步和那一张表)。
    将第一步获取到的最大时间或者最大的批次号传递到第二步。
    第三步。更新自己的初始化好的数据表,将自己初始化好的数据表的最大时间或者最大批次号字段修改。
        同时进行表输入进行查询出数据。然后将这一步查询的数据传递到Switch/Case。
    第四步。Switch/Case。将上一步的数据根据Switch/Case。进行传递。
    第五步。进行各种数据表的输出。        
        
第一步: 1)、select '数据表名称' as table_name, 第几步 as part, COALESCE(max(update_time), now()) as next_time from 数据表名称 2)、postgresql,COALESCE()函数 
    主流数据库系统都支持COALESCE()函数,这个函数主要用来进行空值处理,其参数格
式如下: COALESCE ( expression,value1,value2……,valuen) 。
    COALESCE()函数的第一个参数expression为待检测的表达式,而其后的参数个数不定。
COALESCE()函数将会返回包括expression在内的所有参数中的第一个非空表达式。如果
expression不为空值则返回expression;否则判断value1是否是空值,如果value1不为空值则返
回value1;否则判断value2是否是空值,如果value2不为空值则返回value3;……以此类推,
如果所有的表达式都为空值,则返回NULL。 3)、MySQL,IFNULL函数是MySQL控制流函数之一,它接受两个参数,如果不是NULL,则返回第一个参数。 
    否则,IFNULL函数返回第二个参数。IFNULL(expression_1,expression_2);
        
第二步: 1)、select ? as table_name, ? as part, start_time, ? as next_time from 数据表名称 where table_name='数据表名称' and part=第几步 2)、将第一步的三个参数,传递到第二步的三个问好的地方。
        
第三步:查询出每个case所需要的值的数据。同时修改next_time最大时间或者最大批次号。 select *, CASE  WHEN "字段"='标识位' THEN (select to_char(to_number(COALESCE(max("Cd_batch"), to_char(now(), 'yyyyMMdd')||'00000'), '9999999999999') + 1, '9999999999999') from core_data.dn_data_reconciliation where "TableName"='数据表名称' and "字段"='标识位') END AS "Cd_batch", 'I' as "Cd_operation" from 数据表名称 where '数据表名称'=? and 第几步=?  and update_time>? and update_time<=?


2224