博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列积压问题处理
阅读量:3963 次
发布时间:2019-05-24

本文共 2205 字,大约阅读时间需要 7 分钟。

问题:做的项目中,出现了mq消息大量积压,一方面是入队数量很多,另一方面就是消费很慢,解决消费慢的问题,这边主要采用了两种方式

1、多线程消费
/*** 线程池*/public final class ThreadPoolUtil {
/** * 默认线程数 */ private static final int DEFAULT_POOL_SIZE = 20; /** * 默认阻塞队列数1024 */ private static final int DEFAULT_CAPACITY = 1 << 10; /** * 最大线程数 */ private static final int MAX_POOL_SIZE = 40; /** * 最大阻塞队列数2048 */ private static final int MAX_CAPACITY = 1 << 11; /** * 默认线程空闲时长(seconds) */ private static final long KEEP_ALIVE_TIME = 200L; /** * 获取JDK的线程池 * @return */ public static ThreadPoolExecutor getThreadPool(String threadPoolName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadPoolName + "_thread").build(); return new ThreadPoolExecutor(DEFAULT_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue
(DEFAULT_CAPACITY), threadFactory, new ThreadPoolExecutor.DiscardPolicy()); }}/*** 处理消费*/@Testpublic void testThread() throws InterruptedException {
ThreadPoolExecutor executorService = ThreadPoolUtil.getThreadPool("executeMessage"); List
messages = listener(); //countDownLatch阻塞主线程 CountDownLatch countDownLatch = new CountDownLatch(messages.size()); for (Message message : messages) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getId() + " 线程消费消息: " + JacksonUtil.obj2json(message)); try {
Thread.sleep(1L); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
countDownLatch.countDown(); } }); } countDownLatch.await();}

这里使用countdownLatch的目的有两个:一个是控制线程数量,避免将线程池打满,导致走阻塞队列或者拒绝策略;另一个是阻塞主线程,减轻机器压力。

mq消费的机制就是,这个机器从消息队列中抓取消息后,会加锁,当机器处理完毕返回一个ack标识,则解锁,继续抓取后面的消息。

如果我们不阻塞主线程,开线程的时候,主线程直接执行完毕返回ack,而实际上机器并没有处理完毕,随后就是一波又一波的消息涌入,从而将机器压垮。

2、压力转移到机器

如果机器性能完全hold的住,那么我们完全可以将mq的压力转移到机器上。

List
messageList = Lists.newArrayList();messageList.addAll(messages);//....我们可以将这个messageList开线程处理或存入缓存处理或再存入缓冲消息队列处理return; //直接返回ack

转载地址:http://ntgzi.baihongyu.com/

你可能感兴趣的文章
18-python之while循环,for循环与else的配合
查看>>
19-python之字符串简单介绍
查看>>
20-python之切片详细介绍
查看>>
P24-c++类继承-01详细的例子演示继承的好处
查看>>
P8-c++对象和类-01默认构造函数详解
查看>>
P1-c++函数详解-01函数的默认参数
查看>>
P3-c++函数详解-03函数模板详细介绍
查看>>
P4-c++函数详解-04函数重载,函数模板和函数模板重载,编译器选择使用哪个函数版本?
查看>>
P5-c++内存模型和名称空间-01头文件相关
查看>>
P6-c++内存模型和名称空间-02存储连续性、作用域和链接性
查看>>
P9-c++对象和类-02构造函数和析构函数总结
查看>>
P10-c++对象和类-03this指针详细介绍,详细的例子演示
查看>>
[心情] 如果有一天
查看>>
[Linux] 常用 linux 系统命令及维护备忘
查看>>
[Linux] 关于 Ext4 HowTo
查看>>
[杂记] 新年物语&关于Mysql引擎性能测试
查看>>
[HTML5] 关于HTML5(WebGL)的那点事
查看>>
自我反思
查看>>
初识网络编程
查看>>
东北赛选拔教训
查看>>