跳至内容
·18 分钟阅读

真正在生产环境中管用的 Redis 缓存策略

Cache-aside、write-through、缓存雪崩防护、TTL 策略和失效模式。我在生产环境 Node.js 应用中使用过的 Redis 模式,附带真实代码示例。

分享:X / TwitterLinkedIn

每个人都告诉你 API 慢的时候"加个 Redis 就行了"。没人告诉你六个月后会怎样——你的缓存在提供过期数据,你的失效逻辑散落在 40 个文件里,一次部署引发了缓存雪崩,反而比不用缓存更狠地拖垮了你的数据库。

我已经在生产环境中运行 Redis 好几年了。不是玩玩而已,不是在教程里——而是在处理真实流量的系统中,缓存用错了就意味着凌晨三点的告警。以下是我学到的关于如何正确使用 Redis 的一切。

为什么要缓存?#

先说显而易见的:相对于内存来说,数据库很慢。一个 PostgreSQL 查询耗时 15 毫秒,以数据库标准来看已经很快了。但如果这个查询在每个 API 请求上都要跑一次,而你每秒要处理 1000 个请求,那就是每秒 15000 毫秒的累积数据库时间。你的连接池耗尽了。你的 p99 延迟飙升了。用户在盯着加载动画发呆。

Redis 大多数读操作在 1 毫秒以内完成。同样的数据缓存后,15 毫秒的操作变成了 0.3 毫秒。这不是微优化。这是需要 4 个数据库副本和不需要副本之间的差距。

但缓存不是免费的。它增加了复杂性,引入了一致性问题,并创造了一整类新的故障模式。在缓存任何东西之前,问问自己:

缓存有帮助的时候:

  • 数据的读取频率远高于写入(10:1 或更高的比率)
  • 底层查询很昂贵(联表查询、聚合、外部 API 调用)
  • 轻微的过期是可以接受的(商品目录、用户资料、配置)
  • 你有可预测的访问模式(相同的键被反复命中)

缓存有害的时候:

  • 数据不断变化且必须是最新的(实时股票价格、比赛实况)
  • 每个请求都是唯一的(带有很多参数的搜索查询)
  • 你的数据集很小(如果整个数据集能放进应用的内存,跳过 Redis)
  • 你还不具备监控和调试缓存问题的运维成熟度

Phil Karlton 有一句名言:计算机科学中只有两件难事——缓存失效和命名。两样他都说对了,但缓存失效才是那个会在半夜把你叫醒的。

配置 ioredis#

在深入模式之前,先建立连接。我到处都用 ioredis——它是 Node.js 中最成熟的 Redis 客户端,具有完善的 TypeScript 支持、集群模式、Sentinel 支持和 Lua 脚本。

typescript
import Redis from "ioredis";
 
const redis = new Redis({
  host: process.env.REDIS_HOST || "127.0.0.1",
  port: Number(process.env.REDIS_PORT) || 6379,
  password: process.env.REDIS_PASSWORD || undefined,
  db: Number(process.env.REDIS_DB) || 0,
  maxRetriesPerRequest: 3,
  retryStrategy(times) {
    const delay = Math.min(times * 200, 5000);
    return delay;
  },
  lazyConnect: true,
  enableReadyCheck: true,
  connectTimeout: 10000,
});
 
redis.on("error", (err) => {
  console.error("[Redis] Connection error:", err.message);
});
 
redis.on("connect", () => {
  console.log("[Redis] Connected");
});
 
export default redis;

有几点值得注意。lazyConnect: true 表示在你实际执行命令之前不会建立连接,这在测试和初始化阶段很有用。retryStrategy 实现了上限为 5 秒的指数退避——没有它的话,Redis 宕机时你的应用会疯狂发起重连尝试。maxRetriesPerRequest: 3 确保单个命令快速失败而不是永远挂起。

Cache-Aside 模式#

这是你 80% 时间都会用到的模式。也叫"懒加载"或"旁路缓存"。流程很简单:

  1. 应用收到请求
  2. 检查 Redis 中是否有缓存值
  3. 如果找到(缓存命中),返回它
  4. 如果没找到(缓存未命中),查询数据库
  5. 将结果存入 Redis
  6. 返回结果

这是一个带类型的实现:

typescript
import redis from "./redis";
 
interface CacheOptions {
  ttl?: number;       // 秒
  prefix?: string;
}
 
async function cacheAside<T>(
  key: string,
  fetcher: () => Promise<T>,
  options: CacheOptions = {}
): Promise<T> {
  const { ttl = 3600, prefix = "cache" } = options;
  const cacheKey = `${prefix}:${key}`;
 
  // 第 1 步:尝试从缓存读取
  const cached = await redis.get(cacheKey);
 
  if (cached !== null) {
    try {
      return JSON.parse(cached) as T;
    } catch {
      // 损坏的缓存条目,删除并继续
      await redis.del(cacheKey);
    }
  }
 
  // 第 2 步:缓存未命中——从源获取
  const result = await fetcher();
 
  // 第 3 步:存入缓存(不等待——发射即忘)
  redis
    .set(cacheKey, JSON.stringify(result), "EX", ttl)
    .catch((err) => {
      console.error(`[Cache] Failed to set ${cacheKey}:`, err.message);
    });
 
  return result;
}

用法如下:

typescript
interface User {
  id: string;
  name: string;
  email: string;
  plan: "free" | "pro" | "enterprise";
}
 
async function getUser(userId: string): Promise<User | null> {
  return cacheAside<User | null>(
    `user:${userId}`,
    async () => {
      const row = await db.query("SELECT * FROM users WHERE id = $1", [userId]);
      return row[0] ?? null;
    },
    { ttl: 1800 } // 30 分钟
  );
}

注意我对 redis.set 调用采用了发射即忘。这是有意为之的。如果 Redis 宕机或变慢,请求仍然能完成。缓存是优化,不是必需品。如果写入缓存失败,下一个请求只需再次查询数据库。没什么大不了的。

很多 cache-aside 实现中有一个人们容易忽视的微妙 bug:缓存空值。如果用户不存在而你没有缓存这个事实,每个对该用户的请求都会打到数据库。攻击者可以通过请求随机用户 ID 来利用这一点,让你的缓存形同虚设。始终也要缓存否定结果——只是用更短的 TTL。

typescript
async function getUserSafe(userId: string): Promise<User | null> {
  return cacheAside<User | null>(
    `user:${userId}`,
    async () => {
      const row = await db.query("SELECT * FROM users WHERE id = $1", [userId]);
      return row[0] ?? null;
    },
    {
      // 空结果用更短的 TTL 来限制内存使用
      // 但要足够长以吸收重复的未命中
      ttl: row ? 1800 : 300,
    }
  );
}

实际上,让我重构一下让动态 TTL 正常工作:

typescript
async function getUserWithDynamicTTL(userId: string): Promise<User | null> {
  const cacheKey = `cache:user:${userId}`;
 
  const cached = await redis.get(cacheKey);
  if (cached !== null) {
    return JSON.parse(cached) as User | null;
  }
 
  const row = await db.query("SELECT * FROM users WHERE id = $1", [userId]);
  const user: User | null = row[0] ?? null;
 
  // 存在的结果缓存 30 分钟,空结果缓存 5 分钟
  const ttl = user ? 1800 : 300;
  await redis.set(cacheKey, JSON.stringify(user), "EX", ttl);
 
  return user;
}

Write-Through 和 Write-Behind#

Cache-aside 对读密集型工作负载效果很好,但它有一个一致性问题:如果另一个服务或进程直接更新了数据库,你的缓存在 TTL 过期之前都是过期的。这就是 write-through 和 write-behind 模式登场的时候。

Write-Through#

在 write-through 中,每次写入都经过缓存层。先更新缓存,再更新数据库。这保证了缓存始终与数据库一致(假设写入总是通过你的应用程序进行)。

typescript
async function updateUser(
  userId: string,
  updates: Partial<User>
): Promise<User> {
  // 第 1 步:更新数据库
  const updated = await db.query(
    "UPDATE users SET name = COALESCE($2, name), email = COALESCE($3, email) WHERE id = $1 RETURNING *",
    [userId, updates.name, updates.email]
  );
  const user: User = updated[0];
 
  // 第 2 步:立即更新缓存
  const cacheKey = `cache:user:${userId}`;
  await redis.set(cacheKey, JSON.stringify(user), "EX", 1800);
 
  return user;
}

与 cache-aside 的关键区别:我们在每次写入时都更新缓存,而不仅仅是读取时。这意味着最近更新的数据的缓存始终是热的。

权衡:写入延迟增加,因为每次写入现在都要同时访问数据库和 Redis。如果 Redis 变慢,你的写入就慢。在大多数应用中,读远多于写,所以这个权衡是值得的。

Write-Behind(回写)#

Write-behind 反转了流程:写入先到 Redis,数据库异步更新。这给你极快的写入速度,代价是如果 Redis 在数据持久化之前宕机,可能会丢失数据。

typescript
async function updateUserWriteBehind(
  userId: string,
  updates: Partial<User>
): Promise<User> {
  const cacheKey = `cache:user:${userId}`;
 
  // 读取当前状态
  const current = await redis.get(cacheKey);
  const user = current ? JSON.parse(current) as User : null;
  if (!user) throw new Error("User not in cache");
 
  // 立即更新缓存
  const updated = { ...user, ...updates };
  await redis.set(cacheKey, JSON.stringify(updated), "EX", 1800);
 
  // 将数据库写入排队以进行异步处理
  await redis.rpush(
    "write_behind:users",
    JSON.stringify({ userId, updates, timestamp: Date.now() })
  );
 
  return updated;
}

然后你需要一个单独的 worker 来消费这个队列:

typescript
async function processWriteBehindQueue(): Promise<void> {
  while (true) {
    const item = await redis.blpop("write_behind:users", 5);
 
    if (item) {
      const { userId, updates } = JSON.parse(item[1]);
      try {
        await db.query(
          "UPDATE users SET name = COALESCE($2, name), email = COALESCE($3, email) WHERE id = $1",
          [userId, updates.name, updates.email]
        );
      } catch (err) {
        // 失败时重新排队并带上重试计数
        console.error("[WriteBehind] Failed:", err);
        await redis.rpush("write_behind:users:dlq", item[1]);
      }
    }
  }
}

我在实际中很少使用 write-behind。数据丢失的风险是真实的——如果 Redis 在 worker 处理队列之前崩溃,那些写入就丢了。仅在最终一致性真正可以接受的数据上使用,比如浏览量、分析事件或非关键的用户偏好设置。

TTL 策略#

正确设置 TTL 比看起来更有讲究。对所有东西都设一个固定的 1 小时 TTL 容易实现,但几乎总是错的。

数据变动分级#

我将数据分为三个级别并相应分配 TTL:

typescript
const TTL = {
  // 第 1 级:很少变化,计算成本高
  // 示例:商品目录、站点配置、功能开关
  STATIC: 86400,       // 24 小时
 
  // 第 2 级:偶尔变化,中等成本
  // 示例:用户资料、团队设置、权限
  MODERATE: 1800,      // 30 分钟
 
  // 第 3 级:频繁变化,计算便宜但调用频繁
  // 示例:信息流数据、通知计数、会话信息
  VOLATILE: 300,       // 5 分钟
 
  // 第 4 级:临时性的,用于限流和锁
  EPHEMERAL: 60,       // 1 分钟
 
  // 空结果:始终短生命周期
  NOT_FOUND: 120,      // 2 分钟
} as const;

TTL 抖动:防止雷群效应#

有一个场景曾经坑过我:你部署了应用,缓存是空的,10000 个请求都缓存了相同的数据并设了 1 小时的 TTL。一小时后,10000 个键同时过期。10000 个请求同时打到数据库。数据库窒息了。我亲眼见过这种情况打垮一个生产环境的 Postgres 实例。

解决方法是抖动——给 TTL 值添加随机性:

typescript
function ttlWithJitter(baseTtl: number, jitterPercent = 0.1): number {
  const jitter = baseTtl * jitterPercent;
  const offset = Math.random() * jitter * 2 - jitter;
  return Math.max(1, Math.round(baseTtl + offset));
}
 
// 不要:redis.set(key, value, "EX", 3600)
// 要:  redis.set(key, value, "EX", ttlWithJitter(3600))
 
// 3600 ± 10% = 3240 到 3960 之间的随机值

这将过期分散到一个时间窗口内,所以不是 10000 个键在同一秒过期,而是在 12 分钟的窗口内逐渐过期。数据库看到的是流量的渐进增长,而不是悬崖式暴增。

对于关键路径,我进一步使用 20% 的抖动:

typescript
const ttl = ttlWithJitter(3600, 0.2); // 2880–4320 秒

滑动过期#

对于每次访问都应重置 TTL 的会话类数据,使用 GETEX(Redis 6.2+):

typescript
async function getWithSlidingExpiry<T>(
  key: string,
  ttl: number
): Promise<T | null> {
  // GETEX 原子性地获取值并重置 TTL
  const value = await redis.getex(key, "EX", ttl);
  if (value === null) return null;
  return JSON.parse(value) as T;
}

如果你用的是旧版 Redis,使用 pipeline:

typescript
async function getWithSlidingExpiryCompat<T>(
  key: string,
  ttl: number
): Promise<T | null> {
  const pipeline = redis.pipeline();
  pipeline.get(key);
  pipeline.expire(key, ttl);
  const results = await pipeline.exec();
 
  if (!results || !results[0] || results[0][1] === null) return null;
  return JSON.parse(results[0][1] as string) as T;
}

缓存雪崩(雷群效应)#

TTL 抖动有助于应对大规模过期,但它解决不了单键雪崩:当一个热门键过期时,数百个并发请求同时尝试重新生成它。

想象你把首页信息流缓存了 5 分钟的 TTL。它过期了。50 个并发请求看到了缓存未命中。50 个请求全部用同一条昂贵的查询打数据库。你实际上 DDoS 了自己。

方案 1:互斥锁#

只有一个请求重新生成缓存。其他所有人等待。

typescript
async function cacheAsideWithMutex<T>(
  key: string,
  fetcher: () => Promise<T>,
  ttl: number = 3600
): Promise<T | null> {
  const cacheKey = `cache:${key}`;
  const lockKey = `lock:${key}`;
 
  // 先查缓存
  const cached = await redis.get(cacheKey);
  if (cached !== null) {
    return JSON.parse(cached) as T;
  }
 
  // 尝试获取锁(NX = 仅当不存在时,EX = 自动过期)
  const acquired = await redis.set(lockKey, "1", "EX", 10, "NX");
 
  if (acquired) {
    try {
      // 我们拿到锁了——获取并缓存
      const result = await fetcher();
      await redis.set(
        cacheKey,
        JSON.stringify(result),
        "EX",
        ttlWithJitter(ttl)
      );
      return result;
    } finally {
      // 释放锁
      await redis.del(lockKey);
    }
  }
 
  // 另一个请求持有锁——等待后重试
  await sleep(100);
 
  const retried = await redis.get(cacheKey);
  if (retried !== null) {
    return JSON.parse(retried) as T;
  }
 
  // 仍然没有缓存——直接查数据库
  // (处理锁持有者失败的情况)
  return fetcher();
}
 
function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

上面的锁释放中有一个微妙的竞争条件。如果锁持有者花的时间超过 10 秒(锁的 TTL),另一个请求获取了锁,然后第一个请求删除了第二个请求的锁。正确的修复方法是使用唯一令牌:

typescript
import { randomUUID } from "crypto";
 
async function acquireLock(
  lockKey: string,
  ttl: number
): Promise<string | null> {
  const token = randomUUID();
  const acquired = await redis.set(lockKey, token, "EX", ttl, "NX");
  return acquired ? token : null;
}
 
async function releaseLock(lockKey: string, token: string): Promise<boolean> {
  // Lua 脚本确保原子性的检查并删除
  const script = `
    if redis.call("get", KEYS[1]) == ARGV[1] then
      return redis.call("del", KEYS[1])
    else
      return 0
    end
  `;
  const result = await redis.eval(script, 1, lockKey, token);
  return result === 1;
}

这本质上是一个简化版的 Redlock。对于单实例 Redis,这足够了。对于 Redis Cluster 或 Sentinel 配置,可以了解完整的 Redlock 算法——但老实说,对于缓存雪崩防护,这个简单版本就够用了。

方案 2:概率性提前过期#

这是我最喜欢的方法。不是等键过期,而是在过期前稍早一点随机重新生成它。这个想法来自 Vattani、Chierichetti 和 Lowenstein 的论文。

typescript
interface CachedValue<T> {
  data: T;
  cachedAt: number;
  ttl: number;
}
 
async function cacheWithEarlyExpiration<T>(
  key: string,
  fetcher: () => Promise<T>,
  ttl: number = 3600
): Promise<T> {
  const cacheKey = `cache:${key}`;
  const cached = await redis.get(cacheKey);
 
  if (cached !== null) {
    const entry = JSON.parse(cached) as CachedValue<T>;
    const age = (Date.now() - entry.cachedAt) / 1000;
    const remaining = entry.ttl - age;
 
    // XFetch 算法:随着过期时间临近,概率性地重新生成
    // beta * Math.log(Math.random()) 产生一个负数
    // 随着过期临近变得更大(更负)
    const beta = 1; // 调优参数,1 效果不错
    const shouldRegenerate =
      remaining - beta * Math.log(Math.random()) * -1 <= 0;
 
    if (!shouldRegenerate) {
      return entry.data;
    }
 
    // 继续执行重新生成
    console.log(`[Cache] Early regeneration triggered for ${key}`);
  }
 
  const data = await fetcher();
  const entry: CachedValue<T> = {
    data,
    cachedAt: Date.now(),
    ttl,
  };
 
  // 设置额外的缓冲时间,这样 Redis 不会在我们重新生成之前就过期
  await redis.set(
    cacheKey,
    JSON.stringify(entry),
    "EX",
    Math.round(ttl * 1.1)
  );
 
  return data;
}

这种方法的精妙之处:随着键的剩余 TTL 减少,重新生成的概率增加。在 1000 个并发请求中,可能只有一两个会触发重新生成,而其余的继续提供缓存数据。无需锁,无需协调,无需等待。

方案 3:Stale-While-Revalidate#

在后台重新生成的同时提供过期数据。这提供了最佳延迟,因为没有请求需要等待数据获取。

typescript
async function staleWhileRevalidate<T>(
  key: string,
  fetcher: () => Promise<T>,
  options: {
    freshTtl: number;   // 数据"新鲜"的时长
    staleTtl: number;   // 过期数据可以被提供多久
  }
): Promise<T | null> {
  const cacheKey = `cache:${key}`;
  const metaKey = `meta:${key}`;
 
  const [cached, meta] = await redis.mget(cacheKey, metaKey);
 
  if (cached !== null) {
    const parsedMeta = meta ? JSON.parse(meta) : null;
    const isFresh =
      parsedMeta && Date.now() - parsedMeta.cachedAt < options.freshTtl * 1000;
 
    if (!isFresh) {
      // 数据过期了——提供它但触发后台刷新
      revalidateInBackground(key, cacheKey, metaKey, fetcher, options);
    }
 
    return JSON.parse(cached) as T;
  }
 
  // 完全的缓存未命中——必须同步获取
  return fetchAndCache(key, cacheKey, metaKey, fetcher, options);
}
 
async function fetchAndCache<T>(
  key: string,
  cacheKey: string,
  metaKey: string,
  fetcher: () => Promise<T>,
  options: { freshTtl: number; staleTtl: number }
): Promise<T> {
  const data = await fetcher();
  const totalTtl = options.freshTtl + options.staleTtl;
 
  const pipeline = redis.pipeline();
  pipeline.set(cacheKey, JSON.stringify(data), "EX", totalTtl);
  pipeline.set(
    metaKey,
    JSON.stringify({ cachedAt: Date.now() }),
    "EX",
    totalTtl
  );
  await pipeline.exec();
 
  return data;
}
 
function revalidateInBackground<T>(
  key: string,
  cacheKey: string,
  metaKey: string,
  fetcher: () => Promise<T>,
  options: { freshTtl: number; staleTtl: number }
): void {
  // 使用锁来防止多个后台刷新
  const lockKey = `revalidate_lock:${key}`;
 
  redis
    .set(lockKey, "1", "EX", 30, "NX")
    .then((acquired) => {
      if (!acquired) return;
 
      return fetchAndCache(key, cacheKey, metaKey, fetcher, options)
        .finally(() => redis.del(lockKey));
    })
    .catch((err) => {
      console.error(`[SWR] Background revalidation failed for ${key}:`, err);
    });
}

用法:

typescript
const user = await staleWhileRevalidate<User>("user:123", fetchUserFromDB, {
  freshTtl: 300,     // 5 分钟内新鲜
  staleTtl: 3600,    // 在重新验证的同时最多提供 1 小时的过期数据
});

我对任何面向用户的、延迟比绝对新鲜度更重要的场景使用这种模式。仪表盘数据、个人资料页面、商品列表——都是完美的候选者。

缓存失效#

Phil Karlton 不是在开玩笑。失效是缓存从"简单优化"变成"分布式系统问题"的地方。

简单的基于键的失效#

最简单的情况:当你更新一个用户时,删除他们的缓存键。

typescript
async function updateUserAndInvalidate(
  userId: string,
  updates: Partial<User>
): Promise<User> {
  const user = await db.query(
    "UPDATE users SET name = $2 WHERE id = $1 RETURNING *",
    [userId, updates.name]
  );
 
  // 使缓存失效
  await redis.del(`cache:user:${userId}`);
 
  return user[0];
}

这在用户数据出现在其他缓存结果中之前都管用。也许它嵌在团队成员列表里。也许它在搜索结果里。也许它在 14 个不同的缓存 API 响应里。现在你需要跟踪哪些缓存键包含哪些实体。

基于标签的失效#

用它们包含的实体标记你的缓存条目,然后按标签失效。

typescript
async function setWithTags<T>(
  key: string,
  value: T,
  ttl: number,
  tags: string[]
): Promise<void> {
  const pipeline = redis.pipeline();
 
  // 存储值
  pipeline.set(`cache:${key}`, JSON.stringify(value), "EX", ttl);
 
  // 将键添加到每个标签的集合中
  for (const tag of tags) {
    pipeline.sadd(`tag:${tag}`, `cache:${key}`);
    pipeline.expire(`tag:${tag}`, ttl + 3600); // 标签集合比值存活更久
  }
 
  await pipeline.exec();
}
 
async function invalidateByTag(tag: string): Promise<number> {
  const keys = await redis.smembers(`tag:${tag}`);
 
  if (keys.length === 0) return 0;
 
  const pipeline = redis.pipeline();
  for (const key of keys) {
    pipeline.del(key);
  }
  pipeline.del(`tag:${tag}`);
 
  await pipeline.exec();
  return keys.length;
}

用法:

typescript
// 缓存团队数据时,用所有成员 ID 标记
const team = await fetchTeam(teamId);
await setWithTags(
  `team:${teamId}`,
  team,
  1800,
  [
    `entity:team:${teamId}`,
    ...team.members.map((m) => `entity:user:${m.id}`),
  ]
);
 
// 当用户 42 更新了资料,使包含他的所有缓存失效
await invalidateByTag("entity:user:42");

事件驱动的失效#

对于更大的系统,使用 Redis Pub/Sub 广播失效事件:

typescript
// 发布者(在你的 API 服务中)
async function publishInvalidation(
  entityType: string,
  entityId: string
): Promise<void> {
  await redis.publish(
    "cache:invalidate",
    JSON.stringify({ entityType, entityId, timestamp: Date.now() })
  );
}
 
// 订阅者(在每个应用实例中)
const subscriber = new Redis(/* 相同配置 */);
 
subscriber.subscribe("cache:invalidate", (err) => {
  if (err) console.error("[PubSub] Subscribe error:", err);
});
 
subscriber.on("message", async (_channel, message) => {
  const { entityType, entityId } = JSON.parse(message);
  await invalidateByTag(`entity:${entityType}:${entityId}`);
  console.log(`[Cache] Invalidated ${entityType}:${entityId}`);
});

这在多实例部署中至关重要。如果你有 4 个应用服务器在负载均衡器后面,服务器 1 上的失效需要传播到所有服务器。Pub/Sub 自动处理这一点。

基于模式的失效(谨慎使用)#

有时你需要使匹配某个模式的所有键失效。永远不要在生产环境中使用 KEYS 它在扫描整个键空间时会阻塞 Redis 服务器。几百万个键的话,可能阻塞好几秒——在 Redis 术语中这是永恒。

使用 SCAN 代替:

typescript
async function invalidateByPattern(pattern: string): Promise<number> {
  let cursor = "0";
  let deletedCount = 0;
 
  do {
    const [nextCursor, keys] = await redis.scan(
      cursor,
      "MATCH",
      pattern,
      "COUNT",
      100
    );
    cursor = nextCursor;
 
    if (keys.length > 0) {
      await redis.del(...keys);
      deletedCount += keys.length;
    }
  } while (cursor !== "0");
 
  return deletedCount;
}
 
// 使某个特定团队的所有缓存数据失效
await invalidateByPattern("cache:team:42:*");

SCAN 增量迭代——它永远不会阻塞服务器。COUNT 提示建议每次迭代返回多少键(是提示,不是保证)。对于大的键空间,这是唯一安全的方式。

话说回来,基于模式的失效是一种代码异味。如果你发现自己频繁扫描,请重新设计你的键结构或使用标签。SCAN 对键空间来说是 O(N) 的,它设计用于维护操作,不是热路径。

超越字符串的数据结构#

大多数开发者把 Redis 当作存 JSON 字符串的键值存储。这就像买了一把瑞士军刀却只用开瓶器。Redis 有丰富的数据结构,选择正确的结构可以消除整类复杂性。

Hash 用于对象#

与其把整个对象序列化为 JSON,不如存为 Redis Hash。这让你可以读取和更新单个字段而不需要反序列化整个对象。

typescript
// 把用户存为 hash
async function setUserHash(user: User): Promise<void> {
  const key = `user:${user.id}`;
  await redis.hset(key, {
    name: user.name,
    email: user.email,
    plan: user.plan,
    updatedAt: Date.now().toString(),
  });
  await redis.expire(key, 1800);
}
 
// 读取特定字段
async function getUserPlan(userId: string): Promise<string | null> {
  return redis.hget(`user:${userId}`, "plan");
}
 
// 更新单个字段
async function upgradeUserPlan(
  userId: string,
  plan: string
): Promise<void> {
  await redis.hset(`user:${userId}`, "plan", plan);
}
 
// 读取整个 hash 为对象
async function getUserHash(userId: string): Promise<User | null> {
  const data = await redis.hgetall(`user:${userId}`);
  if (!data || Object.keys(data).length === 0) return null;
 
  return {
    id: userId,
    name: data.name,
    email: data.email,
    plan: data.plan as User["plan"],
  };
}

Hash 对小对象来说内存效率很高(Redis 底层使用紧凑的 ziplist 编码),并且避免了序列化/反序列化开销。权衡:你失去了存储嵌套对象的能力,除非先将它们展平。

Sorted Set 用于排行榜和限流#

Sorted Set 是 Redis 中最被低估的数据结构。每个成员都有一个分数,集合始终按分数排序。这使它们成为排行榜、排名和滑动窗口限流的完美选择。

typescript
// 排行榜
async function addScore(
  leaderboard: string,
  userId: string,
  score: number
): Promise<void> {
  await redis.zadd(leaderboard, score, userId);
}
 
async function getTopPlayers(
  leaderboard: string,
  count: number = 10
): Promise<Array<{ userId: string; score: number }>> {
  const results = await redis.zrevrange(
    leaderboard,
    0,
    count - 1,
    "WITHSCORES"
  );
 
  const players: Array<{ userId: string; score: number }> = [];
  for (let i = 0; i < results.length; i += 2) {
    players.push({
      userId: results[i],
      score: parseFloat(results[i + 1]),
    });
  }
  return players;
}
 
async function getUserRank(
  leaderboard: string,
  userId: string
): Promise<number | null> {
  const rank = await redis.zrevrank(leaderboard, userId);
  return rank !== null ? rank + 1 : null; // 从 0 索引变为 1 索引
}

用于滑动窗口限流:

typescript
async function slidingWindowRateLimit(
  identifier: string,
  windowMs: number,
  maxRequests: number
): Promise<{ allowed: boolean; remaining: number }> {
  const key = `ratelimit:${identifier}`;
  const now = Date.now();
  const windowStart = now - windowMs;
 
  const pipeline = redis.pipeline();
 
  // 移除窗口外的条目
  pipeline.zremrangebyscore(key, 0, windowStart);
 
  // 添加当前请求
  pipeline.zadd(key, now, `${now}:${Math.random()}`);
 
  // 统计窗口内的请求数
  pipeline.zcard(key);
 
  // 设置整个键的过期
  pipeline.expire(key, Math.ceil(windowMs / 1000));
 
  const results = await pipeline.exec();
  const count = results?.[2]?.[1] as number;
 
  return {
    allowed: count <= maxRequests,
    remaining: Math.max(0, maxRequests - count),
  };
}

这比固定窗口计数器方法更精确,不存在边界问题——即一个窗口末尾和下一个窗口开头的突发流量实际上使你的速率限制翻倍。

List 用于队列#

Redis List 配合 LPUSH/BRPOP 可以构建出色的轻量级任务队列:

typescript
interface Job {
  id: string;
  type: string;
  payload: Record<string, unknown>;
  createdAt: number;
}
 
// 生产者
async function enqueueJob(
  queue: string,
  type: string,
  payload: Record<string, unknown>
): Promise<string> {
  const job: Job = {
    id: randomUUID(),
    type,
    payload,
    createdAt: Date.now(),
  };
 
  await redis.lpush(`queue:${queue}`, JSON.stringify(job));
  return job.id;
}
 
// 消费者(阻塞直到有任务可用)
async function dequeueJob(
  queue: string,
  timeout: number = 5
): Promise<Job | null> {
  const result = await redis.brpop(`queue:${queue}`, timeout);
  if (!result) return null;
 
  return JSON.parse(result[1]) as Job;
}

对于比基础队列更复杂的需求(重试、死信队列、优先级、延迟任务),使用 BullMQ,它基于 Redis 构建但处理了所有边界情况。

Set 用于唯一跟踪#

需要跟踪独立访客、事件去重或检查成员资格?Set 的添加、删除和成员检查都是 O(1) 的。

typescript
// 跟踪每日独立访客
async function trackVisitor(
  page: string,
  visitorId: string
): Promise<boolean> {
  const key = `visitors:${page}:${new Date().toISOString().split("T")[0]}`;
  const isNew = await redis.sadd(key, visitorId);
 
  // 48 小时后自动过期
  await redis.expire(key, 172800);
 
  return isNew === 1; // 1 = 新成员,0 = 已存在
}
 
// 获取独立访客数
async function getUniqueVisitors(page: string, date: string): Promise<number> {
  return redis.scard(`visitors:${page}:${date}`);
}
 
// 检查用户是否已经执行过某操作
async function hasUserVoted(pollId: string, userId: string): Promise<boolean> {
  return (await redis.sismember(`votes:${pollId}`, userId)) === 1;
}

对于非常大的集合(数百万成员),考虑使用 HyperLogLog。无论基数多大,它只使用 12KB 内存,代价是约 0.81% 的标准误差:

typescript
// HyperLogLog 用于近似唯一计数
async function trackVisitorApprox(
  page: string,
  visitorId: string
): Promise<void> {
  const key = `hll:visitors:${page}:${new Date().toISOString().split("T")[0]}`;
  await redis.pfadd(key, visitorId);
  await redis.expire(key, 172800);
}
 
async function getApproxUniqueVisitors(
  page: string,
  date: string
): Promise<number> {
  return redis.pfcount(`hll:visitors:${page}:${date}`);
}

序列化:JSON vs MessagePack#

JSON 是 Redis 序列化的默认选择。它可读、通用,大多数情况下够用。但对于高吞吐量系统,序列化/反序列化的开销会累积。

JSON 的问题#

typescript
const user = {
  id: "usr_abc123",
  name: "Ahmet Kousa",
  email: "ahmet@example.com",
  plan: "pro",
  preferences: {
    theme: "dark",
    language: "tr",
    notifications: true,
  },
};
 
// JSON:189 字节
const jsonStr = JSON.stringify(user);
console.log(Buffer.byteLength(jsonStr)); // 189
 
// 热路径上的 JSON.parse:每次约 0.02 毫秒
// 每秒 10000 个请求:每秒 200 毫秒的总 CPU 时间

MessagePack 替代方案#

MessagePack 是一种比 JSON 更小更快的二进制序列化格式:

bash
npm install msgpackr
typescript
import { pack, unpack } from "msgpackr";
 
// MessagePack:约 140 字节(小 25%)
const packed = pack(user);
console.log(packed.length); // ~140
 
// 存为 Buffer
await redis.set("user:123", packed);
 
// 读为 Buffer
const raw = await redis.getBuffer("user:123");
if (raw) {
  const data = unpack(raw);
}

注意使用 getBuffer 而不是 get——这很关键。get 返回字符串,会损坏二进制数据。

大值的压缩#

对于大的缓存值(包含数百条数据的 API 响应、渲染的 HTML),添加压缩:

typescript
import { promisify } from "util";
import { gzip, gunzip } from "zlib";
 
const gzipAsync = promisify(gzip);
const gunzipAsync = promisify(gunzip);
 
async function setCompressed<T>(
  key: string,
  value: T,
  ttl: number
): Promise<void> {
  const json = JSON.stringify(value);
 
  // 仅在大于 1KB 时压缩(小值的压缩开销不值得)
  if (Buffer.byteLength(json) > 1024) {
    const compressed = await gzipAsync(json);
    await redis.set(key, compressed, "EX", ttl);
  } else {
    await redis.set(key, json, "EX", ttl);
  }
}
 
async function getCompressed<T>(key: string): Promise<T | null> {
  const raw = await redis.getBuffer(key);
  if (!raw) return null;
 
  try {
    // 先尝试解压
    const decompressed = await gunzipAsync(raw);
    return JSON.parse(decompressed.toString()) as T;
  } catch {
    // 没有压缩,作为普通 JSON 解析
    return JSON.parse(raw.toString()) as T;
  }
}

在我的测试中,gzip 压缩通常将 JSON 载荷大小减少 70-85%。一个 50KB 的 API 响应变成 8KB。当你在为 Redis 内存付费时这很重要——每个键的内存越少,同一个实例能存的键就越多。

权衡:压缩每次操作增加 1-3 毫秒的 CPU 时间。对于大多数应用来说,这可以忽略不计。对于超低延迟路径,跳过它。

我的建议#

除非性能分析显示 JSON 是瓶颈,否则使用 JSON。JSON 在 Redis 中的可读性和可调试性(你可以 redis-cli GET key 然后真正读懂值)对 95% 的应用来说超过了 MessagePack 的性能增益。仅对大于 1KB 的值添加压缩。

Next.js 中的 Redis#

Next.js 有自己的缓存机制(Data Cache、Full Route Cache 等),但 Redis 填补了内置缓存无法处理的空白——特别是当你需要在多个实例之间共享缓存或在部署之间持久化缓存时。

缓存 API 路由响应#

typescript
// app/api/products/route.ts
import { NextResponse } from "next/server";
import redis from "@/lib/redis";
 
export async function GET(request: Request) {
  const url = new URL(request.url);
  const category = url.searchParams.get("category") || "all";
  const cacheKey = `api:products:${category}`;
 
  // 检查缓存
  const cached = await redis.get(cacheKey);
  if (cached) {
    return NextResponse.json(JSON.parse(cached), {
      headers: {
        "X-Cache": "HIT",
        "Cache-Control": "public, s-maxage=60",
      },
    });
  }
 
  // 从数据库获取
  const products = await db.products.findMany({
    where: category !== "all" ? { category } : undefined,
    orderBy: { createdAt: "desc" },
    take: 50,
  });
 
  // 缓存 5 分钟并带抖动
  await redis.set(
    cacheKey,
    JSON.stringify(products),
    "EX",
    ttlWithJitter(300)
  );
 
  return NextResponse.json(products, {
    headers: {
      "X-Cache": "MISS",
      "Cache-Control": "public, s-maxage=60",
    },
  });
}

X-Cache 头对调试非常有价值。当延迟飙升时,一个快速的 curl -I 就能告诉你缓存是否在工作。

会话存储#

Next.js 配合 Redis 做会话存储,对有状态应用来说比 JWT 更好:

typescript
// lib/session.ts
import { randomUUID } from "crypto";
import redis from "./redis";
 
interface Session {
  userId: string;
  role: string;
  createdAt: number;
  data: Record<string, unknown>;
}
 
const SESSION_TTL = 86400; // 24 小时
const SESSION_PREFIX = "session:";
 
export async function createSession(
  userId: string,
  role: string
): Promise<string> {
  const sessionId = randomUUID();
  const session: Session = {
    userId,
    role,
    createdAt: Date.now(),
    data: {},
  };
 
  await redis.set(
    `${SESSION_PREFIX}${sessionId}`,
    JSON.stringify(session),
    "EX",
    SESSION_TTL
  );
 
  return sessionId;
}
 
export async function getSession(
  sessionId: string
): Promise<Session | null> {
  const key = `${SESSION_PREFIX}${sessionId}`;
 
  // 使用 GETEX 在每次访问时刷新 TTL(滑动过期)
  const raw = await redis.getex(key, "EX", SESSION_TTL);
  if (!raw) return null;
 
  return JSON.parse(raw) as Session;
}
 
export async function destroySession(sessionId: string): Promise<void> {
  await redis.del(`${SESSION_PREFIX}${sessionId}`);
}
 
// 销毁用户的所有会话(适用于"全部退出")
export async function destroyAllUserSessions(
  userId: string
): Promise<void> {
  // 这需要维护一个 user->sessions 索引
  const sessionIds = await redis.smembers(`user_sessions:${userId}`);
 
  if (sessionIds.length > 0) {
    const pipeline = redis.pipeline();
    for (const sid of sessionIds) {
      pipeline.del(`${SESSION_PREFIX}${sid}`);
    }
    pipeline.del(`user_sessions:${userId}`);
    await pipeline.exec();
  }
}

限流中间件#

typescript
// middleware.ts(或中间件使用的辅助函数)
import redis from "@/lib/redis";
 
interface RateLimitResult {
  allowed: boolean;
  remaining: number;
  resetAt: number;
}
 
export async function rateLimit(
  identifier: string,
  limit: number = 60,
  windowSeconds: number = 60
): Promise<RateLimitResult> {
  const key = `rate:${identifier}`;
  const now = Math.floor(Date.now() / 1000);
  const windowStart = now - windowSeconds;
 
  // 原子性限流的 Lua 脚本
  const script = `
    redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
    redis.call('ZADD', KEYS[1], ARGV[2], ARGV[3])
    local count = redis.call('ZCARD', KEYS[1])
    redis.call('EXPIRE', KEYS[1], ARGV[4])
    return count
  `;
 
  const count = (await redis.eval(
    script,
    1,
    key,
    windowStart,
    now,
    `${now}:${Math.random()}`,
    windowSeconds
  )) as number;
 
  return {
    allowed: count <= limit,
    remaining: Math.max(0, limit - count),
    resetAt: now + windowSeconds,
  };
}

Lua 脚本在这里很重要。没有它的话,ZREMRANGEBYSCORE + ZADD + ZCARD 序列不是原子的,在高并发下计数可能不准确。Lua 脚本在 Redis 中原子执行——它们不会与其他命令交错。

Next.js 的分布式锁#

当你有多个 Next.js 实例且需要确保只有一个处理任务时(如发送定时邮件或运行清理任务):

typescript
// lib/distributed-lock.ts
import { randomUUID } from "crypto";
import redis from "./redis";
 
export async function withLock<T>(
  lockName: string,
  fn: () => Promise<T>,
  options: { ttl?: number; retryDelay?: number; maxRetries?: number } = {}
): Promise<T | null> {
  const { ttl = 30, retryDelay = 200, maxRetries = 10 } = options;
  const token = randomUUID();
  const lockKey = `dlock:${lockName}`;
 
  // 尝试获取锁
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const acquired = await redis.set(lockKey, token, "EX", ttl, "NX");
 
    if (acquired) {
      try {
        // 为长时间运行的任务自动延长锁
        const extender = setInterval(async () => {
          const script = `
            if redis.call("get", KEYS[1]) == ARGV[1] then
              return redis.call("expire", KEYS[1], ARGV[2])
            else
              return 0
            end
          `;
          await redis.eval(script, 1, lockKey, token, ttl);
        }, (ttl * 1000) / 3);
 
        const result = await fn();
        clearInterval(extender);
        return result;
      } finally {
        // 仅在我们仍持有锁时释放
        const releaseScript = `
          if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
          else
            return 0
          end
        `;
        await redis.eval(releaseScript, 1, lockKey, token);
      }
    }
 
    // 等待后重试
    await new Promise((r) => setTimeout(r, retryDelay));
  }
 
  // 所有重试后仍无法获取锁
  return null;
}

用法:

typescript
// 在 cron 触发的 API 路由中
export async function POST() {
  const result = await withLock("daily-report", async () => {
    // 只有一个实例运行这个
    const report = await generateDailyReport();
    await sendReportEmail(report);
    return report;
  });
 
  if (result === null) {
    return NextResponse.json(
      { message: "Another instance is already processing" },
      { status: 409 }
    );
  }
 
  return NextResponse.json({ success: true });
}

ttl/3 的锁延长间隔很重要。没有它的话,如果你的任务花的时间超过锁的 TTL,锁过期后另一个实例会抢到它。延长器在任务运行期间保持锁的活性。

监控和调试#

Redis 很快,直到它不快的时候。当问题出现时,你需要可见性。

缓存命中率#

最重要的单一指标。在你的应用中跟踪它:

typescript
// lib/cache-metrics.ts
import redis from "./redis";
 
const METRICS_KEY = "metrics:cache";
 
export async function recordCacheHit(): Promise<void> {
  await redis.hincrby(METRICS_KEY, "hits", 1);
}
 
export async function recordCacheMiss(): Promise<void> {
  await redis.hincrby(METRICS_KEY, "misses", 1);
}
 
export async function getCacheStats(): Promise<{
  hits: number;
  misses: number;
  hitRate: number;
}> {
  const stats = await redis.hgetall(METRICS_KEY);
  const hits = parseInt(stats.hits || "0", 10);
  const misses = parseInt(stats.misses || "0", 10);
  const total = hits + misses;
 
  return {
    hits,
    misses,
    hitRate: total > 0 ? hits / total : 0,
  };
}
 
// 每天重置指标
export async function resetCacheStats(): Promise<void> {
  await redis.del(METRICS_KEY);
}

健康的缓存命中率在 90% 以上。如果你低于 80%,要么是 TTL 太短,要么是缓存键太具体,要么是你的访问模式比你想的更随机。

INFO 命令#

INFO 命令是 Redis 的内置健康仪表盘:

bash
redis-cli INFO memory
# Memory
used_memory:1234567
used_memory_human:1.18M
used_memory_peak:2345678
used_memory_peak_human:2.24M
maxmemory:0
maxmemory_policy:noeviction
mem_fragmentation_ratio:1.23

需要监控的关键指标:

  • used_memory vs maxmemory:你是否在接近限制?
  • mem_fragmentation_ratio:超过 1.5 意味着 Redis 使用的 RSS 远大于逻辑内存。考虑重启。
  • evicted_keys:如果非零且你没打算淘汰,说明你内存不够了。
bash
redis-cli INFO stats

关注:

  • keyspace_hits / keyspace_misses:服务器级别的命中率
  • total_commands_processed:吞吐量
  • instantaneous_ops_per_sec:当前吞吐量

MONITOR(极其谨慎地使用)#

MONITOR 实时流式输出 Redis 服务器上执行的每个命令。用于调试非常有用,但在生产环境中非常危险。

bash
# 永远不要在生产环境中让它持续运行
# 它会增加显著的开销并可能记录敏感数据
redis-cli MONITOR
1614556800.123456 [0 127.0.0.1:52340] "SET" "cache:user:123" "{\"name\":\"Ahmet\"}" "EX" "1800"
1614556800.234567 [0 127.0.0.1:52340] "GET" "cache:user:456"

我用 MONITOR 只做两件事:开发阶段调试键命名问题,以及验证特定代码路径是否按预期访问了 Redis。绝不超过 30 秒。绝不在生产环境中使用,除非你已经穷尽了其他调试手段。

键空间通知#

想知道键何时过期或被删除?Redis 可以发布事件:

bash
# 为过期和淘汰事件启用键空间通知
redis-cli CONFIG SET notify-keyspace-events Ex
typescript
const subscriber = new Redis(/* 配置 */);
 
// 监听键过期事件
subscriber.subscribe("__keyevent@0__:expired", (err) => {
  if (err) console.error("Subscribe error:", err);
});
 
subscriber.on("message", (_channel, expiredKey) => {
  console.log(`Key expired: ${expiredKey}`);
 
  // 主动重新生成重要的键
  if (expiredKey.startsWith("cache:homepage")) {
    regenerateHomepageCache().catch(console.error);
  }
});

这对主动缓存预热很有用——不是等用户触发缓存未命中,而是在关键条目过期的那一刻就重新生成它们。

内存分析#

当 Redis 内存意外增长时,你需要找出哪些键消耗最多:

bash
# 采样 10 个最大的键
redis-cli --bigkeys
# Scanning the entire keyspace to find biggest keys
[00.00%] Biggest string found so far '"cache:search:electronics"' with 524288 bytes
[25.00%] Biggest zset found so far '"leaderboard:global"' with 150000 members
[50.00%] Biggest hash found so far '"session:abc123"' with 45 fields

更详细的分析:

bash
# 特定键的内存使用(字节)
redis-cli MEMORY USAGE "cache:search:electronics"
typescript
// 编程式内存分析
async function analyzeMemory(pattern: string): Promise<void> {
  let cursor = "0";
  const stats: Array<{ key: string; bytes: number }> = [];
 
  do {
    const [nextCursor, keys] = await redis.scan(
      cursor,
      "MATCH",
      pattern,
      "COUNT",
      100
    );
    cursor = nextCursor;
 
    for (const key of keys) {
      const bytes = await redis.memory("USAGE", key);
      if (bytes) {
        stats.push({ key, bytes: bytes as number });
      }
    }
  } while (cursor !== "0");
 
  // 按大小降序排列
  stats.sort((a, b) => b.bytes - a.bytes);
 
  console.log("Top 20 keys by memory usage:");
  for (const { key, bytes } of stats.slice(0, 20)) {
    const mb = (bytes / 1024 / 1024).toFixed(2);
    console.log(`  ${key}: ${mb} MB`);
  }
}

淘汰策略#

如果你的 Redis 实例有 maxmemory 限制(应该有),配置一个淘汰策略:

bash
# 在 redis.conf 中或通过 CONFIG SET
maxmemory 512mb
maxmemory-policy allkeys-lru

可用的策略:

  • noeviction:内存满时返回错误(默认,对缓存来说最差)
  • allkeys-lru:淘汰最近最少使用的键(缓存的最佳通用选择)
  • allkeys-lfu:淘汰最不常用的键(如果某些键是突发访问的话更好)
  • volatile-lru:只淘汰设了 TTL 的键(如果你混合缓存和持久化数据很有用)
  • allkeys-random:随机淘汰(出人意料地不错,没有开销)

对于纯缓存工作负载,allkeys-lfu 通常是最好的选择。它把频繁访问的键保留在内存中,即使它们最近没有被访问过。

整合:一个生产级缓存模块#

这是我在生产中使用的完整缓存模块,结合了我们讨论的所有内容:

typescript
// lib/cache.ts
import Redis from "ioredis";
 
const redis = new Redis({
  host: process.env.REDIS_HOST || "127.0.0.1",
  port: Number(process.env.REDIS_PORT) || 6379,
  password: process.env.REDIS_PASSWORD || undefined,
  maxRetriesPerRequest: 3,
  retryStrategy(times) {
    return Math.min(times * 200, 5000);
  },
});
 
// TTL 分级
const TTL = {
  STATIC: 86400,
  MODERATE: 1800,
  VOLATILE: 300,
  EPHEMERAL: 60,
  NOT_FOUND: 120,
} as const;
 
type TTLTier = keyof typeof TTL;
 
function ttlWithJitter(base: number, jitter = 0.1): number {
  const offset = base * jitter * (Math.random() * 2 - 1);
  return Math.max(1, Math.round(base + offset));
}
 
// 核心的 cache-aside 配合雪崩保护
async function get<T>(
  key: string,
  fetcher: () => Promise<T>,
  options: {
    tier?: TTLTier;
    ttl?: number;
    tags?: string[];
    swr?: { freshTtl: number; staleTtl: number };
  } = {}
): Promise<T> {
  const { tier = "MODERATE", tags } = options;
  const baseTtl = options.ttl ?? TTL[tier];
  const cacheKey = `c:${key}`;
 
  // 检查缓存
  const cached = await redis.get(cacheKey);
 
  if (cached !== null) {
    try {
      const parsed = JSON.parse(cached);
      recordHit();
      return parsed as T;
    } catch {
      await redis.del(cacheKey);
    }
  }
 
  recordMiss();
 
  // 获取锁防止雪崩
  const lockKey = `lock:${key}`;
  const acquired = await redis.set(lockKey, "1", "EX", 10, "NX");
 
  if (!acquired) {
    // 另一个进程正在获取——短暂等待后重试缓存
    await new Promise((r) => setTimeout(r, 150));
    const retried = await redis.get(cacheKey);
    if (retried) return JSON.parse(retried) as T;
  }
 
  try {
    const result = await fetcher();
    const ttl = ttlWithJitter(baseTtl);
 
    const pipeline = redis.pipeline();
    pipeline.set(cacheKey, JSON.stringify(result), "EX", ttl);
 
    // 存储标签关联
    if (tags) {
      for (const tag of tags) {
        pipeline.sadd(`tag:${tag}`, cacheKey);
        pipeline.expire(`tag:${tag}`, ttl + 3600);
      }
    }
 
    await pipeline.exec();
    return result;
  } finally {
    await redis.del(lockKey);
  }
}
 
// 失效
async function invalidate(...keys: string[]): Promise<void> {
  if (keys.length === 0) return;
  await redis.del(...keys.map((k) => `c:${k}`));
}
 
async function invalidateByTag(tag: string): Promise<number> {
  const keys = await redis.smembers(`tag:${tag}`);
  if (keys.length === 0) return 0;
 
  const pipeline = redis.pipeline();
  for (const key of keys) {
    pipeline.del(key);
  }
  pipeline.del(`tag:${tag}`);
  await pipeline.exec();
  return keys.length;
}
 
// 指标
function recordHit(): void {
  redis.hincrby("metrics:cache", "hits", 1).catch(() => {});
}
 
function recordMiss(): void {
  redis.hincrby("metrics:cache", "misses", 1).catch(() => {});
}
 
async function stats(): Promise<{
  hits: number;
  misses: number;
  hitRate: string;
}> {
  const raw = await redis.hgetall("metrics:cache");
  const hits = parseInt(raw.hits || "0", 10);
  const misses = parseInt(raw.misses || "0", 10);
  const total = hits + misses;
 
  return {
    hits,
    misses,
    hitRate: total > 0 ? ((hits / total) * 100).toFixed(1) + "%" : "N/A",
  };
}
 
export const cache = {
  get,
  invalidate,
  invalidateByTag,
  stats,
  redis,
  TTL,
};

在整个应用中使用:

typescript
import { cache } from "@/lib/cache";
 
// 简单的 cache-aside
const products = await cache.get("products:featured", fetchFeaturedProducts, {
  tier: "VOLATILE",
  tags: ["entity:products"],
});
 
// 自定义 TTL
const config = await cache.get("app:config", fetchAppConfig, {
  ttl: 43200, // 12 小时
});
 
// 更新商品后
await cache.invalidateByTag("entity:products");
 
// 检查健康
const metrics = await cache.stats();
console.log(`Cache hit rate: ${metrics.hitRate}`);

我犯过的常见错误(这样你就不必再犯了)#

1. 没设 maxmemory。 Redis 会开心地使用所有可用内存直到操作系统杀掉它。永远设置一个限制。

2. 在生产环境使用 KEYS。 它会阻塞服务器。用 SCAN。我是在一个监控脚本的 KEYS * 调用导致了 3 秒的停机后学到这一课的。

3. 缓存过于激进。 不是所有东西都需要缓存。如果你的数据库查询耗时 2 毫秒且每分钟只调用 10 次,缓存增加了复杂性却几乎没有收益。

4. 忽视序列化成本。 我曾经缓存了一个 2MB 的 JSON blob,然后困惑为什么缓存读取很慢。序列化开销比它应该省下的数据库查询还大。

5. 没有优雅降级。 当 Redis 宕机时,你的应用应该仍然能工作——只是慢一点。把每个缓存调用包在 try/catch 中,回退到数据库。永远不要让缓存失败变成面向用户的错误。

typescript
async function resilientGet<T>(
  key: string,
  fetcher: () => Promise<T>
): Promise<T> {
  try {
    return await cache.get(key, fetcher);
  } catch (err) {
    console.error(`[Cache] Degraded mode for ${key}:`, err);
    return fetcher(); // 完全绕过缓存
  }
}

6. 不监控淘汰。 如果 Redis 在淘汰键,要么是资源不足,要么是缓存太多。无论哪种情况,你都需要知道。

7. 让缓存和持久化数据共享一个 Redis 实例。 使用独立的实例(或至少独立的数据库)。一个删除了你的任务队列条目的缓存淘汰策略,对所有人来说都是糟糕的一天。

总结#

Redis 缓存不难,但很容易用错。从 cache-aside 开始,从第一天就加上 TTL 抖动,监控命中率,抵制缓存所有东西的冲动。

最好的缓存策略是你凌晨三点出了问题时能想明白的那种。保持简单,保持可观测,并记住每个缓存的值都是你告诉用户的关于数据状态的一个谎言——你的工作是让这个谎言尽可能小、尽可能短暂。

相关文章