Netty游戏服务器实战开发(10):Netty结合kafka实现分布式消息队列

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/baidu_23086307/article/details/83186145

在分布式游戏服务器系统中,消息处理队列主要解决问题就是解耦系统中的业务,使得每个系统看起来功能比较单一,而且解决一些全服数据共享等问题。

通常我们知道kafka是作为消息队列比较火的一种方式,其实还有(Active MQ,Rabbit MQ,Zero MQ)个人觉得kafka比较好用点,哈哈,习惯吧。

同样我们来复习kafka基础。
kafka基础。

的内容来介绍kafka的基本安装。

首先我们要去kafka官方网站上下载kafka依赖包。https://kafka.apache.org/
下载下来之后的到安装包解压。
在这里插入图片描述

当然我编写了一个启动脚本,内容很简单

@echo off
.\bin\windows\kafka-server-start.bat .\config\server.properties

运行kafka需要启动zookeeper,所以需要确保机器上有可用的zookeeper。关于zookeeper,可以阅读我上篇文章所写的内容。https://blog.csdn.net/baidu_23086307/article/details/82769234
进入conf目录。使用文本编译器编辑service.properties文件,修改log.dirs熟悉,修改为你自己的路径。
例如我的路径为这个我就修改成这了。
在这里插入图片描述

然后保存文件。
然后我们进入bin目录,里面有个Windows目录,我们执行如下命令,

kafka-topics.bat --create --zookeeper 0.0.0.0:2181 --replication-factor 1 --partitions 1 --topic defaultTopic
创建一个topic中心。有关kafka的topic,请查看这个大大的说明。这就不详细说明了。
https://blog.csdn.net/u013256816/article/details/79303825
不要关闭窗口,然后我们回到kafka目录,打开cmd执行命令:

@echo off
.\bin\windows\kafka-server-start.bat .\config\server.properties

当然我更喜欢把它编写成一个bat脚本。方便与下次直接使用。然后我们启动kafka服务

在这里插入图片描述

扫描二维码关注公众号,回复: 3681274 查看本文章

大概看到这样的说明kafka启动没有问题。否者启动出现各种问题,需要读者自己排查问题。

整合kafka实现分布式消息队列

万事具备,我们就要将kafka使用到我们的业务场景中来了。在游戏服务器中,我们通常用kafka做一些全局的东西。比如全服聊天,全服活动之类的任务。也可以用于任务系统,解耦每一部分的逻辑。
我们的系统采用netty+spring整合的游戏服务器系统。当然spring也对kafka进行封装了。所以我们在工程中需要添加kafka的依赖。

<!--kafka-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.0.0.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

添加完maven依赖后我们就可以使用spring提供的kafka进行封装了。首先我们需要一个kafka中心服务器的配置文件。
简单的配置了kafka的基础属性
game-kafka.properties

kafka.taskThreadSize=4
kafka.corePoolSize=4
kafka.maximumPoolSize=4
kafka.keepAliveTime=5000

然后我们添加kafka的consumer配置,application-kafka-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

<!-- 定義consumer的參數 -->
<bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <!-- 配置kafka的broke -->
            <entry key="bootstrap.servers" value="0.0.0.0:9092"/>
            <!-- 配置組-->
            <entry key="group.id" value="0"/>
            <entry key="enable.auto.commit" value="true"/>
            <entry key="auto.commit.interval.ms" value="1000"/>
            <entry key="session.timeout.ms" value="30000"/>
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
        </map>
    </constructor-arg>
</bean>

<!-- 創建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg ref="consumerProperties"/>
</bean>

<!-- 實際執行消息消費的類 -->
<bean id="messageListenerConsumerService"
      class="com.twjitm.core.common.kafka.NettyKafkaConsumerListener"/>

<!-- 消費者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <!-- 重要!配置topic -->
    <constructor-arg value="defaultTopic"/>
    <property name="messageListener" ref="messageListenerConsumerService"/>
</bean>

<!-- 創建kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
      init-method="doStart">
    <constructor-arg ref="consumerFactory"/>
    <constructor-arg ref="containerProperties"/>
</bean>
然后在添加一个kafka的producer。application-kafka-producer.xml <?xml version="1.0" encoding="UTF-8"?>

<!--bootstrap.servers 消費者提供服務器:example:0.0.0.1:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092-->
<!-- 定義producer的參數 -->
<bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>

            <entry key="bootstrap.servers"
                   value="0.0.0.0:9092"/>
            <entry key="group.id" value="0"/>
            <entry key="retries" value="1"/>
            <entry key="batch.size" value="16384"/>
            <entry key="linger.ms" value="1"/>
            <entry key="buffer.memory" value="33554432"/>
            <!--序列化方式-->
            <entry key="key.serializer"
                   value="org.apache.kafka.common.serialization.StringSerializer"/>
            <entry key="value.serializer"
                   value="org.apache.kafka.common.serialization.StringSerializer"/>
        </map>

    </constructor-arg>
</bean>

<!-- 創建kafka template需要使用的producer factory bean -->
<bean id="producerFactory"
      class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg ref="producerProperties"/>
</bean>

<!-- 創建kafka template bean,使用的時候,只需要注入這個bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory"/>
    <constructor-arg name="autoFlush" value="true"/>
    <property name="defaultTopic" value="defaultTopic"/>
    <!--<property name="producerListener" ref="producerListener"/>-->
</bean>
具体详细配置正如代码中的注释一样,配置好kafka的基础属性,整合到spring 的bean对象中,使用spring容器来管理这些bean对象。省去了对象管理的工作。

程序实现:
我们需要定义一个kafka抽象任务,每个任务其实就是想kafka里面发送消息。属于producer范畴。AbstractKafkaPushTask .java

package com.twjitm.core.common.kafka;

/**
 * @author twjitm - [Created on 2018-09-04 21:31]
 */
public abstract class AbstractKafkaPushTask {
    private KafkaTaskType taskType;
    public Object value;

    public AbstractKafkaPushTask(KafkaTaskType taskType) {
        this.taskType = taskType;
    }

    public KafkaTaskType getTaskType() {
        return taskType;
    }


    public Object getValue() {
        return value;
    }

    /**
     * 需要將消息值保存到這裏面,值如何獲得由子類自己實心
     *
     * @param value
     */
    public abstract void setValue(Object value);
}

kafka消息监听器:消息消费者.

package com.twjitm.core.common.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

/**
 * kafka 消息消費者
 *
 * @author twjitm- [Created on 2018-09-04 16:12]
 */
public class NettyKafkaConsumerListener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {

        System.out.println(consumerRecord);
        consumerRecord.value();
    }
}

在这我们对消息做简单的处理,但是在实际项目开发中我们还得更具消息的具体类型做不同的业务处理,因此我们可以将消息的具体应用做出自己业务逻辑处理。

消息监听器继承MessageListener,而spring对这个接口做了处理,所以能够监听到消息的到来。
在这里插入图片描述

有了消息消费者,我们需要编写一个消息提供者,消息提供者负责将消息发布到kafka消息队列中。实现代码如下:

package com.twjitm.core.common.kafka;

import com.alibaba.fastjson.JSON;
import com.twjitm.core.common.config.global.GlobalConstants;
import com.twjitm.core.common.config.global.KafkaConfig;
import com.twjitm.core.common.service.IService;
import com.twjitm.core.spring.SpringServiceManager;
import com.twjitm.threads.thread.NettyThreadNameFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;
import java.util.concurrent.*;

/**
 * kafka 消息提供者
 *
 * @author twjitm- [Created on 2018-09-04 16:17]
 */
@Service
public class NettyKafkaProducerListener implements IService {
    private Logger logger = LoggerFactory.getLogger(NettyKafkaProducerListener.class);
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 消息處理線程池
     */
    private volatile ExecutorService executorService;

    private boolean run = true;
    /**
     * 消息隊列
     */
    private BlockingQueue<AbstractKafkaPushTask> queue;

    /**
     * 將消息發送給kafka中心
     *
     * @param abstractKafkaPushTask
     */
    private void sendMessage(AbstractKafkaPushTask abstractKafkaPushTask) {
        String type = abstractKafkaPushTask.getTaskType().getTypeName();
        String value = JSON.toJSONString(abstractKafkaPushTask.getValue());
        //本身send方法就是一個異步執行方法
        ListenableFuture<SendResult<String, String>> result =
                kafkaTemplate.sendDefault(type, value);
        /**
         * 添加回調監聽
         */
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.error("KAFKA MESSAGE SEND FAIL", throwable);
            }

            @Override
            public void onSuccess(SendResult<String, String> kafkaTaskTypeObjectSendResult) {
                logger.info("KAFKA MESSAGE SEND SUCCESS");
            }
        });


    }

    /**
     * 將任務存放到隊列中
     *
     * @param task
     */
    public void put(AbstractKafkaPushTask task) {
        try {
            queue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String getId() {
        return NettyKafkaProducerListener.class.getSimpleName();
    }

    @Override
    public void startup() throws Exception {
        run = true;
        queue = new LinkedBlockingQueue<>();
        NettyThreadNameFactory factory = new NettyThreadNameFactory(GlobalConstants.Thread.GAME_KAFKA_TASK_EXECUTOR);
        //開啓線程數量
        KafkaConfig kafkaConfig = SpringServiceManager.getSpringLoadService().getNettyGameServiceConfigService().getKafkaConfig();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(kafkaConfig.getCorePoolSize(), kafkaConfig.getMaximumPoolSize(), kafkaConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
        executorService = poolExecutor;
        for (int i = 0; i < 4; i++) {
            executorService.execute(new Worker());
        }
    }

    @Override
    public void shutdown() throws Exception {
        logger.info("STOP KAFKA EXECUTOR");
        run = false;
        executorService.shutdown();
    }

    private class Worker implements Runnable {

        @Override
        public void run() {
            try {
                while (run) {
                    sendMessage(queue.take());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

封装看一个消息本地消息队列,将上层调用过来的消息存放到队列中,再有队列tack到kafka中。这样保证每个进程进来的消息都是有顺序的。

测试:

使用spring整合kafka使用起来说是特别方便的。说以我们编写一个测试类

package com.twjitm.kafka;

import com.twjitm.TestSpring;
import com.twjitm.core.common.kafka.KafkaTaskType;
import com.twjitm.core.common.kafka.NettyKafkaProducerListener;

import javax.annotation.Resource;

/**
 * @author twjitm- [Created on 2018-09-05 12:25]
 */

public class TestKafka {
    @Resource
    static NettyKafkaProducerListener nettyKafkaProducerListener;

    public static void main(String[] args) {
        TestSpring.initSpring();
        test();
    }

    public static void test() {
        WordChatTask task = new WordChatTask(KafkaTaskType.WORLD_CHAT);
        task.setValue("hello,world");
        nettyKafkaProducerListener.put(task);
    }

}

创建线程成功:
在这里插入图片描述

发送消息成功:

在这里插入图片描述

接收消息成功:

在这里插入图片描述

相关代码已经提交到github,欢迎各位大大star

https://github.com/twjitm/twjitm-core

猜你喜欢

转载自blog.csdn.net/baidu_23086307/article/details/83186145