基於Redis的消息隊列之生產消費者模式(明天寫發布/訂閱模式)
基於Redis
1、pom.xml
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
org.leo.common
common-parent
1.0-SNAPSHOT
../common-parent
org.leo.redis
redis-procon
${redis-procon.version}
war
基於Redis的生產消費者模式
redis-procon
http://maven.apache.org
UTF-8
org.leo.common
common-config
${common-config.version}
junit
junit
test
org.springframework
spring-test
org.apache.logging.log4j
log4j-slf4j-impl
test
org.apache.logging.log4j
log4j-core
org.springframework
spring-core
org.springframework
spring-context-support
org.springframework
spring-web
org.apache.commons
commons-pool2
org.springframework.data
spring-data-redis
redis.clients
jedis
這裡引入了我的一些共通工程,大家做的時候按照自己的實際情況修改即可。
我做成了war包的形式放在Tomcat里跑,大家可以做成jar包。
2、web.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
http://www.springmodules.org/schema/cache/springmodules-cache.xsd
http://www.springmodules.org/schema/cache/springmodules-ehcache.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd
">
Redis生產消費者
contextConfigLocation
classpath:spring/applicationContext.xml
org.springframework.web.context.ContextLoaderListener
CharacterEncodingFilter
org.springframework.web.filter.CharacterEncodingFilter
encoding
UTF-8
CharacterEncodingFilter
/
注意applicationContext.xml配置的路徑,按照實際情況修改,我這裡是放在src/resources/spring下。
3、redis.properties
## redis
redis.host=192.168.56.104
redis.port=6379
redis.pwd=111111
redis.maxIdle=5
redis.maxTotal=10
redis.maxWaitMillis=10000
redis.testOnBorrow=true
裡面的配置請按照實際情況修改。
4、applicationContext.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
">
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
5、applicationContext-redis.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:redis="http://www.springframework.org/schema/redis"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/redis
http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
p:host-name="${redis.host}" p:port="${redis.port}" p:password="${redis.pwd}"
p:pool-config-ref="poolConfig" />
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
class="org.springframework.data.redis.serializer.StringRedisSerializer">
6、RedisQueueDaoImpl
package org.leo.ssm.redis.dao.impl;
import org.leo.ssm.redis.dao.RedisQueueDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Repository;
@Repository
public class RedisQueueDaoImpl implements RedisQueueDao {
@Autowired
protected RedisTemplate
redisTemplate; @Override
public void lpush(String key, String value) {
redisTemplate.opsForList().leftPush(key, value);
}
public String lpop(String key) {
return redisTemplate.opsForList().rightPop(key);
}
}
介面就不附上了。
7、RedisQueueServiceImpl
package org.leo.ssm.redis.service.impl;
import org.leo.ssm.redis.dao.RedisQueueDao;
import org.leo.ssm.redis.service.RedisQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RedisQueueServiceImpl implements RedisQueueService {
@Autowired
private RedisQueueDao redisQueueDao;
@Override
public void lpush(String key, String value) {
redisQueueDao.lpush(key, value);
}
@Override
public String lpop(String key) {
return redisQueueDao.lpop(key);
}
}
介面就不附上了。
8、RedisRun
消費者具體執行類。UUID只是在本例中做標識用。
package org.leo.ssm.redis;
import java.util.UUID;
import org.leo.ssm.redis.service.RedisQueueService;
import org.springframework.beans.factory.annotation.Autowired;
public class RedisRun implements Runnable {
@Autowired
private RedisQueueService redisQueueService;
public volatile boolean exit = false;
public RedisRun(RedisQueueService redisQueueService) {
super();
this.redisQueueService = redisQueueService;
}
@Override
public void run() {
String runUUID = UUID.randomUUID().toString().toUpperCase();
int count = 0;
while (!exit) {
// tq是本消費者程序從Redis中取消息的key
String result = redisQueueService.lpop("tq");
if (null != result) {
// 取出消息之後進行業務處理
System.out.println(System.currentTimeMillis() + ":" + result + "--" + runUUID);
} else {
System.out.println("沒有取到信息,休息1秒");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("=========InterruptedException");
}
count++;
if (count == 10) {
System.out.println("連續10次沒有取到信息,休息10秒");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("=========InterruptedException");
}
count = 0;
}
}
}
System.out.println("Thread Stop:" + runUUID);
}
}
消費者程序會不停地從Redis中取消息,考慮到如果長時間沒有消息進入隊列,這樣是蠻耗資源的,所以在程序後面加了休息的代碼,沒有從Redis取出消息,就休息1秒,連續10次沒有取出消息,就休息10秒。可以自己修改。
另外還要考慮任務處理失敗後消息的重發機制。代碼里不再贅述。
9、RedisInit
package org.leo.ssm.redis;
import java.util.UUID;
import org.leo.ssm.redis.service.RedisQueueService;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
@Component("redisInit")
public class RedisInit implements InitializingBean, DisposableBean {
@Autowired
private RedisQueueService redisQueueService;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private RedisRun redisRun;
private String threadUUID;
public RedisInit() {
threadUUID = UUID.randomUUID().toString().toUpperCase();
System.out.println("------RedisInit-----Start:" + threadUUID);
}
@Override
public void afterPropertiesSet() throws Exception {
redisRun = new RedisRun(redisQueueService);
taskExecutor.execute(redisRun);
}
@Override
public void destroy() throws Exception {
System.out.println("------RedisInit-----Destroy:" + threadUUID);
redisRun.exit = true;
for (;;) {
int count = taskExecutor.getActiveCount();
System.out.println("活躍的線程數 : " + count);
if (count == 0) {
taskExecutor.getThreadPoolExecutor().remove(redisRun);
taskExecutor.shutdown();
break;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
本類實現了InitializingBean, DisposableBean介面,保證了程序在Tomcat中啟動後將RedisRun載入至線程池並運行,以及在銷毀時移除並關閉。
10、生產者
package org.leo.ssm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.leo.ssm.redis.service.RedisQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring/applicationContext.xml")
public class TestPush {
@Autowired
private RedisQueueService redisQueueService;
@Test
public void testPush(){
redisQueueService.lpush("tq", "testQueue");
}
}
這是我在工程中寫的一個測試類,實際情況中不要這麼寫,因為要載入applicationContext.xml這導致這個生產者程序在啟動的時候,一個消費者也會啟動。
這個程序應該寫在別的工程中,其實關鍵代碼就一句話:
redisQueueService.lpush("tq", "你要發的消息");


※Keepalived+Nginx高可用安裝部署(含Nginx+Tomcat負載均衡)
※Redis3.2.9單機版安裝
※Java操作Linux命令分割合并文本文件及其他
※Dubbo管理控制台安裝
TAG:Java個人學習心得 |
※Open Biology開放獲取辦刊模式和 刊載文章分析
※《老滾5》Nvidia Freestyle模式演示 濾鏡下美炸天
※Hitman開發人員在批評之後調整發布模式 Parable創造者的新遊戲
※Android端Firefox 59發布:支持 HLS播放 改進隱私瀏覽模式
※《蒼翼默示錄:Cross Tag Battle》新角色及模式公布
※Teradata向訂閱模式的轉型初顯成效
※剖析關於-ansible配置文件和命令中ad-hoc模式使用參數詳解
※Flutter 的編譯模式
※谷歌發布 Android P Beta 3 暗黑模式迎來優化
※Scott Brinker:數字改變營銷運作模式
※谷歌Chrome瀏覽器將獲原生暗黑模式
※《Avengers: Infinity War》將與《Fortnite》推出聯乘模式
※如何快速進入微軟Edge瀏覽器InPrivate模式
※谷歌發布新款Chrome瀏覽器:帶「影院模式」
※谷歌發布新款 Chrome 瀏覽器:帶「影院模式」
※雙HomePod立體聲模式叫做FullRoom
※iOS 12 beta 5 開發者測試版暗示「iPhone X Plus」將有 iPad 的橫屏模式
※Zynga的《CSR Racing 2》AR模式進入Android設備
※Nacos發布 v0.2 版本,支持 Spring Cloud 微服務高可用集群模式
※海康、UCLA、北理聯合提出3D DescriptorNet:可按條件生成3D形狀,克服模式崩潰