@Component public class MutexConfig { @Value(value = "${zookeeper.host}") private String zkHost; public Boolean getLock(String lock, Integer acquireTime, Integer sleepTime) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkHost, retryPolicy); client.start(); InterProcessMutex mutex = new InterProcessMutex(client, lock); Boolean result = mutex.acquire(acquireTime, TimeUnit.MILLISECONDS); Thread.sleep(sleepTime); if (mutex.isAcquiredInThisProcess()) { mutex.release(); } client.close(); return result; } }
在配置文件里配置
zookeeper.path.smsxxx.xxtemplate=/dmp/smsxxx/xxtemplate
在定时任务实现类编写定时任务
@Value(value = "${zookeeper.path.smsxxx.xxtemplate}") private String jd; /** * 定时更新短信模板状态 */ @Scheduled(cron = "0 0/10 * * * ?") public void updateJdState() { try { logger.info("updateJdState start"); Boolean result = mutexConfig.getLock(jd, 2000, 3000); if (Boolean.TRUE.equals(result)) { logger.info("updateJdState content start"); smsTemplateService.updateJdState(); } logger.info("end"); } catch (Exception e) { logger.error("updateJdState error: {}", e.getMessage()); } }
编写dao类
public interface DistributeLockerDAO extends BaseDAO<LockerPO> { /** * insert one locker * @param lockerPo * @return true if success, false if the locker is duplicated */ boolean insert(LockerPO lockerPo); /** * the acquirer remove the locker with the id * @param id * @param acquirer * @return */ boolean remove(String id, String acquirer); /** * update locker's time held by the acquirer * @param id * @param acquirer * @param time * @return */ boolean updateShakenTime(String id, String acquirer, LocalDateTime time); /** * get the locker by the locker id * @param lockerId * @return */ Optional<LockerPO> getLocker(String lockerId); /** * list the lockers * @param offset * @param limit * @return */ List<LockerPO> list(long offset, int limit); /** * list the lockers with the acquirer * @param offset * @param limit * @param acquirer * @return */ List<LockerPO> list(String acquirer, long offset, int limit); @Data @Accessors(chain = true) class LockerCountOfAcquirer { @Field("_id") String acquirer; long count; } /** * list the locker count group by the user * @return the map user->count pair list */ List<LockerCountOfAcquirer> countByAcquirer(); }
锁的实体类
@Document(CollectionName.LOCKER) @Data @Accessors(chain = true) public class LockerPO implements IdPO { @Id private String id; private String acquirer; private LocalDateTime acquiredAt; private LocalDateTime shakenAt; }
锁的服务类
@Service public class DistributeLockerServiceImpl implements DistributeLockerService { private final DistributeLockerDAO distributeLockerDAO; private final Gauge gauge; private final List<String> lastSyncAcquires; public DistributeLockerServiceImpl( @Value(value = "${prometheus.namespace}") String namespace, @Value(value = "${prometheus.subsystem}") String subsystem, DistributeLockerDAO distributeLockerDAO, MeterRegistry registry ) { this.distributeLockerDAO = distributeLockerDAO; lastSyncAcquires = new ArrayList<>(); gauge = Gauge.build() .namespace(namespace) .subsystem(subsystem) .name("distribute_locker_total") .labelNames("user") .help("distribute locker statistic by the acquirer") .register(((PrometheusMeterRegistry)registry).getPrometheusRegistry()); } @Override public boolean tryLock(String lockerId, String user) { LocalDateTime dateTime = LocalDateTime.now(); boolean suc = distributeLockerDAO.insert( new LockerPO().setId(lockerId) .setAcquirer(user) .setAcquiredAt(dateTime) .setShakenAt(dateTime)); if (!suc) { suc = distributeLockerDAO.updateShakenTime(lockerId, user, dateTime); } return suc; } @Override public boolean release(String lockerId, String user) { return distributeLockerDAO.remove(lockerId, user); } @Override public boolean shake(String lockerId, String user) { return distributeLockerDAO.updateShakenTime(lockerId, user, LocalDateTime.now()); } @Override public Optional<LockerPO> getLocker(String lockerId) { return distributeLockerDAO.getLocker(lockerId); } @Override public List<LockerPO> list(long offset, int limit) { return distributeLockerDAO.list(offset, limit); } @Override public List<LockerPO> list(String user, long offset, int limit) { return distributeLockerDAO.list(user, offset, limit); } @Scheduled(fixedDelay = 500) public void monitorLockerCountOfAcquirer() { List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers = distributeLockerDAO.countByAcquirer(); updateMonitorOfLockerCountOfAcquirer(lockerCountOfAcquirers); List<String> acquires = lockerCountOfAcquirers.stream() .map(DistributeLockerDAO.LockerCountOfAcquirer::getAcquirer) .collect(Collectors.toList()); resetLockerCountOfDeletedAcquire(acquires); updateLastSyncAcquires(acquires); } private void updateMonitorOfLockerCountOfAcquirer(List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers) { lockerCountOfAcquirers.forEach(c -> gauge.labels(c.getAcquirer()).set(c.getCount())); } private void resetLockerCountOfDeletedAcquire(List<String> countOfUsers) { calcDeletedAcquirers(countOfUsers).forEach(a -> gauge.labels(a).set(0)); } private List<String> calcDeletedAcquirers(List<String> acquirers) { return lastSyncAcquires.stream() .filter(s -> !acquirers.contains(s)) .collect(Collectors.toList()); } private void updateLastSyncAcquires(List<String> acquirers) { lastSyncAcquires.clear(); lastSyncAcquires.addAll(acquirers); } }
分布式锁的基类
@Slf4j public class DistributeLockerBase implements LockerShakerScheduler { private final String acquirer; private final List<String> lockingIds; private final DistributeLockerService distributeLockerService; protected DistributeLockerBase(String acquirer, DistributeLockerService distributeLockerService) { Preconditions.checkArgument(!Strings.isNullOrEmpty(acquirer)); Preconditions.checkArgument(distributeLockerService != null); this.acquirer = acquirer; this.lockingIds = Collections.synchronizedList(new ArrayList<>()); this.distributeLockerService = distributeLockerService; } public final String getAcquirer() { return acquirer; } protected boolean lock(String lockerId) { boolean suc = distributeLockerService.tryLock(lockerId, acquirer); if (suc) { lockingIds.add(lockerId); } log.info("lock [{}], [{}]", lockerId, suc); return suc; } protected boolean release(String lockerId) { boolean suc = distributeLockerService.release(lockerId, acquirer); lockingIds.remove(lockerId); log.info("release [{}], [{}]", lockerId, suc); return suc; } /** * delay 1 second. after sync finished */ @Scheduled(fixedDelay = 1000) @Override public void shake() { List<String> lockingIdsCopied; synchronized (this.lockingIds) { lockingIdsCopied = new ArrayList<>(this.lockingIds); } lockingIdsCopied.forEach(id -> { if (!distributeLockerService.shake(id, acquirer)) { log.error("[{}] shakes [{}] FAILED", acquirer, id); } else { log.info("[{}] shakes [{}] SUCCESS", acquirer, id); } }); } } /** * the class used for defining the shake method * so that the <code>shake</code> method in the <code>DistributeLockerBase</code> can auto-scheduled * when some class inherits <code>DistributeLockerBase</code> and marks as a bean * * NOTE: it's dependent on the Spring boot version */ interface LockerShakerScheduler { void shake(); }
实现定时任务类
@Slf4j @Component public class BrandUVSyncher extends DistributeLockerBase { private static final String SPLITTER = "+"; private static final String TASK_ID_PREFIX = "lbi-openapi-brandUV:"; private static final long PERIOD = 10_000; private final MidPlatformClient midPlatformClient; private final BrandUVDAO brandUVDAO; private final AreaDAO areaDAO; private final List<AreaPO> top2LevelCities; /** * geo of banks in some city * cityName -> {bankName -> coordinate} */ private final Map<String, Map<String, List<double[]>>> geoOfBanks; BrandUVSyncher(MidPlatformClient midPlatformClient, BrandUVDAO brandUVDAO, DistributeLockerService distributeLockerService, AreaDAO areaDAO) { super("BrandUVSyncher:" + UuidGenerator.newBase64Uuid(), distributeLockerService); this.midPlatformClient = midPlatformClient; this.brandUVDAO = brandUVDAO; this.areaDAO = areaDAO; this.geoOfBanks = loadGeoOfBanks(); this.top2LevelCities = loadTop2LevelCity(); } /** * sync the living&working uv of the cities of level 1&2 */ @Scheduled(fixedDelay = PERIOD, initialDelay = 3000) public void syncCityLivingAndWorkingUV() { log.info("run sync city WORKING&LIVING UV"); // DO NOT MODIFY ME! final String lockerID = buildLockerID("syncCityLivingAndWorkingUV"); LocalDate now = LocalDate.now(); if (!isTimeToSync(now)) { return; } if (!lock(lockerID)) { log.error("locker [{}] failed", lockerID); return; } if (top2LevelCities.isEmpty()) { log.info("empty level 1 and level 2 cities"); return; } LocalDate preMonth = now.minusMonths(1); LocalDate month = LocalDate.of(preMonth.getYear(), preMonth.getMonth(), 1); try { Stream.of(Brand.values()).forEach( b -> syncCityLivingAndWorkingUV(b.getCode(), month, top2LevelCities) ); } finally { release(lockerID); } } }
https://www.jianshu.com/p/941416645606