WebService学习总结(八)——利用websevice接口封装activemq的生产者,并监听到消息

1.本系列第五篇,利用myeclipse生成基于jax-ws的接口,并能成功访问wsdl后,接下其他操作

1.service接口:

/**
 * 用于外部系统调用接口
 * 
 * @author Administrator
 * 
 */
public interface IUserService {
 
    public Users getUsersByUserCode(String userCode);
 
    public Users selectByPrimaryKey(Integer id);
 
    public int deleteByPrimaryKey(Integer id);
 
    public int insert(Users record);
 
    public int insertSelective(Users record);
 
    public int updateByPrimaryKeySelective(Users record);
 
    public int updateByPrimaryKey(Users record);
 
}
2.UserService类代码

/**
 * 用户信息操作服务
 */
@Service
public class UserService implements IUserService {
    private UsersDao usersDao;
    
    @Resource
    private ProducerServiceUser producerServiceUser;
    
    @Resource(name = "userQueueDestination")
    private Destination destination;
 
    //查询方法
    public Users getUsersByUserCode(String userCode) {
        return usersDao.getUsersByUserCode(userCode);
    }
 
    public Users selectByPrimaryKey(Integer id) {
        return usersDao.selectByPrimaryKey(id);
    }
 
    // 增删改的方法,都需要向消息队列中发送数据
    /**
     * 删除方法,对外暴露接口
     * 
     * @param id
     */    
    public void deleteByPrimaryKeyPort(Integer id) {
        // 将传入的参数放入队列中
        StringBuffer buf = new StringBuffer();
        buf.append(Constant.DATA_DELETE);
        buf.append(":");
        buf.append(String.valueOf(id));
        // 调用对应的生产者的sendMessage方法,向队列发送消息
        producerServiceUser.sendMessage(destination, buf.toString());
    }
 
    public int deleteByPrimaryKey(Integer id) {
        return usersDao.deleteByPrimaryKey(id);
    }
 
    /**
     * 添加方法,对外暴露接口
     * 
     * @param id
     */
 
    public void insertPort(Users record) {
        // 将传入的参数放入队列中
        StringBuffer buf = new StringBuffer();
        buf.append(Constant.DATA_INSERT);
        buf.append(":");
        buf.append(String.valueOf(record.toString()));
        // 调用对应的生产者的sendMessage方法,向队列发送消息
        producerServiceUser.sendMessage(destination, buf.toString());
    }
 
    public int insert(Users record) {
        return usersDao.insert(record);
    }
 
    /**
     * 添加方法,对外暴露接口
     * 
     * @param id
     */
    public void insertSelectivePort(Users record) {
        // 将传入的参数放入队列中
        StringBuffer buf = new StringBuffer();
        buf.append(Constant.DATA_INSERT);
        buf.append(":");
        buf.append(String.valueOf(record.toString()));
        // 调用对应的生产者的sendMessage方法,向队列发送消息
        producerServiceUser.sendMessage(destination, buf.toString());
    }
 
    public int insertSelective(Users record) {
        return usersDao.insertSelective(record);
    }
 
    /**
     * 更新方法,对外暴露接口
     * 
     * @param id
     */
 
    public void updateByPrimaryKeySelectivePort(Users record) {
        // 将传入的参数放入队列中
        StringBuffer buf = new StringBuffer();
        buf.append(Constant.DATA_UPDATE);
        buf.append(":");
        buf.append(String.valueOf(record.toString()));
        // 调用对应的生产者的sendMessage方法,向队列发送消息
        producerServiceUser.sendMessage(destination, buf.toString());
    }
 
    public int updateByPrimaryKeySelective(Users record) {
        return usersDao.updateByPrimaryKeySelective(record);
    }
 
    /**
     * 更新方法,对外暴露接口
     * 
     * @param id
     */
 
    public void updateByPrimaryKeyPort(Users record) {
        // 将传入的参数放入队列中
        StringBuffer buf = new StringBuffer();
        buf.append(Constant.DATA_UPDATE);
        buf.append(":");
        buf.append(String.valueOf(record.toString()));
        // 调用对应的生产者的sendMessage方法,向队列发送消息
        producerServiceUser.sendMessage(destination, buf.toString());
    }
 
    public int updateByPrimaryKey(Users record) {
        return usersDao.updateByPrimaryKey(record);
    }
 
}
3.UserServiceDelegate类代码

@javax.jws.WebService(targetNamespace = "http://impl.service.store.yundao.com/", serviceName = "UserServiceService", portName = "UserServicePort")
public class UserServiceDelegate {
 
    com.yundao.store.service.impl.UserService userService = new com.yundao.store.service.impl.UserService();
 
    /**
     * 对外暴露的接口
     */
    public Users getUsersByUserCode(String userCode) {
        return userService.getUsersByUserCode(userCode);
    }
 
    public Users selectByPrimaryKey(Integer id) {
        return userService.selectByPrimaryKey(id);
    }
 
    /**
     * 对外暴露的接口
     */
    public void deleteByPrimaryKeyPort(Integer id) {
        userService.deleteByPrimaryKeyPort(id);
    }
 
    @WebMethod(exclude = true)
    public int deleteByPrimaryKey(Integer id) {
        return userService.deleteByPrimaryKey(id);
    }
 
    /**
     * 对外暴露的接口
     */
    public void insertPort(Users record) {
        userService.insertPort(record);
    }
 
    @WebMethod(exclude = true)
    public int insert(Users record) {
        return userService.insert(record);
    }
 
    /**
     * 对外暴露的接口
     */
    public void insertSelectivePort(Users record) {
        userService.insertSelectivePort(record);
    }
 
    @WebMethod(exclude = true)
    public int insertSelective(Users record) {
        return userService.insertSelective(record);
    }
 
    /**
     * 对外暴露的接口
     */
    public void updateByPrimaryKeySelectivePort(Users record) {
        userService.updateByPrimaryKeySelectivePort(record);
    }
 
    @WebMethod(exclude = true)
    public int updateByPrimaryKeySelective(Users record) {
        return userService.updateByPrimaryKeySelective(record);
    }
 
    /**
     * 对外暴露的接口
     */
    public void updateByPrimaryKeyPort(Users record) {
        userService.updateByPrimaryKeyPort(record);
    }
 
    @WebMethod(exclude = true)
    public int updateByPrimaryKey(Users record) {
        return userService.updateByPrimaryKey(record);
    }
 
}
4.ProducerServiceUser类

**
 * 生产者服务类
 * 
 * @author  2018-3-26下午4:38:22
 *         将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用ProducerService实例中的sendMessage
 *         方法就可以向默认目的发送一个消息。
 */
@Service
public class ProducerServiceUser {
    @Autowired
    private JmsTemplate jmsTemplate;
 
    /**
     * 向指定队列发送消息
     */
    public void sendMessage(Destination destination, final String msg) {
        System.out.println(Thread.currentThread().getName() + " 向队列"
                + destination.toString() + "发送消息---------------------->" + msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
 
    /**
     * 向默认队列发送消息
     */
    public void sendMessage(final String msg) {
        String destination = jmsTemplate.getDefaultDestinationName();
        System.out.println(Thread.currentThread().getName() + " 向队列"
                + destination + "发送消息---------------------->" + msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
 
}
5.对应的生产者配置文件spring-activemq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
 
    <!-- 生产者 -->
    <!-- 扫描activemq包 -->
    <context:component-scan base-package="com.yundao.store.activemq.*" />
 
 
    <!-- 1.ActiveMQ的连接工厂 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
 
    <!-- 2.Spring Caching连接工厂 -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
 
    <!-- 3.1.定义用户消息队列(Queue) -->
    <bean id="userQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg index="0" value="queue-user" />
    </bean>
    <!-- 3.2.定义日志消息队列(Queue) -->
    <bean id="logQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
       <!-- 设置消息队列的名字 -->
        <constructor-arg index="0" value="queue-log" />
    </bean>
 
    <!-- 4.配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <!-- 4.1.队列模板 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="receiveTimeout" value="10000" />
    </bean>
 
 
</beans>
6.对应的消费者配置文件spring-activemq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
 
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
 
    <!-- 消费者 -->
    <!-- 扫描activemq包 -->
    <context:component-scan base-package="com.yundao.store.activemq.*" />
 
    <!-- 1.ActiveMQ的连接工厂 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>
 
    <!-- 2.Spring Caching连接工厂 -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>
 
 
 
    <!-- 5.第一个消息监听器(Queue) -->
    <bean id="userQueueMessageListener"
        class="com.yundao.store.activemq.listener.userQueueMessageListener"></bean>
    <!-- 第二个消息监听器(Queue) -->
    <bean id="logQueueMessageListener"
        class="com.yundao.store.activemq.listener.logQueueMessageListener">
    </bean>
 
    <!-- 6.第一个消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 --><!-- 自动获取 -->
    <bean id="sessionAwareListenerContainer01"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="userQueueDestination" />
        <property name="messageListener" ref="userQueueMessageListener" />
    </bean>
 
    <!-- 6.第二个消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 --><!-- 自动获取 -->
    <bean id="sessionAwareListenerContainer02"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="logQueueDestination" />
        <property name="messageListener" ref="logQueueMessageListener" />
    </bean>
 
 
</beans>
7.监听器的类

**
 * 配置消息队列监听者(实际项目中采用的是这种方式)
 * 
 * @author  2018-3-26下午4:46:44
 */
public class userQueueMessageListener implements MessageListener {
    private UserService userService;    
    // 当收到消息后,自动调用该方法
    @Override
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("userQueueMessageListener监听到了文本消息:\t" + tm.getText());
            //处理业务逻辑
        } catch (JMSException e) {
            e.printStackTrace();
        }
 
    }
 
}
8.测试方法

/**
     * 用户登录
     */
    public String login() {
        setUtfEncoding();
        String userCode = getRequest().getParameter("userCode");
        String password = getRequest().getParameter("password");
 
//        String param = "登录成功";
//        producerServiceUser.sendMessage(destination, param);
//
//        String param1 = "日志成功";
//        producerServiceLog.sendMessage(destination1, param1);
        int id=1;
        userService.deleteByPrimaryKeyPort(id);
        return SUCCESS;
    }
最后能成功监听到调用接口封装的方法传输过来的数据,真实的的接口调用,可以参考我soap调接口那篇

简单的记录下来,毕竟是自己做项目遇到的难点......
--------------------- 
作者:changhenshui1990 
来源:CSDN 
原文:https://blog.csdn.net/changhenshui1990/article/details/79805332 
版权声明:本文为博主原创文章,转载请附上博文链接!

猜你喜欢

转载自blog.csdn.net/smxjant/article/details/89399202