利用kettle的自定义java给sqlserver复制表
eclipse代码:
package copyTable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
public class CopyTable {
String driverName = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
String copyFromUrl;
String copyFromDatabase;
String copyFromUserName;
String copyFromPassword;
String copyToUrl;
String copyToDatabase;
String copyToUserName;
String copyToPassword;
/**
* 构造方法接收两个数据库的连接信息
* @param copyFromUrl 示例:localhost
* @param copyFromDatabase
* @param copyFromUserName
* @param copyFromPassword
* @param copyToUrl 示例:127.0.0.1
* @param copyToDatabase
* @param copyToUserName
* @param copyToPassword
*/
/* CopyTable(String copyFromUrl,String copyFromDatabase,String copyFromUserName,String copyFromPassword,
String copyToUrl,String copyToDatabase,String copyToUserName,String copyToPassword
){
this.copyFromUrl = copyFromUrl;
this.copyFromDatabase = copyFromDatabase;
this.copyFromUserName = copyFromUserName;
this.copyFromPassword = copyFromPassword;
this.copyToUrl = copyToUrl;
this.copyToDatabase = copyToDatabase;
this.copyToUserName = copyToUserName;
this.copyToPassword = copyToPassword;
}*/
/**
* 初始化
* @param copyFromUrl 示例:127.0.0.1
* @param copyFromDatabase
* @param copyFromUserName
* @param copyFromPassword
* @param copyToUrl 示例:127.0.0.1
* @param copyToDatabase
* @param copyToUserName
* @param copyToPassword
*/
public void Init(String copyFromUrl,String copyFromDatabase,String copyFromUserName,String copyFromPassword,
String copyToUrl,String copyToDatabase,String copyToUserName,String copyToPassword){
this.copyFromUrl = copyFromUrl;
this.copyFromDatabase = copyFromDatabase;
this.copyFromUserName = copyFromUserName;
this.copyFromPassword = copyFromPassword;
this.copyToUrl = copyToUrl;
this.copyToDatabase = copyToDatabase;
this.copyToUserName = copyToUserName;
this.copyToPassword = copyToPassword;
}
/**
* 获得源数据库的连接
* @return Connection
* @throws ClassNotFoundException
* @throws SQLException
*/
public Connection getConFrom() throws ClassNotFoundException, SQLException{
Class.forName(this.driverName);
String copyFromUrls = "jdbc:sqlserver://"+this.copyFromUrl+":1433;Database="+this.copyFromDatabase;
Connection fromConnection = DriverManager.getConnection(copyFromUrls,this.copyFromUserName,this.copyFromPassword);
return fromConnection;
}
/**
* 获得目标数据库的连接,并打开Ad Hoc服务
* @return Connection
* @throws ClassNotFoundException
* @throws SQLException
*/
public Connection getConTo() throws ClassNotFoundException, SQLException{
Class.forName(this.driverName);
String copyToUrls = "jdbc:sqlserver://"+this.copyToUrl+":1433;Database="+this.copyToDatabase;
Connection ToConnection = DriverManager.getConnection(copyToUrls,this.copyToUserName,this.copyToPassword);
String s1 =
"EXEC sp_configure 'show advanced options',1 "+
"RECONFIGURE "+
"EXEC sp_configure 'Ad Hoc Distributed Queries',1 "+
"RECONFIGURE";
PreparedStatement stat = ToConnection.prepareCall(s1);
stat.execute();
return ToConnection;
}
/**
* 获得源数据库的表名
* @return List
*/
public List getTableName(){
try {
Class.forName(this.driverName);
this.copyFromUrl = this.copyFromUrl+";Database="+this.copyFromDatabase;
Connection fromConnection = getConFrom();
List TableNameList = new ArrayList();
String sql = "SELECT Name FROM SysObjects Where XType='U' ORDER BY Name";
ResultSet rSet = fromConnection.createStatement().executeQuery(sql);
while(rSet.next()){
TableNameList.add(rSet.getString("Name"));
}
rSet.close();
fromConnection.close();
return TableNameList;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/**
* 复制表结构
* @return String
*/
public String copyAllTableStructure(){
List list = getTableName();
String resultString = "";
try {
Connection conTo = getConTo();
Statement statement = conTo.createStatement();
String sql = "SELECT Name FROM SysObjects Where XType='U' ORDER BY Name";
for(int i = 0 ; i < list.size() ; i++){
String tableNameString = (String) list.get(i);
String s =
"SELECT * INTO "+this.copyToDatabase+".dbo."+tableNameString
+" FROM opendatasource( 'SQLOLEDB', 'Data Source="+this.copyFromUrl
+";User ID="+this.copyFromUserName+";Password="+this.copyFromPassword+"')."
+this.copyFromDatabase+".dbo."+tableNameString+" WHERE 1=2";
ResultSet rs = null;
rs = statement.executeQuery(sql);
boolean b = true;
while(rs!=null&&rs.next()){
String nameString = rs.getString("Name");
if(nameString.equals(tableNameString)){
b = false;
}
}
if(b){
statement.execute(s);
resultString = resultString + " 已创建"+tableNameString+"表";
}else{
resultString = resultString + " 表" +tableNameString +"已存在";
}
}
statement.close();
conTo.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(resultString);
return resultString;
}
/**
* 复制表结构及其数据
* @return String
*/
public String copyAllTable(){
List list = getTableName();
String resultString = "";
try {
Connection conTo = getConTo();
Statement statement = conTo.createStatement();
String sql = "SELECT Name FROM SysObjects Where XType='U' ORDER BY Name";
for(int i = 0 ; i < list.size() ; i++){
String tableNameString = (String) list.get(i);
String s =
"SELECT * INTO "+this.copyToDatabase+".dbo."+tableNameString
+" FROM opendatasource( 'SQLOLEDB', 'Data Source="+this.copyFromUrl
+";User ID="+this.copyFromUserName+";Password="+this.copyFromPassword+"')."
+this.copyFromDatabase+".dbo."+tableNameString;
ResultSet rs = null;
rs = statement.executeQuery(sql);
boolean b = true;
while(rs!=null&&rs.next()){
String nameString = rs.getString("Name");
if(nameString.equals(tableNameString)){
b = false;
}
}
if(b){
statement.execute(s);
resultString = resultString + " 已复制"+tableNameString+"表";
}else{
resultString = resultString + " 表" +tableNameString +"已存在";
}
}
statement.close();
conTo.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(resultString);
return resultString;
}
/**
* 复制指定的表的结构
* @param tableName
* @return
*/
public String copyTableStructureFromTableName(String tableName){
String resultString = "";
try {
Connection conTo = getConTo();
Statement statement = conTo.createStatement();
String sql = "SELECT Name FROM SysObjects Where XType='U' ORDER BY Name";
String s =
"SELECT * INTO "+this.copyToDatabase+".dbo."+tableName
+" FROM opendatasource( 'SQLOLEDB', 'Data Source="+this.copyFromUrl
+";User ID="+this.copyFromUserName+";Password="+this.copyFromPassword+"')."
+this.copyFromDatabase+".dbo."+tableName+" where 1=2";
ResultSet rs = null;
rs = statement.executeQuery(sql);
boolean b = true;
while(rs!=null&&rs.next()){
String nameString = rs.getString("Name");
if(nameString.equals(tableName)){
b = false;
}
}
if(b){
statement.execute(s);
resultString = resultString + " 已复制"+tableName+"表";
}else{
resultString = resultString + " 表" +tableName +"已存在";
}
statement.close();
conTo.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(resultString);
return resultString;
}
/**
* 复制指定的表的结构及数据
* @param tableName
* @return
*/
public String copyTableFromTableName(String tableName){
String resultString = "";
try {
Connection conTo = getConTo();
Statement statement = conTo.createStatement();
String sql = "SELECT Name FROM SysObjects Where XType='U' ORDER BY Name";
String s =
"SELECT * INTO "+this.copyToDatabase+".dbo."+tableName
+" FROM opendatasource( 'SQLOLEDB', 'Data Source="+this.copyFromUrl
+";User ID="+this.copyFromUserName+";Password="+this.copyFromPassword+"')."
+this.copyFromDatabase+".dbo."+tableName;
ResultSet rs = null;
rs = statement.executeQuery(sql);
boolean b = true;
while(rs!=null&&rs.next()){
String nameString = rs.getString("Name");
if(nameString.equals(tableName)){
b = false;
}
}
if(b){
statement.execute(s);
resultString = resultString + " 已复制"+tableName+"表";
}else{
resultString = resultString + " 表" +tableName +"已存在";
}
statement.close();
conTo.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(resultString);
return resultString;
}
public void close(){
String s2 =
"EXEC sp_configure 'Ad Hoc Distributed Queries',0 "+
"RECONFIGURE "+
"EXEC sp_configure 'show advanced options',0 "+
"RECONFIGURE";
Connection toConn;
try {
toConn = this.getConTo();
PreparedStatement stat = toConn.prepareCall(s2);
stat.execute();
stat.close();
toConn.close();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
kettle的java代码:
import copyTable.CopyTable;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
if (first)
{
first = false;
}
CopyTable cTable = new CopyTable();
//似乎这里无法用java的构造方法
cTable.Init("127.0.0.1", "InfoReport", "sa","yutu@123", "127.0.0.1", "test", "sa", "123");
String resuStr = cTable.copyAllTableStructure();
get(Fields.In, "resultString").setValue(r, resuStr);
putRow(data.outputRowMeta, r);
return true;
}