2024年3月5日发(作者:)
概览
Kettle也叫PDI(全称是Pentaho Data Integeration),是一款开源的ETL工具,项目开始于2003年,2006年加入了开源的 BI 组织 Pentaho, 正式命名为PDI。官方网站:/
术语
1. Transformation
转换步骤,可以理解为将一个或者多个不同的数据源组装成一条数据流水线。然后最终输出到某一个地方,文件或者数据库等。
2. Job
作业,可以调度设计好的转换,也可以执行一些文件处理(比较,删除等),还可以ftp上传,下载文件,发送邮件,执行shell命令等,
3. Hop连接转换步骤或者连接Job(实际上就是执行顺序)的连线
Transformation hop:主要表示数据的流向。从输入,过滤等转换操作,到输出。
Job hop:可设置执行条件:
1, 无条件执行
2, 当上一个Job执行结果为true时执行
3, 当上一个Job执行结果为false时执行
Kettle,etl设计及运行
1. Kettle整体结构图
Kettle整体结构图
2. 转换设计样例图
绿色线条为hop,流水线
转换设计样例
3. 运行方式
使用 java web start 方式运行的配置方法
命令行方式
1) Windows下执行,多个参数之间以“/”分隔,Key和value以”:”分隔
例如:
/file:F: /level:Basic
/log:
/file:指定转换文件的路径
/level:执行日志执行级别
/log: 执行日志文件路径
2) Linux下执行,多个参数之间以“-”分隔,Key和value以”=”分隔
-file=/home/ -level=Minimal
如果设计的转换,Job是保存在数据库中,则命令如下:
/rep:资源库名称 /user:admin /pass:admin /job:job名
4. Xml保存转换,job流程
设计用户定义的作业可以保存在(xml格式)中或某一个特定的数据库中
转换的设计文件以.ktr结尾(xml文格式),保存所有配置好的数据库连接,文件相对路径,
字段映射关系等信息。
Job的设计文件以.kjb结尾,下面是一个调用已经设计好的转换的job文件的一部分:
………
5. 数据库保存转换,job流程
列出几个重要的表
1) r_job:保存job的id,name,status,执行时间,创建时间,修改时间等信息
建表语句:
DROP TABLE IF EXISTS `r_job`;
CREATE TABLE `r_job` (
`ID_JOB` bigint(20) NOT NULL,
`ID_DIRECTORY` int(11) DEFAULT NULL,
`NAME` varchar(255) DEFAULT NULL,
`DESCRIPTION` mediumtext,
`EXTENDED_DESCRIPTION` mediumtext,
`JOB_VERSION` varchar(255) DEFAULT NULL,
`JOB_STATUS` int(11) DEFAULT NULL,
`ID_DATABASE_LOG` int(11) DEFAULT NULL,
`TABLE_NAME_LOG` varchar(255) DEFAULT NULL,
`CREATED_USER` varchar(255) DEFAULT NULL,
`CREATED_DATE` datetime DEFAULT NULL,
`MODIFIED_USER` varchar(255) DEFAULT NULL,
`MODIFIED_DATE` datetime DEFAULT NULL,
`USE_BATCH_ID` char(1) DEFAULT NULL,
`PASS_BATCH_ID` char(1) DEFAULT NULL,
`USE_LOGFIELD` char(1) DEFAULT NULL,
`SHARED_FILE` varchar(255) DEFAULT NULL,
PRIMARY KEY (`ID_JOB`)
)
2) r_jobentry:设计界面上的一个实体对应一个entry,通过job的id关联
DROP TABLE IF EXISTS `r_jobentry`;
CREATE TABLE `r_jobentry` (
`ID_JOBENTRY` bigint(20) NOT NULL,
`ID_JOB` int(11) DEFAULT NULL,
`ID_JOBENTRY_TYPE` int(11) DEFAULT NULL,
`NAME` varchar(255) DEFAULT NULL,
`DESCRIPTION` mediumtext,
PRIMARY KEY (`ID_JOBENTRY`)
)
3) r_jobentry_attribute:job的详细信息表,包括job执行规则,执行过程中的参数来源,日志记录等
DROP TABLE IF EXISTS `r_jobentry_attribute`;
CREATE TABLE `r_jobentry_attribute` (
`ID_JOBENTRY_ATTRIBUTE` bigint(20) NOT NULL,
`ID_JOB` int(11) DEFAULT NULL,
`ID_JOBENTRY` int(11) DEFAULT NULL,
`NR` int(11) DEFAULT NULL,
`CODE` varchar(255) DEFAULT NULL,
`VALUE_NUM` double DEFAULT NULL,
`VALUE_STR` mediumtext,
PRIMARY KEY (`ID_JOBENTRY_ATTRIBUTE`)
)
4) r_step:保存转换的步骤id,名字等
r_step建表语句:
DROP TABLE IF EXISTS `r_step`;
CREATE TABLE `r_step` (
`ID_STEP` bigint(20) NOT NULL,
`ID_TRANSFORMATION` int(11) DEFAULT NULL,
`NAME` varchar(255) DEFAULT NULL,
`DESCRIPTION` mediumtext,
`ID_STEP_TYPE` int(11) DEFAULT NULL,
`DISTRIBUTE` char(1) DEFAULT NULL,
`COPIES` int(11) DEFAULT NULL,
`GUI_LOCATION_X` int(11) DEFAULT NULL,
`GUI_LOCATION_Y` int(11) DEFAULT NULL,
`GUI_DRAW` char(1) DEFAULT NULL,
PRIMARY KEY (`ID_STEP`)
)
5) r_step_attribute:转换步骤的详细信息,字段重命名,字段映射等。通过外键id_transformation与r_step的id_transformation关联
DROP TABLE IF EXISTS `r_step_attribute`;
CREATE TABLE `r_step_attribute` (
`ID_STEP_ATTRIBUTE` bigint(20) NOT NULL,
`ID_TRANSFORMATION` int(11) DEFAULT NULL,
`ID_STEP` int(11) DEFAULT NULL,
`NR` int(11) DEFAULT NULL,
`CODE` varchar(255) DEFAULT NULL, --操作编码譬如:truncate,也可以是字段映射等信息
`VALUE_NUM` bigint(20) DEFAULT NULL, --操作值
`VALUE_STR` mediumtext, --操作值,譬如truncate对应的是Y或者N
PRIMARY KEY (`ID_STEP_ATTRIBUTE`),
UNIQUE KEY `IDX_STEP_ATTRIBUTE_LOOKUP` (`ID_STEP`,`CODE`,`NR`)
)
说明:
如果有一个字段firtstname映射到name则在r_step_attribute中增加两条记录。
6. Kettle组成部分
Chef:是一个图形用户界面,使用SWT开发,用来设计一个作业,转换,SQL,FTP,邮件,检查表存在,检查文件存在,执行SHELL脚本
Kitchen:作业执行引擎,用来进行转换,校验,FTP上传。可以执行xml格式定义的任务以及保存在数据库上的。
/file:D: /level:Basic
-file=/PRD/ -level=Minimal
/rep:"Production Repository"
/job:"Update dimensions"
/dir:/Dimensions
/user:matt
/pass:somepassword123
/level:Basic
/file:F:javapdi-ce-3.2.0-st
/level:Basic /log:
Spoon:Spoon 是Kettle的另一个图形用户界面,用来设计数据转换过程
Pan:Pan是一个数据转换引擎,负责从不同的数据源读写和转换数据。
-file="/PRD/Customer " -level=Minimal
Transformation步骤
1. 输入类型:
1.1 Csv file input
读取csv文件,设置csv文件路径,可以设置csv文件的相对路径或者绝对路径,字段分隔符,文件读取的缓存大小等
1.2 Excel Input
读取excel文件,和csv文件读取类似,增加了表单,表头,出错(是否忽略错误,严格的类型判断等)的处理
1.3 Property Input
读取属性.properties文件
1.4 Table input
从数据库读数据,动态绑定参数的SQL语句,参数替换可以从上一个步骤从获取。
例如SELECT *FROM customerWHERE birthdate<’${current_date}’
这里的${current_date}在执行过程中会作为动态参数被替换掉。这个值是前一个转换步骤设置的。
注:但是测试过程中发现如果上一个步骤设置的变量,在table input里面获取不到,变量设置必须作为一个单独的转换先执行一次,然后才能获取到这个变量。
1.5 Text file input
主要是txt文件内容等,和csv差不多。
1.6 Generate Rows
生成一些固定字段的记录,主要用来模拟一些数据进行测试。
1.7 Get File Names
读取给定目录或者文件全路径的文件名
1.8 Get System Info
包括命令行输入的参数,操作系统时间,ip地址,一些特殊属性,kettle版本等
1.9 De-serialize from file
从二进制kettle cube文件中读取记录
1.10 Access input
读取access数据库
1.11
1.12
ESRI Shapefile Reader
Fixed file input
读取固定大小文件
1.13
1.14
Generate random value
Get Files Rows Count
获取文件内容的行数
1.15
1.16
Get SubFolder names
Get data from XML
从xml文件解析出数据
1.17 LDAP Input
从ldap库读取数据。
1.18 LDIF Input
读取ldap的ldif文件
1.19 Mondrian Input
MDX语言从Mondrian服务器上读取数据
1.20
1.21
1.22
1.23
RSS Input
S3 CSV Input
SalesForce Input
XBase input
读取XBase系列文件,如Foxpro文件,主要是数据库语言
2. 输出类型
2.1 Table output
将数据写入到数据库,可以指定是否truncate表,编辑前一步转换字段与现在表结构的字段映射关系。以及每次commit的记录数大小等。
2.2 Text file output
将数据写入到文本文件,通常是csv文件
2.3 Insert / Update
根据关键字找对应的记录,如果找不到则执行insert,否则执行update
2.4 Update
跟insert/update类似,只是没有insert操作
2.5 Delete
跟update类似,只是执行的是delete操作
2.6 Excel Output
输出到excel,格式可以采用excel模板
2.7 Serialize to file
将记录写到二进制文件中(cube文件)
2.8 Access Output
2.9 Properties Output
输出到properties文件
2.10
2.11
RSS Output
SQL File Output
将输出的sql insert语句保存到文件
2.12
2.13
Synchronize after merge
XML Output
输出到xml文件
3. Transform类型
3.1 Select values
用于选择列,重命名列,指定列长度或精度
3.2 Filter rows
通过使用一个表达式从输入行中过滤数据,将结果是TURE或FALSE的行输出到不同的节点。表达式是“”“OPERATOR”“” 的形式,其中OPERATOR可以是 =,<>, <, >,
<=, >=, REGEXP,IS NULL,IS NOT NULL, IN LIST, CONTAINS, STARTS WITH, ENDS WITH。用户可以增加多个表达式,并用AND或OR连接。
3.3 Sort rows
对指定的列以升序或降序排序,当排序的行数超过5000时需要临时表。
3.4 Add sequence
为数据流增加一个序列,可以使用ORACLE中某一序列的值或由用户指定值
3.5 Dummy
不做任何处理,一般作为流程的终点。
3.6 Join Rows (catesian product)
对所有输入流做笛卡儿乘积。
3.7 Aggregate Rows
聚集行数据,提供SUM,AVERAGE,COUNT,MIN,MAX,FIRST,LAST聚集函数,该类型不提倡使用,将来会被GROUP BY 类型替代。
3.8 Java Script value
使用mozilla的rhino作为脚本语言,并提供了很多函数,用户可以在脚本中使用这些函数。例如 var prev_row; if (prev_row == null) prev_row = row; ... String previousName =
prev_ing(“Name”, “-”); ... prev_row = row; 可以获得字段Name的前一条记录的值。
3.9 Row Normaliser
该步骤可以从透视表(PIVOT TABLE)中还原数据到事实表,如从表一转换成表二,需要使用该步骤。
3.10 Unique rows
去掉输入流中的重复行,在使用该节点前要先排序,否则只能删除连续的重复行。
3.11
分组
Group by
3.12 Calculator
提供了一组函数对列值进行运算,使用该方式比用户自定义JAVA SCRIPT脚本速度更快。
3.13 Add constants
增加常量值。
3.14 Row denormaliser
同正规化过程相反。
3.15 Row flattener
表扁平化处理除了上述基本节点类型外还定义了扩展节点类型,如SPLIT FIELDS,EXECUTE
SQL SCRIPT,CUBE INPUT,CUBE OUTPUT等。图一中创建了一个简单的数据流程示例,共包括5个节点,其中Table input节点使用了SQL SERVER数据库中的一张表(三条记录),Filter rows中定义了过滤条件,将符合条件的发送到file2节点,不符和条件的记录发送到Select values节点。 Select values节点中选择列,并对选择的列进行了设置,将结果发送到file1节点。file1,file2节点分别是两个文本文件节点,最终用来保存数据。该流程运行后,可以在Log View面板中查看运行结果,如图四所示从table input结点输入3条记录,经过滤后输出到file2节点2条记录(OUTPUT列中的3是指2条记录加1行列名),输出到file1节点1条记录(OUTPUT列中的2是指1条记录加1行列名)。
4. Flow
4.1 Abort
忽略上一步的输入流,通常用在错误处理中,譬如不处理X条记录后的所有记录
4.2 Append streams
主要用来处理步骤之间有优先级的问题。从两个步骤从读取数据流,指定步骤的读取顺序。
4.3 Blocking Step
阻塞所有的输出直到最后一条记录到达
4.4 Detect empty stream
当输入流为空的时候,输出一条空的记录
4.5 Dummy (do nothing)
空操作。是一个空操作的插件
4.6 Filter rows
通过设定过滤条件来过滤记录
4.7 Identify last row in a stream
4.8 Switch / Case
类似Java的switch语法,通过比较某一确定的字段值来将数据转发到不同的转化步骤
5. Joins
5.1 Merge join
合并两种不同输入流,连接方式有内连,左外连接等。需要注意的是记录需要先按关键字进行排序
5.2 Merge Rows (diff)
用于比较两组输入数据,一般用于更新后的数据重新导入到数据仓库中。两组数据中一组是引用流,一组是比较流,每次比较后只有最新版本的行数据被输出到下一步。比较结果包括:
idectical一致:两组流的主键一致,值一致
changed有变化:两组流的主键一致,值有一个或多个不同
new新行:引用流中有而比较流中没有某一主键
deleted被删除的行:比较流中有而引用流中没有某一主键
比较流里面的数据除了被标记为deleted都会进入下一个步骤里面
5.3 Sorted Merge
对记录按某个关键字进行排序
5.4 XML Join
将一个XML文作为节点添加到另一个XML里面
合并前的XML文
需要合并的XML
合并后的XML文
6. Scripting
6.1 Execute SQL script
执行SQL脚本,应该避免使用这一步骤,尽可能的使用“table input(select)”,”table
output(insert)”,”update”,”delete”等步骤来替代。
譬如动态创建表(表名是可变的,table1,table2,table3):
SQL脚本是:
CREATE TABLE?
(
ID INTEGER
);
6.2 Execute row SQL script
对Execute sql script的补充,增加了可以自定义sql语句的字段名
6.3 Formula
在数据流中执行公式
6.4 Modified Java Script Value
Modified Java Script Value应该说是转换步骤里最强大的一步,可以获取前一步的输入流的所有字段,调用Java api对数据做转换等操作,改变所有输出的值。还能通过设置转换状态常量对现有转换流程做改变,(忽略转换,设置为错误,继续转换)。
脚本是 Mozilla 的 Rhino,Rhino 是一个Java实现的Javascript解释器。现在已经加入到
JDK 1.6 的 包中了。
对数据流进行修改等操作
提供了常量,函数,输入字段,输出字段的列表显示
1)
2)
Transformation scripts已经创建的脚本
Transformation constantsTransformation functions
已经预先定义好的静态常量,不可更改,例如SKIP_TRANSFORMATION ,ERROR_TRANSFORMATION,CONTINUE_TRANSFORMATION
3)
类型转换,操作函数: 字符串,数字,日期之间的转换,字符串截取等
逻辑判断函数: isDate(var) ,isNumber(var)等
特殊函数:LoadScriptFile(var),getProcessCount(var),print(var),writeToLog(var),getVariable(var,var)
文件操作函数:createFolder(var),deleteFile(var),getLastModifiedTime(var,var),moveFile(var,var,var)
Input fields 获取输入流中字段的值 ber()
Output fields set输出流中字段的值 ue(99)
例子:
1) 过滤Null字段
var a;
if ( () ){
a = '0';
}else{
a = ing();
}
2) 字符串截取
将字符串“12345McDonalds”前面的数字部分分割出来
var str = ing();
var code = "";
var name = "";
for (i = 0; i < (); i++ ){
c = (i);
if ( ! t(c) ){
code = ing(0, i);
name = ing(i);
Alert("code="+code+", name="+name);
break;
}
}
3) 过滤记录行,控制转换流程
trans_Status = CONTINUE_TRANSFORMATION
if (ing()==’123’) trans_Status = SKIP_TRANSFORMATION
4) 使用java类库
var mydate = "20090723";
var parser = DateFormat("yyyyMMdd"); //Must use fully qualified java class
var dateObj = (mydate); //just like how you would do in java
Alert(dateObj);
6.5 Regex Evaluation
通过正则表达式验证输入字段
6.6 User Defined Java Expression
执行一些简单的java代码
譬如表达式:firstname+" "+name
也可以用Java代码:
new StringBuffer(firstname).append(" ").append(name).toString()
7. Lookup
7.1 Call DB Procedure
执行存储过程并获得返回值,返回值只有一个,参数可以多个。
7.2 Check if a column exists
检查数据库表是否存在某列
7.3 Database join
改步骤允许查询等操作利用上一步的数据,譬如参数动态绑定的查询语句,可以被上一步某个字段的值替换掉
7.4 Database lookup
和database join功能类似,从数据库查询数值,作为新的字段添加到数据流中。可将前面的输出流的值作为查询比较参数
7.5 Dynamic SQL row
动态SQL查询记录行数。
7.6 File exists
检查文件是否存在,文件名由上一步传来
7.7 HTTP Post
处理POST请求,url可以从上一步数据里获取,也可以在该步骤指定,可以指定编码,请求参数等。
7.8 HTTP client
仅仅是一个带参数的URL请求,url可以从上一步数据里获取,也可以在该步骤指定,不支持soap
例如
7.9 Stream lookup
从其他转换步骤产生的数据流中查询数据
7.10 Table exists
判断数据库中某张表是否存在,表名由前面步骤传来
7.11 Web services lookup
处理SOAP请求,数据类型转换是在步骤内部处理,如果有日期,数字等类型需要转换,建议全部返回字符串,然后使用“Select values”步骤做转换
JOB类型
1. 常用
1.1 Start
指定job执行规则,是否循环,循环规则等。
1.2 Dummy
空操作,主要用来多数据源汇总
1.3 Abort job
终止,忽略一个Job
1.4 Display Msgbox Info
使用图形化界面执行Job的时候显示消息框
1.5 Dummy plugin
Job里面空操作,可以用来将执行循环操作
1.6 Job
执行已经定义好的Job。job可以嵌套job。
1.7 Ping a host
ping
1.8 Send SNMP trap
发送SNMP trap 报文
SNMP TRAP就是在SNMP设备发生状态变化的时候向管理器发出信号。不用管理器来检查。
1.9 Set variables
Set变量
1.10 Success
当出错后可以强制将该job置成功
1.11 Transformation
执行定义好的转换
1.12
Truncate tables
1.13 Wait for SQL
当检测表的记录数是否达到一定条件
1.14 Write To Log
job里面的日志记录,不同于程序自带的log4j等日志
2. 邮件
2.1 Get mails from POP
从POP服务器获取邮件并存储到文件夹中
2.2 Mail
发送文本或者HTML格式邮件,可添加附件。
2.3 Mail validator
通过发送SNMP TRAP到邮件服务器来验证EMAIL地址是否正确
3. 文件管理
3.1 Add filenames to result
将文件夹或者多个文件加入到数据流中,以便在下一个job步骤中使用
3.2 Compare folders
比较两个文件夹下面的文件是否一致,可以选择只比较文件大小,也可以选择比较文件内容
3.3 Copy Files
3.4 Copy or Move result filenames
根据上一步的执行结果得到的文件名,复制或者剪切文件
3.5 Create a folder
3.6 Create file
3.7 Delete file
3.8 Delete filenames from result
根据上一步的执行结果得到的文件名,复制或者剪切文件
3.9 Delete files
3.10
3.11
Delete folders
File Compare
比较两个文件的内容
3.12 HTTP
通过http协议从web服务器上获得文件
3.13
移动文件
Move Files
3.14 Unzip file
Unzip解压缩文件
3.15 Wait for file
循环检测文件是否存在,否则直到超时失败
3.16 Zip file
采用zip压缩文件
4. 文件传输
4.1 FTP Delete
删除FTP上的文件
4.2 Get a file with FTP
从FTP获取文件,可以设置编码方式,连接超时时间等。文件保存路径,文件名中是否包含日期,时间,时间日期是否需要特殊格式化,是否覆盖文件。
4.3 Get a file with SFTP
与ftp类似,只是采用Secure FTP protocol
4.4 Put a file with FTP
上传文件,本地文件路径,通过*等模糊匹配要上传的文件。并执行上传的远程目录。
4.5 Put a file with SFTP
4.6 SSH2 Get
4.7 SSH2 Put
5. Scripting
5.1 JavaScript
编辑javascript脚本并执行。
5.2 SQL
执行SQL脚本
可以执行写好的sql脚本,指定sql路径即可。
也可以插入编辑sql脚本。
5.3 Shell
执行SHELL脚本,
可以执行已经写好的shell脚本指定shell脚本路径即可,也可以自己插入,编辑shell脚本
可以把前一个步骤的执行结果当作参数传入
与现有应用程序集成
通过图形化界面设计出来的transformation ,job都是xml格式的文件。通过加载环境变量,初始化配置文件可以在servlet或者其他应用程序中中执行转换。
大致执行方式如下:
public static void runTransformation(String filename) {
try {
();
nmentInit();
TransMeta transMeta = new TransMeta(filename);
Trans trans = new Trans(transMeta);
e(null); // You can pass arguments instead of null.
tilFinished();
if ( ors() > 0 )
{
throw new RuntimeException( "There were errors during transformation execution." );
}
}
catch ( KettleException e ) {
n(e);
}
}
通过插件扩展现有功能
1) 至少实现4个接口
StepMetaInterface接口定义元数据,执行检查等
StepInterface接口集成自BaseStep,步骤
StepDataInterface接口处理游标,数据库结果集,文件等
StepDialogInterface接口编辑,设置变量的图形化界面
类名命名规则:
四个类的前缀保持一致,譬如实现StepMetaInterface的类是MyMeta
那么实现StepInterface的类是My,实现StepDataInterface的是MyData,实现StepDialogInterface的是MyDialog
2) 定义文件
id="DummyPlugin" iconfile="" description="Dummy Plugin" tooltip="This is a dummy plugin test step" category="Transform" classname="luginMeta">
小例子
文件批量入库:
获取一个目录下的文件名,使用一个正则表达式来指定文件名。
使用一个 javascript 脚本,读取文件内容,
如果文件是二进制文件,文件内容一般保存为 BLOB 、Binary 、Image 等类型
如果文件是字符型文件,文件内容一般保存为 CLOB 、varchar、Text 等类型
注意:因为该方法是一次性将文件内容都读到了内存中,因此只能处理比较小的文件
file = new (ing());


发布评论