package com.matrix.system.common.init;
|
|
import cn.hutool.core.bean.BeanUtil;
|
import cn.hutool.core.collection.CollUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.TypeReference;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.matrix.core.tools.LogUtil;
|
import com.matrix.core.tools.StringUtils;
|
import com.matrix.system.common.bean.SysCacheValue;
|
import com.matrix.system.common.dao.SysCacheValueDao;
|
import lombok.Data;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.stereotype.Component;
|
|
import java.lang.reflect.Type;
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.stream.Collectors;
|
|
@Component
|
public class LocalCache implements ApplicationRunner {
|
|
/*
|
* 清理线程运行状态 0 未启动,1 已启动
|
*/
|
private static int CLEAR_THREAD_STATUS = 0;
|
|
|
private static ConcurrentMap<String, CacheValue> localCache = new ConcurrentHashMap(60);
|
|
private static ConcurrentLinkedQueue<Long> deadCache = new ConcurrentLinkedQueue<>();
|
|
@Autowired
|
private SysCacheValueDao sysCacheValueDao;
|
|
@Override
|
public void run(ApplicationArguments args) {
|
//初始化缓存
|
List<SysCacheValue> sysCacheValues = sysCacheValueDao.selectByMap(null);
|
if(CollUtil.isNotEmpty(sysCacheValues)){
|
LogUtil.debug("初始化缓存");
|
localCache.putAll(buildValues(sysCacheValues));
|
}
|
startClearThread();
|
startSaveStoreThread();
|
}
|
|
private Map<String,CacheValue> buildValues(List<SysCacheValue> sysCacheValues) {
|
Map<String,CacheValue> storeCache=new HashMap<>();
|
sysCacheValues.forEach(e->{
|
CacheValue cacheValue=new CacheValue();
|
BeanUtil.copyProperties(e,cacheValue);
|
storeCache.put(cacheValue.getCacheKey(),cacheValue);
|
});
|
return storeCache;
|
}
|
|
/**
|
* 根据key匹配多个缓存值
|
*
|
* @param key
|
* @param <T>
|
* @return
|
*/
|
public static <T> Map<String, T> getValues(String key) {
|
return localCache.entrySet().stream()
|
.filter(item -> StringUtils.isMatch(key, item.getKey()))
|
.map(Map.Entry::getValue)
|
.filter(item -> Objects.nonNull(item.cacheValue))
|
.collect(Collectors.toMap(CacheValue::getCacheKey, item -> JSONObject.parseObject(item.cacheValue, new TypeReference<T>(){})));
|
}
|
|
/**
|
* 获取本地缓存
|
*
|
* @param key
|
* @param <T>
|
* @return
|
*//*
|
public static <T> T get(String key) {
|
CacheValue value = localCache.get(key);
|
if (Objects.nonNull(value)) {
|
return JSONObject.parseObject(value.cacheValue, new TypeReference<T>(){});
|
}
|
return null;
|
}
|
*/
|
/**
|
* 获取本地缓存,如果需要转换为List,Map类型的具体泛型使用本方法
|
* @param key
|
* @param typeReference
|
* @param <T>
|
* @return
|
*/
|
public static <T> T get(String key,TypeReference typeReference) {
|
CacheValue value = localCache.get(key);
|
if (Objects.nonNull(value)) {
|
return (T)JSONObject.parseObject(value.cacheValue, typeReference);
|
}
|
return null;
|
}
|
|
/**
|
* 删除缓存
|
*
|
* @param key
|
* @param <T>
|
* @return
|
*/
|
public static <T> T remove(String key) {
|
CacheValue value = localCache.get(key);
|
if (Objects.nonNull(value)) {
|
deadCache.add(value.getId());
|
return (T) value.cacheValue;
|
}
|
return null;
|
}
|
|
/**
|
* 批量删除缓存
|
*
|
* @param key
|
* @return
|
*/
|
public static int batchRemove(String key) {
|
int count = 0;
|
Set<Map.Entry<String, CacheValue>> entries = localCache.entrySet();
|
Iterator<Map.Entry<String, CacheValue>> iterator = entries.iterator();
|
while (iterator.hasNext()) {
|
Map.Entry<String, CacheValue> next = iterator.next();
|
if (StringUtils.isMatch(key, next.getKey())) {
|
remove(next.getKey());
|
count++;
|
}
|
}
|
return count;
|
}
|
|
/**
|
* 保存一个本地缓存
|
*
|
* @param key
|
* @param value
|
*/
|
public static void save(String key, Object value) {
|
if (null != localCache.put(key, buildValue(key, value))) {
|
LogUtil.debug("覆盖原有缓存{}", key);
|
}
|
}
|
|
/**
|
* 设置含过期时间的缓存
|
*
|
* @param key
|
* @param value
|
* @param timeOut 毫秒
|
*/
|
public static void save(String key, Object value, long timeOut) {
|
if (null != localCache.put(key, buildValue(key, value, timeOut))) {
|
LogUtil.debug("覆盖原有缓存{}", key);
|
}
|
|
}
|
|
/**
|
* 重置缓存失效时间
|
*
|
* @param key
|
*/
|
public static void resetExpire(String key) {
|
Objects.requireNonNull(key);
|
CacheValue value = localCache.get(key);
|
if (Objects.nonNull(value)) {
|
value.setCreateTime(System.currentTimeMillis());
|
}
|
}
|
|
/**
|
* 清理过期对象
|
*/
|
private synchronized void startClearThread() {
|
if (CLEAR_THREAD_STATUS == 0) {
|
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
|
.setNameFormat("demo-pool-%d").build();
|
ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
|
0L, TimeUnit.MILLISECONDS,
|
new LinkedBlockingQueue<Runnable>(1), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
|
singleThreadPool.execute(() -> {
|
CLEAR_THREAD_STATUS = 1;
|
while (true) {
|
try {
|
Set<Map.Entry<String, CacheValue>> entries = localCache.entrySet();
|
Iterator<Map.Entry<String, CacheValue>> iterator = entries.iterator();
|
while (iterator.hasNext()) {
|
Map.Entry<String, CacheValue> next = iterator.next();
|
|
if (next.getValue().timeOut == 0) {
|
continue;
|
}
|
|
boolean isTimeOut = (System.currentTimeMillis() - next.getValue().getCreateTime().longValue()) > next.getValue().timeOut;
|
if (isTimeOut) {
|
CacheValue removed = remove(next.getKey());
|
LogUtil.debug("清除过期对象:{}", removed.cacheValue);
|
}
|
}
|
if(CollUtil.isNotEmpty(deadCache)){
|
LogUtil.debug("删除数据库中的缓存:{}",deadCache);
|
sysCacheValueDao.deleteBatchIds(deadCache);
|
deadCache.clear();
|
}
|
Thread.sleep(1000);
|
} catch (InterruptedException e) {
|
LogUtil.error("清理缓存线程异常停止", e);
|
CLEAR_THREAD_STATUS = 0;
|
}
|
}
|
});
|
|
|
}
|
}
|
|
/**
|
* 缓存对象写入磁盘
|
*/
|
private synchronized void startSaveStoreThread() {
|
|
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
|
.setNameFormat("startSaveStoreThread-pool-%d").build();
|
ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
|
0L, TimeUnit.MILLISECONDS,
|
new LinkedBlockingQueue<Runnable>(1), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
|
singleThreadPool.execute(() -> {
|
try {
|
while (true){
|
Collection<CacheValue> values = localCache.values();
|
List<CacheValue> notSavedList = values.stream().filter(v -> !v.saved).collect(Collectors.toList());
|
if(CollUtil.isNotEmpty(notSavedList)){
|
List<String> collect = notSavedList.stream().map(e -> e.getCacheKey()).collect(Collectors.toList());
|
sysCacheValueDao.delete(new LambdaQueryWrapper<SysCacheValue>().in(SysCacheValue::getCacheKey,collect));
|
notSavedList.forEach(e->{
|
e.setSaved(true);
|
SysCacheValue sysCacheValue = buildSysCacheValue(e);
|
sysCacheValueDao.insert(sysCacheValue);
|
e.setId(sysCacheValue.getId());
|
LogUtil.debug("持久化缓存对象:{}",e.getCacheKey());
|
});
|
}
|
Thread.sleep(1000);
|
}
|
} catch (Exception e) {
|
LogUtil.error("存储缓存对象线程异常停止", e);
|
}
|
});
|
|
|
}
|
|
private SysCacheValue buildSysCacheValue(CacheValue e) {
|
SysCacheValue cacheValue=new SysCacheValue();
|
BeanUtil.copyProperties(e,cacheValue);
|
return cacheValue;
|
}
|
|
|
private static CacheValue buildValue(String key, Object value) {
|
return buildValue(key, value, 0);
|
}
|
|
|
private static CacheValue buildValue(String key, Object value, long timeOut) {
|
CacheValue instances = new CacheValue();
|
instances.createTime = System.currentTimeMillis();
|
instances.cacheKey = key;
|
instances.cacheValue = JSON.toJSONString(value);
|
instances.timeOut = timeOut;
|
return instances;
|
}
|
|
|
/**
|
* 缓存对象
|
*/
|
@Data
|
static class CacheValue {
|
|
private Long id ;
|
|
/**
|
* 过期时间,0 表示不过期,单位毫秒
|
*/
|
private Long timeOut = 0L;
|
|
/**
|
* 缓存key
|
*/
|
private String cacheKey;
|
/**
|
* 缓存值
|
*/
|
private String cacheValue;
|
|
/**
|
* 缓存创建时间
|
*/
|
private Long createTime;
|
|
private boolean saved=false;
|
|
private boolean live=true;
|
}
|
|
|
}
|