本文共 2205 字,大约阅读时间需要 7 分钟。
问题:做的项目中,出现了mq消息大量积压,一方面是入队数量很多,另一方面就是消费很慢,解决消费慢的问题,这边主要采用了两种方式
/*** 线程池*/public final class ThreadPoolUtil { /** * 默认线程数 */ private static final int DEFAULT_POOL_SIZE = 20; /** * 默认阻塞队列数1024 */ private static final int DEFAULT_CAPACITY = 1 << 10; /** * 最大线程数 */ private static final int MAX_POOL_SIZE = 40; /** * 最大阻塞队列数2048 */ private static final int MAX_CAPACITY = 1 << 11; /** * 默认线程空闲时长(seconds) */ private static final long KEEP_ALIVE_TIME = 200L; /** * 获取JDK的线程池 * @return */ public static ThreadPoolExecutor getThreadPool(String threadPoolName) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadPoolName + "_thread").build(); return new ThreadPoolExecutor(DEFAULT_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue(DEFAULT_CAPACITY), threadFactory, new ThreadPoolExecutor.DiscardPolicy()); }}/*** 处理消费*/@Testpublic void testThread() throws InterruptedException { ThreadPoolExecutor executorService = ThreadPoolUtil.getThreadPool("executeMessage"); List messages = listener(); //countDownLatch阻塞主线程 CountDownLatch countDownLatch = new CountDownLatch(messages.size()); for (Message message : messages) { executorService.execute(() -> { System.out.println(Thread.currentThread().getId() + " 线程消费消息: " + JacksonUtil.obj2json(message)); try { Thread.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } countDownLatch.await();}
这里使用countdownLatch
的目的有两个:一个是控制线程数量,避免将线程池打满,导致走阻塞队列或者拒绝策略;另一个是阻塞主线程,减轻机器压力。
mq消费的机制就是,这个机器从消息队列中抓取消息后,会加锁,当机器处理完毕返回一个ack标识,则解锁,继续抓取后面的消息。
如果我们不阻塞主线程,开线程的时候,主线程直接执行完毕返回ack,而实际上机器并没有处理完毕,随后就是一波又一波的消息涌入,从而将机器压垮。
如果机器性能完全hold的住,那么我们完全可以将mq的压力转移到机器上。
ListmessageList = Lists.newArrayList();messageList.addAll(messages);//....我们可以将这个messageList开线程处理或存入缓存处理或再存入缓冲消息队列处理return; //直接返回ack
转载地址:http://ntgzi.baihongyu.com/