现在网上java 调用kettle api操作的很少(本人环境MYSQL kettle5.2),我来做下好人。
创建表语法:
CREATE TABLE `t_lzfx_base_syonline` ( `ID` varchar(36) NOT NULL, `CREATEDATETIME` datetime DEFAULT NULL, `IP` varchar(100) DEFAULT NULL, `LOGINNAME` varchar(100) DEFAULT NULL, `TYPE` varchar(1) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `syonline` ( `ID` varchar(36) NOT NULL, `CREATEDATETIME` datetime DEFAULT NULL, `IP` varchar(100) DEFAULT NULL, `LOGINNAME` varchar(100) DEFAULT NULL, `TYPE` varchar(1) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
JAVA代码:
/** * 两个库中的表名 */ public static String bjdt_tablename = "t_lzfx_base_syonline"; public static String kettle_tablename = "syonline"; public static String kettle_log = "t_lzfx_data_log"; /** * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml) */ public static final String[] databasesXML = { "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>bjdt</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>zjdata</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>kettle</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>kettledb</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>" }; /** * @param args */ public static void main(String[] args) { try { KettleEnvironment.init(); TransMeta transMeta = new TransMeta(); //设置转化的名称 transMeta.setName("转换名称"); //添加转换的数据库连接 for (int i=0;i<databasesXML.length;i++){ DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); transMeta.addDatabase(databaseMeta); } VariableSpace space = new Variables(); //将step日志数据库配置名加入到变量集中 space.setVariable("kettle_log","bjdt"); space.initializeVariablesFrom(null); StepLogTable stepLogTable = StepLogTable.getDefault(space,transMeta); //StepLogTable使用的数据库连接名(上面配置的变量名)。 stepLogTable.setConnectionName("bjdt"); //设置Step日志的表名 stepLogTable.setTableName(kettle_log); //设置TransMeta的StepLogTable transMeta.setStepLogTable(stepLogTable); //registry是给每个步骤生成一个标识Id用 PluginRegistry registry = PluginRegistry.getInstance(); //****************************************************************** //第一个表输入步骤(TableInputMeta) TableInputMeta tableInput = new TableInputMeta(); String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); //给表输入添加一个DatabaseMeta连接数据库 DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt"); tableInput.setDatabaseMeta(database_bjdt); String select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename; tableInput.setSQL(select_sql); //添加TableInputMeta到转换中 StepMeta tableInputMetaStep = new StepMeta("INPUTTABLE_"+bjdt_tablename,tableInput); //给步骤添加在spoon工具中的显示位置 tableInputMetaStep.setDraw(true); tableInputMetaStep.setLocation(100, 100); transMeta.addStep(tableInputMetaStep); //****************************************************************** //****************************************************************** //第二个步骤插入与更新 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); insertUpdateMeta.setCommitSize(10000); //性能设置 String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta); //添加数据库连接 DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); insertUpdateMeta.setDatabaseMeta(database_kettle); //设置操作的表 insertUpdateMeta.setTableName(kettle_tablename); //设置用来查询的关键字 insertUpdateMeta.setKeyLookup(new String[]{"ID"}); insertUpdateMeta.setKeyStream(new String[]{"ID"}); insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上 insertUpdateMeta.setKeyCondition(new String[]{"="}); //String select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename; //设置要更新的字段 String[] updatelookup = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"} ; String [] updateStream = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"}; Boolean[] updateOrNot = {false,true,true,true,true}; insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); String[] lookup = insertUpdateMeta.getUpdateLookup(); //System.out.println("******:"+lookup[1]); //System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML()); //添加步骤到转换中 StepMeta insertUpdateStep = new StepMeta("INSERTUPDATE_"+kettle_tablename,insertUpdateMeta); insertUpdateStep.setDraw(true); insertUpdateStep.setLocation(250,100); transMeta.addStep(insertUpdateStep); //****************************************************************** //****************************************************************** //添加hop把两个步骤关联起来 transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep)); Trans trans = new Trans(transMeta); trans.execute(null); // You can pass arguments instead of null. trans.waitUntilFinished(); if ( trans.getErrors() > 0 ) { throw new RuntimeException( "There were errors during transformation execution." ); } System.out.println("***********the end************"); } catch (Exception e) { e.printStackTrace(); return; } }
运行结果:
2015/04/18 16:36:51 - 转换名称 - 为了转换解除补丁开始 [转换名称] 2015/04/18 16:37:00 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 50000 2015/04/18 16:37:02 - INSERTUPDATE_syonline.0 - linenr 50000 2015/04/18 16:37:08 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 100000 2015/04/18 16:37:09 - INSERTUPDATE_syonline.0 - linenr 100000 2015/04/18 16:37:15 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 150000 2015/04/18 16:37:16 - INSERTUPDATE_syonline.0 - linenr 150000 2015/04/18 16:37:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 200000 2015/04/18 16:37:23 - INSERTUPDATE_syonline.0 - linenr 200000 2015/04/18 16:37:29 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 250000 2015/04/18 16:37:31 - INSERTUPDATE_syonline.0 - linenr 250000 2015/04/18 16:37:36 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 300000 2015/04/18 16:37:38 - INSERTUPDATE_syonline.0 - linenr 300000 2015/04/18 16:37:43 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 350000 2015/04/18 16:37:45 - INSERTUPDATE_syonline.0 - linenr 350000 2015/04/18 16:37:51 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 400000 2015/04/18 16:37:52 - INSERTUPDATE_syonline.0 - linenr 400000 2015/04/18 16:37:58 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 450000 2015/04/18 16:37:59 - INSERTUPDATE_syonline.0 - linenr 450000 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 500000 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - Finished reading query, closing connection. 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0 2015/04/18 16:38:06 - INSERTUPDATE_syonline.0 - linenr 500000 2015/04/18 16:38:06 - INSERTUPDATE_syonline.0 - 完成处理 (I=500184, O=0, R=500184, W=500184, U=0, E=0