版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u011267841/article/details/78435932
图片可以通过BlobMessage对象发送,但是在生产者消息发送成功后,消费者接收到,要首先 session.commit();否则一直要等待处理,代码如下,消费者代码:
package org.aisino.mq;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.aisino.common.Constants;
import org.aisino.common.HttpClientUtil;
import org.aisino.common.RequestUtils;
import org.aisino.common.xutils.JsonUtil;
import org.aisino.common.xutils.PropertiesLoader;
import org.aisino.dlgs.cgsdzda.vo.FileVo;
import org.aisino.rpcclient.HttpClient;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.beans.factory.InitializingBean;
public class ThreadQueueConsumerN implements Runnable, InitializingBean {
/**
* @param args
* @throws JMSException
*/
private static String user = ActiveMQConnection.DEFAULT_USER;
private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/";
Connection connection;
Session session;
@Override
public void afterPropertiesSet() throws Exception {
new Thread(this).start();
}
@Override
public void run() {
try {
// 获取 ConnectionFactory
PropertiesLoader loader = new PropertiesLoader("global.properties");
url = loader.getProperty("mq_url_receive");// 从global.properties中取值
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
System.out.println("进入了内网...........................run");
// 创建 Connection
connection = connectionFactory.createConnection();
connection.start();
// 创建 Session
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 创建 Destinatione
Destination destination = session.createQueue("queue1");
// 创建 Consumer
MessageConsumer consumer = session.createConsumer(destination);
// 注册消息监听器,当消息到达时被触发并处理消息
consumer.setMessageListener(new MessageListener() {
// 监听器中处理消息
public void onMessage(Message message) {
if (message instanceof BlobMessage) {
try {
session.commit();//注意要先commit,否则一直显示未处理
} catch (JMSException e) {
e.printStackTrace();
}
BlobMessage blobMessage = (BlobMessage) message;
String fileName = null;
try {
fileName = blobMessage
.getStringProperty("FILE.NAME");// 在json中取不到,要在blobMessage对象中才可以取到
File file = new File(fileName);// 存入临时文件
if (!file.exists()) {
file.createNewFile();
}
OutputStream os = new FileOutputStream(file);
System.out.println("开始接收文件:" + fileName);
InputStream inputStream = blobMessage
.getInputStream();
// 写文件,你也可以使用其他方式
byte[] buff = new byte[256];
int len = 0;
while ((len = inputStream.read(buff)) > 0) {
os.write(buff, 0, len);
}
os.close();
String json = blobMessage.getStringProperty("json");
System.out.println("完成文件接收:" + file.getName()
+ " 路径 :" + file.getPath());
System.out.println("json:" + json);
FileVo vo = uploadConfigFile(file);// 上传FTP
String f = vo.getFilename();// 重命名后的文件名,等会儿传入后台
String furl = vo.getFileurl();// 重命名后的文件名,等会儿传入后台
Map map = JsonUtil.JsonObjStr2Map(json);
map.put(Constants.FILENAME, f);// 文件名
map.put("fileurl", furl);// //注意,传入ftp的路径
json = JsonUtil.map2json(map);// 从新打包放入json传入后台
//httpRequest(json);
} catch (Exception e) {
e.printStackTrace();
}
} else if (message instanceof TextMessage) {
try {
session.commit();//注意要先commit,否则一直显示未处理
} catch (JMSException e) {
e.printStackTrace();
}
try {
TextMessage textMessage = (TextMessage) message;
String json = textMessage.getStringProperty("json");// 此处json为参数
String string = textMessage.getText();
//httpRequest(string);
System.out.println("发送的文本是 " + string
+ " 发送的json数据是 " + json);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 此处根据不同的参数来转发
*
* @param json
* @return
*/
public void httpRequest(String json) {
//插入业务代码
}
/**
* 文件上传FTP,注意传入文件前要将文件重命名,JDK1.7测试通过
*
* @param uploadFile
* ,文件对象
*/
private void uploadConfigFile(File uploadFile) {
FTPClient ftpClient = new FTPClient();
String url = null;
try {
PropertiesLoader loader = new PropertiesLoader(
"global.properties");
String http_picture = loader.getProperty("http_picture");// 从global.properties中取值
Map<String, Object> mapPath = RequestUtils.loadFilePath();
ftpClient.connect(mapPath.get("ftp_host").toString(),
Integer.parseInt(mapPath.get("ftp_port").toString()));
ftpClient.login(mapPath.get("ftp_username").toString(), mapPath
.get("ftp_password").toString());
ftpClient.enterLocalPassiveMode();
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
// 设置上传目录
ftpClient.changeWorkingDirectory(mapPath.get("ftp_filepath")
.toString());
String fileName = new String(
uploadFile.getName().getBytes("utf-8"), "iso-8859-1");
FTPFile[] fs = ftpClient.listFiles();
url = http_picture+ fileName;
if (fs != null && fs.length > 0) {
for (int i = 0; i < fs.length; i++) {
if (fs[i].getName().equals(fileName)) {
ftpClient.deleteFile(fs[i].getName());
break;
}
}
}
OutputStream os = ftpClient.appendFileStream(fileName);
byte[] bytes = new byte[1024];
// InputStream is = uploadFile.getInputStream();
InputStream is = new FileInputStream(uploadFile);
int c;
// 暂未考虑中途终止的情况
while ((c = is.read(bytes)) != -1) {
os.write(bytes, 0, c);
}
os.flush();
is.close();
os.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
ftpClient.disconnect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
生产者:
package tk.mybatis.springboot.mq;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.aisino.common.xutils.PropertiesLoader;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
/**
* 通过 ActiveMQ 发送文件的程序
*
* @author wb-liufei
*
*/
public class MessageSender {
/**
* @param args
* @throws JMSException
*
*/
private static String user = ActiveMQConnection.DEFAULT_USER;
private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/";
private static String queque = "queue1";
/***
* file和text只能传一个,另一个必须传入null
*
* @param file
* 文件对象
* @param text
* 文本信息
* @param json
* json字符串参数
* @param queue
* :通道
* @throws JMSException
*/
public void sender(File file, String text, String json) throws JMSException {
PropertiesLoader loader = new PropertiesLoader("global.properties");
url = loader.getProperty("mq_url_sender");// 从global.properties中取值
// 获取 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 创建 Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// 创建 Session
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 创建 Destination
Queue destination = session.createQueue(queque);
// 创建 Producer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为非持久性
// 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件
// 构造 BlobMessage,用来传输文件
// 如果设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 消息持久性的话,
// 发送方传文件的时候,接收方可以不在线,文件会暂存在 ActiveMQ 服务器上,等到接收程序上线后仍然可以收到发过来的文件。
TextMessage textMessage = null;
BlobMessage blobMessage = null;
if (text != null && file == null) {
textMessage = session.createTextMessage(text);
// textMessage.setStringProperty("json", json);
producer.send(textMessage);// 存入文本
}
if (file != null && text == null) {
blobMessage = ((ActiveMQSession) session).createBlobMessage(file);
blobMessage.setStringProperty("json", json);
blobMessage.setStringProperty("FILE.NAME", file.getName());// 存入文件名称
System.out.println("开始发送文件:" + file.getName() + ",文件大小:"
+ file.length() + " 字节");
producer.send(blobMessage);// 存入二进制文件,并发送
System.out.println("完成文件发送:" + file.getName());
}
// producer.close();
session.commit();
connection.close(); // 不关闭 Connection, 程序则不退出
}
}