rabbitmq_spring类_纯java代码

主要使用org.springframework.amqp包下的代码
生产者  (消息json化)

生产者类的代码
 

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

import com.qbsea.common.properties.ConfigProperties;
import com.qbsea.modules.rabbitmqproduct.model.PersonModel;

public class JavaMainProduct {

	public static void main(String[] args) {
		ApplicationContext ac = new FileSystemXmlApplicationContext("classpath:config/springConfig/applicationContext.xml");
		ConfigProperties configProperties = (ConfigProperties)ac.getBean("configProperties");

		com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
		factory.setHost(configProperties.getRabbitmqAddress());
		factory.setUsername(configProperties.getRabbitmqUsername());
		factory.setPassword(configProperties.getRabbitmqPassword());
		factory.setVirtualHost(configProperties.getRabbitmqVirtualHost());
		ConnectionFactory cf = new CachingConnectionFactory(factory);
		
		//设置一下 queue exchange routingKey 之间的绑定关系 
		RabbitAdmin admin = new RabbitAdmin(cf);
		Queue queue = new Queue("myQueue");
		admin.declareQueue(queue);
		TopicExchange exchange = new TopicExchange("myExchange");
		admin.declareExchange(exchange);
		admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("myroutingKey"));//这个routingKey可以为*.orange.*
		
		RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
		rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
		//Dat
		PersonModel personModel = new PersonModel();
		personModel.setId("1");
		personModel.setName("zhangsan");
		personModel.setAge(14);
		rabbitTemplate.convertAndSend("myExchange", "myroutingKey", personModel);
		System.out.println("aaaa");
	}

}

消息端

具体的消费端的代码如下

 

import java.io.UnsupportedEncodingException;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
public class JavaMainConsumer {

	public static void main(String[] args) {
		 com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
		 factory.setHost("127.0.0.1");
		 factory.setUsername("admin");
		 factory.setPassword("admin");
		 factory.setVirtualHost("/vhost_sunlei");
		 ConnectionFactory cf = new CachingConnectionFactory(factory);
		 // set up the listener and container
		 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
		 Object listener = new Object() {
//			 	public void handleMessage(String message) {
//			 		System.out.println(message);
//			 	}
			 	
			 	public void handleMessage(byte[] bytes) {
			 		System.out.println("bytes="+bytes);
			 		String msg;
					try {
						msg = new String(bytes, "UTF-8");
						System.out.println("msg="+msg);
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
			 	}
		 };
          MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
          container.setMessageListener(adapter);
          container.setQueueNames("myQueue");
          container.start();
	}

}

猜你喜欢

转载自blog.csdn.net/maqingbin8888/article/details/81705669