677807540 java开发交流群
1:controller进入
@Scheduled(cron="0 22 */1 * * ?")
public void FtpToEs(){
System.out.println("FTP开始运行");
ftpToEsService.getFtptoEs();
System.out.println("FTP结束运行");
}
2:调用service
package com.kuaifa.dianxiaomao.web.service.impl;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.kuaifa.dianxiaomao.web.service.FtpToEsService;
import com.kuaifa.dianxiaomao.web.util.CreateFileUtils;
import com.kuaifa.dianxiaomao.web.util.DateUtil;
import com.kuaifa.dianxiaomao.web.util.DownloadFtpFileUtils;
import com.kuaifa.dianxiaomao.web.util.FileImpBXiddsUtils;
import com.kuaifa.dianxiaomao.web.util.HttpClient;
import com.kuaifa.dianxiaomao.web.util.TraverseFolder2Utils;
@Service
public class FtpToEsServiceImpl implements FtpToEsService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public void getFtptoEs() {
// TODO Auto-generated method stub
//读取文本获取内容
ArrayList<String> arrayList =FileImpBXiddsUtils.FileImpBXidds();
//没有文本内容直接返回不做处理
if (arrayList.size()==0) {
return;
}
String type1 = "";
String date1 = "";
String fileName1 = "";
for(String p:arrayList){
String [] tt=p.split("@");
//获得时间
date1 = tt[0];
//文件名字(设备mac)
fileName1 = tt[1];
//设备类型 1:dxm 2:ssp
type1 = tt[2];
if("1".equals(type1)){
String path ="C:/Users/Administrator/Desktop/Ftp/dxm"+"/"+date1+"/"+fileName1;
//创建文件夹 日期+emac
CreateFileUtils.createFile(path);
}else if("2".equals(type1)){
String path ="C:/Users/Administrator/Desktop/Ftp/ssp"+"/"+date1+"/"+fileName1;
//创建文件夹 日期+emac
CreateFileUtils.createFile(path);
}
try {
//1:去dxm下载指定文件到指定位置
if("1".equals(type1)){
//下载Ftp压缩文件到指定路径
DownloadFtpFileUtils.downloadFtpFile("119.188.98.7","ftpdxm","kuaifa4006299139",41431,"/"+date1+"/"+fileName1+"/","C:/Users/Administrator/Desktop/Ftp/dxm"+"/"+date1+"/"+fileName1,"data.zip");
}else if("2".equals(type1)){
//下载Ftp压缩文件到指定路径
DownloadFtpFileUtils.downloadFtpFile("119.188.98.7","ftpssp","kuaifa4006299139",41431,"/"+date1+"/"+fileName1+"/","C:/Users/Administrator/Desktop/Ftp/ssp"+"/"+date1+"/"+fileName1,"data.zip");
}
//1:去dxm处理指定文件
if("1".equals(type1)){
//dxm处理压缩文件
TraverseFolder2Utils.traverseFolder2("C:/Users/Administrator/Desktop/Ftp/dxm"+"/"+date1+"/"+fileName1, type1);
}else if("2".equals(type1)){
//ssp处理压缩文件
TraverseFolder2Utils.traverseFolder2("C:/Users/Administrator/Desktop/Ftp/ssp"+"/"+date1+"/"+fileName1, type1);
}
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}
}
3:实现下载ftp文件
package com.kuaifa.dianxiaomao.web.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import org.apache.commons.net.ftp.FTPClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DownloadFtpFileUtils {
private final static Logger logger = LoggerFactory.getLogger(DownloadFtpFileUtils.class);
/*
* 从FTP服务器下载文件
*
* @param ftpHost FTP IP地址
*
* @param ftpUserName FTP 用户名
*
* @param ftpPassword FTP用户名密码
*
* @param ftpPort FTP端口
*
* @param ftpPath FTP服务器中文件所在路径 格式: ftptest/aa
*
* @param localPath 下载到本地的位置 格式:H:/download
*
* @param fileName 文件名称
*/
public static void downloadFtpFile(String ftpHost, String ftpUserName,
String ftpPassword, int ftpPort, String ftpPath, String localPath,
String fileName) {
FTPClient ftpClient = null;
try {
ftpClient = GetFTPClientUtils.getFTPClient(ftpHost, ftpUserName, ftpPassword, ftpPort);
ftpClient.setControlEncoding("UTF-8"); // 中文支持
ftpClient.setFileType(FTPClient.BINARY_FILE_TYPE);
ftpClient.enterLocalPassiveMode();
ftpClient.changeWorkingDirectory(ftpPath);
File localFile = new File(localPath + File.separatorChar + fileName);
OutputStream os = new FileOutputStream(localFile);
ftpClient.retrieveFile(fileName, os);
os.close();
ftpClient.logout();
} catch (FileNotFoundException e) {
logger.info("没有找到" + ftpPath + "文件");
//System.out.println("没有找到" + ftpPath + "文件");
e.printStackTrace();
} catch (SocketException e) {
logger.info("连接FTP失败.");
// System.out.println("连接FTP失败.");
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
logger.info("文件读取错误。");
// System.out.println("文件读取错误。");
e.printStackTrace();
}
}
}
5:ftp实现连接
package com.kuaifa.dianxiaomao.web.util;
import java.io.IOException;
import java.net.SocketException;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GetFTPClientUtils {
private final static Logger logger = LoggerFactory.getLogger(GetFTPClientUtils.class);
public static FTPClient getFTPClient(String ftpHost, String ftpUserName,
String ftpPassword, int ftpPort) {
FTPClient ftpClient = new FTPClient();
try {
ftpClient = new FTPClient();
ftpClient.connect(ftpHost, ftpPort);// 连接FTP服务器
ftpClient.login(ftpUserName, ftpPassword);// 登陆FTP服务器
if (!FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
logger.info("未连接到FTP,用户名或密码错误。");
//System.out.println("未连接到FTP,用户名或密码错误。");
ftpClient.disconnect();
} else {
logger.info("FTP连接成功。。");
//System.out.println("FTP连接成功。");
}
} catch (SocketException e) {
e.printStackTrace();
logger.info("FTP的IP地址可能错误,请正确配置。");
// System.out.println("FTP的IP地址可能错误,请正确配置。");
} catch (IOException e) {
e.printStackTrace();
logger.info("FTP的端口错误,请正确配置。");
//System.out.println("FTP的端口错误,请正确配置。");
}
return ftpClient;
}
}
6:解压压缩文件
package com.kuaifa.dianxiaomao.web.util;
import java.io.File;
import java.io.IOException;
import java.util.zip.ZipException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TraverseFolder2Utils {
private final static Logger logger = LoggerFactory.getLogger(TraverseFolder2Utils.class);
public static void main(String[] args) throws InterruptedException {
String path = "C:/Users/Administrator/Desktop/Ftp测试";
traverseFolder2(path,"2");
}
public static void traverseFolder2(String path, String type) throws InterruptedException {
//JSONObject data=new JSONObject();
// is = ftp.getFileStream(ftpFile);
File file = new File(path);
if (file.exists()) {
File[] files = file.listFiles();
if (files.length == 0) {
// System.out.println("文件夹是空的!");
logger.info("文件夹是空的!");
return ;
} else {
for (File file2 : files) {
if (file2.isDirectory()) {
//System.out.println("文件夹:" + file2.getAbsolutePath());
traverseFolder2(file2.getAbsolutePath(),type);
} else {
//System.out.println("文件:" + file2.getAbsolutePath());
logger.info("处理的文件: --------》"+file2.getAbsolutePath());
//System.out.println("处理的文件: --------》"+file2.getAbsolutePath());
File fil = new File(file2.getAbsolutePath());
try {
GetZipFileContentUtils.getZipFileContent(fil, "mac.txt",type);
Thread.sleep(600);
} catch (ZipException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
} else {
// System.out.println("文件不存在!");
logger.info("文件不存在!");
return ;
}
}
}
7:调用的类去发送flume
package com.kuaifa.dianxiaomao.web.util;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
import org.apache.ibatis.annotations.Delete;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
public class GetZipFileContentUtils {
/*
* public static void main(String[] args) { long string=
* System.currentTimeMillis(); System.out.println(string); }
*/
public static void getZipFileContent(File zipFile, String readFileName,
String type) throws ZipException, IOException, InterruptedException {
// StringBuilder content = new StringBuilder();
JSONObject json = new JSONObject();
ZipFile zip = new ZipFile(zipFile);
Enumeration<ZipEntry> entries = (Enumeration<ZipEntry>) zip.entries();
String pmac = "";
String[] split =null;
List<String> list = new ArrayList<String>();
ZipEntry ze;
// 枚举zip文件内的文件/
while (entries.hasMoreElements()) {
ze = entries.nextElement();
// 读取目标对象
if (ze.getName().equals(readFileName)) {
Scanner scanner = new Scanner(zip.getInputStream(ze));
List<Map<String, Object>> data = new ArrayList<Map<String, Object>>();
while (scanner.hasNextLine()) {
Map<String, Object> tmpMap = new HashMap<String, Object>();
// System.out.println(scanner.nextLine());
String msg = scanner.nextLine();
//pmac += msg + ",";
// list.add(msg);
tmpMap.put("headers", new HashMap<>());
tmpMap.put("body", msg);
data.add(tmpMap);
}
String jsonString = JSON.toJSONString(data);
// System.out.println(jsonString);
try {
PostUtils.appadd(jsonString,type);
} catch (Exception e) {
Thread.sleep(3000);
}
/*split = pmac.split(",");
// 获得设备mac
split[0].replace(":", "");*/
/* // 遍历删除指定元素
String emac = list.get(0).replace(":", "").toLowerCase();
for (int i = 1; i < list.size(); i++) {
List<String> SendList = new ArrayList<String>();
SendList.add(list.get(i));
String[] newStr = SendList.toArray(new String[1]); // 返回一个包含所有对象的指定类型的数组
System.out.println(emac + " emac");
json.put("emac", emac);
json.put("pmacs", newStr);
System.out.println(json);
// 拼接参数进行数据发送 1:dxm 2:ssp 当前时间戳
long timestamp = System.currentTimeMillis();
int leng = json.toString().length();
String httpUrl = "";
String param = "" ;
if ("1".equals(type)) {
//dxm加密字符串
param = "1316" + timestamp + leng
+ "993e7a6ffc6e47f78c276d9806c5cd04";
System.out.println(param + "没加密之前");
} else if ("2".equals(type)) {
//ssp加密字符串
param = "1314" + timestamp + leng
+ "97c9c694d99f729e1a48940d0b386a9b";
System.out.println(param + "没加密之前");
}
//md5加密
String result = EncryptUtils.md5(param);
System.out.println(result + "===========加密字符串");
if ("1".equals(type)) {
httpUrl = "http://119.188.98.7:18182/Kuaifa/macs/Mac?token="
+ result + "&appid=1316" + "×tamp=" + timestamp;
String response = HttpClient.sendPostJson(httpUrl,
json.toString());
System.out.println(response);
} else if ("2".equals(type)) {
httpUrl = "http://119.188.98.7:18182/Kuaifa/macs/Mac?token="
+ result
+ "&appid=1314"
+ "×tamp="
+ timestamp;
System.out.println(httpUrl);
String response = HttpClient.sendPostJson(httpUrl,
json.toString());
System.out.println(response);
}
}*/
//list.remove(0);
scanner.close();
}
}
zip.close();
}
}