1.环境:
- jdk1.8
- rabbitmq_server-3.3.4
- win10
2.windows安装参考:https://www.cnblogs.com/wuzhiyuan/p/6845230.html
3.启动、关闭服务:net start/stop RabbitMQ
4.访问地址:http://localhost:15672 输入默认账号:guest 密码:guest
5.pom地址
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
6.测试主类
package com.yb.rabbit;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
public class Main {
public Main() throws Exception {
//生产消息
createProduce("haha");
//消费消息
createConsumer("haha");
}
private void createConsumer(String ss) throws Exception{
// 创建消费者,即消息接收者,并启动线程
QueueConsumer consumer = new QueueConsumer(ss);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
// System.out.println(consumerThread.getName());
}
private void createProduce(String ss) throws Exception{
// 创建生产者,即消息发送者
Producer producer = new Producer(ss);
// 循环发送消息
for (int i = 0; i < 3; i++) {
HashMap message = new HashMap();
message.put("phoneNumber", "130"+i);
message.put("shopName", "phone"+i);
message.put("address", "长安街"+i+"号");
producer.sendMessage(message);
System.out.println("Message hahaha " + i + " sent.");
}
}
/**
* @param args
* @throws SQLException
* @throws IOException
*/
public static void main(String[] args) throws Exception {
new Main();
}
}
7.配置类
package com.yb.rabbit;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//base 配置类
public abstract class EndPoint {
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws IOException {
this.endPointName = endpointName;
// Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
// 与RabbitMQ Server建立连接
// 连接到的broker在本机localhost上
factory.setHost("localhost");
// getting a connection
try {
connection = factory.newConnection();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// creating a channel
channel = connection.createChannel();
// declaring a queue for this channel. If queue does not exist,
// it will be created on the server.
// queueDeclare的参数:queue 队列名;durable true为持久化;exclusive
// 是否排外,true为队列只可以在本次的连接中被访问,
// autoDelete true为connection断开队列自动删除;arguments 用于拓展参数
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* 关闭channel和connection。并非必须,因为隐含是自动调用的。
*
* @throws IOException
*/
public void close() throws IOException {
this.connection.close();
}
}
8.生产者
package com.yb.rabbit;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
//生产者
public class Producer extends EndPoint {
public Producer(String endPointName) throws IOException {
super(endPointName);
}
public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("", endPointName, null, SerializationUtils.serialize(object));
}
}
9.消费者
package com.yb.rabbit;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.SerializationUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
//消费者
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
public QueueConsumer(String endPointName) throws IOException {
super(endPointName);
}
public void run() {
try {
// start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true, this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer " + consumerTag + " registered");
}
/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body)
throws IOException {
Map map = (HashMap) SerializationUtils.deserialize(body);
Set keySet = map.keySet();
System.out.println("===============");
for (Object object : keySet) {
System.out.println("key:" + object + " , value:" + map.get(object));
}
}
public void handleCancel(String consumerTag) {
}
public void handleCancelOk(String consumerTag) {
}
public void handleRecoverOk(String consumerTag) {
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {
}
}
10.测试结果:Main类中main方法运行,发送3条,消费了3条