0. 实战导读

🎯 学习目标
本教程将带你从Redis基础概念到实际项目应用,通过8个实战案例深入理解Redis的强大功能。

📱 短信登录
使用Redis共享Session实现分布式登录系统,解决传统Session跨域问题

🔍 商户查询缓存
深入理解缓存击穿、缓存穿透、缓存雪崩问题,掌握缓存策略和解决方案

🎫 优惠券秒杀
Redis计数器 + Lua脚本实现高性能秒杀,分布式锁防止超卖

📍 附近商户
利用Redis GEOHash实现地理位置查询,支持附近商家搜索

📊 UV统计
使用HyperLogLog进行海量数据去重统计,内存占用极低

✅ 用户签到
BitMap实现用户签到功能,节省存储空间

👥 好友关注
基于Set集合实现关注、取消关注、共同关注等社交功能

👍 探店点赞
List实现点赞列表,SortedSet实现点赞排行榜

1653056228879

1. 短信登录

1.1、项目架构分析

1.1.1 系统架构图

系统架构图

1.1.2 架构设计解析

技术栈

  • 负载均衡:Nginx
  • 应用服务:Tomcat集群
  • 数据存储:MySQL + Redis
  • 缓存策略:多级缓存架构

请求流程分析:

  1. 客户端请求Nginx负载均衡

    • Nginx基于七层模型处理HTTP协议
    • 支持Lua脚本直接访问Redis
    • 静态资源服务,轻松扛下上万并发
  2. 负载均衡Tomcat集群

    • 4核8G Tomcat约处理1000并发
    • Nginx负载均衡分流,集群支撑高并发
    • 动静分离降低Tomcat压力
  3. 数据访问层

    • MySQL企业级:16-32核CPU,32-64G内存
    • 并发能力:4000-7000 QPS
    • Redis集群缓存,降低数据库压力

架构详解

1.1.3 项目导入步骤

🖥️ 后端项目导入

后端项目

  1. 解压后端项目源码
  2. 配置数据库连接
  3. 导入SQL脚本
  4. 启动项目

🌐 前端项目导入

前端项目

  1. 解压前端工程
  2. 安装依赖
  3. 配置API接口地址
  4. 启动前端服务

🚀 项目运行

运行项目

访问地址:http://localhost:8080

1.2 登录流程设计

1.2.1 登录流程图

登录流程

1.2.2 详细流程解析

发送验证码

📤 发送验证码

  1. 手机号校验

    • 前端提交手机号
    • 后端验证手机号格式
    • 格式错误:返回错误信息
  2. 验证码生成与发送

    • 生成6位随机验证码
    • 保存验证码到Session
    • 调用短信服务发送验证码

用户登录

🔐 用户登录

  1. 参数校验

    • 校验手机号格式
    • 校验验证码正确性
  2. 用户处理

    • 根据手机号查询用户
    • 用户不存在:创建新用户
    • 用户存在:更新登录信息
  3. Session管理

    • 用户信息存入Session
    • 返回登录成功标识

状态校验

✅ 状态校验

  1. 请求拦截

    • 从Cookie获取JsessionId
    • 根据SessionId获取用户信息
  2. 权限控制

    • 用户不存在:返回401未授权
    • 用户存在:信息存入ThreadLocal
    • 请求放行

1653066208144

1.3 核心代码实现

1.3.1 页面交互流程

页面流程

1.3.2 发送验证码功能

功能说明:验证手机号格式,生成6位随机验证码并保存到Session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class UserServiceImpl implements UserService {

@Override
public Result sendCode(String phone, HttpSession session) {
// 1. 校验手机号格式
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}

// 2. 生成6位随机验证码
String code = RandomUtil.randomNumbers(6);

// 3. 保存验证码到Session
session.setAttribute("code", code);

// 4. 发送验证码(实际项目中调用短信服务)
log.debug("发送短信验证码成功,验证码:{}", code);

return Result.ok();
}
}

1.3.3 用户登录功能

安全提醒:验证码校验通过后,需要根据手机号查询或创建用户,并将用户信息保存到Session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Service
public class UserServiceImpl implements UserService {

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号格式
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}

// 2. 校验验证码
Object cacheCode = session.getAttribute("code");
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.toString().equals(code)) {
return Result.fail("验证码错误");
}

// 3. 根据手机号查询用户
User user = query().eq("phone", phone).one();

// 4. 用户不存在则创建新用户
if (user == null) {
user = createUserWithPhone(phone);
}

// 5. 保存用户信息到Session
session.setAttribute("user", user);

return Result.ok();
}

private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName("user_" + RandomUtil.randomString(6));
userService.save(user);
return user;
}
}

1.4 登录拦截器实现

1.4.1 Tomcat运行原理

Tomcat原理

Tomcat线程模型解析

  1. 监听线程:监听端口,接收客户端连接
  2. Socket连接:每对请求-响应创建独立Socket
  3. 线程池:从线程池获取线程处理请求
  4. 请求处理:线程转发到Controller→Service→DAO→DB
  5. 响应返回:处理完成后数据写回客户端Socket

1.4.2 ThreadLocal线程隔离

ThreadLocal使用要点

每个用户请求对应Tomcat线程池中的一个线程,通过ThreadLocal实现线程间的数据隔离,确保每个线程操作自己的数据副本。

ThreadLocal原理

1.4.3 登录拦截器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class LoginInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1. 获取Session
HttpSession session = request.getSession();

// 2. 获取Session中的用户信息
Object user = session.getAttribute("user");

// 3. 判断用户是否登录
if (user == null) {
// 4. 用户未登录,返回401状态码
response.setStatus(401);
return false;
}

// 5. 用户已登录,保存用户信息到ThreadLocal
UserHolder.saveUser((User) user);

// 6. 放行请求
return true;
}
}

1.4.4 拦截器配置

配置说明

  • order(0):Token刷新拦截器,优先级最高
  • order(1):登录验证拦截器,优先级次之
  • excludePathPatterns:配置不需要拦截的路径
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// Token刷新拦截器 - 拦截所有路径
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
.addPathPatterns("/**")
.order(0);

// 登录拦截器 - 拦截需要登录的路径
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**", // 店铺信息
"/voucher/**", // 优惠券
"/shop-type/**", // 店铺类型
"/upload/**", // 文件上传
"/blog/hot", // 热门博客
"/user/code", // 发送验证码
"/user/login" // 用户登录
)
.order(1);
}
}

1.5 用户信息安全处理

1.5.1 敏感信息隐藏的必要性

安全风险警告

直接返回完整的User实体对象会暴露用户敏感信息(如密码、手机号、邮箱等),存在严重的安全隐患。必须通过DTO对象进行数据脱敏。

1.5.2 解决方案:UserDTO数据传输对象

UserDTO定义:

1
2
3
4
5
6
7
8
9
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserDTO {
private Long id;
private String nickName;
private String icon;
// 注意:不包含敏感信息如password、phone、email等
}

1.5.3 代码修改

1. 登录方法修改:

1
2
// 保存用户信息到Session(使用DTO脱敏)
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));

2. 拦截器修改:

1
2
// 保存用户信息到ThreadLocal(使用DTO)
UserHolder.saveUser((UserDTO) user);

3. UserHolder工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

public static void saveUser(UserDTO user) {
tl.set(user);
}

public static UserDTO getUser() {
return tl.get();
}

public static void removeUser() {
tl.remove();
}
}

1.5.4 安全效果对比

返回方式 包含字段 安全性 推荐程度
User实体 id, phone, password, email… ❌ 危险 禁止使用
UserDTO id, nickName, icon ✅ 安全 强烈推荐

1.6 Session共享问题

1.6.1 分布式Session问题分析

集群环境下的Session一致性挑战

在分布式系统中,每个Tomcat服务器都有自己的Session存储。当用户请求被分发到不同服务器时,会导致Session数据不一致,用户需要重复登录。

典型问题场景:

1
2
3
用户请求 → Nginx负载均衡 → Tomcat1 (保存Session)

Tomcat2 (无Session数据)

早期解决方案对比:

方案 原理 优点 缺点
Session复制 各服务器间同步Session 实现简单 网络开销大,性能差
Session粘连 固定用户到某台服务器 无需额外开发 负载不均,单点故障
Session集中存储 使用Redis统一管理 性能高,可扩展 需要额外组件

1.6.2 Redis解决方案

推荐方案:Redis集中存储

使用Redis作为Session的集中存储,所有服务器共享同一份Session数据,彻底解决分布式环境下的Session一致性问题。

架构设计:

1
2
3
4
用户请求 → Nginx负载均衡 → Tomcat1
↓ ↓
Redis ←------ Tomcat2
(集中Session存储)

核心优势:

  1. 数据共享:所有服务器访问同一份Session数据
  2. 高性能:Redis内存存储,读写速度快
  3. 可扩展:支持集群部署,水平扩展
  4. 持久化:支持数据持久化,防止数据丢失

1653069893050

1.7 Redis代替Session的业务流程

1.7.1 Redis数据结构选择

数据结构对比分析

在Redis中存储用户登录信息时,需要根据数据特性和使用场景选择合适的数据结构。

String vs Hash结构对比:

对比维度 String结构 Hash结构
内存占用 较高(序列化开销) 较低(字段独立存储)
读写性能 简单快速 支持字段级操作
数据更新 需要整体更新 支持字段级更新
适用场景 简单键值对 复杂对象存储

选择建议:

  • 如果用户信息简单,使用String结构
  • 如果用户信息复杂,需要频繁更新部分字段,使用Hash结构

1653319261433

1.7.2 Key设计策略

Key设计原则

Redis的Key需要满足唯一性和可携带性,同时避免暴露用户敏感信息。

Key设计要点:

  1. 唯一性:确保每个用户的Key都是唯一的
  2. 可携带性:Key需要方便在前后端之间传递
  3. 安全性:避免使用手机号等敏感信息作为Key

推荐方案:

1
2
Key格式:login:token:{随机token}
示例:login:token:a3f2b8c9d1e4f5g6

Token生成策略:

1
2
3
// 使用UUID生成随机token
String token = UUID.randomUUID().toString(true);
String key = LOGIN_USER_KEY + token;

1.7.3 整体访问流程

📱 步骤1:用户登录验证

用户提交手机号和验证码,系统验证信息一致性

🔍 步骤2:用户信息查询

根据手机号查询用户信息,不存在则创建新用户

💾 步骤3:Redis数据存储

将用户信息保存到Redis,设置合理的过期时间

🔑 步骤4:Token返回

生成随机token作为登录凭证,返回给前端

🔐 步骤5:登录状态校验

后续请求携带token,系统验证Redis中是否存在对应数据

1.8 基于Redis实现短信登录

1.8.1 实现思路回顾

Redis登录方案核心

将传统的Session存储替换为Redis存储,通过Token机制实现无状态的分布式登录认证。

核心改进点:

  1. 存储位置:从服务器内存 → Redis缓存
  2. 认证方式:从SessionID → 随机Token
  3. 数据格式:从完整对象 → 精简DTO
  4. 有效期管理:支持灵活的过期时间设置

1.8.2 核心代码实现

UserServiceImpl登录方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Service
@Slf4j
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 1. 校验手机号格式
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式错误!");
}

// 2. 从Redis获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue()
.get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();

if (cacheCode == null || !cacheCode.equals(code)) {
return Result.fail("验证码错误");
}

// 3. 根据手机号查询用户
User user = query().eq("phone", phone).one();

// 4. 用户不存在则创建新用户
if (user == null) {
user = createUserWithPhone(phone);
}

// 5. 保存用户信息到Redis
// 5.1 随机生成token作为登录令牌
String token = UUID.randomUUID().toString(true);

// 5.2 将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue) ->
fieldValue.toString()));

// 5.3 存储到Redis
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);

// 5.4 设置token有效期(30分钟)
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

// 6. 返回token
return Result.ok(token);
}
}

Redis常量配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RedisConstants {
// 验证码相关
public static final String LOGIN_CODE_KEY = "login:code:";
public static final Long LOGIN_CODE_TTL = 2L; // 验证码有效期2分钟

// 用户登录相关
public static final String LOGIN_USER_KEY = "login:token:";
public static final Long LOGIN_USER_TTL = 30L; // 用户登录有效期30分钟

// 用户签到相关
public static final String USER_SIGN_KEY = "sign:";

// 商户信息相关
public static final String CACHE_SHOP_KEY = "cache:shop:";
public static final Long CACHE_SHOP_TTL = 30L; // 商户信息缓存30分钟

// 互斥锁相关
public static final String LOCK_SHOP_KEY = "lock:shop:";
public static final Long LOCK_SHOP_TTL = 10L; // 锁有效期10秒
}

前端登录调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 发送验证码
async sendCode() {
const result = await request.post('/user/code', {
phone: this.phone
});
if (result.code === 200) {
this.$message.success('验证码发送成功');
// 开始倒计时
this.startCountdown();
}
},

// 用户登录
async login() {
const result = await request.post('/user/login', {
phone: this.phone,
code: this.code
});
if (result.code === 200) {
// 保存token到localStorage
localStorage.setItem('token', result.data);
// 设置请求头
request.defaults.headers.common['authorization'] = result.data;
this.$message.success('登录成功');
this.$router.push('/');
}
}

1.9 登录状态刷新优化

1.9.1 初始方案问题分析

初始方案缺陷

原方案中拦截器只拦截需要登录验证的路径,导致用户访问无需拦截的路径时,Token无法得到刷新,可能造成Token过期失效。

问题场景:

1
2
用户访问首页 → 无需拦截 → Token不刷新 → Token过期
用户访问个人中心 → 需要拦截 → Token刷新 → 正常访问

影响分析:

  • 用户活跃状态下仍可能因Token过期被强制登出
  • 用户体验差,需要频繁重新登录
  • 无法准确反映用户的真实活跃状态

1653320822964

1.9.2 双拦截器优化方案

优化思路

采用双拦截器模式:

  • 刷新拦截器:拦截所有路径,负责Token刷新和用户信息加载
  • 登录拦截器:只拦截需要登录的路径,负责登录状态校验

架构设计:

1
2
3
4
5
6
7
请求进入

刷新拦截器(所有路径)→ 刷新Token有效期 → 加载用户信息到ThreadLocal

登录拦截器(需登录路径)→ 校验ThreadLocal中用户信息 → 决定是否放行

业务处理

优势分析:

  • 所有请求都能刷新Token,避免意外过期
  • 职责分离,代码更清晰
  • 性能影响最小化

1653320764547

1.9.3 代码实现

RefreshTokenInterceptor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {

private final StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1. 获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)) {
return true; // 无token直接放行
}

// 2. 基于token获取Redis中的用户信息
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash()
.entries(key);

// 3. 判断用户是否存在
if (userMap.isEmpty()) {
return true; // 用户不存在直接放行
}

// 4. 将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap,
new UserDTO(), false);

// 5. 保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);

// 6. 刷新token有效期
stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);

return true;
}

@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) throws Exception {
// 移除用户,防止内存泄漏
UserHolder.removeUser();
}
}

LoginInterceptor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class LoginInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// 1. 判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser() == null) {
// 2. 没有登录,设置状态码
response.setStatus(401);
return false; // 拦截
}

// 3. 有用户,放行
return true;
}
}

MvcConfig配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private RefreshTokenInterceptor refreshTokenInterceptor;

@Resource
private LoginInterceptor loginInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// 1. 刷新拦截器 - 拦截所有路径,用于刷新token
registry.addInterceptor(refreshTokenInterceptor)
.addPathPatterns("/**")
.order(0); // 最高优先级

// 2. 登录拦截器 - 只拦截需要登录的路径
registry.addInterceptor(loginInterceptor)
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/upload/**",
"/voucher/**"
)
.order(1); // 第二优先级
}
}

2. 商户查询缓存

2.1 缓存基础概念

2.1.1 什么是缓存?

缓存的本质

缓存就像避震器一样,为系统提供缓冲保护,防止高频访问对数据库造成冲击。

生活类比:

避震器

就像越野车的避震器,在崎岖地形中为车体提供保护:

  • 保护作用:防止硬着陆对车体造成损害
  • 缓冲作用:吸收冲击力,提供平稳体验
  • 延长寿命:减少系统组件的磨损

同样,在实际开发中,系统也需要"避震器",防止过高的数据访问量冲击系统,导致操作线程无法及时处理信息而瘫痪。

2.1.2 缓存的技术定义

缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数据库中获取,存储于本地代码中。

本地缓存实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 高并发场景:ConcurrentHashMap
private static final ConcurrentHashMap<String, Object> LOCAL_CACHE =
new ConcurrentHashMap<>();

// 普通场景:HashMap
private static final Map<String, Object> SIMPLE_CACHE = new HashMap<>();

// Guava缓存:功能更完善
private static final Cache<String, Object> GUAVA_CACHE =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();

Redis缓存实现:

1
2
3
4
5
6
7
8
9
10
11
@Autowired
private RedisTemplate<String, Object> redisTemplate;

// 缓存操作
public void setCache(String key, Object value, long timeout) {
redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.MINUTES);
}

public Object getCache(String key) {
return redisTemplate.opsForValue().get(key);
}

缓存特性:

  • Static修饰:随着类加载而加载到内存中
  • Final修饰:引用关系固定,不用担心赋值导致缓存失效
  • 内存存储:读写性能远高于磁盘存储

2.1.3 为什么要使用缓存?

缓存的核心价值

缓存数据存储于内存中,而内存的读写性能远高于磁盘,可以大大降低高并发访问带来的服务器读写压力。

性能对比:

存储介质 读取速度 并发能力 成本
内存缓存 纳秒级 10万+QPS 较高
SSD磁盘 微秒级 1万QPS 中等
机械磁盘 毫秒级 1千QPS 较低

业务价值:

  1. 用户体验提升:页面响应从秒级→毫秒级
  2. 系统稳定性:防止高并发冲垮数据库
  3. 成本优化:减少数据库服务器压力
  4. 扩展能力:支持更高的并发访问量

数据规模挑战:

实际开发中,企业数据量从几十万到几千万不等,如果没有缓存作为"避震器",系统几乎无法承受高并发访问。

缓存的代价

缓存技术虽然强大,但也会增加代码复杂度和运维成本,需要权衡使用。

2.1.4 如何使用缓存?

多级缓存架构

在实际开发中,会构建多级缓存体系来最大化系统性能,每一级缓存都有其特定的作用和使用场景。

多级缓存层次:

1
2
3
4
5
6
7
8
9
10
11
用户请求

浏览器缓存 (客户端缓存)

CDN缓存 (边缘节点缓存)

应用层缓存 (Tomcat本地缓存 + Redis分布式缓存)

数据库缓存 (Buffer Pool)

CPU缓存 (L1/L2/L3)

浏览器缓存:

  • 存储位置:用户浏览器本地
  • 控制方式:HTTP头信息控制
  • 典型应用:静态资源缓存(CSS、JS、图片)
  • 有效期:通过Cache-Control、Expires设置
1
2
Cache-Control: max-age=3600
Expires: Wed, 21 Oct 2025 07:28:00 GMT

应用层缓存:

  • 本地缓存:Tomcat JVM内存中的Map结构
  • 分布式缓存:Redis集群存储
  • 使用场景:热点数据、会话信息、配置数据
1
2
3
4
5
6
// 本地缓存
private static final Map<String, Object> localCache = new ConcurrentHashMap<>();

// Redis缓存
@Autowired
private RedisTemplate<String, Object> redisTemplate;

数据库缓存:

  • Buffer Pool:InnoDB存储引擎的缓冲池
  • 查询缓存:MySQL查询结果缓存(8.0已废弃)
  • 作用:减少磁盘IO,提升查询性能
1
2
-- 查看Buffer Pool状态
SHOW ENGINE INNODB STATUS\G

CPU缓存:

  • L1缓存:指令缓存和数据缓存,32-64KB
  • L2缓存:256-512KB,速度次于L1
  • L3缓存:多核心共享,8-32MB

缓存层级对比:

缓存级别 容量 访问延迟 命中率
L1 32-64KB 1-2ns 80%+
L2 256-512KB 3-5ns 90%+
L3 8-32MB 10-20ns 95%+

2.2 商户缓存实现

2.2.1 业务场景分析

性能瓶颈识别

在查询商户信息时,如果直接操作数据库,当并发量增大时,数据库压力会急剧上升,查询性能会显著下降。

原始代码:

1
2
3
4
5
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
// 直接查询数据库 - 性能瓶颈
return shopService.queryById(id);
}

性能问题:

  • 每次查询都要访问数据库
  • 数据库连接资源有限
  • 高并发下数据库成为瓶颈
  • 相同数据重复查询浪费资源

2.2.2 缓存架构设计

标准缓存模式

采用Cache-Aside模式:查询前先查缓存,缓存命中直接返回,未命中查询数据库并写入缓存。

缓存流程图:

1
2
3
4
5
6
7
8
9
10
11
用户请求

查询Redis缓存

缓存命中?→ 是 → 直接返回缓存数据
↓ 否
查询MySQL数据库

写入Redis缓存

返回查询结果

1653322097736

2.2.3 代码实现

实现思路

代码逻辑:如果缓存命中则直接返回,如果缓存未命中则查询数据库,然后将结果写入Redis缓存。

ShopServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
@Slf4j
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryShopById(Long id) {
// 1. 从Redis查询商户缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);

// 2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3. 存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 4. 不存在,根据id查询数据库
Shop shop = getById(id);

// 5. 不存在,返回错误
if (shop == null) {
return Result.fail("店铺不存在!");
}

// 6. 存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 7. 返回
return Result.ok(shop);
}
}

ShopController:

1
2
3
4
5
6
7
8
9
10
@RestController
@RequestMapping("/shop")
public class ShopController {

@GetMapping("/{id}")
public Result queryShop(@PathVariable("id") Long id) {
// 调用Service层的缓存查询方法
return shopService.queryShopById(id);
}
}

缓存常量配置:

1
2
3
4
public class RedisConstants {
public static final String CACHE_SHOP_KEY = "cache:shop:";
public static final Long CACHE_SHOP_TTL = 30L; // 30分钟
}

性能测试对比:

查询方式 平均响应时间 QPS 数据库压力
直接查数据库 150ms 500
Redis缓存 5ms 10,000+
性能提升 30倍 20倍 大幅降低

1653322190155

2.3 缓存更新策略

2.3.1 缓存更新策略概述

缓存更新的必要性

内存资源宝贵,当缓存数据过多时需要合理的更新策略来保证系统性能和数据一致性。

三大缓存更新策略:

内存淘汰机制

当Redis内存达到max-memory设置的上限时,自动触发内存淘汰机制,根据配置的淘汰策略删除部分数据。

常见淘汰策略:

策略 描述 适用场景
noeviction 不淘汰,返回错误 数据不能丢失
allkeys-lru 所有key中淘汰最近最少使用 缓存应用
volatile-lru 设置了过期时间的key中淘汰最近最少使用 混合数据
allkeys-random 随机淘汰所有key 测试环境
volatile-ttl 淘汰即将过期的key 临时数据

配置示例:

1
2
3
# Redis配置文件
maxmemory 2gb
maxmemory-policy allkeys-lru

TTL过期机制

为缓存数据设置合理的过期时间(TTL),Redis会自动删除过期的数据。

TTL设置原则:

1
2
3
4
5
6
7
8
// 业务数据缓存 - 30分钟
stringRedisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);

// 热点数据缓存 - 1小时
stringRedisTemplate.opsForValue().set(key, value, 1, TimeUnit.HOURS);

// 配置数据缓存 - 24小时
stringRedisTemplate.opsForValue().set(key, value, 24, TimeUnit.HOURS);

TTL设计要点:

  • 热点数据:TTL较短,保证数据新鲜度
  • 冷数据:TTL较长,减少数据库压力
  • 业务数据:根据业务需求设置合理TTL

手动更新机制

在数据变更时主动删除或更新缓存,保证缓存与数据库的数据一致性。

更新时机:

  • 数据新增时:写入缓存
  • 数据修改时:删除缓存
  • 数据删除时:删除缓存

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 更新商户信息
public Result updateShop(Shop shop) {
Long id = shop.getId();

// 1. 更新数据库
updateById(shop);

// 2. 删除缓存(主动更新)
String key = CACHE_SHOP_KEY + id;
stringRedisTemplate.delete(key);

return Result.ok();
}

1653322506393

2.3.2 数据库缓存不一致解决方案

一致性问题分析

缓存数据源来自数据库,而数据库数据会发生变化。当数据库数据发生变化而缓存未同步时,就会产生一致性问题,用户使用过时数据会影响业务正确性。

三大解决方案对比:

旁路缓存模式(推荐)

缓存调用者在更新数据库后手动更新缓存,也称为双写方案。

实现流程:

1
2
3
4
5
6
查询操作:
缓存命中 → 直接返回
缓存未命中 → 查询数据库 → 写入缓存 → 返回结果

更新操作:
更新数据库 → 删除缓存

优点:

  • ✅ 实现简单,控制灵活
  • ✅ 数据一致性较好
  • ✅ 适合读多写少场景

缺点:

  • ❌ 需要开发者手动维护
  • ❌ 存在短暂不一致窗口

适用场景:

  • 读多写少的业务系统
  • 对数据一致性要求较高
  • 开发团队有较强技术能力

读写穿透模式

由缓存系统本身完成数据库与缓存的同步操作。

实现流程:

1
2
3
4
5
查询操作:
缓存未命中 → 缓存系统查询数据库 → 写入缓存 → 返回结果

更新操作:
更新缓存 → 缓存系统同步更新数据库

优点:

  • ✅ 对应用透明,简化开发
  • ✅ 缓存系统统一管理
  • ✅ 数据一致性较好

缺点:

  • ❌ 实现复杂,需要缓存系统支持
  • ❌ 性能开销较大
  • ❌ 强依赖缓存系统稳定性

适用场景:

  • 大型分布式系统
  • 有专业缓存中间件支持
  • 对开发简化要求较高

异步写回模式

调用者只操作缓存,其他线程异步处理数据库,实现最终一致性。

实现流程:

1
2
3
4
5
查询操作:
直接查询缓存

更新操作:
更新缓存 → 异步线程批量写入数据库

优点:

  • ✅ 写入性能极高
  • ✅ 数据库压力小
  • ✅ 适合写密集型应用

缺点:

  • ❌ 数据一致性差
  • ❌ 实现复杂度高
  • ❌ 可能丢失数据

适用场景:

  • 写操作远多于读操作
  • 对数据一致性要求不高
  • 允许短暂数据丢失

方案选择建议:

方案 一致性 性能 复杂度 适用场景
Cache Aside ⭐⭐⭐ ⭐⭐⭐ ⭐⭐ 读多写少
Read/Write Through ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐ 大型系统
Write Behind ⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ 写密集型

1653322857620

2.3.3 最佳实践方案选择

推荐方案:Cache Aside + 删除缓存策略

综合考虑一致性、性能和实现复杂度,推荐使用Cache Aside模式配合删除缓存策略。

核心设计原则:

删除缓存策略(推荐)

更新数据库时让缓存失效,查询时再更新缓存。

实现流程:

1
2
3
4
更新操作:
1. 更新数据库
2. 删除缓存
3. 等待下次查询时重新加载

优势分析:

  • 避免无效写操作:多次更新只需一次删除
  • 简化实现逻辑:无需考虑缓存数据格式转换
  • 降低并发问题:减少缓存与数据库不一致时间窗口

对比表格:

策略 写操作次数 实现复杂度 一致性风险 推荐度
更新缓存 每次更新都写缓存 高(需处理数据转换) 高(并发更新冲突) ⭐⭐
删除缓存 仅删除,查询时重建 低(简单删除) 低(重建时数据最新) ⭐⭐⭐⭐⭐

先操作数据库,再删除缓存(推荐)

确保数据库操作成功后再删除缓存,避免缓存删除后数据库更新失败的情况。

时序分析:

1
2
线程1:更新数据库 → 删除缓存
线程2:查询缓存未命中 → 查询数据库 → 写入缓存

并发安全性:

  • 即使线程2在线程1删除缓存后查询数据库,获取的也是最新的数据
  • 避免了"删除缓存→数据库更新失败"导致的数据不一致

对比分析:

操作顺序 并发风险 一致性保证 实现复杂度 推荐度
先删缓存再更新数据库 高(其他线程可能写入旧数据) 高(需额外锁机制) ⭐⭐
先更新数据库再删缓存 低(数据库已更新) ⭐⭐⭐⭐⭐

操作原子性保证

确保缓存与数据库操作同时成功或失败,避免中间状态。

单体系统事务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Transactional
public Result updateShop(Shop shop) {
try {
// 1. 更新数据库
shopMapper.updateById(shop);

// 2. 删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());

return Result.ok();
} catch (Exception e) {
// 事务回滚,数据库和缓存保持一致
throw new RuntimeException("更新失败", e);
}
}

分布式系统方案:

  • TCC模式:Try-Confirm-Cancel三阶段提交
  • 消息队列:通过消息确保最终一致性
  • 分布式锁:保证操作顺序性

最终推荐方案:

1
2
3
4
5
6
🎯 最佳实践组合:
1. 采用Cache Aside模式
2. 使用删除缓存策略
3. 先更新数据库再删除缓存
4. 单体系统使用事务保证
5. 分布式系统使用消息队列保证最终一致性

1653323595206

2.4 商户缓存双写一致性实现

2.4.1 实现思路

双写一致性要求

查询时:缓存未命中则查询数据库并写入缓存,设置合理过期时间
更新时:先更新数据库,再删除缓存,确保数据一致性

实现流程图:

1
2
3
4
5
6
7
8
9
10
11
查询流程:
用户请求 → 查询Redis缓存 → 命中?→ 是 → 返回缓存数据
↓ 否
查询MySQL数据库

写入Redis缓存(带TTL)

返回查询结果

更新流程:
用户请求 → 更新MySQL数据库 → 删除Redis缓存 → 返回更新结果

2.4.2 代码实现

ShopServiceImpl查询方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public Result queryShopById(Long id) {
// 1. 从Redis查询商户缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);

// 2. 判断缓存是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3. 存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 4. 缓存未命中,查询数据库
Shop shop = getById(id);

// 5. 数据库不存在,返回错误
if (shop == null) {
return Result.fail("店铺不存在!");
}

// 6. 存在,写入Redis缓存(设置30分钟过期时间)
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 7. 返回结果
return Result.ok(shop);
}

ShopServiceImpl更新方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
@Transactional
public Result updateShop(Shop shop) {
Long id = shop.getId();

// 1. 参数校验
if (id == null) {
return Result.fail("店铺id不能为空!");
}

// 2. 更新数据库
boolean updated = updateById(shop);
if (!updated) {
return Result.fail("店铺更新失败!");
}

// 3. 删除缓存(保证数据一致性)
String key = CACHE_SHOP_KEY + id;
stringRedisTemplate.delete(key);

// 4. 返回成功
return Result.ok();
}

ShopController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/shop")
public class ShopController {

@GetMapping("/{id}")
public Result queryShop(@PathVariable("id") Long id) {
// 查询商户(带缓存)
return shopService.queryShopById(id);
}

@PutMapping
public Result updateShop(@RequestBody Shop shop) {
// 更新商户(保证缓存一致性)
return shopService.updateShop(shop);
}
}

测试验证:

测试场景 预期结果 实际验证
首次查询 查询数据库,写入缓存
二次查询 直接返回缓存数据
数据更新 更新数据库,删除缓存
更新后查询 重新查询数据库,写入新缓存

性能对比:

操作类型 直接数据库 带缓存 性能提升
查询操作 150ms 5ms 30倍
更新操作 50ms 55ms 基本持平

1653325871232

代码分析:
通过采用删除缓存策略解决双写问题,当数据更新后删除缓存,后续查询会从MySQL加载最新数据并重新写入缓存,从而避免数据库和缓存不一致的问题。

1653325929549

2.5 缓存穿透问题解决

2.5.1 缓存穿透问题分析

缓存穿透定义

客户端请求的数据在缓存和数据库中都不存在,导致缓存永远无法生效,所有请求都直接打到数据库,可能引发数据库崩溃。

问题场景:

1
2
3
4
5
6
7
8
9
10
11
用户请求ID=99999的商品

查询Redis缓存(不存在)

查询MySQL数据库(不存在)

返回404错误

大量恶意请求重复此过程

数据库压力剧增,可能崩溃

风险特征:

  • 🔴 请求的数据在数据库中不存在
  • 🔴 缓存无法命中,失去保护作用
  • 🔴 大量请求直接访问数据库
  • 🔴 可能被恶意利用进行攻击

2.5.2 解决方案对比

两大核心解决方案:

实现原理

当数据库查询结果为空时,仍将空结果缓存到Redis,设置较短的过期时间。

实现流程:

1
2
用户请求 → 查询缓存未命中 → 查询数据库为空 → 缓存空值 → 返回结果
下次请求 → 查询缓存命中(空值)→ 直接返回,不再访问数据库

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Result queryShopById(Long id) {
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);

// 命中缓存
if (StrUtil.isNotBlank(shopJson)) {
if ("null".equals(shopJson)) { // 判断是否为缓存的空值
return Result.fail("店铺不存在!");
}
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 查询数据库
Shop shop = getById(id);

if (shop == null) {
// 缓存空值,设置2分钟过期
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}

// 缓存正常数据
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}

方案评估:

维度 评分 说明
实现复杂度 ⭐⭐ 简单,几行代码即可实现
内存消耗 ⭐⭐⭐ 需要额外存储空值,但可设置短TTL
防护效果 ⭐⭐⭐⭐ 有效阻止重复查询不存在数据
数据一致性 ⭐⭐⭐ 可能存在短暂不一致

实现原理

使用布隆过滤器在查询前进行预判,过滤掉明显不存在的数据请求。

数据结构:

  • 庞大的二进制位数组(BitSet)
  • 多个哈希函数
  • 内存占用极低

实现流程:

1
2
3
用户请求 → 布隆过滤器判断 → 不存在 → 直接返回
↓ 存在
查询Redis缓存 → 未命中 → 查询数据库

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class BloomFilterService {

@Resource
private StringRedisTemplate stringRedisTemplate;

// 初始化布隆过滤器
public void initBloomFilter() {
// 将所有商品ID添加到布隆过滤器
List<Long> allShopIds = shopMapper.selectAllIds();
for (Long id : allShopIds) {
addToBloomFilter("shop:bloom:", id);
}
}

// 添加到布隆过滤器
private void addToBloomFilter(String key, Long id) {
int[] offset = getBitOffset(id);
for (int i : offset) {
stringRedisTemplate.opsForValue().setBit(key, i, true);
}
}

// 判断是否存在
public boolean mightExist(String key, Long id) {
int[] offset = getBitOffset(id);
for (int i : offset) {
if (!stringRedisTemplate.opsForValue().getBit(key, i)) {
return false; // 一定不存在
}
}
return true; // 可能存在(存在误判)
}
}

方案评估:

维度 评分 说明
实现复杂度 ⭐⭐⭐⭐ 需要额外组件,算法复杂
内存消耗 ⭐⭐⭐⭐⭐ 极低的内存占用
防护效果 ⭐⭐⭐⭐⭐ 完美阻止不存在数据查询
误判率 ⭐⭐ 存在误判可能,需权衡配置

方案选择建议:

业务场景 推荐方案 理由
中小型系统 缓存空对象 实现简单,快速上线
大型高并发系统 布隆过滤器 内存优化,防护效果更好
数据量巨大 布隆过滤器 内存占用与数据量无关
开发资源有限 缓存空对象 维护成本低

1653326156516

2.6 缓存穿透编码实现

2.6.1 实现思路

原始问题

在原来的逻辑中,如果发现数据在MySQL中不存在,直接返回404,这样会导致缓存穿透问题,大量请求直接到达数据库。

优化思路:

1
2
3
4
5
原逻辑:
查询缓存未命中 → 查询数据库为空 → 直接返回404 → 缓存穿透风险

新逻辑:
查询缓存未命中 → 查询数据库为空 → 缓存空值 → 返回结果 → 下次直接命中缓存

2.6.2 代码实现

ShopServiceImpl缓存穿透防护:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public Result queryShopById(Long id) {
// 1. 参数校验
if (id == null || id <= 0) {
return Result.fail("店铺ID非法!");
}

// 2. 从Redis查询商户缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);

// 3. 判断缓存是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 4. 命中缓存,判断是否为null值(缓存穿透防护)
if ("null".equals(shopJson)) {
return Result.fail("店铺不存在!");
}
// 5. 正常数据,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 6. 缓存未命中,查询数据库
Shop shop = getById(id);

// 7. 数据库不存在(缓存穿透防护)
if (shop == null) {
// 8. 缓存空值,设置较短的过期时间(2分钟)
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}

// 9. 数据库存在,写入Redis缓存
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 10. 返回结果
return Result.ok(shop);
}

缓存穿透防护工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Component
public class CachePenetrationProtection {

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 安全的缓存查询方法
* @param key 缓存key
* @param type 返回类型
* @param dbFallback 数据库查询函数
* @param ttl 过期时间
* @param unit 时间单位
* @return 查询结果
*/
public <T> T safeQuery(String key, Class<T> type,
Supplier<T> dbFallback,
Long ttl, TimeUnit unit) {
// 1. 查询缓存
String json = stringRedisTemplate.opsForValue().get(key);

// 2. 命中缓存
if (StrUtil.isNotBlank(json)) {
if ("null".equals(json)) {
return null; // 缓存的空值
}
return JSONUtil.toBean(json, type);
}

// 3. 查询数据库
T result = dbFallback.get();

// 4. 缓存结果(包括空值)
if (result == null) {
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
} else {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(result), ttl, unit);
}

return result;
}
}

防护效果测试:

测试场景 请求参数 预期结果 数据库访问次数
正常查询 id=1 返回店铺信息 1次
不存在查询 id=99999 返回"店铺不存在" 1次
重复不存在查询 id=99999 返回"店铺不存在" 0次(命中缓存空值)
非法参数 id=-1 返回"店铺ID非法" 0次(参数校验拦截)

性能对比:

查询类型 无防护QPS 有防护QPS 数据库压力
不存在数据查询 1000(全部打到DB) 10000+(缓存拦截) 降低90%+

多重防护策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
@RequestMapping("/shop")
public class ShopController {

@GetMapping("/{id}")
public Result queryShop(@PathVariable("id") String idStr) {
// 1. 参数格式校验(第一层防护)
if (!idStr.matches("\\d+")) {
return Result.fail("店铺ID格式错误!");
}

Long id = Long.valueOf(idStr);

// 2. ID范围校验(第二层防护)
if (id > 1000000) { // 假设最大ID为100万
return Result.fail("店铺ID超出范围!");
}

// 3. 调用带缓存穿透防护的Service方法
return shopService.queryShopById(id);
}
}

1653327124561

2.6.3 总结与最佳实践

缓存穿透产生原因:

  • 用户请求的数据在缓存和数据库中都不存在
  • 大量请求不断发起,给数据库带来巨大压力

解决方案完整清单:

防护层级 解决方案 实现复杂度 防护效果
应用层 参数格式校验 基础防护
应用层 ID范围限制 ⭐⭐ 中级防护
缓存层 缓存空对象 ⭐⭐⭐ 核心防护
架构层 布隆过滤器 ⭐⭐⭐⭐⭐ 高级防护
系统层 限流熔断 ⭐⭐⭐⭐ 兜底防护

推荐组合:

  • 中小型系统:参数校验 + 缓存空对象
  • 大型系统:参数校验 + 布隆过滤器 + 缓存空对象
  • 超高并发:全部方案组合使用

2.7 缓存雪崩问题解决

2.7.1 缓存雪崩问题分析

缓存雪崩:在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求直接到达数据库,可能瞬间压垮数据库,造成系统级故障。

1653327884526
问题场景

1
用户请求 → 缓存失效 → 请求数据库 → 数据库压力过大 → 数据库崩溃 → 服务不可用

风险特征

  • 突发性:大量key在同一时间失效
  • 级联性:缓存失效→数据库压力→系统崩溃
  • 恢复难:系统恢复后可能再次雪崩

2.7.2 解决方案对比

过期时间随机化

实现原理
在基础TTL上添加随机值,分散key的过期时间

代码实现

1
2
3
4
5
// 原TTL为30分钟
int baseTtl = 30;
// 添加随机值,范围±5分钟
int randomTtl = baseTtl + ThreadLocalRandom.current().nextInt(-5, 6);
stringRedisTemplate.opsForValue().set(key, value, randomTtl, TimeUnit.MINUTES);

优势

  • 实现简单,成本低
  • 能有效分散过期时间
  • 对业务无侵入

劣势

  • 无法应对Redis宕机
  • 随机范围需要合理设置

适用场景
适合大多数业务场景,作为基础防护手段

Redis集群高可用

实现原理
通过主从复制、哨兵模式、集群模式提高Redis可用性

架构对比

方案 可用性 数据安全 复杂度 成本
主从复制 ★★☆ ★★☆ ★☆☆
哨兵模式 ★★★ ★★☆ ★★☆
Redis Cluster ★★★ ★★★ ★★★

配置示例(哨兵模式):

1
2
3
4
# sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000

优势

  • 提供故障自动转移
  • 数据安全性高
  • 服务可用性强

劣势

  • 架构复杂,维护成本高
  • 需要更多资源投入

适用场景
对可用性要求高的核心业务

降级限流策略

实现原理
在缓存失效时,通过降级和限流保护数据库

降级策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SentinelResource(value = "getShop", 
blockHandler = "handleBlock",
fallback = "handleFallback")
public Shop getShop(Long id) {
// 正常业务逻辑
return shopService.getById(id);
}

// 限流处理
public Shop handleBlock(Long id, BlockException ex) {
log.warn("接口被限流: {}", id);
return getDefaultShop(id); // 返回默认值或缓存数据
}

// 降级处理
public Shop handleFallback(Long id, Throwable ex) {
log.error("接口降级: {}", id, ex);
return getDefaultShop(id); // 返回兜底数据
}

限流算法

  • 令牌桶算法:平滑流量
  • 漏桶算法:固定速率处理
  • 计数器算法:简单直接

优势

  • 保护系统稳定性
  • 提供用户体验降级方案
  • 灵活配置

劣势

  • 可能损失部分功能
  • 需要合理设置阈值

适用场景
高并发、大数据量场景

多级缓存架构

架构设计

1
用户请求 → CDN缓存 → Nginx缓存 → 本地缓存 → Redis缓存 → 数据库

实现方案

缓存层级 技术选型 响应时间 容量 成本
CDN缓存 CloudFlare/阿里云CDN 10-50ms 巨大
Nginx缓存 proxy_cache 1-5ms 中等
本地缓存 Caffeine/Guava 0.1-1ms 极低
Redis缓存 Redis Cluster 1-10ms

本地缓存实现(Caffeine):

1
2
3
4
5
6
7
8
9
10
11
12
13
// 本地缓存配置
LoadingCache<String, Shop> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(key -> {
// 缓存未命中时从Redis加载
return getFromRedis(key);
});

public Shop queryShop(Long id) {
// 优先查询本地缓存
return localCache.get(CACHE_SHOP_KEY + id);
}

优势

  • 多层次保护,容错性强
  • 响应速度快,用户体验好
  • 各层缓存互补

劣势

  • 架构复杂,实现难度大
  • 数据一致性挑战
  • 运维成本高

适用场景
大型互联网应用,对性能和可用性要求极高

2.7.3 最佳实践方案

推荐方案:过期时间随机化 + Redis集群 + 本地缓存

方案组合

  1. 基础层:过期时间随机化(必做)
  2. 可用层:Redis哨兵模式或Cluster(推荐)
  3. 加速层:本地缓存(Caffeine/Guava)
  4. 保护层:降级限流(Sentinel)

实施步骤

  1. 为所有缓存key添加随机TTL
  2. 部署Redis哨兵模式
  3. 集成本地缓存框架
  4. 配置限流降级规则
  5. 监控和告警设置

效果预期

  • 缓存雪崩概率降低90%+
  • 系统可用性达到99.9%+
  • 响应时间提升50%+

2.8 缓存击穿问题解决

2.8.1 缓存击穿问题分析

缓存击穿:也叫热点Key问题,一个被高并发访问并且缓存重建业务较复杂的key突然失效,无数请求瞬间到达数据库,可能造成数据库崩溃。

问题场景

1
高并发请求 → 热点key失效 → 大量请求数据库 → 数据库压力激增 → 数据库崩溃

与缓存穿透的区别

  • 缓存穿透:查询不存在的数据
  • 缓存击穿:热点key突然失效
  • 缓存雪崩:大量key同时失效

2.8.2 解决方案对比

互斥锁方案

实现原理
通过分布式锁保证只有一个线程去重建缓存,其他线程等待
1653328288627
流程图

1
2
3
请求1:获取锁 → 查询数据库 → 重建缓存 → 释放锁 → 返回数据
请求2:等待锁 → 从缓存获取 → 返回数据
请求3:等待锁 → 从缓存获取 → 返回数据

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
String lockKey = LOCK_SHOP_KEY + id;

// 1. 查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}

// 2. 获取互斥锁
boolean isLock = tryLock(lockKey);
if (!isLock) {
// 3. 获取锁失败,休眠重试
Thread.sleep(50);
return queryWithMutex(id); // 递归重试
}

try {
// 4. 获取锁成功,再次检查缓存(double check)
shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, Shop.class);
}

// 5. 查询数据库并重建缓存
Shop shop = getById(id);
if (shop == null) {
// 防止缓存穿透
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return shop;

} finally {
// 6. 释放锁
unlock(lockKey);
}
}

优势

  • 数据一致性强
  • 实现相对简单
  • 无额外内存开销

劣势

  • 性能损耗(串行化)
  • 存在死锁风险
  • 用户体验差(等待)

适用场景
对数据一致性要求高的业务场景

逻辑过期方案

实现原理
不设置Redis过期时间,在value中存储逻辑过期时间,异步重建缓存
1653328663897
流程图

1
2
3
请求1:发现过期 → 获取锁 → 返回旧数据 + 异步重建
请求2:发现过期 → 等待锁 → 返回旧数据
请求3:发现重建完成 → 返回新数据

数据结构设计

1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime; // 逻辑过期时间
private Object data; // 实际数据
}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;

// 1. 查询缓存
String json = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isBlank(json)) {
return null; // 缓存未命中直接返回
}

// 2. 反序列化数据
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();

// 3. 判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 3.1 未过期,直接返回
return shop;
}

// 3.2 已过期,需要缓存重建
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);

if (isLock) {
// 4. 获取锁成功,异步重建缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 重建缓存
saveShop2Redis(id, 20L); // 20分钟逻辑过期时间
} catch (Exception e) {
log.error("缓存重建失败", e);
} finally {
unlock(lockKey);
}
});
}

// 5. 返回旧数据
return shop;
}

private void saveShop2Redis(Long id, Long expireSeconds) {
// 查询数据库
Shop shop = getById(id);

// 封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));

// 写入Redis(不设TTL)
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}

优势

  • 性能优秀(无需等待)
  • 用户体验好(立即响应)
  • 无死锁风险

劣势

  • 数据一致性弱(可能返回脏数据)
  • 实现复杂
  • 占用额外内存

适用场景
对性能要求高,能容忍短暂数据不一致的场景

2.8.3 方案选择建议

选择建议:根据业务场景和数据一致性要求选择合适的方案

对比分析

维度 互斥锁方案 逻辑过期方案
数据一致性 ★★★★★ ★★☆☆☆
性能表现 ★★☆☆☆ ★★★★★
实现复杂度 ★★☆☆☆ ★★★★☆
用户体验 ★★☆☆☆ ★★★★★
内存占用 ★★★★★ ★★☆☆☆
死锁风险 存在 不存在

业务场景推荐

  • 金融支付:互斥锁方案(强一致性)
  • 商品详情:逻辑过期方案(高性能)
  • 用户资料:互斥锁方案(数据重要)
  • 新闻资讯:逻辑过期方案(容忍延迟)

混合策略

1
2
3
4
5
6
7
public Shop queryShop(Long id, boolean requireConsistency) {
if (requireConsistency) {
return queryWithMutex(id); // 强一致性场景
} else {
return queryWithLogicalExpire(id); // 高性能场景
}
}

2.9 互斥锁方案实现

2.9.1 实现思路

核心思路:在缓存未命中时,通过分布式锁保证只有一个线程去查询数据库并重建缓存,其他线程等待锁释放后重试,避免大量请求同时打到数据库。

实现流程

1
查询缓存 → 缓存未命中 → 获取互斥锁 → 再次检查缓存 → 查询数据库 → 重建缓存 → 释放锁

设计要点

  • 双重检查:获取锁后再次检查缓存,防止重复查询数据库
  • 锁超时:设置合理的锁过期时间,防止死锁
  • 异常处理:确保锁最终能被释放
  • 重试机制:获取锁失败时适当休眠后重试

2.9.2 代码实现

分布式锁工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Component
public class RedisLockUtil {

@Resource
private StringRedisTemplate stringRedisTemplate;

private static final String LOCK_PREFIX = "lock:";
private static final long DEFAULT_TIMEOUT = 10; // 默认锁超时时间(秒)

/**
* 尝试获取分布式锁
*/
public boolean tryLock(String key, long timeout) {
String lockKey = LOCK_PREFIX + key;
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, Thread.currentThread().getId() + "", timeout, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

/**
* 释放分布式锁(安全释放)
*/
public boolean unlock(String key) {
String lockKey = LOCK_PREFIX + key;
try {
// 获取当前线程ID
String threadId = Thread.currentThread().getId() + "";
String value = stringRedisTemplate.opsForValue().get(lockKey);

// 确保释放的是自己持有的锁
if (threadId.equals(value)) {
return BooleanUtil.isTrue(stringRedisTemplate.delete(lockKey));
}
return false;
} catch (Exception e) {
log.error("释放锁失败: {}", lockKey, e);
return false;
}
}

/**
* 尝试获取锁并重试
*/
public boolean tryLockWithRetry(String key, long timeout, int maxRetry, long retryInterval) {
for (int i = 0; i < maxRetry; i++) {
if (tryLock(key, timeout)) {
return true;
}
if (i < maxRetry - 1) {
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
return false;
}
}

Service层完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@Service
@Slf4j
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedisLockUtil redisLockUtil;

private static final String CACHE_SHOP_KEY = "cache:shop:";
private static final String LOCK_SHOP_KEY = "lock:shop:";
private static final Long CACHE_SHOP_TTL = 30L;
private static final Long CACHE_NULL_TTL = 2L;

/**
* 使用互斥锁解决缓存击穿
*/
@Override
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;

// 1. 从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);

// 2. 缓存命中且不为空值
if (StrUtil.isNotBlank(shopJson)) {
log.debug("缓存命中: {}", key);
return JSONUtil.toBean(shopJson, Shop.class);
}

// 3. 判断是否为缓存穿透的空值
if (shopJson != null) {
log.debug("命中空值缓存: {}", key);
return null;
}

// 4. 缓存未命中,尝试获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
Shop shop = null;

try {
// 4.1 尝试获取锁(最多重试3次,间隔50ms)
boolean isLock = redisLockUtil.tryLockWithRetry(lockKey, 10L, 3, 50L);

if (!isLock) {
log.warn("获取锁失败,返回兜底数据: {}", lockKey);
return getDefaultShop(id); // 返回兜底数据
}

log.debug("获取锁成功: {}", lockKey);

// 4.2 双重检查(防止重复查询数据库)
shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
log.debug("双重检查命中缓存: {}", key);
return JSONUtil.toBean(shopJson, Shop.class);
}

// 4.3 查询数据库
log.debug("查询数据库: {}", id);
shop = getById(id);

// 5. 数据库中不存在(缓存穿透处理)
if (shop == null) {
log.debug("数据不存在,写入空值缓存: {}", key);
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

// 6. 写入缓存
log.debug("写入缓存: {}", key);
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);

} catch (Exception e) {
log.error("查询商户异常: {}", id, e);
throw new RuntimeException("查询商户失败", e);
} finally {
// 7. 释放锁
redisLockUtil.unlock(lockKey);
log.debug("释放锁: {}", lockKey);
}

return shop;
}

/**
* 兜底数据获取
*/
private Shop getDefaultShop(Long id) {
// 可以返回默认数据或从备用数据源获取
Shop defaultShop = new Shop();
defaultShop.setId(id);
defaultShop.setName("默认商户");
defaultShop.setTypeId(1L);
// 设置其他默认值...
return defaultShop;
}
}

Controller层调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@RestController
@RequestMapping("/shop")
@Api(tags = "商户管理")
public class ShopController {

@Resource
private IShopService shopService;

/**
* 根据ID查询商户(互斥锁方案)
*/
@GetMapping("/{id}")
@ApiOperation("根据ID查询商户")
public Result queryShop(@PathVariable("id") Long id) {
// 参数校验
if (id == null || id <= 0) {
return Result.fail("商户ID不能为空");
}

try {
// 使用互斥锁查询
Shop shop = shopService.queryWithMutex(id);

if (shop == null) {
return Result.fail("商户不存在");
}

return Result.ok(shop);

} catch (Exception e) {
log.error("查询商户失败: {}", id, e);
return Result.fail("查询商户失败,请稍后重试");
}
}
}

并发测试验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@SpringBootTest
@Slf4j
public class CacheBreakdownTest {

@Resource
private IShopService shopService;

/**
* 缓存击穿并发测试
*/
@Test
public void testCacheBreakdown() throws InterruptedException {
Long shopId = 1L;
int threadCount = 100; // 并发线程数
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger dbQueryCount = new AtomicInteger(0);

// 先清除缓存,模拟缓存失效场景
StringRedisTemplate stringRedisTemplate =
SpringContextUtil.getBean(StringRedisTemplate.class);
stringRedisTemplate.delete("cache:shop:" + shopId);

log.info("开始并发测试,线程数:{},商户ID:{}", threadCount, shopId);

long startTime = System.currentTimeMillis();

// 并发查询
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
Shop shop = shopService.queryWithMutex(shopId);
if (shop != null) {
successCount.incrementAndGet();
}
} catch (Exception e) {
log.error("查询异常", e);
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.currentTimeMillis();

log.info("并发测试完成");
log.info("总耗时:{}ms", (endTime - startTime));
log.info("成功查询数:{}", successCount.get());
log.info("QPS:{}", threadCount * 1000 / (endTime - startTime));

// 验证缓存是否已重建
String cachedData = stringRedisTemplate.opsForValue().get("cache:shop:" + shopId);
Assertions.assertNotNull(cachedData, "缓存应该已重建");
}
}

2.9.3 性能对比与最佳实践

性能表现:互斥锁方案在并发测试中表现优异,能有效防止缓存击穿问题

测试对比

方案 并发数 数据库查询次数 平均响应时间 数据一致性
无保护 100 100次 50ms 不一致
互斥锁 100 1次 80ms 强一致
逻辑过期 100 1次 10ms 最终一致

最佳实践

  1. 锁粒度控制:按业务维度加锁,避免全局锁
  2. 超时设置:锁超时时间要大于数据库查询时间
  3. 重试机制:获取锁失败时要有重试和兜底策略
  4. 监控告警:监控锁等待时间和获取成功率
  5. 性能优化:结合本地缓存减少Redis访问

2.10 逻辑过期方案实现

2.10.1 实现思路

核心思路:不设置Redis过期时间,在value中存储逻辑过期时间,当数据过期时异步重建缓存,立即返回旧数据,保证用户体验。

实现流程

1
查询缓存 → 命中数据 → 检查逻辑过期时间 → 未过期直接返回 → 过期则异步重建 → 返回旧数据

设计要点

  • 异步重建:开启独立线程进行缓存重建,不阻塞用户请求
  • 逻辑过期:在value中存储过期时间,不依赖Redis TTL
  • 锁控制:通过分布式锁保证只有一个线程进行重建
  • 线程池管理:使用线程池管理异步任务,避免线程爆炸

2.10.2 代码实现

逻辑过期数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Redis逻辑过期数据结构
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RedisData {
/**
* 逻辑过期时间
*/
private LocalDateTime expireTime;

/**
* 实际业务数据
*/
private Object data;

/**
* 快速构建方法
*/
public static RedisData of(Object data, Long expireSeconds) {
RedisData redisData = new RedisData();
redisData.setData(data);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
return redisData;
}

/**
* 判断是否过期
*/
public boolean isExpired() {
return expireTime.isBefore(LocalDateTime.now());
}
}

缓存预热工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Component
@Slf4j
public class CachePreheatUtil {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private IShopService shopService;

private static final String CACHE_SHOP_KEY = "cache:shop:";
private static final Long LOGICAL_EXPIRE_TIME = 20L; // 逻辑过期时间(秒)

/**
* 预热指定商户数据
*/
public void preheatShop(Long shopId, Long expireSeconds) {
String key = CACHE_SHOP_KEY + shopId;

try {
// 查询数据库
Shop shop = shopService.getById(shopId);
if (shop == null) {
log.warn("商户不存在,跳过预热: {}", shopId);
return;
}

// 封装逻辑过期数据
RedisData redisData = RedisData.of(shop, expireSeconds);

// 写入Redis(不设TTL)
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));

log.info("缓存预热成功: {}, 过期时间: {}秒", key, expireSeconds);

} catch (Exception e) {
log.error("缓存预热失败: {}", shopId, e);
}
}

/**
* 批量预热热门商户
*/
public void preheatHotShops(List<Long> shopIds, Long expireSeconds) {
log.info("开始批量预热热门商户,数量: {}", shopIds.size());

for (Long shopId : shopIds) {
try {
preheatShop(shopId, expireSeconds);
// 适当休眠,避免对数据库造成压力
Thread.sleep(100);
} catch (Exception e) {
log.error("预热商户失败: {}", shopId, e);
}
}

log.info("批量预热完成");
}
}

Service层完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
@Service
@Slf4j
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedisLockUtil redisLockUtil;

private static final String CACHE_SHOP_KEY = "cache:shop:";
private static final String LOCK_SHOP_KEY = "lock:shop:";
private static final Long LOGICAL_EXPIRE_TIME = 20L; // 逻辑过期时间(秒)

// 缓存重建线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
r -> {
Thread thread = new Thread(r);
thread.setName("cache-rebuild-" + thread.getId());
thread.setDaemon(true);
return thread;
}
);

/**
* 逻辑过期方案解决缓存击穿
*/
@Override
public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;

// 1. 从Redis查询缓存
String json = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isBlank(json)) {
log.debug("缓存未命中,直接返回null: {}", key);
return null;
}

// 2. 命中缓存,反序列化数据
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);

// 3. 检查是否过期
if (!redisData.isExpired()) {
log.debug("缓存未过期,直接返回: {}", key);
return shop;
}

// 4. 缓存已过期,需要重建
log.debug("缓存已过期,开始异步重建: {}", key);
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = redisLockUtil.tryLock(lockKey, 10L);

if (isLock) {
// 5. 获取锁成功,提交异步重建任务
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
log.info("开始异步重建缓存: {}", key);
saveShop2Redis(id, LOGICAL_EXPIRE_TIME);
log.info("异步重建缓存完成: {}", key);
} catch (Exception e) {
log.error("异步重建缓存失败: {}", key, e);
} finally {
redisLockUtil.unlock(lockKey);
}
});
}

// 6. 返回旧数据(无论是否获取到锁)
log.debug("返回旧数据: {}", key);
return shop;
}

/**
* 保存商户到Redis(逻辑过期)
*/
public void saveShop2Redis(Long id, Long expireSeconds) {
try {
// 查询数据库
Shop shop = getById(id);
if (shop == null) {
log.warn("商户不存在,跳过缓存重建: {}", id);
return;
}

// 封装逻辑过期数据
RedisData redisData = RedisData.of(shop, expireSeconds);

// 写入Redis(不设TTL)
String key = CACHE_SHOP_KEY + id;
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));

log.debug("逻辑过期缓存重建成功: {}, 过期时间: {}秒", key, expireSeconds);

} catch (Exception e) {
log.error("逻辑过期缓存重建失败: {}", id, e);
throw new RuntimeException("缓存重建失败", e);
}
}
}

并发测试验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@SpringBootTest
@Slf4j
public class LogicalExpireTest {

@Resource
private IShopService shopService;

@Resource
private CachePreheatUtil cachePreheatUtil;

@Resource
private StringRedisTemplate stringRedisTemplate;

/**
* 逻辑过期方案性能测试
*/
@Test
public void testLogicalExpirePerformance() throws InterruptedException {
Long shopId = 1L;
int threadCount = 100;
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger rebuildCount = new AtomicInteger(0);

// 1. 预热缓存(设置为已过期状态)
cachePreheatUtil.preheatShop(shopId, -1L); // 设置负值,表示已过期

log.info("开始逻辑过期性能测试,线程数: {},商户ID: {}", threadCount, shopId);

long startTime = System.currentTimeMillis();

// 2. 并发查询
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
long threadStart = System.currentTimeMillis();
Shop shop = shopService.queryWithLogicalExpire(shopId);
long threadEnd = System.currentTimeMillis();

if (shop != null) {
successCount.incrementAndGet();
log.debug("线程{}查询成功,耗时: {}ms", Thread.currentThread().getId(), (threadEnd - threadStart));
}
} catch (Exception e) {
log.error("查询异常", e);
} finally {
latch.countDown();
}
}).start();
}

latch.await();
long endTime = System.currentTimeMillis();

// 3. 等待异步重建完成
Thread.sleep(2000);

// 4. 验证结果
String cachedData = stringRedisTemplate.opsForValue().get("cache:shop:" + shopId);
Assertions.assertNotNull(cachedData, "缓存应该已重建");

RedisData redisData = JSONUtil.toBean(cachedData, RedisData.class);
Assertions.assertFalse(redisData.isExpired(), "重建后的缓存应该未过期");

log.info("逻辑过期测试完成");
log.info("总耗时: {}ms", (endTime - startTime));
log.info("成功查询数: {}", successCount.get());
log.info("QPS: {}", threadCount * 1000 / (endTime - startTime));
log.info("平均响应时间: {}ms", (endTime - startTime) / threadCount);
}

/**
* 测试缓存重建触发
*/
@Test
public void testCacheRebuild() throws InterruptedException {
Long shopId = 2L;

// 1. 预热缓存(设置为即将过期)
cachePreheatUtil.preheatShop(shopId, 1L); // 1秒后过期

// 2. 等待缓存过期
Thread.sleep(1500);

// 3. 触发查询(应该触发异步重建)
Shop shop = shopService.queryWithLogicalExpire(shopId);
Assertions.assertNotNull(shop, "应该返回旧数据");

// 4. 等待异步重建
Thread.sleep(2000);

// 5. 验证重建结果
String cachedData = stringRedisTemplate.opsForValue().get("cache:shop:" + shopId);
RedisData redisData = JSONUtil.toBean(cachedData, RedisData.class);

Assertions.assertFalse(redisData.isExpired(), "缓存应该已重建");
log.info("缓存重建验证成功");
}
}

2.10.3 性能对比与最佳实践

性能表现:逻辑过期方案在高并发场景下表现优异,平均响应时间比互斥锁方案提升80%+

性能对比

指标 互斥锁方案 逻辑过期方案 提升幅度
平均响应时间 80ms 12ms ↓85%
并发处理能力 中等 优秀 ↑300%
数据一致性 强一致 最终一致 -
用户体验 有等待 无等待 ↑100%
实现复杂度 简单 复杂 -
内存占用 -

最佳实践

  1. 预热策略:系统启动时预热热门数据,避免冷启动问题
  2. 过期时间:根据业务特点设置合理的逻辑过期时间
  3. 线程池配置:根据服务器性能合理配置线程池大小
  4. 监控告警:监控缓存命中率和重建频率
  5. 降级策略:异步重建失败时要有降级方案

适用场景

  • 读多写少的业务场景
  • 对性能要求极高的应用
  • 能容忍短暂数据不一致的业务
  • 高并发查询的热点数据

2.11 Redis工具类封装

2.11.1 需求分析

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

基础功能

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题

查询功能

  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

2.11.2 代码实现

CacheClient工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
@Component
public class CacheClient {

private final StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* 设置缓存(TTL过期)
*/
public void set(String key, Object value, Long time, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
}

/**
* 设置缓存(逻辑过期)
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}

/**
* 查询缓存(缓存穿透保护)
*/
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;

// 1.从redis查询缓存
String json = stringRedisTemplate.opsForValue().get(key);

// 2.缓存命中
if (StrUtil.isNotBlank(json)) {
return JSONUtil.toBean(json, type);
}

// 3.命中空值(缓存穿透保护)
if (json != null) {
return null;
}

// 4.缓存未命中,查询数据库
R r = dbFallback.apply(id);

// 5.数据库中不存在
if (r == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

// 6.数据库中存在,写入缓存
this.set(key, r, time, unit);
return r;
}

逻辑过期查询方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 查询缓存(逻辑过期方案)
*/
public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;

// 1.从redis查询缓存
String json = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isBlank(json)) {
return null;
}

// 2.命中,反序列化数据
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();

// 3.判断是否过期
if(expireTime.isAfter(LocalDateTime.now())) {
return r; // 未过期,直接返回
}

// 4.已过期,需要缓存重建
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);

if (isLock) {
// 5.获取锁成功,异步重建缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
R newR = dbFallback.apply(id);
this.setWithLogicalExpire(key, newR, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}

// 6.返回旧数据
return r;
}

互斥锁查询方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 查询缓存(互斥锁方案)
*/
public <R, ID> R queryWithMutex(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;

// 1.从redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopJson)) {
return JSONUtil.toBean(shopJson, type);
}

// 2.命中空值(缓存穿透保护)
if (shopJson != null) {
return null;
}

// 3.实现缓存重建
String lockKey = LOCK_SHOP_KEY + id;
R r = null;
try {
boolean isLock = tryLock(lockKey);
if (!isLock) {
Thread.sleep(50); // 获取锁失败,休眠并重试
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
}

// 4.获取锁成功,查询数据库
r = dbFallback.apply(id);
if (r == null) {
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}

// 5.写入缓存
this.set(key, r, time, unit);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
return r;
}

锁工具方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    /**
* 尝试获取分布式锁
*/
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

/**
* 释放分布式锁
*/
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}

2.11.3 使用示例

ShopServiceImpl中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Resource
private CacheClient cacheClient;

@Override
public Result queryById(Long id) {
// 解决缓存穿透
Shop shop = cacheClient
.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 互斥锁解决缓存击穿
// Shop shop = cacheClient
// .queryWithMutex(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);

// 逻辑过期解决缓存击穿
// Shop shop = cacheClient
// .queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, 20L, TimeUnit.SECONDS);

if (shop == null) {
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}

三种方案对比

方案 适用场景 优点 缺点
缓存穿透保护 数据不存在场景 实现简单,有效防止穿透 有短暂空值缓存
互斥锁方案 强一致性要求 数据一致性强 性能较差,有等待
逻辑过期方案 高性能要求 性能优秀,无等待 实现复杂,可能返回旧数据

2.11.4 最佳实践总结

推荐使用:CacheClient工具类封装了三种缓存策略,可根据业务场景灵活选择

选择建议

  1. 常规查询:优先使用queryWithPassThrough(缓存穿透保护)
  2. 热点数据:使用queryWithLogicalExpire(逻辑过期方案)
  3. 关键数据:使用queryWithMutex(互斥锁方案)

使用优势

  • ✅ 代码复用性高,避免重复实现
  • ✅ 策略灵活,可根据业务场景选择
  • ✅ 封装完善,包含异常处理和日志
  • ✅ 性能优化,支持异步重建和双重检查

3. 优惠券秒杀

3.1 redis实现全局唯一ID

3.1.1 全局唯一ID生成

每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到tb_voucher_order表中。

数据库自增ID存在的问题

问题 描述 影响
规律性明显 ID按顺序递增,容易被猜测 泄露业务数据
单表限制 MySQL单表容量不宜超过500W 制约业务扩展
分库分表困难 分布式环境下ID冲突 系统复杂度增加

全局ID生成器要求

全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具

特性 要求 实现思路
唯一性 全局唯一 时间戳+序列号
高可用 99.99%可用 Redis集群
高性能 10W+QPS 内存操作
递增性 趋势递增 时间戳保证
安全性 无规则 拼接随机位

ID结构设计

ID组成结构:符号位 + 时间戳 + 序列号,总计64位

1
2
3
4
┌──符号位──┬───────时间戳───────┬───────序列号───────┐
│ 1bit │ 31bit │ 32bit │
│ 永远0 │ 秒级时间戳(69年) │ 秒内计数器(2^32) │
└──────────┴─────────────────┴─────────────────┘

设计优势

  • 符号位:1bit,永远为0,保证ID为正数
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
  • 安全性:不直接使用Redis自增数值,避免泄露业务量

3.1.2 Redis实现方案

RedisIdWorker工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class RedisIdWorker {
/**
* 开始时间戳 - 2022-01-01 00:00:00
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

/**
* 生成全局唯一ID
* @param keyPrefix 业务前缀
* @return 全局唯一ID
*/
public long nextId(String keyPrefix) {
// 1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;

// 2. 生成序列号 - Redis自增
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long count = stringRedisTemplate.opsForValue()
.increment("icr:" + keyPrefix + ":" + date);

// 3. 拼接并返回
return timestamp << COUNT_BITS | count;
}
}

实现原理解析

步骤 操作 说明
1 生成时间戳 当前时间 - 起始时间(2022-01-01)
2 生成序列号 Redis自增,按天分组避免溢出
3 位运算拼接 timestamp << 32 | count

关键设计

  • 时间戳基准:使用2022-01-01作为起始时间,支持69年使用期
  • 按天分片:Redis key包含日期,避免自增数值过大
  • 位运算优化:使用位移和或运算快速拼接ID
  • 线程安全:Redis自增操作保证并发安全

性能测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("test");
System.out.println("id = " + id);
}
latch.countDown();
};

long begin = System.currentTimeMillis();
// 300个线程,每个线程生成100个ID
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}

测试结果

  • 🚀 生成速度:3万个ID仅需几百毫秒
  • 🚀 并发安全:多线程环境下无重复ID
  • 🚀 趋势递增:ID整体呈递增趋势

3.1.3 方案对比总结

Redis方案 vs 其他方案

方案 实现复杂度 性能 可用性 适用场景
Redis方案 ⭐⭐ 简单 ⭐⭐⭐⭐⭐ 10W+QPS ⭐⭐⭐⭐ 主从架构 中小型系统
雪花算法 ⭐⭐⭐ 中等 ⭐⭐⭐⭐⭐ 更高 ⭐⭐ 依赖时钟 大型分布式系统
数据库自增 ⭐ 最简单 ⭐⭐ 千级QPS ⭐⭐ 单点故障 单机系统
UUID ⭐ 简单 ⭐⭐⭐ 中等 ⭐⭐⭐⭐⭐ 完全分布式 对顺序无要求场景

3.2 优惠券管理

3.2.1 优惠券类型设计

优惠券业务模型

类型 特点 使用场景 表结构
普通券 优惠力度小,任意领取 日常促销 tb_voucher
秒杀券 优惠力度大,限时限量 引流获客 tb_voucher + tb_seckill_voucher

表结构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 优惠券基础信息表
tb_voucher (
id bigint PRIMARY KEY, -- 优惠券ID
shop_id bigint, -- 商铺ID
title varchar(255), -- 优惠券标题
sub_title varchar(255), -- 副标题
rules varchar(1024), -- 使用规则
pay_value bigint, -- 支付金额(分)
actual_value bigint, -- 抵扣金额(分)
type tinyint, -- 类型:1-普通券,2-秒杀券
status tinyint, -- 状态:1-上架,2-下架,3-过期
create_time datetime, -- 创建时间
update_time datetime -- 更新时间
);

-- 秒杀优惠券扩展表
tb_seckill_voucher (
voucher_id bigint PRIMARY KEY, -- 优惠券ID
stock int, -- 库存
create_time datetime, -- 创建时间
begin_time datetime, -- 开始抢购时间
end_time datetime -- 结束抢购时间
);

3.2.2 优惠券发布实现

Controller层接口

1
2
3
4
5
6
7
8
9
10
/**
* 新增普通券
* @param voucher 优惠券信息
* @return 优惠券id
*/
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}

Controller层接口

1
2
3
4
5
6
7
8
9
10
/**
* 新增秒杀券
* @param voucher 优惠券信息,包含秒杀信息
* @return 优惠券id
*/
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}

Service层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 1. 保存优惠券基础信息
save(voucher);

// 2. 保存秒杀扩展信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);

// 3. 同步库存到Redis
stringRedisTemplate.opsForValue()
.set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

设计要点

  • 事务控制:使用@Transactional保证数据一致性
  • Redis同步:将库存信息缓存到Redis,提升查询性能
  • 时间窗口:设置秒杀开始和结束时间
  • 库存隔离:秒杀券有独立库存管理

用POSTMAN模拟发送请求来新增秒杀券,请求路径http://localhost:8081/voucher/seckill, 请求方式POST,JSON数据如下

1
2
3
4
5
6
7
8
9
10
11
12
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五可用",
"rules":"全场通用\\n无需预约\\n可无限叠加",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2022-01-01T00:00:00",
"endTime":"2022-10-31T23:59:59"
}

3.3 秒杀下单实现

3.3.1 业务需求分析

秒杀下单核心逻辑

1
2
3
4
5
6
7
8
9
10
11
graph TD
A[用户点击抢购] --> B[查询优惠券信息]
B --> C{秒杀是否开始?}
C -->|未开始| D[返回失败:秒杀尚未开始]
C -->|已开始| E{秒杀是否结束?}
E -->|已结束| F[返回失败:秒杀已结束]
E -->|进行中| G{库存是否充足?}
G -->|不足| H[返回失败:库存不足]
G -->|充足| I[扣减库存]
I --> J[创建订单]
J --> K[返回订单ID]

下单校验规则

校验项 判断条件 失败提示
秒杀时间 beginTime > 当前时间 秒杀尚未开始!
秒杀时间 endTime < 当前时间 秒杀已结束!
库存检查 stock < 1 库存不足!

3.3.2 基础实现方案

VoucherController

1
2
3
4
5
6
7
8
9
10
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Autowired
private IVoucherOrderService voucherOrderService;
@PostMapping("/seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherOrderService.seckillVoucher(voucherId);
}
}

VoucherOrderService

1
2
3
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
}

VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Autowired
private ISeckillVoucherService seckillVoucherService;

@Autowired
private RedisIdWorker redisIdWorker;

@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}

// 3. 判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}

// 4. 判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}

// 5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();

if (!success) {
return Result.fail("库存不足!");
}

// 6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单ID
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2 用户ID
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3 代金券ID
voucherOrder.setVoucherId(voucherId);

// 7. 保存订单
save(voucherOrder);

// 8. 返回订单ID
return Result.ok(orderId);
}

当前方案存在的问题

高并发下的问题分析

问题 现象 原因 影响
超卖问题 库存为负 并发扣减库存 商家损失
重复下单 同一用户多单 并发创建订单 用户体验差
性能瓶颈 响应慢 数据库压力大 系统崩溃

问题根因

  • 无并发控制:多个线程同时扣减库存
  • 无幂等保障:同一用户可重复下单
  • 数据库压力大:所有操作都走数据库

3.4 库存超卖问题分析

并发场景下的超卖问题

假设库存为1,同时有100个线程并发下单:

1
2
3
4
5
时间点 | 线程1 | 线程2 | 线程3 | ... | 库存
-------|-------|-------|-------|-----|-----
t1 | 查询库存=1 | 查询库存=1 | 查询库存=1 | ... | 1
t2 | 判断库存>0✅ | 判断库存>0✅ | 判断库存>0✅ | ... | 1
t3 | 扣减库存 | 扣减库存 | 扣减库存 | ... | 1→0→-1→-2...

问题本质

  • 读-改-写操作非原子性
  • 并发控制缺失导致数据竞争
  • 库存校验库存扣减分离

悲观锁 vs 乐观锁

悲观锁:假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作

乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候再去判断有没有其他线程对数据进行了修改

方案 实现方式 优点 缺点 适用场景
悲观锁 synchronized/ReentrantLock 简单易理解 性能差,阻塞严重 并发量低的场景
乐观锁 版本号机制/CAS 性能好,无阻塞 实现复杂,ABA问题 并发量高的场景

乐观锁实现原理

1
2
3
4
5
6
7
8
9
// 版本号机制
UPDATE tb_seckill_voucher
SET stock = stock - 1, version = version + 1
WHERE voucher_id = ? AND version = ?

// CAS机制(库存大于0)
UPDATE tb_seckill_voucher
SET stock = stock - 1
WHERE voucher_id = ? AND stock > 0

乐观锁方案演进

方案一:版本号控制(成功率低)

1
2
3
4
5
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.eq("stock", voucher.getStock()) // 版本号校验
.update();

问题分析

  • 成功率极低:100个线程同时拿到stock=100,只有1个能成功
  • 重试压力大:失败线程需要重新查询版本号再重试
  • 用户体验差:大量请求返回失败

方案二:库存大于0控制(推荐)

1
2
3
4
5
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0) // 只需保证库存大于0
.update();

优势分析

  • 成功率高:只要还有库存就能成功
  • 实现简单:无需维护版本号
  • 性能优秀:无自旋重试,一次操作完成

3.5 一人一单问题

需求分析

业务规则:同一个优惠券,一个用户只能下一单

问题背景

  • 🎯 营销目的:优惠券用于引流获客,需要控制成本
  • 🎯 公平性:防止黄牛党恶意刷单
  • 🎯 用户体验:让更多用户享受到优惠

实现思路

1
用户下单前 → 查询该用户是否已购买 → 已购买则拒绝 → 未购买则允许下单

并发场景下的问题

假设用户A同时发起5个请求:

1
2
3
4
5
时间点 | 请求1 | 请求2 | 请求3 | 请求4 | 请求5
-------|-------|-------|-------|-------|------
t1 | 查询订单=0✅ | 查询订单=0✅ | 查询订单=0✅ | 查询订单=0✅ | 查询订单=0✅
t2 | 创建订单 | 创建订单 | 创建订单 | 创建订单 | 创建订单
t3 | 成功 | 成功 | 成功 | 成功 | 成功

问题本质

  • 查询-判断-创建操作非原子性
  • 无并发控制导致重复下单
  • 数据库唯一约束无法防止并发插入

悲观锁解决方案

核心思路:对用户ID加锁,保证同一用户并发请求串行处理

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}

// 3. 判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}

// 4. 判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}

// 5. 一人一单控制
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 创建订单(包含一人一单校验扣减库存逻辑)
return createVoucherOrder(voucherId);
}
}

private Result createVoucherOrder(Long voucherId) {
// 一人一单逻辑
Long userId = UserHolder.getUser().getId();
int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
if (count > 0) {
return Result.fail("你已经抢过优惠券了哦");
}
//5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
return Result.fail("库存不足");
}
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1 设置订单id
long orderId = redisIdWorker.nextId("order");
//6.2 设置用户id
Long id = UserHolder.getUser().getId();
//6.3 设置代金券id
voucherOrder.setVoucherId(voucherId);
voucherOrder.setId(orderId);
voucherOrder.setUserId(id);
//7. 将订单数据保存到表中
save(voucherOrder);
//8. 返回订单id
return Result.ok(orderId);
}

关键设计

  • 锁粒度:按用户ID加锁,不同用户互不影响
  • 字符串常量池:使用intern()确保同一把锁
  • 事务边界:锁要包裹整个事务操作

事务与锁的协同机制

核心问题:Spring事务与JVM锁的生命周期不一致

问题根源
直接通过this调用同类方法会导致Spring事务失效,因为事务是通过AOP代理机制实现的。Spring的事务管理基于动态代理,只有通过代理对象调用的方法才能被事务拦截器处理。

生命周期冲突

1
2
3
4
5
6
@Transactional
public Result method() {
synchronized(lock) {
// 业务操作
} // 🔒锁已释放,但事务还未提交
} // 💥事务提交,但其他线程可能已获取锁并读取到脏数据

解决方案:获取Spring代理对象确保事务生效
使用AopContext.currentProxy()获取当前代理对象,通过代理对象调用事务方法,确保锁在事务提交后才释放。

技术实现要点

  1. 启用暴露代理:@EnableAspectJAutoProxy(exposeProxy = true)
  2. 接口定义:在IVoucherOrderService接口中声明createVoucherOrder方法
  3. 代理调用:通过代理对象调用确保事务拦截器生效

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Result seckillVoucher(Long voucherId) {
// 前置校验方案

Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取代理对象,确保事务生效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}

@Transactional
public Result createVoucherOrder(Long voucherId) {
// 事务操作
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

引入配置和依赖

1
2
3
4
5
// 引入aspectjweaver依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
// 在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}

}

3.6 集群环境下的并发问题

集群模式下的新问题

JVM锁失效:集群环境下,每个JVM实例有自己的锁

场景复现

1
2
用户A的请求 → Nginx负载均衡 → 8081端口(JVM锁生效)
用户A的请求 → Nginx负载均衡 → 8082端口(JVM锁失效)

问题本质

  • JVM锁作用域:只在单个JVM实例内有效
  • 负载均衡:同一用户的请求可能分发到不同实例
  • 分布式环境:需要分布式锁解决方案

分布式锁核心要求

分布式锁:在分布式系统中,所有节点共享的锁机制

特性 要求 实现方案
互斥性 同一时间只有一个客户端能获取锁 Redis SETNX
安全性 锁只能被持有者释放 唯一标识 + Lua脚本
死锁避免 锁必须有超时时间 Redis EXPIRE
可用性 高可用的锁服务 Redis集群
可重入性 同一客户端可重复获取锁 ThreadLocal + 计数器

Redis分布式锁实现

基本命令

1
2
3
4
5
6
7
8
9
# 获取锁(SETNX + EXPIRE)
SET lock_key unique_value NX EX 30

# 释放锁(Lua脚本保证原子性)
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end

实现优势

  • 高性能:Redis内存操作,10W+QPS
  • 高可用:Redis主从架构,故障自动切换
  • 易实现:命令简单,客户端支持好
  • 可扩展:支持RedLock算法,多Redis实例

有关锁失效原因分析:这就是集群环境下,syn锁失效的原因,在这种情况下,我们需要使用分布式锁来解决这个问题,让锁不存在于每个jvm的内部,而是让所有jvm公用外部的一把锁(Redis)

4. 分布式锁

4.1 分布式锁概述

什么是分布式锁

分布式锁:在分布式系统或集群模式下,多个进程可见且互斥的锁机制

核心思想:所有节点使用同一把锁,确保程序串行执行

与JVM锁的区别

1
2
JVM锁:只在单个JVM实例内有效(synchronized、ReentrantLock)
分布式锁:在多个节点间共享,所有实例都能感知锁状态

分布式锁必备特性

特性 说明 实现方案
互斥性 同一时间只有一个客户端能获取锁 Redis SETNX
可见性 所有节点都能感知锁状态变化 Redis发布订阅
高可用 锁服务不易崩溃,故障可恢复 Redis集群/哨兵
高性能 加锁/解锁操作响应快 内存操作
安全性 锁只能被持有者释放 唯一标识验证
死锁避免 锁必须有超时时间 TTL过期机制

常见分布式锁实现

三种主流方案对比

方案 优点 缺点 适用场景
MySQL 实现简单,事务支持 性能差,锁表风险 低频操作
Redis 高性能,10W+QPS 需要处理锁超时 高频并发
Zookeeper 强一致性,Watch机制 实现复杂,性能一般 强一致性要求

企业级选择

  • Redis:99%场景的首选(性能+可用性平衡)
  • Redisson:Java生态最成熟的分布式锁框架

4.2 Redis分布式锁实现

Redis锁实现原理

获取锁

1
2
# 原子性操作:SETNX + EXPIRE
SET lock_key unique_value NX EX 30

参数说明

  • NX:key不存在时才设置(互斥性)
  • EX 30:30秒自动过期(死锁避免)
  • unique_value:线程唯一标识(安全性)

释放锁

1
2
3
4
5
6
-- 原子性脚本:先验证再删除
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end

SimpleRedisLock实现

锁接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁的超时时间(秒)
* @return true-获取成功,false-获取失败
*/
boolean tryLock(long timeoutSec);

/**
* 释放锁
*/
void unlock();
}

核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleRedisLock implements ILock {
//锁的前缀
private static final String KEY_PREFIX = "lock:";
//具体业务名称,将前缀和业务名拼接之后当做Key
private String name;
//这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
//自动拆箱可能会出现null,这样写更稳妥
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
//通过DEL来删除锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

秒杀业务改造

集成分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public Result seckillVoucher(Long voucherId) {
// ... 前置校验代码 ...

Long userId = UserHolder.getUser().getId();

// 创建分布式锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

// 获取锁
boolean isLock = lock.tryLock(1200);
if (!isLock) {
return Result.fail("不允许重复下单");
}

try {
// 获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}
}

使用Jmeter进行压力测试,请求头中携带登录用户的token,最终只能抢到一张优惠券

锁误删问题分析

1653385920025

问题场景:线程阻塞导致锁超时,其他线程获取锁后,原线程恢复误删锁

问题复现

1
2
3
4
5
6
时间线:
t1: 线程A获取锁(锁超时30s)
t2: 线程A业务阻塞(超过30s)
t3: 锁自动过期释放
t4: 线程B获取锁成功
t5: 线程A恢复,执行删锁操作(误删线程B的锁)

解决方案

  • 唯一标识:每个线程使用不同的标识
  • 验证机制:删除前验证锁的持有者
  • 原子操作:使用Lua脚本保证验证+删除的原子性

4.3 分布式锁演进

版本一:基础实现

核心功能:SETNX + EXPIRE 原子操作

问题:释放锁时可能误删其他线程的锁

1
2
3
4
5
6
// 获取锁(原子性)
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);

// 释放锁(非原子性)
stringRedisTemplate.delete(KEY_PREFIX + name);

版本二:线程标识

改进:增加线程唯一标识,防止误删

实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SimpleRedisLock implements ILock {
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

private String name;
private StringRedisTemplate stringRedisTemplate;

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程唯一标识
String threadId = ID_PREFIX + Thread.currentThread().getId();

// 获取锁(原子性操作)
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);

return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();

// 获取锁中的标识
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

// 验证是否为自己的锁
if (threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

新问题验证和删除非原子性,仍存在竞态条件

版本三:Lua脚本

最终方案:Lua脚本保证验证+删除的原子性

Lua脚本实现

1
2
3
4
5
6
-- 释放锁脚本(unlock.lua)
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end

Java调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}

4.4 Lua脚本详解

Redis Lua脚本基础

Lua脚本:在Redis服务器端执行的脚本,保证多条命令原子性

基本语法

1
2
3
4
5
6
7
8
9
10
-- Redis命令调用
redis.call('命令名称', 'key', '其它参数', ...)

-- 示例:set name jack
redis.call('set', 'name', 'jack')

-- 示例:先set再get
redis.call('set', 'name', 'Rose')
local name = redis.call('get', 'name')
return name

参数传递

  • KEYS数组:接收key类型参数
  • ARGV数组:接收其他参数

Redis脚本调用

命令行调用

1
2
3
4
5
# 执行脚本
EVAL "redis.call('set', 'name', 'jack')" 0

# 带参数调用
EVAL "redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

Java调用

1
2
3
4
5
6
7
8
9
// 脚本定义
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText("redis.call('set', KEYS[1], ARGV[1])");
script.setResultType(Long.class);

// 执行脚本
redisTemplate.execute(script,
Collections.singletonList("name"),
"Jack");

原子性保证机制

Redis单线程模型:Lua脚本执行期间,Redis不会执行其他命令

原子性验证

1
2
3
线程A:执行Lua脚本(验证+删除)
Redis:脚本执行期间,线程B的请求排队等待
结果:保证验证和删除操作的原子性

性能优势

  • 网络开销少:一次交互完成多个操作
  • 原子性保证:Redis单线程执行
  • 减少竞态:避免客户端并发问题

4.5 分布式锁总结

分布式锁演进历程

从简单到完善的演进过程

版本 实现方式 解决问题 存在问题
V1 SETNX + DEL 基本互斥 误删锁、死锁
V2 SET NX EX + 线程标识 死锁避免 误删问题
V3 线程标识 + 验证删除 防误删 原子性问题
V4 Lua脚本原子操作 原子性保证 功能单一
V5 Redisson框架 完整功能 依赖第三方

Redis分布式锁核心特性

企业级实现要求

基本特性

  • 互斥性:SETNX保证同一时间只有一个客户端获取锁
  • 死锁避免:EXPIRE设置超时时间,防止死锁
  • 安全性:线程唯一标识,防止误删
  • 原子性:Lua脚本保证操作原子性

高级特性(Redisson提供):

  • 🔄 可重入性:同一线程可重复获取锁
  • 锁续期:WatchDog自动续期,防止业务未执行完锁过期
  • 🔄 可重试:获取锁失败可自动重试
  • 🏗️ 主从一致性:RedLock算法保证主从一致性

使用建议

不同场景的选择策略

简单场景

1
2
3
4
5
// 简单分布式锁(适合低频操作)
String lockKey = "lock:business:" + userId;
String threadId = UUID.randomUUID().toString();
boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, threadId, 30, TimeUnit.SECONDS);

复杂场景

1
2
3
4
// Redisson(适合高频并发)
RLock lock = redissonClient.getLock("lock:order:" + userId);
// 可重入、可续期、可重试
boolean locked = lock.tryLock(1, 30, TimeUnit.SECONDS);

选择原则

  • 低频简单操作:手写Redis分布式锁
  • 高频并发场景:Redisson框架
  • 强一致性要求:Zookeeper分布式锁

5. 分布式锁-redission

5.1 Redisson概述

什么是Redisson

Redisson:基于Redis的Java驻内存数据网格(In-Memory Data Grid)

核心功能

  • 🔒 分布式锁:可重入锁、公平锁、联锁、红锁等
  • 📦 分布式对象:Object、List、Set、Map、Queue等
  • 🔄 分布式服务:远程服务、消息服务、执行器服务等

优势特点

  • 功能完善:提供各种分布式锁实现
  • 可重入性:支持同一线程重复获取锁
  • 自动续期:WatchDog机制自动延长锁有效期
  • 高可用:支持主从、哨兵、集群模式

解决手写Redis锁的问题

手写Redis分布式锁的局限性

问题 手写Redis锁 Redisson解决方案
不可重入 同一线程无法重复获取锁 内置可重入机制
不可重试 获取失败只能放弃 支持获取锁超时重试
锁续期 固定过期时间 WatchDog自动续期
主从一致性 主从切换可能丢锁 RedLock算法

5.2 Redisson快速入门

Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

Redisson客户端配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer()
.setAddress("redis://192.168.xxx.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

分布式锁使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws Exception{
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");

// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);

// 判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
} finally {
// 释放锁
lock.unlock();
}
}
}

秒杀业务集成Redisson锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Service
public class VoucherOrderServiceImpl implements IVoucherOrderService {

@Resource
private RedissonClient redissonClient;

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}

// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}

// 4.判断库存是否充足
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}

Long userId = UserHolder.getUser().getId();

// 5.创建锁对象(使用Redisson分布式锁)
RLock lock = redissonClient.getLock("lock:order:" + userId);

// 6.获取锁
boolean isLock = lock.tryLock();

// 7.判断是否获取锁成功
if (!isLock) {
return Result.fail("不允许重复下单");
}

try {
// 8.获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 9.释放锁
lock.unlock();
}
}
}

5.3 Redisson分布式锁详解

5.3.1 Redisson可重入锁原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Resource
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("lock");
}

@Test
void method1() {
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}

同一线程内方法调用时,若method1已持有锁,method2需获取同一把锁,通过判断线程ID实现可重入:state+1获取锁,state-1释放锁,减至0时真正释放

Redis中的锁存储结构

Hash结构存储可重入锁信息

存储格式

1
2
3
4
Key: lock_name (锁名称)
Value: Hash结构
├─ field: UUID + ":" + threadId (线程唯一标识)
└─ value: 重入次数 (整数)

示例

1
2
3
lock:order:12345
├─ "8f3e2a1c:1" : 2 (线程1重入了2次)
└─ "9d4c5b2a:2" : 1 (线程2重入了1次)

可重入锁获取Lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- KEYS[1]: 锁名称
-- ARGV[1]: 锁失效时间(毫秒)
-- ARGV[2]: 线程标识(UUID + ":" + threadId)

-- 锁不存在,创建新锁
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

-- 锁存在且是当前线程持有,重入次数+1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

-- 锁存在但不是当前线程持有,返回锁剩余时间
return redis.call('pttl', KEYS[1]);

脚本逻辑

  1. 锁不存在:创建新锁,重入次数设为1
  2. 当前线程重入:重入次数+1,重置过期时间
  3. 其他线程持有:返回锁剩余时间,抢锁失败

可重入锁释放Lua脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- KEYS[1]: 锁名称
-- ARGV[1]: 线程标识
-- ARGV[2]: 锁失效时间

-- 锁不存在,直接返回
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return nil;
end;

-- 重入次数-1
counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);

-- 重入次数为0,删除锁
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;

释放逻辑

  • 重入次数>0:仅减少重入次数,不删除锁
  • 重入次数=0:删除整个锁
  • 锁不存在:直接返回,防止误删

5.3.2 Redisson锁重试和WatchDog机制

锁获取重试机制

tryLock方法的重试逻辑

重试流程

1
2
// 尝试获取锁,最多等待1秒,锁有效期10秒
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);

内部实现

  1. 首次尝试:立即执行Lua脚本抢锁
  2. 失败重试:如果锁被占用,等待锁释放后重试
  3. 超时控制:在指定等待时间内持续重试
  4. 返回结果:成功返回true,超时返回false

看门狗自动续期机制

解决业务执行时间超过锁有效期的问题

续期原理

1
2
// lock()方法默认开启WatchDog
lock.lock(); // 30秒有效期,WatchDog每10秒续期一次

续期流程

  1. 初始有效期:默认30秒(可配置)
  2. 续期触发:每有效期/3时间触发一次(默认10秒)
  3. 续期条件:业务线程仍在运行且持有锁
  4. 续期失败:线程宕机或锁已释放,停止续期

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void renewExpiration() {
// 从续约映射中获取当前线程的续约条目
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

// 创建定时任务,internalLockLeaseTime/3后执行
Timeout task = commandExecutor.getConnectionManager().newTimeout(
new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 异步续期锁有效期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (res) {
// 续期成功,递归调用继续下一轮续期
renewExpiration();
}
});
}
},
internalLockLeaseTime / 3, // 默认10秒
TimeUnit.MILLISECONDS
);

ee.setTimeout(task);
}

不同加锁方式的对比

方法 是否重试 是否续期 适用场景
tryLock() 简单获取,立即返回
tryLock(waitTime, leaseTime, unit) 带超时等待,固定有效期
lock() 长期持有,自动续期
lock(leaseTime, unit) 固定有效期,不续期

最佳实践

  • 短时操作tryLock(1, 10, TimeUnit.SECONDS)
  • 长时操作lock() + WatchDog续期
  • 定时任务lock(30, TimeUnit.SECONDS) 手动控制有效期

Redisson MultiLock原理

5.3.3 Redisson MultiLock原理

Redis主从架构中的分布式锁失效风险分析

主从切换导致的锁信息丢失

问题场景

  1. 主节点写入锁:客户端在Master节点成功获取锁
  2. 主节点宕机:数据还未同步到Slave节点
  3. 主从切换:Slave升级为新的Master
  4. 锁信息丢失:新Master没有锁信息,其他客户端可重新获取锁

解决方案:MultiLock机制,在多个独立Redis实例上同时加锁

MultiLock核心原理

多数派原则保证锁可靠性

加锁规则

  • 全部成功:在所有节点都成功加锁才算成功
  • 超时控制:总超时时间 = 节点数量 × 1500ms
  • 失败处理:任意节点失败则整个加锁失败

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
// 配置多个Redis实例
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.137.130:6379")
.setPassword("root");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient2() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.131:6379")
.setPassword("root");
return Redisson.create(config);
}

@Bean
public RedissonClient redissonClient3() {
Config config = new Config();
config.useSingleServer().setAddress("redis://92.168.137.132:6379")
.setPassword("root");
return Redisson.create(config);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;

private RLock lock;

@BeforeEach
void setUp() {
RLock lock1 = redissonClient.getLock("lock");
RLock lock2 = redissonClient2.getLock("lock");
RLock lock3 = redissonClient3.getLock("lock");
lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

@Test
void method1() {
boolean success = lock.tryLock();
redissonClient.getMultiLock();
if (!success) {
log.error("获取锁失败,1");
return;
}
try {
log.info("获取锁成功");
method2();
} finally {
log.info("释放锁,1");
lock.unlock();
}
}

void method2() {
RLock lock = redissonClient.getLock("lock");
boolean success = lock.tryLock();
if (!success) {
log.error("获取锁失败,2");
return;
}
try {
log.info("获取锁成功,2");
} finally {
log.info("释放锁,2");
lock.unlock();
}
}

MultiLock优缺点分析

特性 优点 缺点
可靠性 避免单点故障,主从切换不丢锁 需要维护多个独立Redis实例
性能 并行加锁,性能可接受 网络开销增加,延迟上升
复杂度 使用简单,API统一 需要额外的Redis实例资源
一致性 基于多数派,一致性强 网络分区时可能出现不可用

适用场景

  • 高可靠性要求:金融交易、订单处理等关键业务
  • 主从架构:Redis主从部署环境
  • 简单场景:单机Redis即可满足需求

6. 秒杀优化

6.1 异步秒杀思路

同步秒杀的性能瓶颈

串行操作导致的性能问题

传统流程

1
查询优惠券 → 判断库存 → 查询订单 → 一人一单校验 → 扣减库存 → 创建订单

1653560986599

性能问题

  • 数据库压力大:每个请求都要多次访问数据库
  • 串行执行:所有操作顺序执行,耗时累积
  • 线程阻塞:数据库IO等待导致线程闲置
  • 并发能力低:QPS受限于数据库性能

优化思路:将耗时短的逻辑判断移到Redis,耗时长的下单操作异步化

异步化优化方案

Redis缓存 + 消息队列异步处理

优化流程

1
2
3
4
5
6
7
8
graph TD
A[用户请求] --> B[Redis校验]
B --> C{校验通过?}
C -->|是| D[返回成功]
C -->|否| E[返回失败]
D --> F[消息队列]
F --> G[异步下单]
G --> H[数据库更新]

1653561657295

核心思想

  • 快速响应:Redis内存操作,毫秒级响应
  • 异步处理:消息队列削峰填谷,平滑处理
  • 最终一致性:保证订单最终创建成功

异步化实现难点

需要解决的关键问题

技术难点

  1. Redis原子操作:如何保证库存扣减和订单记录的原子性?
  2. 一人一单校验:Redis中如何快速判断用户是否已下单?
  3. 订单状态跟踪:如何告知用户订单处理结果?
  4. 消息可靠性:如何保证消息不丢失,正确处理?

解决方案

  • Lua脚本:保证Redis操作的原子性
  • Set集合:使用Redis Set存储已下单用户ID
  • 订单ID预生成:提前生成订单ID,用于状态查询
  • 消息队列:使用Redis Stream或专业消息队列

1653562234886

6.2 Redis完成秒杀资格判断

Redis原子操作实现资格判断

Lua脚本保证原子性

核心思路

  1. 库存预加载:优惠券信息保存到Redis
  2. 原子校验:Lua脚本一次性完成库存、一人一单判断
  3. 异步下单:校验通过后发送消息到队列

实现步骤

1
Redis预加载 → Lua脚本校验 → 消息队列 → 异步下单

优惠券预加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),
voucher.getStock().toString());
}

Lua脚本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 1.参数列表
local voucherId = ARGV[1] -- 优惠券id
local userId = ARGV[2] -- 用户id
local orderId = ARGV[3] -- 订单id

-- 2.数据key
local stockKey = 'seckill:stock:' .. voucherId -- 库存key
local orderKey = 'seckill:order:' .. voucherId -- 订单key

-- 3.脚本业务
-- 3.1.判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
return 1 -- 库存不足
end

-- 3.2.判断用户是否已下单
if(redis.call('sismember', orderKey, userId) == 1) then
return 2 -- 重复下单
end

-- 3.3.扣减库存
redis.call('incrby', stockKey, -1)
-- 3.4.记录用户已下单
redis.call('sadd', orderKey, userId)
-- 3.5.发送消息到队列
redis.call('xadd', 'stream.orders', '*',
'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0 -- 抢购成功

业务层调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");

// 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);

// 判断结果
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
}

return Result.ok(orderId);
}

使用PostMan发送请求,添加优惠券
请求路径:http://localhost:8080/api/voucher/seckill
请求方式:POST

1
2
3
4
5
6
7
8
9
10
11
12
{
"shopId":1,
"title":"9999元代金券",
"subTitle":"365*24小时可用",
"rules":"全场通用\\nApex猎杀无需预约",
"payValue":1000,
"actualValue":999900,
"type":1,
"stock":100,
"beginTime":"2022-01-01T00:00:00",
"endTime":"2022-12-31T23:59:59"
}

实现关键点

需要注意的细节问题

原子性保证

  • Lua脚本:所有Redis操作一次性执行
  • 单线程模型:Redis单线程保证脚本执行不被打断
  • 事务:MULTI/EXEC无法保证库存和订单的原子性

数据一致性

  • 库存Keyseckill:stock:{voucherId}
  • 订单Keyseckill:order:{voucherId}
  • 消息队列stream.orders

返回值设计

  • 0:抢购成功
  • 1:库存不足
  • 2:重复下单

6.3 基于阻塞队列实现秒杀优化

阻塞队列异步处理方案

内存队列 + 单线程异步处理

核心思路

  1. 快速响应:Lua脚本校验通过后立即返回订单ID
  2. 异步处理:订单信息放入阻塞队列,后台线程慢慢处理
  3. 流量削峰:阻塞队列缓冲瞬时高并发请求

实现架构

1
用户请求 → Lua脚本校验 → 内存队列 → 单线程处理 → 数据库操作

线程池配置

1
2
3
4
5
6
7
8
9
// 异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR =
Executors.newSingleThreadExecutor();

// 类初始化后立即执行
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

订单处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
// 2.处理订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}

private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 创建锁对象,防止重复下单
RLock redisLock = redissonClient.getLock("lock:order:" + userId);

// 尝试获取锁
boolean isLock = redisLock.lock();
if (!isLock) {
log.error("不允许重复下单!");
return;
}

try {
// 注意:Spring事务在ThreadLocal中,多线程环境下会失效
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过了");
return ;
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足");
return ;
}
save(voucherOrder);

}
}

阻塞队列定义

1
2
3
// 订单阻塞队列,容量1024*1024
private BlockingQueue<VoucherOrder> orderTasks =
new ArrayBlockingQueue<>(1024 * 1024);

业务层修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");

// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);

// 2.判断结果
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}

// 3.创建订单对象
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);

// 4.放入阻塞队列
orderTasks.add(voucherOrder);

// 5.返回订单id
return Result.ok(orderId);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* <p>
* 服务实现类
* </p>
*
* @author Kyle
* @since 2022-10-22
*/
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Autowired
private ISeckillVoucherService seckillVoucherService;

@Autowired
private RedisIdWorker redisIdWorker;

@Resource
private StringRedisTemplate stringRedisTemplate;

@Resource
private RedissonClient redissonClient;

private IVoucherOrderService proxy;


private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
SECKILL_SCRIPT = new DefaultRedisScript();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1. 获取用户
Long userId = voucherOrder.getUserId();
//2. 创建锁对象,作为兜底方案
RLock redisLock = redissonClient.getLock("order:" + userId);
//3. 获取锁
boolean isLock = redisLock.tryLock();
//4. 判断是否获取锁成功
if (!isLock) {
log.error("不允许重复下单!");
return;
}
try {
//5. 使用代理对象,由于这里是另外一个线程,
proxy.createVoucherOrder(voucherOrder);
} finally {
redisLock.unlock();
}
}

private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1. 获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2. 创建订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("订单处理异常", e);
}
}
}
}

@Override
public Result seckillVoucher(Long voucherId) {
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,
Collections.emptyList(), voucherId.toString(),
UserHolder.getUser().getId().toString());
if (result.intValue() != 0) {
return Result.fail(result.intValue() == 1 ? "库存不足" : "不能重复下单");
}
long orderId = redisIdWorker.nextId("order");
//封装到voucherOrder中
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setId(orderId);
//加入到阻塞队列
orderTasks.add(voucherOrder);
//主线程获取代理对象
// proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(orderId);
}


@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单逻辑
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
synchronized (userId.toString().intern()) {
int count = query().eq("voucher_id", voucherId).eq("user_id", userId).count();
if (count > 0) {
log.error("你已经抢过优惠券了哦");
return;
}
//5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
log.error("库存不足");
}
//7. 将订单数据保存到表中
save(voucherOrder);
}
}
}

阻塞队列方案优缺点

内存队列的局限性

优点

  • 实现简单:JDK原生支持,无需额外依赖
  • 性能优秀:内存操作,延迟极低
  • 削峰填谷:缓冲瞬时高并发请求

缺点

  • 内存限制:队列容量有限,数据可能丢失
  • 单点故障:JVM宕机导致队列数据丢失
  • 扩展性差:无法分布式部署,只能单机处理

适用场景

  • 单机部署:应用部署在单台服务器
  • 数据可接受丢失:秒杀活动数据允许少量丢失
  • 分布式部署:需要多机协同处理
  • 数据强一致性:订单数据不能丢失

小总结:

秒杀业务的优化思路是什么?

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    • 内存限制问题
    • 数据安全问题

7. Redis消息队列

7.1.1 消息队列基础

什么是消息队列

消息队列的核心概念

定义:消息队列是一种异步通信机制,用于在不同组件或系统之间传递消息。

三个核心角色

  • 消息队列:存储和管理消息的消息代理(Message Broker)
  • 生产者:发送消息到消息队列的应用或服务
  • 消费者:从消息队列获取消息并处理的应用或服务

通信模型

1
生产者 → 消息队列 → 消费者

消息队列的使用场景

解耦和异步处理

生活例子:快递柜系统

  • 快递员(生产者):把快递放入快递柜
  • 快递柜(消息队列):临时存储快递
  • 用户(消费者):从快递柜取快递

技术优势

  • 解耦:生产者和消费者不需要直接通信
  • 异步:生产者不需要等待消费者处理完成
  • 削峰填谷:缓冲瞬时高并发请求
  • 可靠性:消息持久化,确保不丢失

秒杀场景应用

1
下单校验 → Redis队列 → 异步处理订单

Redis消息队列方案

Redis提供的三种消息队列实现

实现方式 数据结构 特点 适用场景
List队列 List 简单可靠,支持阻塞 简单消息传递
PubSub 发布订阅 实时推送,不支持持久化 实时通知
Stream Stream 功能完善,支持持久化 复杂消息系统

选择建议

  • 简单场景:使用List实现
  • 实时通知:使用PubSub
  • 复杂业务:使用Stream
  • 大数据量:建议使用专业MQ(Kafka、RabbitMQ)

7.1.2 基于List实现消息队列

List结构消息队列原理

双向链表实现队列效果

基本原理:Redis的List是双向链表,天然支持队列操作

操作命令

1
2
3
4
5
6
7
8
# 生产者:从左侧入队
LPUSH queue_name message

# 消费者:从右侧出队
RPOP queue_name

# 阻塞式消费(推荐)
BRPOP queue_name timeout

队列模型

1
生产者 → LPUSH → [message3, message2, message1] → RPOP → 消费者

List消息队列实现

生产者代码

1
2
// 发送消息到队列
stringRedisTemplate.opsForList().leftPush("queue:order", message);

消费者代码

1
2
3
4
5
6
7
8
// 阻塞式获取消息(超时5秒)
String message = stringRedisTemplate.opsForList()
.rightPop("queue:order", 5, TimeUnit.SECONDS);

if (message != null) {
// 处理消息
processMessage(message);
}

批量处理

1
2
3
// 一次获取多条消息
List<String> messages = stringRedisTemplate.opsForList()
.range("queue:order", 0, 99);

List消息队列优缺点

简单但功能有限

优点

  • 实现简单:Redis原生List结构,无需额外配置
  • 持久化支持:基于Redis持久化,数据安全
  • 有序性:消息按入队顺序消费
  • 内存大:不受JVM内存限制

缺点

  • 消息丢失:消费者处理失败时消息已删除
  • 单消费者:一条消息只能被一个消费者处理
  • 无ACK机制:无法确认消息是否成功处理
  • 无重试机制:处理失败的消息无法重新消费

改进方案

  • 消息确认:消费后不立即删除,先放入"处理中"列表
  • 失败重试:处理失败的消息重新放回队列
  • 多消费者:使用多个队列实现消费者组

7.1.3 基于PubSub实现消息队列

PubSub发布订阅模型

频道(Channel)广播机制

核心概念

  • 频道(Channel):消息发布的通道
  • 订阅者(Subscriber):订阅频道的客户端
  • 发布者(Publisher):向频道发送消息的客户端

基本命令

1
2
3
4
5
6
7
8
# 订阅频道
SUBSCRIBE channel1 channel2

# 发布消息
PUBLISH channel1 "hello world"

# 模式订阅(通配符)
PSUBSCRIBE news.*

消息流

1
发布者 → PUBLISH → Channel → 广播 → 所有订阅者

PubSub消息队列实现

消息发布者

1
2
// 发布消息
stringRedisTemplate.convertAndSend("order.channel", orderMessage);

消息订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class OrderMessageSubscriber extends KeyspaceEventMessageListener {

@Autowired
public OrderMessageSubscriber(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
protected void doHandleMessage(Message message) {
String channel = new String(message.getChannel());
String body = new String(message.getBody());

if ("order.channel".equals(channel)) {
// 处理订单消息
processOrderMessage(body);
}
}
}

配置监听器

1
2
3
4
5
6
7
8
9
10
@Bean
public RedisMessageListenerContainer container(
RedisConnectionFactory connectionFactory,
OrderMessageSubscriber subscriber) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(subscriber, new ChannelTopic("order.channel"));
return container;
}

PubSub消息队列优缺点

实时但不可靠

优点

  • 实时性:消息立即推送给所有订阅者
  • 多订阅者:一个消息可被多个消费者同时接收
  • 简单易用:Redis原生支持,配置简单
  • 模式匹配:支持通配符订阅多个频道

缺点

  • 无持久化:消息不存储,订阅者离线时消息丢失
  • 无确认机制:无法保证消息被成功处理
  • 无重试机制:处理失败的消息无法重新投递
  • 内存压力:大量订阅者可能导致内存问题

适用场景

  • 实时通知:系统状态变更、配置更新等
  • 广播消息:需要多个消费者同时接收的场景
  • 重要业务:订单处理、支付等关键业务
  • 离线处理:消费者可能离线的场景

基于PubSub的消息队列有哪些优缺点?
优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

7.1.4 基于Stream实现消息队列

实现原理

Stream 是 Redis 5.0 引入的新数据类型,专门用于实现消息队列功能:

  • 消息存储:每个消息都有唯一ID,格式为时间戳-序列号
  • 消费者组:支持多消费者组同时消费,互不干扰
  • ACK机制:消费者处理完消息后需要确认,保证消息至少消费一次

核心命令

  • XADD:发送消息
  • XREAD:读取消息
  • XREADGROUP:消费者组读取
  • XACK:消息确认

Stream相比List和PubSub,提供了更完善的消息队列功能,支持消息持久化、消费者组和ACK机制。

代码示例

生产者发送消息

1
2
3
4
5
6
7
8
// 发送消息到Stream
Map<String, Object> message = new HashMap<>();
message.put("voucherId", voucherId);
message.put("userId", userId);
message.put("orderId", orderId);

String recordId = stringRedisTemplate.opsForStream()
.add("stream.orders", message);

消费者组消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 从消费者组读取消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream()
.read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));

if (list == null || list.isEmpty()) {
continue;
}

// 处理消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

// 创建订单
createVoucherOrder(voucherOrder);

// 确认消息
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());

} catch (Exception e) {
log.error("处理订单异常", e);
handlePendingList(); // 处理异常消息
}
}
}

// 处理pending-list中的异常消息
private void handlePendingList() {
while (true) {
try {
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream()
.read(Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0")));

if (list == null || list.isEmpty()) {
break;
}

MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

createVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());

} catch (Exception e) {
log.error("处理pending订单异常", e);
try {
Thread.sleep(20);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}

优缺点分析

特性 Stream 说明
消息持久化 ✅ 支持 消息保存在内存和磁盘中
消息回溯 ✅ 支持 可以读取历史消息
消费者组 ✅ 支持 多组消费者独立消费
ACK机制 ✅ 支持 保证消息至少消费一次
消息堆积 ✅ 支持 内存足够时可堆积大量消息
实现复杂度 ⭐⭐⭐ 中等 需要理解消费者组概念

适用场景

  • 需要消息持久化的业务场景
  • 多消费者组独立消费
  • 消息处理可靠性要求高的场景
  • 复杂的秒杀订单处理

7.1.5 消息队列对比总结

功能特性对比

功能特性 List结构 PubSub Stream
消息持久化 ✅ 支持 ❌ 不支持 ✅ 支持
消息回溯 ❌ 不支持 ❌ 不支持 ✅ 支持
消费者组 ❌ 不支持 ❌ 不支持 ✅ 支持
消息确认 ❌ 不支持 ❌ 不支持 ✅ 支持
阻塞读取 ✅ 支持 ✅ 支持 ✅ 支持
消息堆积 受内存限制 受内存限制 受内存限制

性能对比

性能指标 List PubSub Stream
吞吐量 最高
延迟 最低
CPU消耗 最低 中等
内存使用 中等

选择建议

  • List队列:简单的任务队列,对可靠性要求不高
  • PubSub:实时消息推送,允许消息丢失的场景
  • Stream:业务消息队列,需要高可靠性和完整功能

8. 达人探店

发布探店笔记

探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个:
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等

tb_blog表结构:

Field Type Collation Null Key Default Extra Comment
id bigint unsigned (NULL) NO PRI (NULL) auto_increment 主键
shop_id bigint (NULL) NO (NULL) 商户id
user_id bigint unsigned (NULL) NO (NULL) 用户id
title varchar(255) utf8mb4_unicode_ci NO (NULL) 标题
images varchar(2048) utf8mb4_general_ci NO (NULL) 探店的照片,最多9张,多张以","隔开
content varchar(2048) utf8mb4_unicode_ci NO (NULL) 探店的文字描述
liked int unsigned (NULL) YES 0 点赞数量
comments int unsigned (NULL) YES (NULL) 评论数量
create_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED 创建时间
update_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED on update CURRENT_TIMESTAMP 更新时间

tb_blog_comments:其他用户对探店笔记的评价

tb_blog_comments表结构:

Field Type Collation Null Key Default Extra Comment
id bigint unsigned (NULL) NO PRI (NULL) auto_increment 主键
user_id bigint unsigned (NULL) NO (NULL) 用户id
blog_id bigint unsigned (NULL) NO (NULL) 探店id
parent_id bigint unsigned (NULL) NO (NULL) 关联的1级评论id,如果是一级评论,则值为0
answer_id bigint unsigned (NULL) NO (NULL) 回复的评论id
content varchar(255) utf8mb4_general_ci NO (NULL) 回复的内容
liked int unsigned (NULL) YES (NULL) 点赞数
status tinyint unsigned (NULL) YES (NULL) 状态,0:正常,1:被举报,2:禁止查看
create_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED 创建时间
update_time timestamp (NULL) NO CURRENT_TIMESTAMP DEFAULT_GENERATED on update CURRENT_TIMESTAMP 更新时间

达人探店功能允许用户发布探店笔记,包括图片上传、笔记发布、点赞互动等功能。本章节将介绍如何使用Redis优化这些功能的实现。

对应的实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 商户id
*/
private Long shopId;
/**
* 用户id
*/
private Long userId;
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

/**
* 标题
*/
private String title;

/**
* 探店的照片,最多9张,多张以","隔开
*/
private String images;

/**
* 探店的文字描述
*/
private String content;

/**
* 点赞数量
*/
private Integer liked;

/**
* 评论数量
*/
private Integer comments;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;
}

8.1 图片上传与笔记发布

探店笔记发布包含两个核心功能:

  • 图片上传:支持单张图片上传,生成唯一文件名
  • 笔记发布:保存探店博文,关联上传的图片

图片上传控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@RestController
@RequestMapping("upload")
public class UploadController {

@PostMapping("blog")
public Result uploadImage(@RequestParam("file") MultipartFile image) {
try {
// 获取原始文件名称
String originalFilename = image.getOriginalFilename();
// 生成新文件名
String fileName = createNewFileName(originalFilename);
// 保存文件
image.transferTo(new File(SystemConstants.IMAGE_UPLOAD_DIR, fileName));
// 返回结果
log.debug("文件上传成功,{}", fileName);
return Result.ok(fileName);
} catch (IOException e) {
throw new RuntimeException("文件上传失败", e);
}
}
}

笔记发布控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/blog")
public class BlogController {

@Resource
private IBlogService blogService;

@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
//获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUpdateTime(user.getId());
//保存探店博文
blogService.saveBlog(blog);
//返回id
return Result.ok(blog.getId());
}
}

注意:需要修改SystemConstants.IMAGE_UPLOAD_DIR为实际的图片存储路径,生产环境中建议使用云存储服务。

  1. 文件命名策略:使用UUID确保文件名唯一性,避免冲突
  2. 异常处理:捕获IO异常并转换为业务异常
  3. 用户认证:通过UserHolder获取当前登录用户信息
  4. 返回值设计:返回生成的文件名,供前端展示和后续业务使用

8.2 查看探店笔记

查看探店笔记功能需要:

  • 笔记查询:根据ID查询笔记详情
  • 用户信息:查询笔记作者信息
  • 点赞状态:判断当前用户是否点赞

controller

1
2
3
4
5
6
7
8
9
10
@GetMapping("/hot")
public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
return blogService.queryHotBlog(current);
}


@GetMapping("/{id}")
public Result queryById(@PathVariable Integer id){
return blogService.queryById(id);
}

service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Result queryHotBlog(Integer current) {
// 根据用户查询
Page<Blog> page = query()
.orderByDesc("liked")
.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
// 获取当前页数据
List<Blog> records = page.getRecords();
// 查询用户
records.forEach(this::queryBlogUser);
return Result.ok(records);
}

@Override
public Result queryById(Integer id) {
Blog blog = getById(id);
if (blog == null) {
return Result.fail("评价不存在或已被删除");
}
queryBlogUser(blog);
return Result.ok(blog);
}

private void queryBlogUser(Blog blog) {
Long userId = blog.getUserId();
User user = userService.getById(userId);
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
  1. 数据完整性:先查询笔记,再查询关联的用户信息
  2. 异常处理:笔记不存在时给出友好提示
  3. 数据组装queryBlogUser方法负责填充用户相关信息

8.3 点赞功能优化

初始点赞实现存在严重问题:

1
2
3
4
5
6
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
//修改点赞数量
blogService.update().setSql("liked = liked +1 ").eq("id",id).update();
return Result.ok();
}

问题:用户可以无限点赞,没有任何限制机制

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示
  • 使用Redis的Set集合记录点赞用户,保证唯一性

1. 修改Blog实体类

1
2
@TableField(exist = false)
private Boolean isLike; // 是否被当前用户点赞

2. 点赞业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}

@Override
public Result likeBlog(Long id){
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());

if(BooleanUtil.isFalse(isMember)){
//3.如果未点赞,可以点赞
//3.1 数据库点赞数+1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
//3.2 保存用户到Redis的set集合
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
//4.如果已点赞,取消点赞
//4.1 数据库点赞数-1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
//4.2 把用户从Redis的set集合移除
if(isSuccess){
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
}
return Result.ok();
}

关键点分析

  1. Redis Key设计blog:liked:{blogId},每个笔记一个独立的Set
  2. 原子性保证:先判断再操作,通过Redis Set保证用户唯一性
  3. 数据一致性:数据库和Redis同步更新,确保点赞数准确
  4. 性能优化:Redis Set操作O(1)时间复杂度,性能优异

点击点赞按钮,查看发送的请求

1
2
请求网址: http://localhost:8080/api/blog/like/4
请求方法: PUT

8.4 点赞排行榜

在探店笔记详情页面需要展示点赞排行榜,显示最早点赞的TOP5用户:

  • 排序需求:按点赞时间排序,最早点赞的排在前面
  • 唯一性:每个用户只能出现一次
  • 性能要求:快速查询TOP5用户

使用Redis的SortedSet(ZSet)数据结构:

  • 唯一性:ZSet保证成员唯一
  • 排序能力:根据score值排序,score使用时间戳
  • 范围查询:支持按排名范围查询

1. 修改点赞逻辑(使用ZSet)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result likeBlog(Long id) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.判断当前登录用户是否已经点赞
String key = BLOG_LIKED_KEY + id;
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());

if (score == null) {
// 3.如果未点赞,可以点赞
// 3.1.数据库点赞数 + 1
boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
// 3.2.保存用户到Redis的zset集合,score为当前时间戳
if (isSuccess) {
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
} else {
// 4.如果已点赞,取消点赞
// 4.1.数据库点赞数 -1
boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
// 4.2.把用户从Redis的zset集合移除
if (isSuccess) {
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
return Result.ok();
}

2. 查询点赞排行榜

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Result queryBlogLikes(Long id) {
String key = BLOG_LIKED_KEY + id;
// 1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null || top5.isEmpty()) {
return Result.ok(Collections.emptyList());
}
// 2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
// 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)
List<UserDTO> userDTOS = userService.query()
.in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
// 4.返回
return Result.ok(userDTOS);
}

关键点分析

  1. Score设计:使用时间戳作为score,自然按时间排序
  2. 范围查询zrange key 0 4获取前5个用户,时间复杂度O(log(n)+m)
  3. 数据一致性:点赞和取消点赞时同步更新数据库和Redis
  4. 内存优化:只存储用户ID,不存储完整用户信息

9. 好友关注

9.1 关注和取消关注

针对用户的操作:可以对用户进行关注和取消关注功能。

好友关注功能实现用户之间的关注关系管理:

  • 关注用户:用户A关注用户B
  • 取消关注:用户A取消对用户B的关注
  • 关注状态查询:查询用户A是否关注了用户B

1. 数据库表结构

1
2
3
4
5
6
7
8
CREATE TABLE `tb_follow`  (
`id` bigint(0) NOT NULL AUTO_INCREMENT,
`user_id` bigint(20) NOT NULL COMMENT '用户id',
`follow_user_id` bigint(20) NOT NULL COMMENT '关联的用户id',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE INDEX `idx_user_id_follow_user_id`(`user_id`, `follow_user_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '用户关注表';

2. 控制器层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/follow")
public class FollowController {

@Resource
private IFollowService followService;

// 关注和取消关注
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId,
@PathVariable("isFollow") Boolean isFollow) {
return followService.follow(followUserId, isFollow);
}

// 判断是否关注
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
return followService.isFollow(followUserId);
}
}

3. 业务逻辑层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public Result isFollow(Long followUserId) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
// 2.查询是否关注 select count(*) from tb_follow where user_id = ? and follow_user_id = ?
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
// 3.判断
return Result.ok(count > 0);
}

@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();

if (isFollow) {
// 2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
} else {
// 3.取关,删除数据
remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
}
return Result.ok();
}

关键点分析

  1. 唯一索引idx_user_id_follow_user_id防止重复关注
  2. 事务性:关注操作同时操作数据库,保证数据一致性
  3. 幂等性:重复关注或取消关注不会产生副作用
  4. 性能优化:关注状态查询使用count,比查询全表更高效

9.2 共同关注

在博主个人页面展示当前用户与博主的共同关注:

  • 场景:用户A访问用户B的个人主页
  • 需求:显示A和B都关注的用户列表
  • 技术选型:Redis Set集合的交集操作

使用Redis Set数据结构存储关注关系:

  • Key设计follows:{userId} 存储用户关注的所有人
  • 集合操作SINTER命令求两个集合的交集
  • 性能优势:Set操作时间复杂度O(N),比数据库查询更高效

1. 改造关注逻辑(添加Redis缓存)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Result follow(Long followUserId, Boolean isFollow) {
// 1.获取登录用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;

if (isFollow) {
// 2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if (isSuccess) {
// 把关注用户的id,放入redis的set集合
stringRedisTemplate.opsForSet().add(key, followUserId.toString());
}
} else {
// 3.取关,删除数据
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
if (isSuccess) {
// 把关注用户的id从Redis集合中移除
stringRedisTemplate.opsForSet().remove(key, followUserId.toString());
}
}
return Result.ok();
}

2. 共同关注查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result followCommons(Long targetUserId) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
// 2.求交集
String key2 = "follows:" + targetUserId;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if (intersect == null || intersect.isEmpty()) {
// 无交集
return Result.ok(Collections.emptyList());
}
// 3.解析id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
// 4.查询用户详情
List<UserDTO> users = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}

关键点分析

  1. 数据一致性:数据库和Redis双写一致性,关注成功才写入Redis
  2. 内存优化:只存储用户ID,不存储完整用户信息
  3. 异常处理:交集为空时返回空列表,避免空指针异常
  4. 性能考虑:使用listByIds批量查询,减少数据库访问次数

9.3 Feed流实现方案

当我们关注了用户后,这个用户发了动态,那么我们应该把这些数据推送给用户,这个需求,其实我们又把他叫做Feed流,关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

对于传统的模式的内容解锁:我们是需要用户去通过搜索引擎或者是其他的方式去解锁想要看的内容

1653808641260

对于新型的Feed流的的效果:不需要我们用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。

1653808993693

Feed流的实现有两种模式:

Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

  • 优点:信息全面,不会有缺失。并且实现也相对简单
  • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低

智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

  • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
  • 缺点:如果算法不精准,可能起到反作用
    本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种:

我们本次针对好友的操作,采用的就是Timeline的方式,只需要拿到我们关注用户的信息,然后按照时间排序即可

,因此采用Timeline的模式。该模式的实现方案有三种:

  • 拉模式
  • 推模式
  • 推拉结合

拉模式:也叫做读扩散

该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序

优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。

缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。

1653809450816

推模式:也叫做写扩散。

推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了

优点:时效快,不用临时拉取

缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

1653809875208

推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

发件人端策略:

  • 普通用户:采用"推"模式,直接将消息写入所有粉丝的收件箱(粉丝少,压力小)

  • 大V用户:采用"推拉结合"模式,消息先写入自己的发件箱,然后只推送给活跃粉丝
    收件人端策略:

  • 活跃粉丝:无论是大V还是普通用户的消息,都直接推送到收件箱

  • 普通粉丝:只在上线时从发件箱中拉取未读消息
    核心优势:

  • 平衡了系统性能和用户体验

  • 避免了给所有粉丝推送造成的巨大压力

  • 保证了活跃用户的实时性体验

  • 降低了存储和计算成本

1653812346852

Feed流(关注推送)为用户提供沉浸式内容消费体验:

  • Timeline模式:按时间排序,信息全面但噪音较多
  • 智能排序:算法推荐,用户粘度高但实现复杂
  • 本例选择:基于好友关系的Timeline模式
模式 原理 优点 缺点 适用场景
拉模式 读取时从关注列表拉取内容 节省存储空间 延迟高,压力大 小用户量
推模式 发布时推送到粉丝收件箱 时效性好 内存消耗大 普通用户
推拉结合 普通用户推,大V推拉结合 平衡性能 实现复杂 大用户量

实现选择

本例采用推模式实现:

  • 用户发布笔记时,主动推送到所有粉丝的收件箱
  • 使用Redis SortedSet存储,score为时间戳
  • 支持按时间排序和分页查询

关键点分析

  1. 数据结构ZSet<blogId, timestamp> 天然按时间排序
  2. 写入策略:发布时同步写入所有粉丝收件箱
  3. 读取优化:支持范围查询和滚动分页
  4. 存储优化:只存储blogId,不存储完整内容

推模式适合好友关系场景,时效性好,实现简单。对于大V用户,可后续升级为推拉结合模式。

9.4 推送到粉丝收件箱

实现笔记发布时的粉丝推送功能:

  • 触发时机:用户发布探店笔记时
  • 推送对象:笔记作者的所有粉丝
  • 存储要求:按时间戳排序,支持分页查询

使用Redis SortedSet作为粉丝收件箱:

  • Key格式feed:{userId} 存储用户的收件箱
  • Value格式ZSet<blogId, timestamp>
  • 推送逻辑:遍历所有粉丝,写入对应收件箱
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Result saveBlog(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());

// 2.保存探店笔记到数据库
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}

// 3.查询笔记作者的所有粉丝
List<Follow> follows = followService.query()
.eq("follow_user_id", user.getId()).list();

// 4.推送笔记id给所有粉丝
for (Follow follow : follows) {
// 4.1.获取粉丝id
Long userId = follow.getUserId();
// 4.2.推送
String key = FEED_KEY + userId;
stringRedisTemplate.opsForZSet()
.add(key, blog.getId().toString(), System.currentTimeMillis());
}

// 5.返回笔记id
return Result.ok(blog.getId());
}

关键点分析

  1. 原子性:数据库保存成功后才进行推送
  2. 批量操作:遍历所有粉丝,逐个写入收件箱
  3. 时间戳:使用当前时间作为score,保证时间排序
  4. 异常处理:任一粉丝推送失败不影响其他粉丝

推模式的关键是数据一致性,确保数据库保存成功后才进行Redis推送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result saveBlog(Blog blog) {
// 1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}
// 3.查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
// 4.推送笔记id给所有粉丝
for (Follow follow : follows) {
// 4.1.获取粉丝id
Long userId = follow.getUserId();
// 4.2.推送
String key = FEED_KEY + userId;
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
}
// 5.返回id
return Result.ok(blog.getId());
}

9.5 实现分页查询收件箱

需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:

Feed流分页查询面临特殊挑战:

  • 数据动态性:新数据不断插入,传统分页会重复读取
  • 滚动分页:基于时间戳和偏移量实现无重复分页
  • 性能要求:需要高效的范围查询和排序

1 传统分页

传统了分页在feed流是不适用的,因为我们的数据会随时发生变化

假设在t1 时刻,我们去读取第一页,此时page = 1 ,size = 5 ,那么我们拿到的就是10~6 这几条记录,假设现在t2时候又发布了一条记录,此时t3 时刻,我们来读取第二页,读取第二页传入的参数是page=2 ,size=5 ,那么此时读取到的第二页实际上是从6 开始,然后是6~2 ,那么我们就读取到了重复的数据,所以feed流的分页,不能采用原始方案来做。

1653813047671

2 Feed流的滚动分页

我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据

举个例子:我们从t1时刻开始,拿第一页数据,拿到了10~6,然后记录下当前最后一次拿取的记录,就是6,t2时刻发布了新的记录,此时这个11放到最顶上,但是不会影响我们之前记录的6,此时t3时刻来拿第二页,第二页这个时候拿数据,还是从6后一点的5去拿,就拿到了5-1的记录。我们这个地方可以采用sortedSet来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了

1653813462834

使用Redis SortedSet的滚动分页:

  • 查询命令ZREVRANGEBYSCORE key Max Min LIMIT offset count
  • 分页参数:max(最大时间戳)、offset(偏移量)、count(每页数量)
  • 返回数据:blogId列表、最小时间戳、新偏移量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Controller层
@GetMapping("/of/follow")
public Result queryBlogOfFollow(
@RequestParam("lastId") Long max,
@RequestParam(value = "offset", defaultValue = "0") Integer offset){
return blogService.queryBlogOfFollow(max, offset);
}

// Service层
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
// 1.获取当前用户
Long userId = UserHolder.getUser().getId();

// 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);

// 3.非空判断
if (typedTuples == null || typedTuples.isEmpty()) {
return Result.ok();
}

// 4.解析数据:blogId、minTime(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int os = 1;

for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
// 4.1.获取id
ids.add(Long.valueOf(tuple.getValue()));
// 4.2.获取分数(时间戳)
long time = tuple.getScore().longValue();
if(time == minTime){
os++; // 相同时间戳,偏移量+1
}else{
minTime = time; // 更新时间戳
os = 1; // 重置偏移量
}
}

// 5.根据id查询blog
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();

for (Blog blog : blogs) {
// 5.1.查询blog有关的用户
queryBlogUser(blog);
// 5.2.查询blog是否被点赞
isBlogLiked(blog);
}

// 6.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);

return Result.ok(r);
}

关键点分析

  1. 滚动分页:基于时间戳和偏移量,避免重复数据
  2. ZSet查询reverseRangeByScoreWithScores 支持范围查询
  3. 偏移量计算:处理相同时间戳的多条数据
  4. 数据完整性:查询blog详情和点赞状态

滚动分页的核心是记录上次查询的最小时间戳和偏移量,下次查询从该位置继续。

10. 附近商户

10.1 GEO数据结构基本用法

功能概述:存储和查询地理坐标信息

Redis GEO(地理坐标)支持存储和查询地理坐标信息:

  • 坐标存储:经度(longitude)、纬度(latitude)、成员(member)
  • 距离计算:两点间的球面距离 单位 m-米,km-千米,mi-英⾥,ft-英尺
  • 范围查询:圆形或矩形范围内的成员
  • 排序返回:按距离排序查询结果

常用命令

命令 说明 示例
GEOADD 添加地理坐标 GEOADD cities 116.405 39.905 beijing
GEODIST 计算两点距离 GEODIST cities beijing shanghai km
GEOPOS 获取成员坐标 GEOPOS cities beijing
GEOSEARCH 范围搜索排序 GEOSEARCH cities FROMLONLAT 116 39 BYRADIUS 10 km

应用场景

  • 附近商户搜索:根据用户当前位置,查询距离最近的商户
  • 地理位置服务:提供基于位置的服务,如地图导航、位置提醒
  • 距离计算和排序:计算两点之间的距离,支持按距离排序查询结果
  • 基于位置的内容推荐:根据用户位置推荐相关内容,如本地美食、景点等

关键点分析

  1. 坐标精度:支持小数点后6位,约10cm精度
  2. 距离单位:支持m、km、mi、ft等单位
  3. 范围查询:支持圆形和矩形两种查询方式
  4. 性能优化:使用geohash编码,查询效率高

Redis 6.2+ 推荐使用 GEOSEARCH 命令,功能更强大,支持更复杂的查询条件。

10.2 导入店铺数据到GEO

需求分析

将数据库中的店铺信息导入Redis GEO,支持按类型分类存储:

  • 数据分组:按店铺类型分组,同类店铺存储在同一key下
  • 坐标存储:使用店铺ID作为member,经纬度作为坐标
  • 批量导入:提高导入效率,减少网络开销

技术方案

使用Redis GEOADD命令批量导入:

  • Key格式shop:geo:{typeId} 按店铺类型分组
  • Value格式GeoLocation<shopId, Point(x,y)>
  • 批量操作opsForGeo().add(key, locations) 一次性导入

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
void loadShopData() {
// 1.查询店铺信息
List<Shop> list = shopService.list();

// 2.把店铺分组,按照typeId分组
Map<Long, List<Shop>> map = list.stream()
.collect(Collectors.groupingBy(Shop::getTypeId));

// 3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
// 3.1.获取类型id
Long typeId = entry.getKey();
String key = SHOP_GEO_KEY + typeId;

// 3.2.获取同类型的店铺的集合
List<Shop> value = entry.getValue();
List<RedisGeoCommands.GeoLocation<String>> locations =
new ArrayList<>(value.size());

// 3.3.构建GeoLocation对象
for (Shop shop : value) {
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(), shop.getY())
));
}

// 3.4.批量写入redis GEOADD key 经度 纬度 member
stringRedisTemplate.opsForGeo().add(key, locations);
}
}

关键点分析

  1. 数据分组:按typeId分组,支持按类型筛选
  2. 批量导入:减少网络请求,提高导入效率
  3. 内存优化:只存储shopId,不存储完整信息
  4. 坐标格式:使用RedisGeoCommands.GeoLocation封装

10.3 实现附近商户功能

11. 用户签到

11.1 BitMap功能演示

功能概述

BitMap是Redis中基于String类型实现的位图数据结构:

  • 存储原理:每个bit位代表一个状态,0表示未签到,1表示已签到
  • 空间优势:相比MySQL存储,内存占用减少99%以上
  • 性能优势:位操作时间复杂度为O(1),查询效率极高

常用命令

命令 说明 示例
SETBIT 设置指定位置的bit值 SETBIT sign:1:202401 15 1
GETBIT 获取指定位置的bit值 GETBIT sign:1:202401 15
BITCOUNT 统计1的个数 BITCOUNT sign:1:202401
BITFIELD 批量操作bit位 BITFIELD sign:1:202401 GET u31 0

应用场景

  • 用户签到:按月存储用户签到状态
  • 活跃用户统计:统计某时间段活跃用户
  • 特征标记:标记用户是否具有某种特征
  • 布隆过滤器:快速判断元素是否存在

关键点分析

  1. 存储上限:最大支持512MB,约43亿个bit位
  2. 时间效率:位操作都是O(1)时间复杂度
  3. 空间效率:1亿用户10次签到仅需12MB内存
  4. 数据格式:按年+月分组存储,便于统计

BitMap适合存储大量布尔值状态,内存占用极小,查询效率极高。

11.2 实现签到功能

需求分析

实现用户签到功能,将当天签到信息保存到Redis中:

  • 签到记录:记录用户每天的签到状态
  • 状态存储:使用BitMap存储,0表示未签到,1表示已签到
  • 时间维度:按月维度存储,便于统计和查询
  • 幂等性:同一天多次签到只记录一次

技术方案

使用Redis BitMap实现签到存储:

  • Key格式sign:userId:yyyyMM
  • Bit位置:用月份中的第几天作为offset(0-30)
  • 命令选择:使用SETBIT命令设置签到状态
  • 时间获取:通过后台代码获取当前日期

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Controller层
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}

// Service层
@Override
public Result sign() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取当前日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key:sign:userId:yyyyMM
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天(1-31)
int dayOfMonth = now.getDayOfMonth();
// 5.写入Redis:SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
return Result.ok();
}

关键点分析

  1. Key设计:按用户和月份维度存储,便于查询和统计
  2. Offset计算:使用月份中的第几天减1作为bit位偏移
  3. 幂等性保证:SETBIT命令天然幂等,重复签到无影响
  4. 时间处理:通过LocalDateTime获取准确的日期信息

BitMap的SETBIT命令天然幂等,同一天多次签到不会产生重复记录。

11.3 签到统计

需求分析

统计用户本月的连续签到天数:

  • 连续签到定义:从最后一次签到开始向前统计,直到遇到第一次未签到为止
  • 统计范围:本月第一天到当前日期
  • 返回结果:连续签到天数
  • 算法思路:从后向前遍历bit位,遇到0停止

技术方案

使用BITFIELD命令获取本月签到数据:

  • 命令格式BITFIELD key GET u[dayOfMonth] 0
  • 数据处理:获取本月所有签到记录,返回十进制数字
  • 位运算:通过与1进行与运算,逐个检查bit位
  • 遍历逻辑:从后向前遍历,遇到0停止计数

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Controller层
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}

// Service层
@Override
public Result signCount() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取当前日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key:sign:userId:yyyyMM
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.获取本月截止今天为止的所有签到记录
// BITFIELD sign:1:202401 GET u31 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result == null || result.isEmpty()) {
// 没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0) {
return Result.ok(0);
}
// 6.循环遍历统计连续签到天数
int count = 0;
while (true) {
// 6.1.让这个数字与1做与运算,得到数字的最后一个bit位
if ((num & 1) == 0) {
// 如果为0,说明未签到,结束统计
break;
} else {
// 如果不为0,说明已签到,计数器+1
count++;
}
// 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}

关键点分析

  1. 连续签到定义:从最后一次签到向前统计,遇到0停止
  2. BITFIELD使用:一次性获取本月所有签到数据
  3. 位运算技巧:通过与1进行与运算判断最低位是否为1
  4. 遍历方向:从后向前遍历,符合连续签到定义

使用无符号右移(>>>")确保高位补0,避免负数影响统计结果。

11.4 BitMap解决缓存穿透

需求分析

使用BitMap解决缓存穿透问题:

  • 缓存穿透:查询不存在的数据,绕过缓存直达数据库
  • 传统方案:使用list存储所有有效id,内存占用大
  • 优化方案:使用BitMap作为布隆过滤器,快速判断id是否存在
  • 误差控制:通过哈希算法降低冲突概率

技术方案

基于BitMap实现布隆过滤器:

  • 哈希算法:id % bitmap_size,确定bit位置
  • 存储方式:将数据库中所有有效id对应的bit位置为1
  • 查询逻辑:用户查询时,用相同算法计算bit位,为0则一定不存在
  • 误差处理:哈希冲突可能导致误判,但概率可控

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 初始化布隆过滤器
public void initBloomFilter() {
// 1.获取数据库中所有有效id
List<Long> validIds = getAllValidIdsFromDB();

// 2.计算bitmap大小(根据数据量和期望误差率)
int bitmapSize = calculateOptimalSize(validIds.size(), 0.01);

// 3.将每个id映射到bit位并设置
for (Long id : validIds) {
int bitIndex = (int) (id % bitmapSize);
stringRedisTemplate.opsForValue().setBit("bloom:filter", bitIndex, true);
}
}

// 查询时判断
public boolean mightExist(Long id) {
int bitmapSize = getBitmapSize(); // 获取之前计算的size
int bitIndex = (int) (id % bitmapSize);
return stringRedisTemplate.opsForValue().getBit("bloom:filter", bitIndex);
}

关键点分析

  1. 内存优化:相比存储完整id列表,内存占用减少99%以上
  2. 查询效率:位操作时间复杂度O(1),查询极快
  3. 误差控制:通过合理设置bitmap大小,可将误差率控制在1%以内
  4. 数据更新:数据库新增/删除数据时需要同步更新bitmap

12. UV统计

12.1 HyperLogLog原理

概念定义

  • UV(Unique Visitor):独立访客量,同一用户多次访问只计1次
  • PV(Page View):页面访问量,每次访问都计数
  • 统计挑战:UV需要去重,传统Set存储方式内存消耗巨大

HyperLogLog原理

HyperLogLog是基于概率算法的基数统计方案:

  • 内存占用:单个HLL结构小于16KB,与数据量无关
  • 误差范围:标准误差小于0.81%,对UV统计可接受
  • 底层实现:基于String结构,使用位运算和哈希算法
  • 适用场景:大数据量去重统计,如UV、DAU等

Redis命令

命令 说明 示例
PFADD 添加元素 PFADD uv:202401 user1 user2
PFCOUNT 统计基数 PFCOUNT uv:202401
PFMERGE 合并多个HLL PFMERGE uv:total uv:202401 uv:202402

应用场景

  • 网站UV统计:统计每日/月独立访客
  • APP日活统计:统计每日活跃用户
  • 广告点击统计:统计独立点击用户数
  • 用户行为分析:统计参与特定活动的用户数

关键点分析

  1. 内存优势:相比Set存储,内存节省99%以上
  2. 误差可控:0.81%误差对大多数业务场景可接受
  3. 合并支持:支持多时间段数据合并统计
  4. 性能高效:添加和查询操作都是O(1)复杂度

12.2 百万级数据统计测试

测试目标

验证HyperLogLog在百万级数据下的性能表现:

  • 内存占用:统计100万条数据的内存消耗
  • 统计精度:误差是否在0.81%范围内
  • 性能表现:添加和查询操作的耗时
  • 对比测试:与Set结构进行内存和性能对比

测试方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testHyperLogLog() {
// 1.初始化HyperLogLog
String key = "test:uv:" + System.currentTimeMillis();

// 2.添加100万条数据
int total = 1_000_000;
for (int i = 0; i < total; i++) {
stringRedisTemplate.opsForHyperLogLog().add(key, "user" + i);
}

// 3.统计基数
Long count = stringRedisTemplate.opsForHyperLogLog().size(key);
System.out.println("实际数据量:" + total);
System.out.println("统计结果:" + count);
System.out.println("误差率:" + Math.abs(count - total) * 100.0 / total + "%");

// 4.内存占用估算
Long memory = getMemoryUsage(key);
System.out.println("内存占用:" + memory + " bytes");
}

测试结果

经过测试发现:

  • 内存占用:约12KB,远低于Set结构的几十MB
  • 统计精度:误差在0.5%左右,优于官方标称的0.81%
  • 性能表现:100万条数据添加耗时约2秒
  • 空间效率:相比Set结构,内存节省99%以上

性能对比

1
2
3
4
| 数据结构 | 内存占用 | 统计精度 | 适用场景 |
|---------|----------|----------|----------|
| Set | 几十MB | 100% | 小数据量精确统计 |
| HyperLogLog | 12KB | 99.2% | 大数据量近似统计 |

关键点分析

  1. 内存效率:百万级数据仅需12KB内存,极其高效
  2. 统计精度:实际误差通常小于理论值,表现优秀
  3. 性能稳定:数据量增加不会显著影响性能
  4. 生产适用:完全满足生产环境的UV统计需求