方式一:
最主要的是这三行代码,将程序打成jar包,上传到Linux服务器上运行,即可以执行想要执行的命令。
Runtime r = Runtime.getRuntime();
Process p = r.exec(" ");
p.waitFor();
代码示例:
Java调用sqoop命令,实现MySQL数据导入Hive。
public static void main(String[] args) throws IOException, InterruptedException {
//执行一个命令需要展示返回结果的
Runtime r = Runtime.getRuntime();
Process p = r.exec("/export/servers/sqoop/bin/sqoop import --connect jdbc:mysql://192.168.XXX.XXX:3306/test11 --username root --password root --table table12 --hive-table default.table12 --hive-import --m 1");
p.waitFor();
//打印日志信息
BufferedReader b = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader e = new BufferedReader(new InputStreamReader(p.getErrorStream()));
String line = "";
StringBuffer sb=new StringBuffer();
while ((line = b.readLine()) != null) {
sb.append(line).append("\n");
}
while ((line = e.readLine()) != null) {
sb.append(line).append("\n");
}
System.out.println("result: "+sb.toString());
b.close();
e.close();
}
方式二:
第一种方法需要将jar包上传到Linux服务器上运行,不方便本地测试。
可以使用Java代码远程访问Linux服务器,调用执行Linux命令。
导入依赖:
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>
代码示例:
Java调用sqoop命令,实现MySQL数据导入Hive。
public static void main(String[] args) throws IOException {
Connection conn = new Connection("192.168.XXX.XXX");//参数传服务器的地址
conn.connect();
boolean isAuthenticated=conn.authenticateWithPassword("root", "1a2b3c4d");//返回true代表可正常登录服务器
if(!isAuthenticated) {
System.out.println("用户名或密码错误");
throw new IOException("Authentication failed!");
}
Session session=conn.openSession();
//执行linux指令。
String orderStr = "/export/servers/sqoop/bin/sqoop import --connect jdbc:mysql://192.168.XXX.XXX:3306/test11 --username root --password root --table table12 --hive-table default.table12 --hive-import --m 1";
session.execCommand(orderStr);
//StreamGobbler的作用是把session的标准输出包装成InputStream,用于接收目标服务器上的控制台返回结果。
InputStream stdOut=new StreamGobbler(session.getStdout());
//正常IO流读取输入流数据的操作
StringBuffer sb=new StringBuffer();
byte[] bys=new byte[1024];
int len=0;
while((len=stdOut.read(bys))!=-1) sb.append(new String(bys,0,len));
String res=sb.toString();
System.out.println(res);
//这是接收日志信息
InputStream stdErr=new StreamGobbler(session.getStderr());
sb=new StringBuffer();
bys=new byte[1024];
len=0;
while((len=stdErr.read(bys))!=-1) sb.append(new String(bys,0,len));
String err=sb.toString();
System.out.println(err);
//session.waitForCondition(ChannelCondition.CLOSED | ChannelCondition.EOF | ChannelCondition.EXIT_STATUS , 30000);
// 等待,除非1.连接关闭;2.输出数据传送完毕;3.进程状态为退出;4.超时
//调用这个方法在执行linux命令时,会避免环境变量读取不全的问题,这里有许多标识可以用,比如当exit命令执行后或者超过了timeout时间,则session关闭
session.waitForCondition(ChannelCondition.EXIT_STATUS, 3000);//在调用getExitStatus时,要先调用WaitForCondition方法
//一般情况下shell脚本正常执行完毕,getExitStatus方法返回0。
//此方法通过远程命令取得Exit Code/status。
//但并不是每个server设计时都会返回这个值,如果没有则会返回null。
//getExitStatus的返回值,可以认为是此次执行是否OK的标准。
int ret=session.getExitStatus()==null?0:session.getExitStatus();
if (ret == 0){
System.out.println("导入成功");
}else{
System.out.println("导入失败");
}
if(stdOut!=null) stdOut.close();
if(stdErr!=null) stdErr.close();
if(session!=null) session.close();
if(conn!=null) conn.close();
}