package com.vincent.rsf.server.common.service;
|
|
import com.vincent.rsf.common.utils.Serialize;
|
import com.vincent.rsf.server.common.config.RedisProperties;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import redis.clients.jedis.Jedis;
|
import redis.clients.jedis.JedisPool;
|
import redis.clients.jedis.JedisPoolConfig;
|
|
import java.util.Date;
|
import java.util.Set;
|
|
/**
|
* redis tools
|
* Created by vincent on 2023-03-13
|
*/
|
@Slf4j
|
@Service
|
public class RedisService {
|
|
private static final String LINK = ".";
|
private static final long MIN_DEGRADE_WINDOW_MS = 30000L;
|
|
protected volatile JedisPool pool;
|
|
Integer index = 0;
|
|
public volatile Boolean initialize = true;
|
|
private volatile long degradedUntilMillis = 0L;
|
|
@Autowired
|
private RedisProperties redisProperties;
|
|
public synchronized JedisPool getPool() {
|
if (this.pool == null) {
|
JedisPoolConfig config = new JedisPoolConfig();
|
config.setTestOnBorrow(false);
|
this.index = redisProperties.getIndex();
|
this.pool = new JedisPool(config,
|
redisProperties.getHost(),
|
redisProperties.getPort(),
|
redisProperties.getTimeout(),
|
redisProperties.getPassword()
|
);
|
}
|
return this.pool;
|
}
|
|
public Jedis getJedis() {
|
if (isDegradedNow()) {
|
return null;
|
}
|
try {
|
Jedis jedis = this.getPool().getResource();
|
if (this.index != jedis.getDB()) {
|
jedis.select(this.index);
|
}
|
markRecovered();
|
return jedis;
|
} catch (Exception e) {
|
markUnavailable(e);
|
}
|
return null;
|
}
|
|
// key - object ----------------------------------------------------------------------------------------------------------
|
|
public String set(String flag, String key, Object value) {
|
if (value == null) {
|
this.delete(flag, key);
|
return null;
|
}
|
return execute(null, jedis -> jedis.set((flag + LINK + key).getBytes(), Serialize.serialize(value)));
|
}
|
|
public String set(String flag, String key, Object value, Integer seconds) {
|
if (value == null) {
|
this.delete(flag, key);
|
return null;
|
}
|
return execute(null, jedis -> jedis.setex((flag + LINK + key).getBytes(), seconds, Serialize.serialize(value)));
|
}
|
|
public <T> T get(String flag, String key) {
|
return execute(null, jedis -> {
|
byte[] bytes = jedis.get((flag + LINK + key).getBytes());
|
if (bytes == null || bytes.length == 0) {
|
return null;
|
}
|
return (T) Serialize.unSerialize(bytes);
|
});
|
}
|
|
public Long delete(String flag, String key) {
|
return execute(null, jedis -> jedis.del((flag + LINK + key).getBytes()));
|
}
|
|
public Long clear(String flag) {
|
this.setValue(flag, "CLEARING", "true");
|
return execute(null, jedis -> {
|
Object returnValue = jedis.eval("local keys = redis.call('keys', ARGV[1]) for i=1,#keys,1000 do redis.call('del', unpack(keys, i, math.min(i+4999, #keys))) end return #keys", 0, flag + LINK + "*");
|
return Long.parseLong(String.valueOf(returnValue));
|
});
|
}
|
|
// 为已存在的key设置过期时间 - 秒
|
public void setExpire(String flag, String key, int seconds) {
|
executeVoid(jedis -> jedis.expire((flag + LINK + key).getBytes(), seconds));
|
}
|
|
// 为已存在的key设置过期时间 - 具体到时间戳 (秒)
|
public void setExpireAt(String flag, String key, Date toTime) {
|
executeVoid(jedis -> jedis.expireAt((flag + LINK + key).getBytes(), toTime.getTime() / 1000));
|
}
|
|
// 获取过期剩余时间(秒) ttl == -1 没有设置过期时间; ttl == -2 key不存在
|
public Long getExpire(String flag, String key) {
|
return execute(null, jedis -> jedis.ttl((flag + LINK + key).getBytes()));
|
}
|
|
|
// key - string ----------------------------------------------------------------------------------------------------------
|
|
public String setValue(String flag, String key, String value) {
|
return execute(null, jedis -> jedis.set(flag + LINK + key, value));
|
}
|
|
public String setValue(String flag, String key, String value, Integer seconds) {
|
return execute(null, jedis -> jedis.setex(flag + LINK + key, seconds, value));
|
}
|
|
public String getValue(String flag, String key) {
|
return execute(null, jedis -> jedis.get(flag + LINK + key));
|
}
|
|
public Long deleteValue(String flag, String... key) {
|
return execute(null, jedis -> {
|
String[] keys = new String[key.length];
|
for (int i = 0; i < key.length; i++) {
|
keys[i] = flag + LINK + key[i];
|
}
|
return jedis.del(keys);
|
});
|
}
|
|
public Long clearValue(String flag) {
|
this.setValue(flag, "CLEARING", "true");
|
return execute(null, jedis -> {
|
Object returnValue = jedis.eval("return redis.call('del', unpack(redis.call('keys', ARGV[1])))", 0, flag + LINK + "*");
|
return Long.parseLong(String.valueOf(returnValue));
|
});
|
}
|
|
public void setValueExpire(String flag, String key, int seconds) {
|
executeVoid(jedis -> jedis.expire((flag + LINK + key).getBytes(), seconds));
|
}
|
|
public void setValueExpireAt(String flag, String key, Date atTime) {
|
executeVoid(jedis -> jedis.expireAt((flag + LINK + key).getBytes(), atTime.getTime() / 1000));
|
}
|
|
|
// hash ----------------------------------------------------------------------------------------------------------
|
|
public Long setMap(String name, String key, Object value) {
|
if (value == null) {
|
deleteMap(name, key);
|
return null;
|
}
|
return execute(null, jedis -> jedis.hset(name.getBytes(), key.getBytes(), Serialize.serialize(value)));
|
}
|
|
public <T> T getMap(String name, String key) {
|
return execute(null, jedis -> {
|
byte[] bytes = jedis.hget(name.getBytes(), key.getBytes());
|
if (bytes == null || bytes.length == 0) {
|
return null;
|
}
|
return (T) Serialize.unSerialize(bytes);
|
});
|
}
|
|
public Set<String> getMapKeys(String name) {
|
return execute(null, jedis -> jedis.hkeys(name));
|
}
|
|
public Long deleteMap(String name, String... key) {
|
return execute(null, jedis -> {
|
String[] keys = new String[key.length];
|
System.arraycopy(key, 0, keys, 0, key.length);
|
return jedis.hdel(name, keys);
|
});
|
}
|
|
public Long clearMap(String name) {
|
return execute(null, jedis -> jedis.del(name));
|
}
|
|
public void setMapExpire(String name, int seconds) {
|
executeVoid(jedis -> jedis.expire(name.getBytes(), seconds));
|
}
|
|
public void setMapExpireAt(String name, Date atTime) {
|
executeVoid(jedis -> jedis.expireAt(name.getBytes(), atTime.getTime() / 1000));
|
}
|
|
|
// mq ----------------------------------------------------------------------------------------------------------
|
|
// 列表末尾添加元素
|
public Long push(String name, Object value) {
|
if (value == null) {
|
return null;
|
}
|
return execute(null, jedis -> jedis.rpush(name.getBytes(), Serialize.serialize(value)));
|
}
|
|
// 获取列表头部元素 && 删除
|
public <T> T pop(String name) {
|
return execute(null, jedis -> {
|
byte[] bytes = jedis.lpop(name.getBytes());
|
if (bytes == null || bytes.length == 0) {
|
return null;
|
}
|
return (T) Serialize.unSerialize(bytes);
|
});
|
}
|
|
// 删除
|
public Long deleteList(String name) {
|
return execute(null, jedis -> jedis.del(name));
|
}
|
|
public void setListExpire(String name, int seconds) {
|
executeVoid(jedis -> jedis.expire(name.getBytes(), seconds));
|
}
|
|
public void setListExpireAt(String name, Date atTime) {
|
executeVoid(jedis -> jedis.expireAt(name.getBytes(), atTime.getTime() / 1000));
|
}
|
|
// count ----------------------------------------------------------------------------------------------------------
|
|
public Long incr(String key) {
|
return execute(null, jedis -> jedis.incr("COUNT." + key));
|
}
|
|
public Long decr(String key) {
|
return execute(null, jedis -> jedis.decr("COUNT." + key));
|
}
|
|
private boolean isDegradedNow() {
|
return !Boolean.TRUE.equals(this.initialize) && System.currentTimeMillis() < this.degradedUntilMillis;
|
}
|
|
private void markRecovered() {
|
if (Boolean.TRUE.equals(this.initialize)) {
|
return;
|
}
|
synchronized (this) {
|
if (Boolean.TRUE.equals(this.initialize)) {
|
return;
|
}
|
this.initialize = true;
|
this.degradedUntilMillis = 0L;
|
log.info("Redis recovered, degrade mode cleared.");
|
}
|
}
|
|
private void markUnavailable(Exception e) {
|
long now = System.currentTimeMillis();
|
long nextRetryAt = now + getDegradeWindowMillis();
|
boolean shouldLog = Boolean.TRUE.equals(this.initialize) || now >= this.degradedUntilMillis;
|
synchronized (this) {
|
this.initialize = false;
|
this.degradedUntilMillis = nextRetryAt;
|
}
|
if (shouldLog) {
|
log.warn("Redis unavailable, degrade mode enabled until {}. message={}", new Date(nextRetryAt), e.getMessage());
|
}
|
}
|
|
private long getDegradeWindowMillis() {
|
return Math.max((long) redisProperties.getTimeout() * 2L, MIN_DEGRADE_WINDOW_MS);
|
}
|
|
private <T> T execute(T fallbackValue, RedisCallback<T> callback) {
|
if (isDegradedNow()) {
|
return fallbackValue;
|
}
|
Jedis jedis = getJedis();
|
if (jedis == null) {
|
return fallbackValue;
|
}
|
try (jedis) {
|
return callback.doWithJedis(jedis);
|
} catch (Exception e) {
|
markUnavailable(e);
|
return fallbackValue;
|
}
|
}
|
|
private void executeVoid(RedisVoidCallback callback) {
|
execute(null, jedis -> {
|
callback.doWithJedis(jedis);
|
return null;
|
});
|
}
|
|
@FunctionalInterface
|
private interface RedisCallback<T> {
|
T doWithJedis(Jedis jedis) throws Exception;
|
}
|
|
@FunctionalInterface
|
private interface RedisVoidCallback {
|
void doWithJedis(Jedis jedis) throws Exception;
|
}
|
}
|