消息队列之 (4)-将消息写入到文件中

前言

我们之前的博客已经完成了将队列,交换机,绑定写入到了数据库中,并且完成了一次对数据库的封装. 接下来我们将消息写入到文件中.实现消息的持久化存储

设计思想

我们往硬盘中存放的消息, 只是简单的 存 和 取 还有删除 , 并不涉及 复杂的增删改查, 而且消息的数量可能会非常多, 往数据库中存放效率不高

我们之前说过, 消息 是 依附于队列来存在的, 所以存储的时候 就要按照队列的分类来存储 ,在一个虚拟主机中, 会有多个队列 , 每个队列下面, 放着一个文件, 文件中存储我们的消息, 那一个文件不可能只存储一个消息, 而将多个消息存储到一个文件中, 我们需要知道每个消息 的开头 与结尾, 这里我们每个队列下面 存放 俩个文件, 一个文件是单纯的消息文件 , 一个文件是用来存储 这个消息文件中的消息的总数目, 和有效文件的数目, 方便我们后续进行垃圾回收算法的设计

消息文件的格式

我们对存储多个消息的文件 , 进行消息格式的设计 ,我们约定 每个消息的前4个字节用来存放, 这个消息的长度 , 然后跟上 消息本体, 依此内推 , 如图
在这里插入图片描述

对于我们的Broker server 服务器来说, 消息是需要新增 和删除的, 因此 在文件中采取逻辑删除的形式来删除一个消息

垃圾回收的思想

对于一个文件中无效消息过多的情况 , 我们采取垃圾回收 来 清楚这些无效消息, 我们约定 ,加入一个文件中的总消息数量大于2000 , 且无效消息 >50% ,时我们进行垃圾回收, 垃圾回收具体实现为: 我们采取复制算法, 也就是 假如一个文件触发了垃圾回收, 我们就将这个文件中的有效消息, 挪到一个新文件中, 在将旧文件删除掉 , 并将这个新文件的名称改回旧文件的名称

其他情况

我们上面说了 , 垃圾消息过多的情况, 还有一种情况是 ,假如一个文件全部都是有效消息, 并且还不断的持续添加中, 这种状况势必会让文件越来越大, 当文件过大的时候, 我们想要查找消息 就显得有点困难, 因此我们势必要针对这种情况作出处理, 我们约定: 当一个文件过大的时候 ,我们就将它一份为2 , 分成俩个具体的小文件, 同理当一个文件垃圾回收后过小的时候, 我们就将这俩个小文件合并成一个文件

具体思路: 用一个队列来存放每个文件的大小是多少,消息数目的多少, 无效消息是多少,

序列化和反序列化

因为我们存储的是二进制文件 , 所以 势必要序列化和反序列化 消息的, 序列化常用的方式 有 JSON 格式, 但是由于 JSON格式序列化之后产生的是文本文件, 这里我们想要存储的是二进制文件 ,所以我们采用二进制化的序列化方式 , 使用JAVA标准库提供的 ObjectInputStream 和 ObjectOutputStream 的方式来序列化 和反序列化

package com.example.demo.Common;

import java.io.*;
// 下列的逻辑, 并不仅仅是 Message, 其他的 Java 中的对象, 也是可以通过这样的逻辑进行序列化和反序列化的.
// 如果要想让这个对象能够序列化或者反序列化, 需要让这个类能够实现 Serializable 接口.
public class BinaryTool {
    
    

    //  序列化一个对象
    public static byte[] toBytes(Object object) throws IOException {
    
    
        // 这个流对象相当于一个变长的字节数组.
        // 就可以把 object 序列化的数据给逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]
        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
    
    
            try (ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)){
    
    
                // 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到
                // ObjectOutputStream 中.
                // 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream, 最终结果就写入到 ByteArrayOutputStream 里了
                outputStream.writeObject(object);
            }
            // 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来, 转成 byte[]
            return byteArrayOutputStream.toByteArray();
        }
    }
    // 反序列化一个对象
    public static Object toObject (byte[] bytes) throws IOException, ClassNotFoundException {
    
    
        Object object = null;
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)){
    
    
            try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){
    
    
                // 此处的 readObject, 就是从 data 这个 byte[] 中读取数据并进行反序列化.
                object =objectInputStream.readObject();

            }

        }
        return object;
    }
}

具体代码实现

该类设计方法
在这里插入图片描述
首先我们定义一个内部来,用来标识消息的有效消息数目和总的消息数目,方便我们后序对消息文件就行写入

package com.example.demo.mqServer.dataCenter;

import com.example.demo.Common.BinaryTool;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;

import java.io.*;
import java.util.LinkedList;
import java.util.Scanner;

//通过这个类, 来针对硬盘上的消息进行管理
public class MessageFileManager {
    
    
    // 定义一个内部类, 来表示该队列的统计信息
    // 有限考虑使用 static, 静态内部类.
    static public class Stat {
    
    
        // 此处直接定义成 public, 就不再搞 get set 方法了.
        // 对于这样的简单的类, 就直接使用成员, 类似于 C 的结构体了.
        public int totalCount;  // 总消息数量
        public int validCount;  // 有效消息数量
    }
    // 预定消息文件所在的目录和文件名
    // 这个方法, 用来获取到指定队列对应的消息文件所在路径
    private String getQueueDir(String queueName){
    
    
        return "./data/"+queueName;
    }

    // 这个方法用来获取该队列的消息数据文件路径
    private String getQueueDataPath(String queueName){
    
    
        return getQueueDir(queueName)+"/queue_data.txt";
    }
    // 这个方法用来获取该队列的消息统计文件路径
    private String getQueueStatPath(String queueName){
    
    
        return getQueueDir(queueName)+"/queue_stat.txt";
    }
    public void init(){
    
    
        // 暂时不需要单独初始化方法, 只是列出来
    }
    //    读写 消息统计文件
    private Stat readStat(String queueName){
    
    
        Stat stat = new Stat();
        try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))){
    
    
            Scanner scanner = new Scanner(inputStream);
            stat.totalCount = scanner.nextInt();
            stat.validCount = scanner.nextInt();
            return stat;

        } catch (IOException e) {
    
    
            e.printStackTrace();
        }
        return null;
    }

    private void writeStat(String queueName , Stat stat){
    
    
        // 使用 PrintWrite 来写文件.
        // OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.
        try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))){
    
    
            PrintWriter printWriter = new PrintWriter(outputStream);
            printWriter.write(stat.totalCount +"\t"+stat.validCount);
            printWriter.flush();
        }catch (IOException e) {
    
    
            e.printStackTrace();
        }
    }

//    创建文件目录与相对应的文件
    public void createQueueFiles(String queueName) throws IOException {
    
    
        File baseDir = new File(getQueueDir(queueName));
        if (!baseDir.exists()){
    
    
            // 如果不存在就去创建它
            boolean ok = baseDir.mkdirs();
            if (!ok){
    
    
                throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath());
            }
        }
        // 2. 创建队列数据文件
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
    
    
            boolean ok = queueDataFile.createNewFile();
            if (!ok) {
    
    
                throw new IOException("创建文件失败! queueDataFile=" + queueDataFile.getAbsolutePath());
            }
        }
        // 3. 创建消息统计文件
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
    
    
            boolean ok = queueStatFile.createNewFile();
            if (!ok) {
    
    
                throw new IOException("创建文件失败! queueStatFile=" + queueStatFile.getAbsolutePath());
            }
        }
        // 4. 给消息统计文件设置初始值
        Stat stat = new Stat();
        stat.validCount = 0;
        stat.totalCount =0;
        writeStat(queueName,stat);
    }

    // 删除队列的目录和文件.
    // 队列也是可以被删除的. 当队列删除之后, 对应的消息文件啥的, 自然也要随之删除.
    public void destroyQueueFiles(String queueName) throws IOException {
    
    
        //先删除里面的文件, 在删除目录
        File  queueDataFile = new File(getQueueDataPath(queueName));
        boolean isQueueData = queueDataFile.delete();
        File queueStatFile = new File(getQueueStatPath(queueName));
        boolean isQueueStat = queueStatFile.delete();
        File queueDir = new File(getQueueDir(queueName));
        boolean isQueueDir = queueDir.delete();
        if (!isQueueData || !isQueueStat || !isQueueDir){
    
    
            // 只要有一个没有删除成功, 说明整体删除失败,
            throw new IOException("删除队列目录和文件失败! baseDir=" + queueDir.getAbsolutePath());
        }
    }

    // 检查队列的目录 与 文件是否存在
    // 比如后续有生产者给 broker server 生产消息了, 这个消息就可能需要记录到文件上(取决于消息是否要持久化)
    public boolean checkFilesExits(String queueName) {
    
    
        // 判定队列的数据文件和统计文件是否都存在!!
        File queueDataFile = new File(getQueueDataPath(queueName));
        if (!queueDataFile.exists()) {
    
    
            return false;
        }
        File queueStatFile = new File(getQueueStatPath(queueName));
        if (!queueStatFile.exists()) {
    
    
            return false;
        }
        return true;
    }



    // 这个方法用来把一个新的消息, 放到队列对应的文件中.
    // queue 表示要把消息写入的队列. message 则是要写的消息.
    public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
    
    
        // 先检查队列是否合法
        if (!checkFilesExits(queue.getName())){
    
    
            throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName());
        }
        // 2. 把 Message 对象, 进行序列化, 转成二进制的字节数组.
        byte[] messageBinary = BinaryTool.toBytes(message);
        synchronized (queue){
    
    
            // 3. 先获取到当前的队列数据文件的长度, 用这个来计算出该 Message 对象的 offsetBeg 和 offsetEnd
            // 把新的 Message 数据, 写入到队列数据文件的末尾. 此时 Message 对象的 offsetBeg , 就是当前文件长度 + 4
            // offsetEnd 就是当前文件长度 + 4 + message 自身长度.
            File queueDataFile = new File(getQueueDataPath(queue.getName()));
            // 设置 beg 和 end 值
            message.setOffsetBeg(queueDataFile.length()+4);
            message.setOffsetEnd(queueDataFile.length()+4+messageBinary.length);
            // 4. 写入消息到数据文件, 注意, 是追加写入到数据文件末尾.
            try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {
    
    
                // DataOutputStream 封装了 将int 一个个字节写入 方法
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
    
    
                    // 接下来要先写当前消息的长度, 占据 4 个字节的~~
                    dataOutputStream.writeInt(messageBinary.length);
                    // 写入消息本体
                    dataOutputStream.write(messageBinary);
                }
            }
            // 5. 更新消息的统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount+=1;
            stat.validCount+=1;
            writeStat(queue.getName(),stat);
        }
    }

    // 这个是删除消息的方法.
    // 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置成 0
    // 1. 先把文件中的这一段数据, 读出来, 还原回 Message 对象;
    // 2. 把 isValid 改成 0;
    // 3. 把上述数据重新写回到文件.
    // 此处这个参数中的 message 对象, 必须得包含有效的 offsetBeg 和 offsetEnd
    public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {
    
    
        synchronized (queue) {
    
    
            try (RandomAccessFile accessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {
    
    
                accessFile.seek(message.getOffsetBeg());
                // 1. 先从文件中读取对应的 Message 数据.
                byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];
                accessFile.read(bufferSrc);
                // 2. 把当前读出来的二进制数据, 转换回成 Message 对象
                Message message1 = (Message) BinaryTool.toObject(bufferSrc);
                // 3. 把 isValid 设置为无效.
                message1.setIsValid((byte) 0x0);
                // 此处不需要给参数的这个 message 的 isValid 设为 0, 因为这个参数代表的是内存中管理的 Message 对象
                // 而这个对象马上也要被从内存中销毁了.
                // 4. 重新写入文件
                byte[] bufferDest = BinaryTool.toBytes(message1);
                // 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到
                // 下一个消息的位置了. 因此要想让接下来的写入, 能够刚好写回到之前的位置, 就需要重新调整文件光标.
                accessFile.seek(message.getOffsetBeg());
                accessFile.write(bufferDest);
                // 通过上述这通折腾, 对于文件来说, 只是有一个字节发生改变而已了~~
            }
            // 不要忘了, 更新统计文件!! 把一个消息设为无效了, 此时有效消息个数就需要 - 1
            Stat stat = readStat(queue.getName());
            if (stat.validCount > 0) {
    
    
                stat.validCount -= 1;
            }
            writeStat(queue.getName(), stat);
        }
    }

    // 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)
    // 这个方法, 准备在程序启动的时候, 进行调用.
    // 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.
    // 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.
    // 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.
    public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {
    
    
        LinkedList<Message> messages = new LinkedList<>();
        try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){
    
    
            try (DataInputStream dataInputStream = new DataInputStream(inputStream)){
    
    
                // 这个变量记录当前文件光标.
                long currentOffset = 0;
                // 一个文件中包含了很多消息, 此处势必要循环读取.
                while (true){
    
    
                    // 1. 读取当前消息的长度, 这里的 readInt 可能会读到文件的末尾(EOF)
                    //    readInt 方法, 读到文件末尾, 会抛出 EOFException 异常. 这一点和之前的很多流对象不太一样.
                    int messageSize = dataInputStream.readInt();
                    // 2 将消息按照这个长度读取到数组中
                    byte[] buffer =  new byte[messageSize];
                    int actualSize = dataInputStream.read(buffer);
                    if (messageSize != actualSize) {
    
    
                        // 如果不匹配, 说明文件有问题, 格式错乱了!!
                        throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);
                    }
                    // 3 将读到的数据 反序列化 成 message 对象
                    Message message =(Message) BinaryTool.toObject(buffer);
                    // 4 判断当前对象是否 有效
                    if (message.getIsValid() != 0x1){
    
    
                        // 说明是无效数据, 直接跳过
                        // 虽然消息是无效数据, 但是 offset 不要忘记更新.
                        currentOffset += (4 + messageSize);
                        continue;
                    }
                    // 5. 有效数据, 则需要把这个 Message 对象加入到链表中. 加入之前还需要填写 offsetBeg 和 offsetEnd
                    //    进行计算 offset 的时候, 需要知道当前文件光标的位置的. 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置
                    //    因此就需要手动计算下文件光标.
                    message.setOffsetBeg(currentOffset + 4);
                    message.setOffsetEnd(currentOffset + 4 + messageSize);
                    currentOffset += (4 + messageSize);
                    messages.add(message);
                }
            }catch (EOFException e) {
    
    
                // 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.
                // 这个 catch 语句中也不需要做啥特殊的事情
                System.out.println("[MessageFileManager] 恢复 Message 数据完成!");
            }
        }
        return messages;
    }

    // 检查当前是否要针对该队列的消息数据文件进行 GC
    public boolean checkGC(String queueName) {
    
    
        // 判断是否要GC , 是根据总消息总数 和 有效 消息总数, 这俩个值都是在消息统计文件中的
        Stat stat = readStat(queueName);
        if (stat.totalCount > 2000 && (double)stat.totalCount / (double) stat.validCount <0.5){
    
    
            return true;
        }
        return false;
    }
    private String getQueueDataNewPath(String queueName) {
    
    
        return getQueueDir(queueName) + "/queue_data_new.txt";
    }
    // 通过这个方法, 真正执行消息数据文件的垃圾回收操作.
    // 使用复制算法来完成.
    // 创建一个新的文件, 名字就是 queue_data_new.txt
    // 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.
    // 删除旧的文件, 再把新的文件改名回 queue_data.txt
    // 同时要记得更新消息统计文件.
    public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
    
    
        // 进行 gc 的时候, 是针对消息数据文件进行大洗牌. 在这个过程中, 其他线程不能针对该队列的消息文件做任何修改.
        synchronized (queue){
    
    
            // 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.
            long gcBeg = System.currentTimeMillis();
            // 1. 创建一个新的文件
            File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));
            if (queueDataNewFile.exists()) {
    
    
                // 正常情况下, 这个文件不应该存在. 如果存在, 就是意外~~ 说明上次 gc 了一半, 程序意外崩溃了.
                throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName());
            }
            // 如果没有新文件, 就去创建新文件
            boolean ok = queueDataNewFile.createNewFile();
            if (!ok) {
    
    
                throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());
            }
            //2 . 从旧的文件中读取所有有效对象 , 直接调用
            LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());
            //3 . 将有效对象 写入到新文件中

            try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {
    
    
                try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {
    
    
                    for (Message message : messages) {
    
    
                        byte[] buffer = BinaryTool.toBytes(message);
                        // 先写四个字节消息的长度
                        dataOutputStream.writeInt(buffer.length);
                        dataOutputStream.write(buffer);
                    }
                }
            }

            // 4. 删除旧的数据文件, 并且把新的文件进行重命名
            File queueDataOldFile = new File(getQueueDataPath(queue.getName()));
            ok = queueDataOldFile.delete();
            if (!ok) {
    
    
                throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 把 queue_data_new.txt => queue_data.txt
            ok = queueDataNewFile.renameTo(queueDataOldFile);
            if (!ok) {
    
    
                throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()
                        + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());
            }
            // 5. 更新统计文件
            Stat stat = readStat(queue.getName());
            stat.totalCount = messages.size();
            stat.validCount = messages.size();
            writeStat(queue.getName(), stat);

            long gcEnd = System.currentTimeMillis();
            System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="
                    + (gcEnd - gcBeg) + "ms");


        }
    }
}

测试消息文件

package com.example.demo.mqServer.dataCenter;

import com.example.demo.Common.MqException;
import com.example.demo.mqServer.core.MSGQueue;
import com.example.demo.mqServer.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;

import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
class MessageFileManagerTest {
    
    

    private MessageFileManager messageFileManager = new MessageFileManager();

    private static final String queueName1 = "testQueue1";
    private static final String queueName2 = "testQueue2";

    // 这个方法是每个用例执行之前的准备工作
    @BeforeEach
    public void setUp() throws IOException {
    
    
        // 准备阶段, 创建出两个队列, 以备后用
        messageFileManager.createQueueFiles(queueName1);
        messageFileManager.createQueueFiles(queueName2);
    }

    // 这个方法就是每个用例执行完毕之后的收尾工作
    @AfterEach
    public void tearDown() throws IOException {
    
    
        // 收尾阶段, 就把刚才的队列给干掉.
        messageFileManager.destroyQueueFiles(queueName1);
        messageFileManager.destroyQueueFiles(queueName2);
    }

    @Test
    public void testCreateFiles() {
    
    
        // 创建队列文件已经在上面 setUp 阶段执行过了. 此处主要是验证看看文件是否存在.
        File queueDataFile1 = new File("./data/" + queueName1 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile1.isFile());
        File queueStatFile1 = new File("./data/" + queueName1 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile1.isFile());

        File queueDataFile2 = new File("./data/" + queueName2 + "/queue_data.txt");
        Assertions.assertEquals(true, queueDataFile2.isFile());
        File queueStatFile2 = new File("./data/" + queueName2 + "/queue_stat.txt");
        Assertions.assertEquals(true, queueStatFile2.isFile());
    }
    @Test
    public void testReadWriteStat() {
    
    
        MessageFileManager.Stat stat = new MessageFileManager.Stat();
        stat.totalCount = 100;
        stat.validCount = 50;
        // 此处就需要使用反射的方式, 来调用 writeStat 和 readStat 了.
        // Java 原生的反射 API 其实非常难用~~
        // 此处使用 Spring 帮我们封装好的 反射 的工具类.
        ReflectionTestUtils.invokeMethod(messageFileManager, "writeStat", queueName1, stat);

        // 写入完毕之后, 再调用一下读取, 验证读取的结果和写入的数据是一致的.
        MessageFileManager.Stat newStat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(100, newStat.totalCount);
        Assertions.assertEquals(50, newStat.validCount);
        System.out.println("测试 readStat 和 writeStat 完成!");
    }

    private MSGQueue createTestQueue(String queueName) {
    
    
        MSGQueue queue = new MSGQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setAutoDelete(false);
        queue.setExclusive(false);
        return queue;
    }

    private Message createTestMessage(String content) {
    
    
        Message message = Message.createMessageWithId("testRoutingKey", null, content.getBytes());
        return message;
    }

    @Test
    public void testSendMessage() throws IOException, MqException, ClassNotFoundException {
    
    
        // 构造出消息, 并且构造出队列.
        Message message = createTestMessage("testMessage");
        // 此处创建的 queue 对象的 name, 不能随便写, 只能用 queueName1 和 queueName2. 需要保证这个队列对象
        // 对应的目录和文件啥的都存在才行.
        MSGQueue queue = createTestQueue(queueName1);

        // 调用发送消息方法
        messageFileManager.sendMessage(queue, message);

        // 检查 stat 文件.
        MessageFileManager.Stat stat = ReflectionTestUtils.invokeMethod(messageFileManager, "readStat", queueName1);
        Assertions.assertEquals(1, stat.totalCount);
        Assertions.assertEquals(1, stat.validCount);

        // 检查 data 文件 对比 插入获取到的消息与 准备插入的消息是否一致
        LinkedList<Message> messages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(1, messages.size());
        Message curMessage = messages.get(0);
        Assertions.assertEquals(message.getMessageId(), curMessage.getMessageId());
        Assertions.assertEquals(message.getRoutingKey(), curMessage.getRoutingKey());
        Assertions.assertEquals(message.getDeliverMode(), curMessage.getDeliverMode());
        // 比较两个字节数组的内容是否相同, 不能直接使用 assertEquals 了.
        Assertions.assertArrayEquals(message.getBody(), curMessage.getBody());

        System.out.println("message: " + curMessage);
    }


    @Test
    public void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
    
    
        // 往队列中插入 100 条消息, 然后验证看看这 100 条消息从文件中读取之后, 是否和最初是一致的.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
    
    
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 读取所有消息
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(expectedMessages.size(), actualMessages.size());
        for (int i = 0; i < expectedMessages.size(); i++) {
    
    
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }

    @Test
    public void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
    
    
        // 创建队列, 写入 10 个消息. 删除其中的几个消息. 再把所有消息读取出来, 判定是否符合预期.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 10; i++) {
    
    
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 删除其中的三个消息
        messageFileManager.deleteMessage(queue, expectedMessages.get(7));
        messageFileManager.deleteMessage(queue, expectedMessages.get(8));
        messageFileManager.deleteMessage(queue, expectedMessages.get(9));

        // 对比这里的内容是否正确.
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(7, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
    
    
            Message expectedMessage = expectedMessages.get(i);
            Message actualMessage = actualMessages.get(i);
            System.out.println("[" + i + "] actualMessage=" + actualMessage);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
    }


    @Test
    public void testGC() throws IOException, MqException, ClassNotFoundException {
    
    
        // 先往队列中写 100 个消息. 获取到文件大小.
        // 再把 100 个消息中的一半, 都给删除掉(比如把下标为偶数的消息都删除)
        // 再手动调用 gc 方法, 检测得到的新的文件的大小是否比之前缩小了.
        MSGQueue queue = createTestQueue(queueName1);
        List<Message> expectedMessages = new LinkedList<>();
        for (int i = 0; i < 100; i++) {
    
    
            Message message = createTestMessage("testMessage" + i);
            messageFileManager.sendMessage(queue, message);
            expectedMessages.add(message);
        }

        // 获取 gc 前的文件大小
        File beforeGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long beforeGCLength = beforeGCFile.length();

        // 删除偶数下标的消息
        for (int i = 0; i < 100; i += 2) {
    
    
            messageFileManager.deleteMessage(queue, expectedMessages.get(i));
        }

        // 手动调用 gc
        messageFileManager.gc(queue);

        // 重新读取文件, 验证新的文件的内容是不是和之前的内容匹配
        LinkedList<Message> actualMessages = messageFileManager.loadAllMessageFromQueue(queueName1);
        Assertions.assertEquals(50, actualMessages.size());
        for (int i = 0; i < actualMessages.size(); i++) {
    
    
            // 把之前消息偶数下标的删了, 剩下的就是奇数下标的元素了.
            // actual 中的 0 对应 expected 的 1
            // actual 中的 1 对应 expected 的 3
            // actual 中的 2 对应 expected 的 5
            // actual 中的 i 对应 expected 的 2 * i + 1
            Message expectedMessage = expectedMessages.get(2 * i + 1);
            Message actualMessage = actualMessages.get(i);

            Assertions.assertEquals(expectedMessage.getMessageId(), actualMessage.getMessageId());
            Assertions.assertEquals(expectedMessage.getRoutingKey(), actualMessage.getRoutingKey());
            Assertions.assertEquals(expectedMessage.getDeliverMode(), actualMessage.getDeliverMode());
            Assertions.assertArrayEquals(expectedMessage.getBody(), actualMessage.getBody());
            Assertions.assertEquals(0x1, actualMessage.getIsValid());
        }
        // 获取新的文件的大小
        File afterGCFile = new File("./data/" + queueName1 + "/queue_data.txt");
        long afterGCLength = afterGCFile.length();
        System.out.println("before: " + beforeGCLength);
        System.out.println("after: " + afterGCLength);
        Assertions.assertTrue(beforeGCLength > afterGCLength);
    }


}

猜你喜欢

转载自blog.csdn.net/qq_56454895/article/details/132050963