taskModel :我的队列元素。
second:延时时间秒。
destinationName:队列名称,也就是redis key 关键字。
public Object pushQueue(TaskModel taskModel, Integer second,String destinationName){ //mq 实现方式 /*return producer.pushVoiceToQueue(taskModel,second,destinationName);*/ //redis 方式处理延时 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, second); int second3later = (int) (calendar.getTimeInMillis() / 1000); return redisUtil.zadd(destinationName, second3later, SerializeUtils.serialize(taskModel),1 ); }
receiveRediesQue() 方法外层需要有 循环调用
比如: while(true){
receiveRediesQue();
//应该还有 thread.sleep(1000);
}
// 这个方法目前每次只处理一条队列元素。
public void receiveRediesQue() { try { Set<Tuple> items = redisUtil.zrangeWithScores(Constant.VOICEQUE_NAME, 1); if(items == null || items.isEmpty()){ logger.info("获取延时队列,date:"+ Voice2TextDataUtils.getCurrentDate()+"暂无数据"); return; } logger.info("获取延时队列,有数据,开始处理 date:"+ Voice2TextDataUtils.getCurrentDate()+" items.size="+items.size()); int score = (int) ((Tuple)items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if(nowSecond >= score){ String taskModelStr = ((Tuple)items.toArray()[0]).getElement(); logger.info("获取延时序列化话单对象 taskModelStr:"+ taskModelStr); TaskModel taskModel = null; try { taskModel = SerializeUtils.deserialize(taskModelStr); } catch (Exception e) { logger.error("获取延时反序列号化话单对象,发生异常"+e.getMessage() ); e.printStackTrace(); } logger.info("获取延时反序列号化话单对象 taskModel:"+ JSONObject.toJSONString(taskModel)); Long num = redisUtil.zrem(Constant.VOICEQUE_NAME, taskModelStr,1); if( num != null && num>0){ if(null != taskModel){ //获取到,延时到期的队列元素,开始干活。。。。 }else { logger.error("获取话单对象错误,数据为空!" ); } } } } catch (Exception e) { logger.error("获取话单对象错误, 严重错误,请及时排查"+e.getMessage()); e.printStackTrace(); } }
public boolean zadd(String key, int seconds, String value, int indexDb) { boolean result = false; Jedis jedis = getJedis(); if (jedis == null) { return false; } try { value = StringUtils.isEmpty(value) ? "" : value; jedis.select(indexDb); jedis.zadd(key, seconds, value); result = true; } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis); } return result; } public long zrem(String key, String value, int indexDb) { long result = 0; Jedis jedis = getJedis(); if (jedis == null) { return result; } try { value = StringUtils.isEmpty(value) ? "" : value; jedis.select(indexDb); result = jedis.zrem(key, value); } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis); } return result; } public Set<Tuple> zrangeWithScores(String key, int indexDb) { Set<Tuple> result = new HashSet<>(); Jedis jedis = getJedis(); if (jedis == null) { return result; } try { jedis.select(indexDb); Set<Tuple> items = jedis.zrangeWithScores(key, 0, 1); result = items; } catch (Exception e) { e.printStackTrace(); } finally { returnResource(jedis); } return result; }