如何批量消费队列中数据

如何批量消费队列中数据

队列的消费模式

在我们实际开发过程中经常会处理各种大批量数据入库,这个时候我们就会到队列,将数据先写入队列中,然后开启多个消费线程慢慢消费入库。从队列中消费数据有两种方式:

  • 单条消费
  • 批量消费

我们今天分别来实现这两种消费方式

存数据到队列

存数据相对比较简单,这里我推荐大家使用BlockingQueue,该队列为阻塞队列,非常好用!

//创建队列  
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000000,true);  
  
/**  
 * 向队列中存放数据  
  * @param message  
 */public void saveQueueData(String message){  
  //存放数据  
  blockingQueue.offer("测试");  
}
ArrayBlockingQueue参数说明

ArrayBlockingQueue一共有三个重载方法

  • int capacity
    该参数表示当前定义队列的大小,也就是能存放多少条数据
  • boolean fair
    该参数表示访问该队列的策略是否公平。true:按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;false:访问顺序是不确定的
  • Collection<? extends E> c
    该参数是一个集合,表示将一个集合的数据存入该阻塞队列,相当于给该队列一个初始数据

单条消费

    /**
     * 从队列中单条消费数据
     */
    public void consumerBySingle() {
        while (true) {
            try {
                String take = blockingQueue.take();
                log.info("消费到的数据是:{}", take);
            } catch (Exception e) {
                log.error("缓存队列单条消费异常:{}", e.getMessage());
            }
        }
    }

可能会有人问,为什么用while(true),这样不是一个死循环么,那不是一直都在执行?其实并不是这样的,这就是为什么我推荐大家用BlockingQueue的原因,他是一个阻塞队列,take()这个方法是阻塞的,一段队列中没有数据,那么就不会继续往下执行,而是阻塞到这个地方,等对队列中有数据的时候才会继续执行

批量消费

    /**
     * 从队列中批量消费数据
     */
    public void consumerByBatch() {
        while (true) {
            try {
                List<String> list = new ArrayList<>();
                Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);
                log.info("批量消费到的数据是:{}", list);
            } catch (Exception e) {
                log.error("缓存队列批量消费异常:{}", e.getMessage());
            }
        }
    }

这里面用到一个很重要的东西Guava的Queues
需要导入如Guava的包,maven项目只需要在pom文件中添加:

<dependency>   
    <groupId>com.google.guava</groupId>    
    <artifactId>guava</artifactId>    
    <version>26.0-jre</version> 
</dependency>

Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);这个方法一共有5个参数

  • 第一个:传入你需要批量消费的队列
  • 第二个:传入一个用来接收批量消费到的数据
  • 第三个:批量消费数据的大小,这里我们给100,即意味着每次消费100条数据
  • 第四个:批量消费的等待的最大间隔,什么意思呢?比如说,我先在队列中只有10条数据,它不到100条,那按道理就不会消费,但是这样显然不合理,所以需要指定当超多多长时间,即使当前队列中数据低于我们设定的阈值也会消费
  • 第五个,这个就很好理解,就是指定第四个参数的单位,是秒是分钟还是小时等等

所以我这里就表示:每次批量消费100条数据,如果队列当前数据不够100条,那么等待1分钟然后将数据全部消费

本次的批量消费队列中数据就介绍完了,是不是觉得很简单呢?

发布了12 篇原创文章 · 获赞 33 · 访问量 7004

猜你喜欢

转载自blog.csdn.net/qq_22331931/article/details/105120808