當前位置:
首頁 > 知識 > 基於Redis的消息隊列之生產消費者模式(明天寫發布/訂閱模式)

基於Redis的消息隊列之生產消費者模式(明天寫發布/訂閱模式)

基於Redis的消息隊列之生產消費者模式(明天寫發布/訂閱模式)

基於Redis


1、pom.xml

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.leo.common

common-parent

1.0-SNAPSHOT

../common-parent

org.leo.redis

redis-procon

${redis-procon.version}

war

基於Redis的生產消費者模式

redis-procon

http://maven.apache.org

UTF-8

org.leo.common

common-config

${common-config.version}

junit

junit

test

org.springframework

spring-test

org.apache.logging.log4j

log4j-slf4j-impl

test

org.apache.logging.log4j

log4j-core

org.springframework

spring-core

org.springframework

spring-context-support

org.springframework

spring-web

org.apache.commons

commons-pool2

org.springframework.data

spring-data-redis

redis.clients

jedis

這裡引入了我的一些共通工程,大家做的時候按照自己的實際情況修改即可。

我做成了war包的形式放在Tomcat里跑,大家可以做成jar包。


2、web.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance

http://www.springmodules.org/schema/cache/springmodules-cache.xsd

http://www.springmodules.org/schema/cache/springmodules-ehcache.xsd"

xsi:schemaLocation="http://java.sun.com/xml/ns/javaee

http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd

">

Redis生產消費者

contextConfigLocation

classpath:spring/applicationContext.xml

org.springframework.web.context.ContextLoaderListener

CharacterEncodingFilter

org.springframework.web.filter.CharacterEncodingFilter

encoding

UTF-8

CharacterEncodingFilter

/

注意applicationContext.xml配置的路徑,按照實際情況修改,我這裡是放在src/resources/spring下。


3、redis.properties

## redis

redis.host=192.168.56.104

redis.port=6379

redis.pwd=111111

redis.maxIdle=5

redis.maxTotal=10

redis.maxWaitMillis=10000

redis.testOnBorrow=true

裡面的配置請按照實際情況修改。


4、applicationContext.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

">

class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

5、applicationContext-redis.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:redis="http://www.springframework.org/schema/redis"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/redis

http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">

class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"

p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pwd}"

p:pool-config-ref="poolConfig" />

class="org.springframework.data.redis.serializer.StringRedisSerializer" />

class="org.springframework.data.redis.serializer.StringRedisSerializer">

6、RedisQueueDaoImpl

package org.leo.ssm.redis.dao.impl;

import org.leo.ssm.redis.dao.RedisQueueDao;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.stereotype.Repository;

@Repository

public class RedisQueueDaoImpl implements RedisQueueDao {

@Autowired

protected RedisTemplate redisTemplate;

@Override

public void lpush(String key, String value) {

redisTemplate.opsForList().leftPush(key, value);

}

public String lpop(String key) {

return redisTemplate.opsForList().rightPop(key);

}

}

介面就不附上了。


7、RedisQueueServiceImpl

package org.leo.ssm.redis.service.impl;

import org.leo.ssm.redis.dao.RedisQueueDao;

import org.leo.ssm.redis.service.RedisQueueService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class RedisQueueServiceImpl implements RedisQueueService {

@Autowired

private RedisQueueDao redisQueueDao;

@Override

public void lpush(String key, String value) {

redisQueueDao.lpush(key, value);

}

@Override

public String lpop(String key) {

return redisQueueDao.lpop(key);

}

}

介面就不附上了。


8、RedisRun

消費者具體執行類。UUID只是在本例中做標識用。


package org.leo.ssm.redis;

import java.util.UUID;

import org.leo.ssm.redis.service.RedisQueueService;

import org.springframework.beans.factory.annotation.Autowired;

public class RedisRun implements Runnable {

@Autowired

private RedisQueueService redisQueueService;

public volatile boolean exit = false;

public RedisRun(RedisQueueService redisQueueService) {

super();

this.redisQueueService = redisQueueService;

}

@Override

public void run() {

String runUUID = UUID.randomUUID().toString().toUpperCase();

int count = 0;

while (!exit) {

// tq是本消費者程序從Redis中取消息的key

String result = redisQueueService.lpop("tq");

if (null != result) {

// 取出消息之後進行業務處理

System.out.println(System.currentTimeMillis() + ":" + result + "--" + runUUID);

} else {

System.out.println("沒有取到信息,休息1秒");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

System.out.println("=========InterruptedException");

}

count++;

if (count == 10) {

System.out.println("連續10次沒有取到信息,休息10秒");

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

System.out.println("=========InterruptedException");

}

count = 0;

}

}

}

System.out.println("Thread Stop:" + runUUID);

}

}

消費者程序會不停地從Redis中取消息,考慮到如果長時間沒有消息進入隊列,這樣是蠻耗資源的,所以在程序後面加了休息的代碼,沒有從Redis取出消息,就休息1秒,連續10次沒有取出消息,就休息10秒。可以自己修改。

另外還要考慮任務處理失敗後消息的重發機制。代碼里不再贅述。


9、RedisInit

package org.leo.ssm.redis;

import java.util.UUID;

import org.leo.ssm.redis.service.RedisQueueService;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.stereotype.Component;

@Component("redisInit")

public class RedisInit implements InitializingBean, DisposableBean {

@Autowired

private RedisQueueService redisQueueService;

@Autowired

private ThreadPoolTaskExecutor taskExecutor;

private RedisRun redisRun;

private String threadUUID;

public RedisInit() {

threadUUID = UUID.randomUUID().toString().toUpperCase();

System.out.println("------RedisInit-----Start:" + threadUUID);

}

@Override

public void afterPropertiesSet() throws Exception {

redisRun = new RedisRun(redisQueueService);

taskExecutor.execute(redisRun);

}

@Override

public void destroy() throws Exception {

System.out.println("------RedisInit-----Destroy:" + threadUUID);

redisRun.exit = true;

for (;;) {

int count = taskExecutor.getActiveCount();

System.out.println("活躍的線程數 : " + count);

if (count == 0) {

taskExecutor.getThreadPoolExecutor().remove(redisRun);

taskExecutor.shutdown();

break;

} else {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

本類實現了InitializingBean, DisposableBean介面,保證了程序在Tomcat中啟動後將RedisRun載入至線程池並運行,以及在銷毀時移除並關閉。


10、生產者

package org.leo.ssm;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.leo.ssm.redis.service.RedisQueueService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("classpath:spring/applicationContext.xml")

public class TestPush {

@Autowired

private RedisQueueService redisQueueService;

@Test

public void testPush(){

redisQueueService.lpush("tq", "testQueue");

}

}

這是我在工程中寫的一個測試類,實際情況中不要這麼寫,因為要載入applicationContext.xml這導致這個生產者程序在啟動的時候,一個消費者也會啟動。

這個程序應該寫在別的工程中,其實關鍵代碼就一句話:


redisQueueService.lpush("tq", "你要發的消息");

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Java個人學習心得 的精彩文章:

Keepalived+Nginx高可用安裝部署(含Nginx+Tomcat負載均衡)
Redis3.2.9單機版安裝
Java操作Linux命令分割合并文本文件及其他
Dubbo管理控制台安裝

TAG:Java個人學習心得 |

您可能感興趣

Open Biology開放獲取辦刊模式和 刊載文章分析
《老滾5》Nvidia Freestyle模式演示 濾鏡下美炸天
Hitman開發人員在批評之後調整發布模式 Parable創造者的新遊戲
Android端Firefox 59發布:支持 HLS播放 改進隱私瀏覽模式
《蒼翼默示錄:Cross Tag Battle》新角色及模式公布
Teradata向訂閱模式的轉型初顯成效
剖析關於-ansible配置文件和命令中ad-hoc模式使用參數詳解
Flutter 的編譯模式
谷歌發布 Android P Beta 3 暗黑模式迎來優化
Scott Brinker:數字改變營銷運作模式
谷歌Chrome瀏覽器將獲原生暗黑模式
《Avengers: Infinity War》將與《Fortnite》推出聯乘模式
如何快速進入微軟Edge瀏覽器InPrivate模式
谷歌發布新款Chrome瀏覽器:帶「影院模式」
谷歌發布新款 Chrome 瀏覽器:帶「影院模式」
雙HomePod立體聲模式叫做FullRoom
iOS 12 beta 5 開發者測試版暗示「iPhone X Plus」將有 iPad 的橫屏模式
Zynga的《CSR Racing 2》AR模式進入Android設備
Nacos發布 v0.2 版本,支持 Spring Cloud 微服務高可用集群模式
海康、UCLA、北理聯合提出3D DescriptorNet:可按條件生成3D形狀,克服模式崩潰