ActiveMQ系列(四)--------结合 Eclipse 操作

打开Eclipse 

 创建Maven工程

 

 项目创建完毕  需要配置一下 pom.xml

 

在pom.xml中 添加ActiveMQ架包

扫描二维码关注公众号,回复: 13612720 查看本文章

 添加一个com.tong文件夹  在里面创建一个名为HelloWordProducer的类

 如果javax.jms 报错可以在pom.xml里面添加架包

 

  编写消息的生产者 

package com.tong;

import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Message;
import org.apache.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWordProducer {

	/***
	 * 生产消息
	 */
	public void sendHelloWordActiveMQ(String msgTest)
	{
		//定义链接工厂
		ConnectionFactory  connectionFactory=null;
		
		//定义链接对象
		Connection connection=null;
		
		//定义会话
		Session session=null;
		
		//定义目的地
		Destination destination=null;
		
		//定义消息的发送者
		MessageProducer producer=null;
		
		//定义消息
		Message messag=null;
		
		try
		{
			/***
			 * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
			 */
			connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
		
			//创建连接对象
			connection =connectionFactory.createConnection();
		
			//启动连接对象
			connection.start();
			
			/**
			 * transacted:是否使用事务:true|false
			 *             true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
			 *             false:不适用事务,设置次变量 则acknowledge参数必须设置
			 *             
			 * acknowledgeMode :
			 *   Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 *   Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
			 *   Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
			 */
			session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		
			//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
			destination=session.createQueue("helloword-destination");
			//创建消息的生产者
			producer=session.createProducer(destination);
			
			//创建消息对象
			messag =session.createTextMessage(msgTest);
			
			//发送消息
			producer.send(messag);
			
		}catch(Exception e)
		{
			e.printStackTrace();
		
		}finally
		{
			//回收消息发送者资源
			if(producer!=null)
			{
				try
				{
					producer.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			if(session!=null)
			{
				try
				{
					session.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			
			if(connection!=null)
			{
				try
				{
					connection.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
			}
		}
	}
}

编写消息的消费者 

           创建新的Maven项目

 pom.xml 文件和消息生产者  一致

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.tong</groupId>
  <artifactId>mq-consumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  
  <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId> 
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
      
            <!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
    <dependency>
           <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
            <version>2.0.1</version>
    </dependency>
<!-- MQ -->
           <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
   <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>5.0.6.RELEASE</version>
   </dependency>
           <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
   <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
      </dependency>
  </dependencies>
  
</project>
package com.tong;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloworldConsumer {
	
	/***
	 * 生产消息
	 */
	public void sendHelloWordActiveMQ()
	{
		//定义链接工厂
		ConnectionFactory  connectionFactory=null;
		
		//定义链接对象
		Connection connection=null;
		
		//定义会话
		Session session=null;
		
		//定义目的地
		Destination destination=null;
		
		//定义消息的消费者
		MessageConsumer consumer=null;
		
		//定义消息
		Message messag=null;
		
		try
		{
			/***
			 * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
			 */
			connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
		
			//创建连接对象
			connection =connectionFactory.createConnection();
		
			//启动连接对象
			connection.start();
			
			/**
			 * transacted:是否使用事务:true|false
			 *             true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
			 *             false:不适用事务,设置次变量 则acknowledge参数必须设置
			 *             
			 * acknowledgeMode :
			 *   Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 *   Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
			 *   Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
			 */
			session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		
			//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
			destination=session.createQueue("helloword-destination");
			
			//创建消息的消费者
			 consumer=session.createConsumer(destination);
			
			//创建消息对象
			messag = consumer.receive();
			 
			//处理消息
			 String msg= ((TextMessage)messag).getText();
			 
			  System.out.println("从ActiveMQ服务中获取文本信息"+msg);
			
		}catch(Exception e)
		{
			e.printStackTrace();
		
		}finally
		{
			//回收消息发送者资源
			if(consumer!=null)
			{
				try
				{
					consumer.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			if(session!=null)
			{
				try
				{
					session.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			
			if(connection!=null)
			{
				try
				{
					connection.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
			}
		}
	}

}

创建Test.java当启动类

package com.tong;

import com.tong.HelloWordProducer;
public class Test {
	public static void main(String[] args)
	{
		HelloWordProducer producer=new HelloWordProducer();
		producer.sendHelloWordActiveMQ("HelloWord");
	}

}

处理对象消息 

关键必须实现 Serializable 接口

package com.tong;

import java.io.Serializable;

public class Users implements Serializable {
	private int userid;
	private String username;
	private int userage;
	
	@Override
	public String toString() {
		return "Users [userid=" + userid + ", username=" + username + ", userage=" + userage + "]";
	}
	public int getUserid() {
		return userid;
	}
	public void setUserid(int userid) {
		this.userid = userid;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
	public int getUserage() {
		return userage;
	}
	public void setUserage(int userage) {
		this.userage = userage;
	}
	
}

生产者

package com.tong;

import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Message;
import org.apache.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloWordProducer2 {

	/***
	 * 生产消息
	 */
	public void sendHelloWordActiveMQ(Users user)
	{
		//定义链接工厂
		ConnectionFactory  connectionFactory=null;
		
		//定义链接对象
		Connection connection=null;
		
		//定义会话
		Session session=null;
		
		//定义目的地
		Destination destination=null;
		
		//定义消息的发送者
		MessageProducer producer=null;
		
		//定义消息
		Message messag=null;
		
		try
		{
			/***
			 * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
			 */
			connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
		
			//创建连接对象
			connection =connectionFactory.createConnection();
		
			//启动连接对象
			connection.start();
			
			/**
			 * transacted:是否使用事务:true|false
			 *             true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
			 *             false:不适用事务,设置次变量 则acknowledge参数必须设置
			 *             
			 * acknowledgeMode :
			 *   Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 *   Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
			 *   Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
			 */
			session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		
			//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
			destination=session.createQueue("my-users");
			//创建消息的生产者
			producer=session.createProducer(destination);
			
			//创建消息对象
			messag =session.createObjectMessage(user);
			
			//发送消息
			producer.send(messag);
			
		}catch(Exception e)
		{
			e.printStackTrace();
		
		}finally
		{
			//回收消息发送者资源
			if(producer!=null)
			{
				try
				{
					producer.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			if(session!=null)
			{
				try
				{
					session.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			
			if(connection!=null)
			{
				try
				{
					connection.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
			}
		}
	}
}

消费者

package com.tong;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class HelloworldConsumer2 {
	
	/***
	 * 生产消息
	 */
	public void sendHelloWordActiveMQ()
	{
		//定义链接工厂
		ConnectionFactory  connectionFactory=null;
		
		//定义链接对象
		Connection connection=null;
		
		//定义会话
		Session session=null;
		
		//定义目的地
		Destination destination=null;
		
		//定义消息的消费者
		MessageConsumer consumer=null;
		
		//定义消息
		Message messag=null;
		
		try
		{
			/***
			 * username:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * password:访问ActiveMQ服务的用户名 用户密码。 默认的为admin。用户名可以通过jetty-name.properties文件进行修改
			 * brokerURL:访问ActiveMQ服务的路径地址。路径地址结构为:协议名称://主机地址:端口号
			 */
			connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.100.133:61616");
		
			//创建连接对象
			connection =connectionFactory.createConnection();
		
			//启动连接对象
			connection.start();
			
			/**
			 * transacted:是否使用事务:true|false
			 *             true:使用事务 当设置次变量值.Session.SESSION_TRANSACTED
			 *             false:不适用事务,设置次变量 则acknowledge参数必须设置
			 *             
			 * acknowledgeMode :
			 *   Session.AUTO_ACKNOWLEDGE:自动消息确认机制
			 *   Seesion.CLIENT_ACKNOWLEDGE:客户端确认机制
			 *   Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息队列
			 */
			session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		
			//创建目的地,目的地名称即队列名称。消息的消费者需要此名称访问对应的队列
			destination=session.createQueue("my-users");
			
			//创建消息的消费者
			 consumer=session.createConsumer(destination);
			
			//创建消息对象
			messag = consumer.receive();
			 
			//处理消息
			ObjectMessage objMessage=(ObjectMessage) messag; 
			Users users=(Users)objMessage.getObject();
			System.out.println(users);
			
		}catch(Exception e)
		{
			e.printStackTrace();
		
		}finally
		{
			//回收消息发送者资源
			if(consumer!=null)
			{
				try
				{
					consumer.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			if(session!=null)
			{
				try
				{
					session.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
				
			}
			
			if(connection!=null)
			{
				try
				{
					connection.close();
				}catch(Exception e)
				{
					e.printStackTrace();
				}
			}
		}
	}

}

测试启动类

package com.tong;

import com.tong.HelloWordProducer;
public class Test {
	public static void main(String[] args)
	{
//		HelloWordProducer producer=new HelloWordProducer();
//		producer.sendHelloWordActiveMQ("HelloWord");
		Users users=new Users();
		users.setUserid(1);
		users.setUsername("童小纯");
		users.setUserage(20);
		HelloWordProducer2 producer=new HelloWordProducer2();
		producer.sendHelloWordActiveMQ(users);
	}

}

猜你喜欢

转载自blog.csdn.net/m0_58719994/article/details/121523129