这个方法还有待研究,目前还有如下几个疑点:
1. ActiveMQ 报出这样的信息:
- INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
- destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info
2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?
使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。
客户端收到内容后,根据命令将内容合并到一个文件中。
- package org.apache.activemq.exchange.file;
- import java.io.BufferedOutputStream;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.Session;
- import javax.jms.StreamMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class Consumer {
- /**
- * @param args
- */
- public static void main(String[] args) throws JMSException, IOException {
- ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("EXCHANGE.FILE");
- MessageConsumer consumer = session.createConsumer(destination);
- boolean appended = false;
- try {
- while (true) {
- Message message = consumer.receive(5000);
- if (message == null) {
- continue;
- }
- if (message instanceof StreamMessage) {
- StreamMessage streamMessage = (StreamMessage) message;
- String command = streamMessage.getStringProperty("COMMAND");
- if ("start".equals(command)) {
- appended = false;
- continue;
- }
- if ("sending".equals(command)) {
- byte[] content = new byte[4096];
- String file_name = message.getStringProperty("FILE_NAME");
- BufferedOutputStream bos = null;
- bos = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
- if (!appended) {
- appended = true;
- }
- while (streamMessage.readBytes(content) > 0) {
- bos.write(content);
- }
- bos.close();
- continue;
- }
- if ("end".equals(command)) {
- appended = false;
- continue;
- }
- }
- }
- } catch (JMSException e) {
- throw e;
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
- }
发送端将文件分包,逐次发送到客户端
- package org.apache.activemq.exchange.file;
- import java.io.BufferedInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.StreamMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- public class Publisher {
- public static String FILE_NAME = "01.mp3";
- public static void main(String[] args) throws JMSException, IOException {
- ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("EXCHANGE.FILE");
- MessageProducer producer = session.createProducer(destination);
- long time = System.currentTimeMillis();
- //通知客户端开始接受文件
- StreamMessage message = session.createStreamMessage();
- message.setStringProperty("COMMAND", "start");
- producer.send(message);
- //开始发送文件
- byte[] content = new byte[4096];
- InputStream ins = Publisher.class.getResourceAsStream(FILE_NAME);
- BufferedInputStream bins = new BufferedInputStream(ins);
- while (bins.read(content) > 0) {
- //
- message = session.createStreamMessage();
- message.setStringProperty("FILE_NAME", FILE_NAME);
- message.setStringProperty("COMMAND", "sending");
- message.clearBody();
- message.writeBytes(content);
- producer.send(message);
- }
- bins.close();
- ins.close();
- //通知客户端发送完毕
- message = session.createStreamMessage();
- message.setStringProperty("COMMAND", "end");
- producer.send(message);
- connection.close();
- System.out.println("Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
- }
- }