Kettle变量和自定义java代码的实例应用
1 kettle.properties参数配置数据源连接和FTP连接
由于测试环境和生产环境中数据库连接FTP等配置会在部署过程中变更,所以预先定义成配置项,在配置文件中修改,这样测试和发布将会变得简单,下面以数据库为例说明这类配置的使用。
(1) 首先要找到配置文件,不同的操作系统路径也不一样,本人用win7进行开发,配置文件的路径为“C:\Users\chenpeng\.kettle\kettle.properties”,如下:
(2) 配置文件中的具体配置如下:
还可以可视化设置:
(3) 具体使用示例
l 下方是数据库连接配置:
l 下方是FTP连接配置:
1.1.2 kettle.properties参数设置和路径及与正则配合使用
(1)在kettle.properties中设置变量值
(2)kettle.properties设置如下(win7下的路径为:C:\Users\chenpeng\.kettle)
(4) 在输出文件时使用参数中指定的路径:
注:如果路径出现错误,如把某个文件夹删除则会报错,只要是报错,即使把文件夹建好了也仍然会报目录错误(好像有记忆功能),这时必须重新启动kettle才能正常运行。
l 在输出文件时使用参数中和正则表达式混合使用场景:
1.1.3 kettle.properties参数在java代码中的应用
1.1.4 作业中变量使用并用javascript设置变量值
上面的例子都是kettle.properties中声明的变量的应用,这些都是全局范围内通用的,但很多时间,子作业需要有内部专用的变量参数,这时就不能使用kettle.properties中声明的变量了,需要在流程中声明变量,并把作用域设置为当前作业有效。以下应用场景业务如下:文件名要命名成当前日期格式的,所以在作业级别定义了一个变量,但无法给它赋值,如是采用了javascript脚本方式给该变量赋值,然后在输出文件名的位置应用该变量即可,后面文件的删除上传都是公用部分都需要用到这个变量做为接口参数来做处理。
(1) 主流程如下:
(2) 定义变量:
(3) 子过程中调用javascript脚本修改值:
Date.prototype.Format = function (fmt) { //author: meizz var o = { "M+": this.getMonth() + 1, //月份 "d+": this.getDate(), //日 "h+": this.getHours(), //小时 "m+": this.getMinutes(), //分 "s+": this.getSeconds(), //秒 "q+": Math.floor((this.getMonth() + 3) / 3), //季度 "S": this.getMilliseconds() //毫秒 }; if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length)); for (var k in o) if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr (("" + o[k]).length))); return fmt; } var dateTime = new Date().Format("yyyyMMdd"); // gives back today at yyyy/MM/dd HH:mm:ss.000 setVariable("curdate",dateTime,"s");
另外,如果是用上一节点的字段值,修改变量值则更为简单,如下:
(4) 使用变量(按常规使用):
1.1.5 Java代码访问变量调用jar包并生成验证文件
这是一个较为综合性的示例,首先定义了一条记录(可能理解为很多个变量),然后通过java代码来调用jar包,计算出记录数、文件大小、MD5值等,赋值给相应记录字段,并输出到文件,形成数据文件的校验文件。
(1) 主流程如下:
(2) 生成记录
(3) Java处理
//导入在eclipse中编辑好的包,主要用于计算文件行数、MD5值 import cgb.tools.KettleHelper; import java.io.File; import java.io.IOException; //kettle中已定义好的行处理方法,每行记录都会执行一次 public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { //(1)获取到上一个步骤的输入行 Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); //(2)读取出参数变量值 String kettleoutputdir = getVariable("kettleoutputdir", ""); String hbprovince_code = getVariable("hbprovince_code", ""); String item_code = getVariable("item_code", ""); String curdate = getVariable("curdate", ""); String filename = ""; String onlyfilename = ""; String recordcount = ""; String bytecount = ""; String md5code = ""; //(3)调用jar包计算出MD5值、行数、字节数 try { filename = kettleoutputdir + "\\" + hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv"; onlyfilename = hbprovince_code + "_" + item_code + "_day_" + curdate + ".csv"; md5code = KettleHelper.getFileMD5String(filename); recordcount = String.valueOf(KettleHelper.getFileRecordCount(filename, true)); bytecount = String.valueOf(KettleHelper.getFileByteCount(filename)); } catch (IOException e) { e.printStackTrace(); } //(4)把计算好的值放入到输出记录中 get(Fields.Out, "filename").setValue(r, onlyfilename); get(Fields.Out, "recordcount").setValue(r, recordcount); get(Fields.Out, "byteCount").setValue(r, bytecount); get(Fields.Out, "md5code").setValue(r, md5code); //(5)输出到下一个节点做处理 putRow(data.outputRowMeta, r); return true; }
(4) Jar包的开发和导出
其中调用的jar包,可以选择用eclipse来生成,如下:
package XXX.tools; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class KettleHelper { /** * 默认的密码字符串组合,用来将字节转换成 16 进制表示的字符,apache校验下载的文件的正确性用的就是默认的这个组合 */ private static char hexDigits[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }; private static MessageDigest messagedigest = null; static { try { messagedigest = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException nsaex) { System.err.println("MD5Util.class.getName()" + "初始化失败,MessageDigest不支持MD5Util。"); nsaex.printStackTrace(); } } /** * 生成字符串的md5校验值 * * @param s * @return */ private static String getMD5String(String s) { return getMD5String(s.getBytes()); } /** * 判断字符串的md5校验码是否与一个已知的md5码相匹配 * * @param password 要校验的字符串 * @param md5PwdStr 已知的md5校验码 * @return */ public static boolean checkPassword(String password, String md5PwdStr) { String s = getMD5String(password); return s.equals(md5PwdStr); } /** * 生成文件的md5校验值 * * @param file * @return * @throws IOException */ public static String getFileMD5String(String fileName) throws IOException { File file = new File(fileName); InputStream fis; fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int numRead = 0; while ((numRead = fis.read(buffer)) > 0) { messagedigest.update(buffer, 0, numRead); } fis.close(); return bufferToHex(messagedigest.digest()); } /** * 获取文件的记录条数 * * @param file * @return * @throws IOException */ public static int getFileRecordCount(String fileName,boolean hasHeadRow) throws IOException { File inFile = new File(fileName); // 读取的CSV文件 @SuppressWarnings("unused") String inString = ""; int count = 0; try { BufferedReader reader = new BufferedReader(new FileReader(inFile)); while((inString = reader.readLine())!= null){ count++; } reader.close(); } catch (FileNotFoundException ex) { System.out.println("没找到文件!"); } catch (IOException ex) { System.out.println("读写文件出错!"); } if(hasHeadRow) { count--; } return count; } /** * 生成文件的md5校验值 * * @param file * @return * @throws IOException */ public static String getFileMD5String(File file) throws IOException { InputStream fis; fis = new FileInputStream(file); byte[] buffer = new byte[1024]; int numRead = 0; while ((numRead = fis.read(buffer)) > 0) { messagedigest.update(buffer, 0, numRead); } fis.close(); return bufferToHex(messagedigest.digest()); } private static String getMD5String(byte[] bytes) { messagedigest.update(bytes); return bufferToHex(messagedigest.digest()); } private static String bufferToHex(byte bytes[]) { return bufferToHex(bytes, 0, bytes.length); } private static String bufferToHex(byte bytes[], int m, int n) { StringBuffer stringbuffer = new StringBuffer(2 * n); int k = m + n; for (int l = m; l < k; l++) { appendHexPair(bytes[l], stringbuffer); } return stringbuffer.toString(); } private static void appendHexPair(byte bt, StringBuffer stringbuffer) { char c0 = hexDigits[(bt & 0xf0) >> 4];// 取字节中高 4 位的数字转换, >>> 为逻辑右移,将符号位一起右移,此处未发现两种符号有何不同 char c1 = hexDigits[bt & 0xf];// 取字节中低 4 位的数字转换 stringbuffer.append(c0); stringbuffer.append(c1); } /** * 获取文个把的字节数 * @param fileName * @return * @throws IOException */ public static long getFileByteCount(String fileName) throws IOException { File file = new File(fileName); return file.length(); } }
导出jar包
放入到kettle的jar包目录,它会自加载,操作系统不同,目录也会不同,本人使用的是win7,目录如下:
(5) 文件输出
(6) 最终生成文件的效果如下:
1.1.6 SQL中使用变量
下面的查询语句用问号占位符,当开始日期(第一个?号)和结束日期(第二个?号)绑定到SQL的问号占位符,在查询入职日期在一定期间的总统信息:
SELECTname,took_office FROMpresidents WHEREtook_officeBETWEEN? AND?
示例中,首先使用生成行步骤(“Generdate Rows”)生成一行带有两个字段的记录,分别按顺序代替表输入SQL语句中的占位符。实际场景中,通常使用动态处理结果产生期望值代替生成行步骤。
接下来是表输入步骤,其中配置SQL查询语句,包含问号占位符,通过在“Insert Data Step”的下拉框中选择前一步骤,来替换问号的值。
通过传输不同的值多次执行查询
如果你想循环执行查询,使用不同值替换占位符;就需要占位符生产步骤生成多行数据,并把表输入的选项“Execute for each row”选中。本示例文件名称为placeholders_in_loop.ktr。
占位符的局限性
虽然通过给占位符绑定值查询非常有效,但也有一些场景不能使用,下面一些SQL不能使用占位符。这些示例都非常通用,但是不能使用占位符。
不能用占位符代替表名词,否则查询将不执行。
SELECT some_fieldFROM ?
不能使用占位符代替查询的字段名称,下面的查询可以成功绑定参数,但只是作为一个常量,而不是字段的名称。
SELECT ? asmy_field FROM table
不能使用占位符绑定逗号分隔的多个列表项值;如果你绑定“1,2,3″给下面的查询语句,将得到意外的结果。
SELECT * FROM testWHERE id IN(?)
你期望得到的结果是:
SELECT * FROM testWHERE id IN("1,2,3")
但是运行的结果却是这样,传输一个字符串,却得到三个值,而实际情况完全不确定有几个值传输进来。
SELECT * FROM testWHERE id IN(1,2,3)
为了解决这些场景的问题,需要使用kettle的变量动态构造查询文本,下面详细说明。
SQL查询中使用kettle变量
表输入步骤支持替换查询中的变量或参数,假设有一系列结构完全相关的表,分别是: mammals, birds, insects(动物、鸟、昆虫),可以使用kettle变量作为表的名称。假设我们有一个变量,名称为:ANIMALS_TABLE,赋值为birds,我们设置“Replace Vaiables”选项选中。如果我们写下面的查询:
SELECT name,population FROM${ANIMALS_TABLE}
在执行一定被成功的替换成:
SELECT name,population FROM birds
如果设置变量的值为“mammals”或“insects”,则将动态查询不同的表。当占位符不能胜任是,使用变量技术可以帮助我们解决。示例的名称为variables.ktr,运行时不要忘了给parameter(命名参数)赋值进行测试。
变量和占位符一起使用
如果有必要,我们可以混合这两种技术;本示例中使用变量作为表名词,同时使用占位符作为前面步骤的输入值。示例文件variables_and_placeholders.ktr。
SELECT name, population FROM${ANIMALS_TABLE}WHERE population > ?
版权声明:本文为博主原创文章,未经博主允许不得转载。