From 99091a8cbb8e098575c75a7c640b568addbcc29d Mon Sep 17 00:00:00 2001 From: wzy <wzy19931122ai@163.com> Date: Sun, 09 Oct 2022 21:41:13 +0800 Subject: [PATCH] Merge branch 'score_shop' --- zq-erp/src/main/java/com/matrix/system/common/init/LocalCache.java | 235 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 209 insertions(+), 26 deletions(-) diff --git a/zq-erp/src/main/java/com/matrix/system/common/init/LocalCache.java b/zq-erp/src/main/java/com/matrix/system/common/init/LocalCache.java index 00aa8b5..454d95e 100644 --- a/zq-erp/src/main/java/com/matrix/system/common/init/LocalCache.java +++ b/zq-erp/src/main/java/com/matrix/system/common/init/LocalCache.java @@ -1,22 +1,79 @@ package com.matrix.system.common.init; +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.matrix.core.tools.LogUtil; +import com.matrix.core.tools.StringUtils; +import com.matrix.system.common.bean.SysCacheValue; +import com.matrix.system.common.dao.SysCacheValueDao; +import lombok.Data; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.lang.reflect.Type; +import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; -public class LocalCache { +@Component +public class LocalCache implements ApplicationRunner { /* * 清理线程运行状态 0 未启动,1 已启动 */ private static int CLEAR_THREAD_STATUS = 0; - private static ConcurrentMap<String, Value> localCache = new ConcurrentHashMap(60); + private static ConcurrentMap<String, CacheValue> localCache = new ConcurrentHashMap(60); + + private static ConcurrentLinkedQueue<Long> deadCache = new ConcurrentLinkedQueue<>(); + + @Autowired + private SysCacheValueDao sysCacheValueDao; + + @Override + public void run(ApplicationArguments args) { + //初始化缓存 + List<SysCacheValue> sysCacheValues = sysCacheValueDao.selectByMap(null); + if(CollUtil.isNotEmpty(sysCacheValues)){ + LogUtil.debug("初始化缓存"); + localCache.putAll(buildValues(sysCacheValues)); + } + startClearThread(); + startSaveStoreThread(); + } + + private Map<String,CacheValue> buildValues(List<SysCacheValue> sysCacheValues) { + Map<String,CacheValue> storeCache=new HashMap<>(); + sysCacheValues.forEach(e->{ + CacheValue cacheValue=new CacheValue(); + BeanUtil.copyProperties(e,cacheValue); + storeCache.put(cacheValue.getCacheKey(),cacheValue); + }); + return storeCache; + } + + /** + * 根据key匹配多个缓存值 + * + * @param key + * @param <T> + * @return + */ + public static <T> Map<String, T> getValues(String key) { + return localCache.entrySet().stream() + .filter(item -> StringUtils.isMatch(key, item.getKey())) + .map(Map.Entry::getValue) + .filter(item -> Objects.nonNull(item.cacheValue)) + .collect(Collectors.toMap(CacheValue::getCacheKey, item -> JSONObject.parseObject(item.cacheValue, new TypeReference<T>(){}))); + } /** * 获取本地缓存 @@ -24,9 +81,64 @@ * @param key * @param <T> * @return - */ + *//* public static <T> T get(String key) { - return (T) localCache.get(key); + CacheValue value = localCache.get(key); + if (Objects.nonNull(value)) { + return JSONObject.parseObject(value.cacheValue, new TypeReference<T>(){}); + } + return null; + } +*/ + /** + * 获取本地缓存,如果需要转换为List,Map类型的具体泛型使用本方法 + * @param key + * @param typeReference + * @param <T> + * @return + */ + public static <T> T get(String key,TypeReference typeReference) { + CacheValue value = localCache.get(key); + if (Objects.nonNull(value)) { + return (T)JSONObject.parseObject(value.cacheValue, typeReference); + } + return null; + } + + /** + * 删除缓存 + * + * @param key + * @param <T> + * @return + */ + public static <T> T remove(String key) { + CacheValue value = localCache.get(key); + if (Objects.nonNull(value)) { + deadCache.add(value.getId()); + return (T) value.cacheValue; + } + return null; + } + + /** + * 批量删除缓存 + * + * @param key + * @return + */ + public static int batchRemove(String key) { + int count = 0; + Set<Map.Entry<String, CacheValue>> entries = localCache.entrySet(); + Iterator<Map.Entry<String, CacheValue>> iterator = entries.iterator(); + while (iterator.hasNext()) { + Map.Entry<String, CacheValue> next = iterator.next(); + if (StringUtils.isMatch(key, next.getKey())) { + remove(next.getKey()); + count++; + } + } + return count; } /** @@ -36,7 +148,7 @@ * @param value */ public static void save(String key, Object value) { - if (null != localCache.put(key, buildValue(value))) { + if (null != localCache.put(key, buildValue(key, value))) { LogUtil.debug("覆盖原有缓存{}", key); } } @@ -49,16 +161,29 @@ * @param timeOut 毫秒 */ public static void save(String key, Object value, long timeOut) { - if (null != localCache.put(key, buildValue(value, timeOut))) { + if (null != localCache.put(key, buildValue(key, value, timeOut))) { LogUtil.debug("覆盖原有缓存{}", key); } - startClearThread(); + + } + + /** + * 重置缓存失效时间 + * + * @param key + */ + public static void resetExpire(String key) { + Objects.requireNonNull(key); + CacheValue value = localCache.get(key); + if (Objects.nonNull(value)) { + value.setCreateTime(System.currentTimeMillis()); + } } /** * 清理过期对象 */ - private synchronized static void startClearThread() { + private synchronized void startClearThread() { if (CLEAR_THREAD_STATUS == 0) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); @@ -69,20 +194,25 @@ CLEAR_THREAD_STATUS = 1; while (true) { try { - Set<Map.Entry<String, Value>> entries = localCache.entrySet(); - Iterator<Map.Entry<String, Value>> iterator = entries.iterator(); + Set<Map.Entry<String, CacheValue>> entries = localCache.entrySet(); + Iterator<Map.Entry<String, CacheValue>> iterator = entries.iterator(); while (iterator.hasNext()) { - Map.Entry<String, Value> next = iterator.next(); + Map.Entry<String, CacheValue> next = iterator.next(); if (next.getValue().timeOut == 0) { continue; } - boolean isTimeOut = (System.currentTimeMillis() - next.getValue().createTime) > next.getValue().timeOut; + boolean isTimeOut = (System.currentTimeMillis() - next.getValue().getCreateTime().longValue()) > next.getValue().timeOut; if (isTimeOut) { - Value removed = localCache.remove(next.getKey()); - LogUtil.debug("清除过期对象:{}", removed.value); + CacheValue removed = remove(next.getKey()); + LogUtil.debug("清除过期对象:{}", removed.cacheValue); } + } + if(CollUtil.isNotEmpty(deadCache)){ + LogUtil.debug("删除数据库中的缓存:{}",deadCache); + sysCacheValueDao.deleteBatchIds(deadCache); + deadCache.clear(); } Thread.sleep(1000); } catch (InterruptedException e) { @@ -96,16 +226,59 @@ } } + /** + * 缓存对象写入磁盘 + */ + private synchronized void startSaveStoreThread() { - private static Value buildValue(Object value) { - return buildValue(value, 0); + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("startSaveStoreThread-pool-%d").build(); + ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(1), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); + singleThreadPool.execute(() -> { + try { + while (true){ + Collection<CacheValue> values = localCache.values(); + List<CacheValue> notSavedList = values.stream().filter(v -> !v.saved).collect(Collectors.toList()); + if(CollUtil.isNotEmpty(notSavedList)){ + List<String> collect = notSavedList.stream().map(e -> e.getCacheKey()).collect(Collectors.toList()); + sysCacheValueDao.delete(new LambdaQueryWrapper<SysCacheValue>().in(SysCacheValue::getCacheKey,collect)); + notSavedList.forEach(e->{ + e.setSaved(true); + SysCacheValue sysCacheValue = buildSysCacheValue(e); + sysCacheValueDao.insert(sysCacheValue); + e.setId(sysCacheValue.getId()); + LogUtil.debug("持久化缓存对象:{}",e.getCacheKey()); + }); + } + Thread.sleep(1000); + } + } catch (Exception e) { + LogUtil.error("存储缓存对象线程异常停止", e); + } + }); + + + } + + private SysCacheValue buildSysCacheValue(CacheValue e) { + SysCacheValue cacheValue=new SysCacheValue(); + BeanUtil.copyProperties(e,cacheValue); + return cacheValue; } - private static Value buildValue(Object value, long timeOut) { - Value instances = new Value(); + private static CacheValue buildValue(String key, Object value) { + return buildValue(key, value, 0); + } + + + private static CacheValue buildValue(String key, Object value, long timeOut) { + CacheValue instances = new CacheValue(); instances.createTime = System.currentTimeMillis(); - instances.value = value; + instances.cacheKey = key; + instances.cacheValue = JSON.toJSONString(value); instances.timeOut = timeOut; return instances; } @@ -114,23 +287,33 @@ /** * 缓存对象 */ - static class Value { + @Data + static class CacheValue { + + private Long id ; /** * 过期时间,0 表示不过期,单位毫秒 */ - private long timeOut = 0; + private Long timeOut = 0L; + + /** + * 缓存key + */ + private String cacheKey; /** * 缓存值 */ - private Object value; + private String cacheValue; /** * 缓存创建时间 */ - private long createTime; + private Long createTime; + private boolean saved=false; + private boolean live=true; } -- Gitblit v1.9.1