2024年3月5日发(作者:)

概览

Kettle也叫PDI(全称是Pentaho Data Integeration),是一款开源的ETL工具,项目开始于2003年,2006年加入了开源的 BI 组织 Pentaho, 正式命名为PDI。官方网站:/

术语

1. Transformation

转换步骤,可以理解为将一个或者多个不同的数据源组装成一条数据流水线。然后最终输出到某一个地方,文件或者数据库等。

2. Job

作业,可以调度设计好的转换,也可以执行一些文件处理(比较,删除等),还可以ftp上传,下载文件,发送邮件,执行shell命令等,

3. Hop连接转换步骤或者连接Job(实际上就是执行顺序)的连线

Transformation hop:主要表示数据的流向。从输入,过滤等转换操作,到输出。

Job hop:可设置执行条件:

1, 无条件执行

2, 当上一个Job执行结果为true时执行

3, 当上一个Job执行结果为false时执行

Kettle,etl设计及运行

1. Kettle整体结构图

Kettle整体结构图

2. 转换设计样例图

绿色线条为hop,流水线

转换设计样例

3. 运行方式

使用 java web start 方式运行的配置方法

命令行方式

1) Windows下执行,多个参数之间以“/”分隔,Key和value以”:”分隔

例如:

/file:F: /level:Basic

/log:

/file:指定转换文件的路径

/level:执行日志执行级别

/log: 执行日志文件路径

2) Linux下执行,多个参数之间以“-”分隔,Key和value以”=”分隔

-file=/home/ -level=Minimal

如果设计的转换,Job是保存在数据库中,则命令如下:

/rep:资源库名称 /user:admin /pass:admin /job:job名

4. Xml保存转换,job流程

设计用户定义的作业可以保存在(xml格式)中或某一个特定的数据库中

转换的设计文件以.ktr结尾(xml文格式),保存所有配置好的数据库连接,文件相对路径,

字段映射关系等信息。

Job的设计文件以.kjb结尾,下面是一个调用已经设计好的转换的job文件的一部分:

demo test

Transformation

TRANS

${ory}/

load customer data job

${ory}

N

N

N

N

N

………

5. 数据库保存转换,job流程

列出几个重要的表

1) r_job:保存job的id,name,status,执行时间,创建时间,修改时间等信息

建表语句:

DROP TABLE IF EXISTS `r_job`;

CREATE TABLE `r_job` (

`ID_JOB` bigint(20) NOT NULL,

`ID_DIRECTORY` int(11) DEFAULT NULL,

`NAME` varchar(255) DEFAULT NULL,

`DESCRIPTION` mediumtext,

`EXTENDED_DESCRIPTION` mediumtext,

`JOB_VERSION` varchar(255) DEFAULT NULL,

`JOB_STATUS` int(11) DEFAULT NULL,

`ID_DATABASE_LOG` int(11) DEFAULT NULL,

`TABLE_NAME_LOG` varchar(255) DEFAULT NULL,

`CREATED_USER` varchar(255) DEFAULT NULL,

`CREATED_DATE` datetime DEFAULT NULL,

`MODIFIED_USER` varchar(255) DEFAULT NULL,

`MODIFIED_DATE` datetime DEFAULT NULL,

`USE_BATCH_ID` char(1) DEFAULT NULL,

`PASS_BATCH_ID` char(1) DEFAULT NULL,

`USE_LOGFIELD` char(1) DEFAULT NULL,

`SHARED_FILE` varchar(255) DEFAULT NULL,

PRIMARY KEY (`ID_JOB`)

)

2) r_jobentry:设计界面上的一个实体对应一个entry,通过job的id关联

DROP TABLE IF EXISTS `r_jobentry`;

CREATE TABLE `r_jobentry` (

`ID_JOBENTRY` bigint(20) NOT NULL,

`ID_JOB` int(11) DEFAULT NULL,

`ID_JOBENTRY_TYPE` int(11) DEFAULT NULL,

`NAME` varchar(255) DEFAULT NULL,

`DESCRIPTION` mediumtext,

PRIMARY KEY (`ID_JOBENTRY`)

)

3) r_jobentry_attribute:job的详细信息表,包括job执行规则,执行过程中的参数来源,日志记录等

DROP TABLE IF EXISTS `r_jobentry_attribute`;

CREATE TABLE `r_jobentry_attribute` (

`ID_JOBENTRY_ATTRIBUTE` bigint(20) NOT NULL,

`ID_JOB` int(11) DEFAULT NULL,

`ID_JOBENTRY` int(11) DEFAULT NULL,

`NR` int(11) DEFAULT NULL,

`CODE` varchar(255) DEFAULT NULL,

`VALUE_NUM` double DEFAULT NULL,

`VALUE_STR` mediumtext,

PRIMARY KEY (`ID_JOBENTRY_ATTRIBUTE`)

)

4) r_step:保存转换的步骤id,名字等

r_step建表语句:

DROP TABLE IF EXISTS `r_step`;

CREATE TABLE `r_step` (

`ID_STEP` bigint(20) NOT NULL,

`ID_TRANSFORMATION` int(11) DEFAULT NULL,

`NAME` varchar(255) DEFAULT NULL,

`DESCRIPTION` mediumtext,

`ID_STEP_TYPE` int(11) DEFAULT NULL,

`DISTRIBUTE` char(1) DEFAULT NULL,

`COPIES` int(11) DEFAULT NULL,

`GUI_LOCATION_X` int(11) DEFAULT NULL,

`GUI_LOCATION_Y` int(11) DEFAULT NULL,

`GUI_DRAW` char(1) DEFAULT NULL,

PRIMARY KEY (`ID_STEP`)

)

5) r_step_attribute:转换步骤的详细信息,字段重命名,字段映射等。通过外键id_transformation与r_step的id_transformation关联

DROP TABLE IF EXISTS `r_step_attribute`;

CREATE TABLE `r_step_attribute` (

`ID_STEP_ATTRIBUTE` bigint(20) NOT NULL,

`ID_TRANSFORMATION` int(11) DEFAULT NULL,

`ID_STEP` int(11) DEFAULT NULL,

`NR` int(11) DEFAULT NULL,

`CODE` varchar(255) DEFAULT NULL, --操作编码譬如:truncate,也可以是字段映射等信息

`VALUE_NUM` bigint(20) DEFAULT NULL, --操作值

`VALUE_STR` mediumtext, --操作值,譬如truncate对应的是Y或者N

PRIMARY KEY (`ID_STEP_ATTRIBUTE`),

UNIQUE KEY `IDX_STEP_ATTRIBUTE_LOOKUP` (`ID_STEP`,`CODE`,`NR`)

)

说明:

如果有一个字段firtstname映射到name则在r_step_attribute中增加两条记录。

6. Kettle组成部分

Chef:是一个图形用户界面,使用SWT开发,用来设计一个作业,转换,SQL,FTP,邮件,检查表存在,检查文件存在,执行SHELL脚本

Kitchen:作业执行引擎,用来进行转换,校验,FTP上传。可以执行xml格式定义的任务以及保存在数据库上的。

/file:D: /level:Basic

-file=/PRD/ -level=Minimal

/rep:"Production Repository"

/job:"Update dimensions"

/dir:/Dimensions

/user:matt

/pass:somepassword123

/level:Basic

/file:F:javapdi-ce-3.2.0-st

/level:Basic /log:

Spoon:Spoon 是Kettle的另一个图形用户界面,用来设计数据转换过程

Pan:Pan是一个数据转换引擎,负责从不同的数据源读写和转换数据。

-file="/PRD/Customer " -level=Minimal

Transformation步骤

1. 输入类型:

1.1 Csv file input

读取csv文件,设置csv文件路径,可以设置csv文件的相对路径或者绝对路径,字段分隔符,文件读取的缓存大小等

1.2 Excel Input

读取excel文件,和csv文件读取类似,增加了表单,表头,出错(是否忽略错误,严格的类型判断等)的处理

1.3 Property Input

读取属性.properties文件

1.4 Table input

从数据库读数据,动态绑定参数的SQL语句,参数替换可以从上一个步骤从获取。

例如SELECT *FROM customerWHERE birthdate<’${current_date}’

这里的${current_date}在执行过程中会作为动态参数被替换掉。这个值是前一个转换步骤设置的。

注:但是测试过程中发现如果上一个步骤设置的变量,在table input里面获取不到,变量设置必须作为一个单独的转换先执行一次,然后才能获取到这个变量。

1.5 Text file input

主要是txt文件内容等,和csv差不多。

1.6 Generate Rows

生成一些固定字段的记录,主要用来模拟一些数据进行测试。

1.7 Get File Names

读取给定目录或者文件全路径的文件名

1.8 Get System Info

包括命令行输入的参数,操作系统时间,ip地址,一些特殊属性,kettle版本等

1.9 De-serialize from file

从二进制kettle cube文件中读取记录

1.10 Access input

读取access数据库

1.11

1.12

ESRI Shapefile Reader

Fixed file input

读取固定大小文件

1.13

1.14

Generate random value

Get Files Rows Count

获取文件内容的行数

1.15

1.16

Get SubFolder names

Get data from XML

从xml文件解析出数据

1.17 LDAP Input

从ldap库读取数据。

1.18 LDIF Input

读取ldap的ldif文件

1.19 Mondrian Input

MDX语言从Mondrian服务器上读取数据

1.20

1.21

1.22

1.23

RSS Input

S3 CSV Input

SalesForce Input

XBase input

读取XBase系列文件,如Foxpro文件,主要是数据库语言

2. 输出类型

2.1 Table output

将数据写入到数据库,可以指定是否truncate表,编辑前一步转换字段与现在表结构的字段映射关系。以及每次commit的记录数大小等。

2.2 Text file output

将数据写入到文本文件,通常是csv文件

2.3 Insert / Update

根据关键字找对应的记录,如果找不到则执行insert,否则执行update

2.4 Update

跟insert/update类似,只是没有insert操作

2.5 Delete

跟update类似,只是执行的是delete操作

2.6 Excel Output

输出到excel,格式可以采用excel模板

2.7 Serialize to file

将记录写到二进制文件中(cube文件)

2.8 Access Output

2.9 Properties Output

输出到properties文件

2.10

2.11

RSS Output

SQL File Output

将输出的sql insert语句保存到文件

2.12

2.13

Synchronize after merge

XML Output

输出到xml文件

3. Transform类型

3.1 Select values

用于选择列,重命名列,指定列长度或精度

3.2 Filter rows

通过使用一个表达式从输入行中过滤数据,将结果是TURE或FALSE的行输出到不同的节点。表达式是“”“OPERATOR”“” 的形式,其中OPERATOR可以是 =,<>, <, >,

<=, >=, REGEXP,IS NULL,IS NOT NULL, IN LIST, CONTAINS, STARTS WITH, ENDS WITH。用户可以增加多个表达式,并用AND或OR连接。

3.3 Sort rows

对指定的列以升序或降序排序,当排序的行数超过5000时需要临时表。

3.4 Add sequence

为数据流增加一个序列,可以使用ORACLE中某一序列的值或由用户指定值

3.5 Dummy

不做任何处理,一般作为流程的终点。

3.6 Join Rows (catesian product)

对所有输入流做笛卡儿乘积。

3.7 Aggregate Rows

聚集行数据,提供SUM,AVERAGE,COUNT,MIN,MAX,FIRST,LAST聚集函数,该类型不提倡使用,将来会被GROUP BY 类型替代。

3.8 Java Script value

使用mozilla的rhino作为脚本语言,并提供了很多函数,用户可以在脚本中使用这些函数。例如 var prev_row; if (prev_row == null) prev_row = row; ... String previousName =

prev_ing(“Name”, “-”); ... prev_row = row; 可以获得字段Name的前一条记录的值。

3.9 Row Normaliser

该步骤可以从透视表(PIVOT TABLE)中还原数据到事实表,如从表一转换成表二,需要使用该步骤。

3.10 Unique rows

去掉输入流中的重复行,在使用该节点前要先排序,否则只能删除连续的重复行。

3.11

分组

Group by

3.12 Calculator

提供了一组函数对列值进行运算,使用该方式比用户自定义JAVA SCRIPT脚本速度更快。

3.13 Add constants

增加常量值。

3.14 Row denormaliser

同正规化过程相反。

3.15 Row flattener

表扁平化处理除了上述基本节点类型外还定义了扩展节点类型,如SPLIT FIELDS,EXECUTE

SQL SCRIPT,CUBE INPUT,CUBE OUTPUT等。图一中创建了一个简单的数据流程示例,共包括5个节点,其中Table input节点使用了SQL SERVER数据库中的一张表(三条记录),Filter rows中定义了过滤条件,将符合条件的发送到file2节点,不符和条件的记录发送到Select values节点。 Select values节点中选择列,并对选择的列进行了设置,将结果发送到file1节点。file1,file2节点分别是两个文本文件节点,最终用来保存数据。该流程运行后,可以在Log View面板中查看运行结果,如图四所示从table input结点输入3条记录,经过滤后输出到file2节点2条记录(OUTPUT列中的3是指2条记录加1行列名),输出到file1节点1条记录(OUTPUT列中的2是指1条记录加1行列名)。

4. Flow

4.1 Abort

忽略上一步的输入流,通常用在错误处理中,譬如不处理X条记录后的所有记录

4.2 Append streams

主要用来处理步骤之间有优先级的问题。从两个步骤从读取数据流,指定步骤的读取顺序。

4.3 Blocking Step

阻塞所有的输出直到最后一条记录到达

4.4 Detect empty stream

当输入流为空的时候,输出一条空的记录

4.5 Dummy (do nothing)

空操作。是一个空操作的插件

4.6 Filter rows

通过设定过滤条件来过滤记录

4.7 Identify last row in a stream

4.8 Switch / Case

类似Java的switch语法,通过比较某一确定的字段值来将数据转发到不同的转化步骤

5. Joins

5.1 Merge join

合并两种不同输入流,连接方式有内连,左外连接等。需要注意的是记录需要先按关键字进行排序

5.2 Merge Rows (diff)

用于比较两组输入数据,一般用于更新后的数据重新导入到数据仓库中。两组数据中一组是引用流,一组是比较流,每次比较后只有最新版本的行数据被输出到下一步。比较结果包括:

idectical一致:两组流的主键一致,值一致

changed有变化:两组流的主键一致,值有一个或多个不同

new新行:引用流中有而比较流中没有某一主键

deleted被删除的行:比较流中有而引用流中没有某一主键

比较流里面的数据除了被标记为deleted都会进入下一个步骤里面

5.3 Sorted Merge

对记录按某个关键字进行排序

5.4 XML Join

将一个XML文作为节点添加到另一个XML里面

合并前的XML文

需要合并的XML

合并后的XML文

6. Scripting

6.1 Execute SQL script

执行SQL脚本,应该避免使用这一步骤,尽可能的使用“table input(select)”,”table

output(insert)”,”update”,”delete”等步骤来替代。

譬如动态创建表(表名是可变的,table1,table2,table3):

SQL脚本是:

CREATE TABLE?

(

ID INTEGER

);

6.2 Execute row SQL script

对Execute sql script的补充,增加了可以自定义sql语句的字段名

6.3 Formula

在数据流中执行公式

6.4 Modified Java Script Value

Modified Java Script Value应该说是转换步骤里最强大的一步,可以获取前一步的输入流的所有字段,调用Java api对数据做转换等操作,改变所有输出的值。还能通过设置转换状态常量对现有转换流程做改变,(忽略转换,设置为错误,继续转换)。

脚本是 Mozilla 的 Rhino,Rhino 是一个Java实现的Javascript解释器。现在已经加入到

JDK 1.6 的 包中了。

对数据流进行修改等操作

提供了常量,函数,输入字段,输出字段的列表显示

1)

2)

Transformation scripts已经创建的脚本

Transformation constantsTransformation functions

已经预先定义好的静态常量,不可更改,例如SKIP_TRANSFORMATION ,ERROR_TRANSFORMATION,CONTINUE_TRANSFORMATION

3)

类型转换,操作函数: 字符串,数字,日期之间的转换,字符串截取等

逻辑判断函数: isDate(var) ,isNumber(var)等

特殊函数:LoadScriptFile(var),getProcessCount(var),print(var),writeToLog(var),getVariable(var,var)

文件操作函数:createFolder(var),deleteFile(var),getLastModifiedTime(var,var),moveFile(var,var,var)

Input fields 获取输入流中字段的值 ber()

Output fields set输出流中字段的值 ue(99)

例子:

1) 过滤Null字段

var a;

if ( () ){

a = '0';

}else{

a = ing();

}

2) 字符串截取

将字符串“12345McDonalds”前面的数字部分分割出来

var str = ing();

var code = "";

var name = "";

for (i = 0; i < (); i++ ){

c = (i);

if ( ! t(c) ){

code = ing(0, i);

name = ing(i);

Alert("code="+code+", name="+name);

break;

}

}

3) 过滤记录行,控制转换流程

trans_Status = CONTINUE_TRANSFORMATION

if (ing()==’123’) trans_Status = SKIP_TRANSFORMATION

4) 使用java类库

var mydate = "20090723";

var parser = DateFormat("yyyyMMdd"); //Must use fully qualified java class

var dateObj = (mydate); //just like how you would do in java

Alert(dateObj);

6.5 Regex Evaluation

通过正则表达式验证输入字段

6.6 User Defined Java Expression

执行一些简单的java代码

譬如表达式:firstname+" "+name

也可以用Java代码:

new StringBuffer(firstname).append(" ").append(name).toString()

7. Lookup

7.1 Call DB Procedure

执行存储过程并获得返回值,返回值只有一个,参数可以多个。

7.2 Check if a column exists

检查数据库表是否存在某列

7.3 Database join

改步骤允许查询等操作利用上一步的数据,譬如参数动态绑定的查询语句,可以被上一步某个字段的值替换掉

7.4 Database lookup

和database join功能类似,从数据库查询数值,作为新的字段添加到数据流中。可将前面的输出流的值作为查询比较参数

7.5 Dynamic SQL row

动态SQL查询记录行数。

7.6 File exists

检查文件是否存在,文件名由上一步传来

7.7 HTTP Post

处理POST请求,url可以从上一步数据里获取,也可以在该步骤指定,可以指定编码,请求参数等。

7.8 HTTP client

仅仅是一个带参数的URL请求,url可以从上一步数据里获取,也可以在该步骤指定,不支持soap

例如?param1=value1¶m2=value2

7.9 Stream lookup

从其他转换步骤产生的数据流中查询数据

7.10 Table exists

判断数据库中某张表是否存在,表名由前面步骤传来

7.11 Web services lookup

处理SOAP请求,数据类型转换是在步骤内部处理,如果有日期,数字等类型需要转换,建议全部返回字符串,然后使用“Select values”步骤做转换

JOB类型

1. 常用

1.1 Start

指定job执行规则,是否循环,循环规则等。

1.2 Dummy

空操作,主要用来多数据源汇总

1.3 Abort job

终止,忽略一个Job

1.4 Display Msgbox Info

使用图形化界面执行Job的时候显示消息框

1.5 Dummy plugin

Job里面空操作,可以用来将执行循环操作

1.6 Job

执行已经定义好的Job。job可以嵌套job。

1.7 Ping a host

ping

1.8 Send SNMP trap

发送SNMP trap 报文

SNMP TRAP就是在SNMP设备发生状态变化的时候向管理器发出信号。不用管理器来检查。

1.9 Set variables

Set变量

1.10 Success

当出错后可以强制将该job置成功

1.11 Transformation

执行定义好的转换

1.12

Truncate tables

1.13 Wait for SQL

当检测表的记录数是否达到一定条件

1.14 Write To Log

job里面的日志记录,不同于程序自带的log4j等日志

2. 邮件

2.1 Get mails from POP

从POP服务器获取邮件并存储到文件夹中

2.2 Mail

发送文本或者HTML格式邮件,可添加附件。

2.3 Mail validator

通过发送SNMP TRAP到邮件服务器来验证EMAIL地址是否正确

3. 文件管理

3.1 Add filenames to result

将文件夹或者多个文件加入到数据流中,以便在下一个job步骤中使用

3.2 Compare folders

比较两个文件夹下面的文件是否一致,可以选择只比较文件大小,也可以选择比较文件内容

3.3 Copy Files

3.4 Copy or Move result filenames

根据上一步的执行结果得到的文件名,复制或者剪切文件

3.5 Create a folder

3.6 Create file

3.7 Delete file

3.8 Delete filenames from result

根据上一步的执行结果得到的文件名,复制或者剪切文件

3.9 Delete files

3.10

3.11

Delete folders

File Compare

比较两个文件的内容

3.12 HTTP

通过http协议从web服务器上获得文件

3.13

移动文件

Move Files

3.14 Unzip file

Unzip解压缩文件

3.15 Wait for file

循环检测文件是否存在,否则直到超时失败

3.16 Zip file

采用zip压缩文件

4. 文件传输

4.1 FTP Delete

删除FTP上的文件

4.2 Get a file with FTP

从FTP获取文件,可以设置编码方式,连接超时时间等。文件保存路径,文件名中是否包含日期,时间,时间日期是否需要特殊格式化,是否覆盖文件。

4.3 Get a file with SFTP

与ftp类似,只是采用Secure FTP protocol

4.4 Put a file with FTP

上传文件,本地文件路径,通过*等模糊匹配要上传的文件。并执行上传的远程目录。

4.5 Put a file with SFTP

4.6 SSH2 Get

4.7 SSH2 Put

5. Scripting

5.1 JavaScript

编辑javascript脚本并执行。

5.2 SQL

执行SQL脚本

可以执行写好的sql脚本,指定sql路径即可。

也可以插入编辑sql脚本。

5.3 Shell

执行SHELL脚本,

可以执行已经写好的shell脚本指定shell脚本路径即可,也可以自己插入,编辑shell脚本

可以把前一个步骤的执行结果当作参数传入

与现有应用程序集成

通过图形化界面设计出来的transformation ,job都是xml格式的文件。通过加载环境变量,初始化配置文件可以在servlet或者其他应用程序中中执行转换。

大致执行方式如下:

public static void runTransformation(String filename) {

try {

();

nmentInit();

TransMeta transMeta = new TransMeta(filename);

Trans trans = new Trans(transMeta);

e(null); // You can pass arguments instead of null.

tilFinished();

if ( ors() > 0 )

{

throw new RuntimeException( "There were errors during transformation execution." );

}

}

catch ( KettleException e ) {

n(e);

}

}

通过插件扩展现有功能

1) 至少实现4个接口

StepMetaInterface接口定义元数据,执行检查等

StepInterface接口集成自BaseStep,步骤

StepDataInterface接口处理游标,数据库结果集,文件等

StepDialogInterface接口编辑,设置变量的图形化界面

类名命名规则:

四个类的前缀保持一致,譬如实现StepMetaInterface的类是MyMeta

那么实现StepInterface的类是My,实现StepDataInterface的是MyData,实现StepDialogInterface的是MyDialog

2) 定义文件

id="DummyPlugin"

iconfile=""

description="Dummy Plugin"

tooltip="This is a dummy plugin test step"

category="Transform"

classname="luginMeta">

Transform

Transform

Transformation

Dummy plugin

Plugin voorbeeld

Exemple de plugiciel

This is a dummy plugin test step

Dit is een voorbeeld plugin ook wel 'dummy plugin' geheten

Exemple de plugiciel

小例子

文件批量入库:

获取一个目录下的文件名,使用一个正则表达式来指定文件名。

使用一个 javascript 脚本,读取文件内容,

如果文件是二进制文件,文件内容一般保存为 BLOB 、Binary 、Image 等类型

如果文件是字符型文件,文件内容一般保存为 CLOB 、varchar、Text 等类型

注意:因为该方法是一次性将文件内容都读到了内存中,因此只能处理比较小的文件

file = new (ing());