跳至内容
·17 分钟阅读

tRPC:无需繁文缛节的端到端类型安全

tRPC 如何消除 API 契约问题,与 Next.js App Router 配合,处理认证中间件、文件上传和订阅。从零开始搭建一个真实的 tRPC 项目。

分享:X / TwitterLinkedIn

你一定经历过这种场景。你修改了 API 响应中的一个字段名,更新了后端类型,然后部署。接着前端在生产环境崩了,因为有人忘了更新 Dashboard.tsx 第 247 行的 fetch 调用。那个字段现在是 undefined,组件渲染空白,你的错误追踪系统在凌晨两点亮起了红灯。

这就是 API 契约问题。它不是技术问题,而是协作问题。再多的 Swagger 文档或 GraphQL schema 也无法解决前后端类型悄悄漂移的事实。

tRPC 通过拒绝让类型漂移来解决这个问题。没有 schema 文件,没有代码生成步骤,没有需要维护的独立契约。你在服务端写一个 TypeScript 函数,客户端在编译时就知道它的精确输入和输出类型。如果你重命名一个字段,前端就无法编译,直到你修复它。

这就是它的承诺。让我来展示它实际如何工作,在哪里闪光,以及在哪里你绝对不应该使用它。

更精确地描述问题#

来看看大多数团队今天是怎么构建 API 的。

REST + OpenAPI:你编写端点。也许加上 Swagger 注解。也许从 OpenAPI spec 生成客户端 SDK。但 spec 是一个独立的产物,它可能会过时。生成步骤是 CI 流水线中另一个可能出错或被遗忘的环节。而且生成的类型通常很丑——paths["/api/users"]["get"]["responses"]["200"]["content"]["application/json"] 这种深层嵌套的怪物。

GraphQL:类型安全更好,但仪式感太重。你用 SDL 写 schema,写 resolver,从 schema 生成类型,在客户端写查询,再从查询生成类型。这至少是两次代码生成步骤、一个 schema 文件和一个所有人都得记得运行的构建步骤。对于一个同时控制前后端的团队来说,这是为一个有更简单解决方案的问题搭建了太多基础设施。

手动 fetch 调用:最常见的方式,也是最危险的。你写 fetch("/api/users"),把结果断言为 User[],然后祈祷一切顺利。没有任何编译时安全性。类型断言是你对 TypeScript 撒的谎。

typescript
// The lie every frontend developer has told
const res = await fetch("/api/users");
const users = (await res.json()) as User[]; // 🙏 hope this is right

tRPC 采取了完全不同的方式。它不是用另一种格式描述 API 然后生成类型,而是在服务端写纯 TypeScript 函数,直接把类型导入客户端。没有生成步骤,没有 schema 文件,没有漂移。

核心概念#

在开始搭建之前,让我们先理解思维模型。

Router#

tRPC router 是一组 procedure 的集合。可以把它想象成 MVC 中的 controller,只不过它是一个内置了类型推断的普通对象。

typescript
import { initTRPC } from "@trpc/server";
 
const t = initTRPC.create();
 
export const appRouter = t.router({
  user: t.router({
    list: t.procedure.query(/* ... */),
    byId: t.procedure.input(/* ... */).query(/* ... */),
    create: t.procedure.input(/* ... */).mutation(/* ... */),
  }),
  post: t.router({
    list: t.procedure.query(/* ... */),
    publish: t.procedure.input(/* ... */).mutation(/* ... */),
  }),
});
 
export type AppRouter = typeof appRouter;

那个 AppRouter 类型导出就是全部的魔法。客户端导入这个类型——不是运行时代码,只是类型——然后就获得了每个 procedure 的完整自动补全和类型检查。

Procedure#

Procedure 是一个单独的端点。有三种类型:

  • Query:读操作。映射到 HTTP GET 语义。由 TanStack Query 缓存。
  • Mutation:写操作。映射到 HTTP POST。不缓存。
  • Subscription:实时流。使用 WebSocket。

Context#

Context 是每个请求作用域内可用的数据。数据库连接、已认证的用户、请求头——任何你会放在 Express 的 req 对象中的东西都在这里。

Middleware#

Middleware 转换 context 或控制访问。最常见的模式是一个认证 middleware,检查有效的 session 并添加 ctx.user

类型推断链#

这是关键的思维模型。当你这样定义一个 procedure 时:

typescript
t.procedure
  .input(z.object({ id: z.string() }))
  .query(({ input }) => {
    // input is typed as { id: string }
    return db.user.findUnique({ where: { id: input.id } });
  });

返回类型一路流到客户端。如果 db.user.findUnique 返回 User | null,客户端的 useQuery hook 的 data 就会被类型化为 User | null。不需要手动定义类型,不需要类型断言。端到端推断。

在 Next.js App Router 中搭建#

让我们从头开始构建。假设你有一个 Next.js 14+ 项目,使用 App Router。

安装依赖#

bash
npm install @trpc/server @trpc/client @trpc/react-query @trpc/next @tanstack/react-query zod

第一步:在服务端初始化 tRPC#

创建 tRPC 实例并定义 context 类型。

typescript
// src/server/trpc.ts
import { initTRPC, TRPCError } from "@trpc/server";
import { type FetchCreateContextFnOptions } from "@trpc/server/adapters/fetch";
import superjson from "superjson";
import { ZodError } from "zod";
 
export async function createTRPCContext(opts: FetchCreateContextFnOptions) {
  const session = await getSession(opts.req);
 
  return {
    session,
    db: prisma,
    req: opts.req,
  };
}
 
const t = initTRPC.context<typeof createTRPCContext>().create({
  transformer: superjson,
  errorFormatter({ shape, error }) {
    return {
      ...shape,
      data: {
        ...shape.data,
        zodError:
          error.cause instanceof ZodError ? error.cause.flatten() : null,
      },
    };
  },
});
 
export const createCallerFactory = t.createCallerFactory;
export const router = t.router;
export const publicProcedure = t.procedure;

几点值得注意:

  • superjson transformer:tRPC 默认将数据序列化为 JSON,这意味着 Date 对象、MapSet 和其他非 JSON 类型会丢失。superjson 保留它们。
  • Error formatter:我们把 Zod 校验错误附加到响应中,这样客户端就能显示字段级别的错误。
  • createTRPCContext:这个函数在每个请求上运行。你在这里解析 session、设置数据库连接、构建 context 对象。

第二步:定义 Router#

typescript
// src/server/routers/user.ts
import { z } from "zod";
import { router, publicProcedure, protectedProcedure } from "../trpc";
 
export const userRouter = router({
  me: protectedProcedure.query(async ({ ctx }) => {
    const user = await ctx.db.user.findUnique({
      where: { id: ctx.user.id },
      select: {
        id: true,
        name: true,
        email: true,
        image: true,
        createdAt: true,
      },
    });
 
    if (!user) {
      throw new TRPCError({
        code: "NOT_FOUND",
        message: "User not found",
      });
    }
 
    return user;
  }),
 
  updateProfile: protectedProcedure
    .input(
      z.object({
        name: z.string().min(1).max(100),
        bio: z.string().max(500).optional(),
      })
    )
    .mutation(async ({ ctx, input }) => {
      const updated = await ctx.db.user.update({
        where: { id: ctx.user.id },
        data: {
          name: input.name,
          bio: input.bio,
        },
      });
 
      return updated;
    }),
 
  byId: publicProcedure
    .input(z.object({ id: z.string().uuid() }))
    .query(async ({ ctx, input }) => {
      const user = await ctx.db.user.findUnique({
        where: { id: input.id },
        select: {
          id: true,
          name: true,
          image: true,
          bio: true,
        },
      });
 
      if (!user) {
        throw new TRPCError({
          code: "NOT_FOUND",
          message: "User not found",
        });
      }
 
      return user;
    }),
});
typescript
// src/server/routers/_app.ts
import { router } from "../trpc";
import { userRouter } from "./user";
import { postRouter } from "./post";
import { notificationRouter } from "./notification";
 
export const appRouter = router({
  user: userRouter,
  post: postRouter,
  notification: notificationRouter,
});
 
export type AppRouter = typeof appRouter;

第三步:通过 Next.js Route Handler 暴露#

在 App Router 中,tRPC 作为标准的 Route Handler 运行。不需要自定义服务器,不需要特殊的 Next.js 插件。

typescript
// src/app/api/trpc/[trpc]/route.ts
import { fetchRequestHandler } from "@trpc/server/adapters/fetch";
import { appRouter } from "@/server/routers/_app";
import { createTRPCContext } from "@/server/trpc";
 
const handler = (req: Request) =>
  fetchRequestHandler({
    endpoint: "/api/trpc",
    req,
    router: appRouter,
    createContext: createTRPCContext,
    onError:
      process.env.NODE_ENV === "development"
        ? ({ path, error }) => {
            console.error(
              `❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
            );
          }
        : undefined,
  });
 
export { handler as GET, handler as POST };

就这样。GET 和 POST 都处理了。Query 走 GET(带 URL 编码的输入),mutation 走 POST。

第四步:设置客户端#

typescript
// src/lib/trpc.ts
import { createTRPCReact } from "@trpc/react-query";
import type { AppRouter } from "@/server/routers/_app";
 
export const trpc = createTRPCReact<AppRouter>();

注意:我们导入 AppRouter 只作为 类型。没有服务端代码泄漏到客户端 bundle 中。

第五步:Provider 设置#

typescript
// src/components/providers/TRPCProvider.tsx
"use client";
 
import { useState } from "react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { httpBatchLink, loggerLink } from "@trpc/client";
import { trpc } from "@/lib/trpc";
import superjson from "superjson";
 
function getBaseUrl() {
  if (typeof window !== "undefined") return "";
  if (process.env.VERCEL_URL) return `https://${process.env.VERCEL_URL}`;
  return `http://localhost:${process.env.PORT ?? 3000}`;
}
 
export function TRPCProvider({ children }: { children: React.ReactNode }) {
  const [queryClient] = useState(
    () =>
      new QueryClient({
        defaultOptions: {
          queries: {
            staleTime: 5 * 60 * 1000,
            retry: 1,
          },
        },
      })
  );
 
  const [trpcClient] = useState(() =>
    trpc.createClient({
      links: [
        loggerLink({
          enabled: (op) =>
            process.env.NODE_ENV === "development" ||
            (op.direction === "down" && op.result instanceof Error),
        }),
        httpBatchLink({
          url: `${getBaseUrl()}/api/trpc`,
          transformer: superjson,
        }),
      ],
    })
  );
 
  return (
    <trpc.Provider client={trpcClient} queryClient={queryClient}>
      <QueryClientProvider client={queryClient}>
        {children}
      </QueryClientProvider>
    </trpc.Provider>
  );
}
typescript
// src/app/layout.tsx (relevant part)
import { TRPCProvider } from "@/components/providers/TRPCProvider";
 
export default function RootLayout({ children }: { children: React.ReactNode }) {
  return (
    <html>
      <body>
        <TRPCProvider>{children}</TRPCProvider>
      </body>
    </html>
  );
}

第六步:在组件中使用#

typescript
// src/app/dashboard/page.tsx
"use client";
 
import { trpc } from "@/lib/trpc";
 
export default function DashboardPage() {
  const { data: user, isLoading, error } = trpc.user.me.useQuery();
 
  if (isLoading) return <DashboardSkeleton />;
  if (error) return <ErrorDisplay message={error.message} />;
 
  return (
    <div>
      <h1>Welcome back, {user.name}</h1>
      <p>Member since {user.createdAt.toLocaleDateString()}</p>
    </div>
  );
}

那个 user.name 是完全类型化的。如果你拼错成 user.nme,TypeScript 会立刻捕获。如果你把服务端改成返回 displayName 而不是 name,每个客户端使用处都会显示编译错误。不会有运行时意外。

Context 和 Middleware#

Context 和 middleware 是 tRPC 从"巧妙的类型技巧"升级为"生产就绪框架"的关键。

创建 Context#

Context 函数在每个请求上运行。以下是一个真实的版本:

typescript
// src/server/trpc.ts
import { type FetchCreateContextFnOptions } from "@trpc/server/adapters/fetch";
import { getServerSession } from "next-auth";
import { authOptions } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
 
export async function createTRPCContext(opts: FetchCreateContextFnOptions) {
  const session = await getServerSession(authOptions);
 
  return {
    session,
    user: session?.user ?? null,
    db: prisma,
    headers: Object.fromEntries(opts.req.headers),
  };
}
 
type Context = Awaited<ReturnType<typeof createTRPCContext>>;

认证 Middleware#

最常见的 middleware 模式是区分公开和受保护的 procedure:

typescript
// src/server/trpc.ts
const isAuthed = t.middleware(async ({ ctx, next }) => {
  if (!ctx.user) {
    throw new TRPCError({
      code: "UNAUTHORIZED",
      message: "You must be logged in to perform this action",
    });
  }
 
  return next({
    ctx: {
      // Override the context type — user is no longer nullable
      user: ctx.user,
    },
  });
});
 
export const protectedProcedure = t.procedure.use(isAuthed);

这个 middleware 运行后,任何 protectedProcedure 中的 ctx.user 都保证是非 null 的。类型系统强制执行这一点。你无法在公开 procedure 中意外访问 ctx.user.id 而不收到 TypeScript 的警告。

基于角色的 Middleware#

你可以组合 middleware 来实现更细粒度的访问控制:

typescript
const isAdmin = t.middleware(async ({ ctx, next }) => {
  if (!ctx.user) {
    throw new TRPCError({ code: "UNAUTHORIZED" });
  }
 
  if (ctx.user.role !== "ADMIN") {
    throw new TRPCError({
      code: "FORBIDDEN",
      message: "Admin access required",
    });
  }
 
  return next({
    ctx: {
      user: ctx.user,
    },
  });
});
 
export const adminProcedure = t.procedure.use(isAdmin);

日志 Middleware#

Middleware 不仅仅用于认证。这里是一个性能日志 middleware:

typescript
const loggerMiddleware = t.middleware(async ({ path, type, next }) => {
  const start = Date.now();
 
  const result = await next();
 
  const duration = Date.now() - start;
 
  if (duration > 1000) {
    console.warn(`⚠️ Slow ${type} ${path}: ${duration}ms`);
  }
 
  return result;
});
 
// Apply to all procedures
export const publicProcedure = t.procedure.use(loggerMiddleware);

速率限制 Middleware#

typescript
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";
 
const ratelimit = new Ratelimit({
  redis: Redis.fromEnv(),
  limiter: Ratelimit.slidingWindow(20, "10 s"),
});
 
const rateLimitMiddleware = t.middleware(async ({ ctx, next }) => {
  const ip = ctx.headers["x-forwarded-for"] ?? "unknown";
  const { success, remaining } = await ratelimit.limit(ip);
 
  if (!success) {
    throw new TRPCError({
      code: "TOO_MANY_REQUESTS",
      message: `Rate limit exceeded. Try again later.`,
    });
  }
 
  return next();
});

使用 Zod 进行输入校验#

tRPC 使用 Zod 做输入校验。这不是可选的装饰——它是确保客户端和服务端输入安全的核心机制。

基本校验#

typescript
const postRouter = router({
  create: protectedProcedure
    .input(
      z.object({
        title: z.string().min(1, "Title is required").max(200, "Title too long"),
        content: z.string().min(10, "Content must be at least 10 characters"),
        categoryId: z.string().uuid("Invalid category ID"),
        tags: z.array(z.string()).max(5, "Maximum 5 tags").default([]),
        published: z.boolean().default(false),
      })
    )
    .mutation(async ({ ctx, input }) => {
      // input is fully typed:
      // {
      //   title: string;
      //   content: string;
      //   categoryId: string;
      //   tags: string[];
      //   published: boolean;
      // }
 
      return ctx.db.post.create({
        data: {
          ...input,
          authorId: ctx.user.id,
        },
      });
    }),
});

双重校验技巧#

这里有一个微妙的点:Zod 校验在 两端 都会运行。在客户端,tRPC 在发送请求之前就校验输入。如果输入无效,请求根本不会离开浏览器。在服务端,同一个 schema 再次校验作为安全措施。

这意味着你免费获得了即时的客户端校验:

typescript
"use client";
 
import { trpc } from "@/lib/trpc";
import { useState } from "react";
 
export function CreatePostForm() {
  const [title, setTitle] = useState("");
  const utils = trpc.useUtils();
 
  const createPost = trpc.post.create.useMutation({
    onSuccess: () => {
      utils.post.list.invalidate();
    },
    onError: (error) => {
      // Zod errors arrive structured
      if (error.data?.zodError) {
        const fieldErrors = error.data.zodError.fieldErrors;
        // fieldErrors.title?: string[]
        // fieldErrors.content?: string[]
        console.log(fieldErrors);
      }
    },
  });
 
  return (
    <form
      onSubmit={(e) => {
        e.preventDefault();
        createPost.mutate({
          title,
          content: "...",
          categoryId: "...",
        });
      }}
    >
      <input value={title} onChange={(e) => setTitle(e.target.value)} />
      {createPost.error?.data?.zodError?.fieldErrors.title && (
        <span className="text-red-500">
          {createPost.error.data.zodError.fieldErrors.title[0]}
        </span>
      )}
      <button type="submit" disabled={createPost.isPending}>
        {createPost.isPending ? "Creating..." : "Create Post"}
      </button>
    </form>
  );
}

复杂输入模式#

typescript
// Discriminated unions
const searchInput = z.discriminatedUnion("type", [
  z.object({
    type: z.literal("user"),
    query: z.string(),
    includeInactive: z.boolean().default(false),
  }),
  z.object({
    type: z.literal("post"),
    query: z.string(),
    category: z.string().optional(),
  }),
]);
 
// Pagination input reused across procedures
const paginationInput = z.object({
  cursor: z.string().nullish(),
  limit: z.number().min(1).max(100).default(20),
});
 
const postRouter = router({
  infiniteList: publicProcedure
    .input(
      z.object({
        ...paginationInput.shape,
        category: z.string().optional(),
        sortBy: z.enum(["newest", "popular", "trending"]).default("newest"),
      })
    )
    .query(async ({ ctx, input }) => {
      const { cursor, limit, category, sortBy } = input;
 
      const posts = await ctx.db.post.findMany({
        take: limit + 1,
        cursor: cursor ? { id: cursor } : undefined,
        where: category ? { categoryId: category } : undefined,
        orderBy:
          sortBy === "newest"
            ? { createdAt: "desc" }
            : sortBy === "popular"
              ? { likes: "desc" }
              : { score: "desc" },
      });
 
      let nextCursor: string | undefined;
      if (posts.length > limit) {
        const nextItem = posts.pop();
        nextCursor = nextItem?.id;
      }
 
      return {
        posts,
        nextCursor,
      };
    }),
});

可选输入#

不是每个 procedure 都需要输入。Query 通常不需要:

typescript
const statsRouter = router({
  // No input needed
  overview: publicProcedure.query(async ({ ctx }) => {
    const [userCount, postCount, commentCount] = await Promise.all([
      ctx.db.user.count(),
      ctx.db.post.count(),
      ctx.db.comment.count(),
    ]);
 
    return { userCount, postCount, commentCount };
  }),
 
  // Optional filters
  detailed: publicProcedure
    .input(
      z
        .object({
          from: z.date().optional(),
          to: z.date().optional(),
        })
        .optional()
    )
    .query(async ({ ctx, input }) => {
      const where = {
        ...(input?.from && { createdAt: { gte: input.from } }),
        ...(input?.to && { createdAt: { lte: input.to } }),
      };
 
      return ctx.db.post.groupBy({
        by: ["categoryId"],
        where,
        _count: true,
      });
    }),
});

错误处理#

tRPC 的错误处理是结构化的、类型安全的,并且与 HTTP 语义和客户端 UI 都能干净地集成。

在服务端抛出错误#

typescript
import { TRPCError } from "@trpc/server";
 
const postRouter = router({
  delete: protectedProcedure
    .input(z.object({ id: z.string().uuid() }))
    .mutation(async ({ ctx, input }) => {
      const post = await ctx.db.post.findUnique({
        where: { id: input.id },
      });
 
      if (!post) {
        throw new TRPCError({
          code: "NOT_FOUND",
          message: "Post not found",
        });
      }
 
      if (post.authorId !== ctx.user.id) {
        throw new TRPCError({
          code: "FORBIDDEN",
          message: "You can only delete your own posts",
        });
      }
 
      await ctx.db.post.delete({ where: { id: input.id } });
 
      return { success: true };
    }),
});

tRPC 错误码映射到 HTTP 状态码:

tRPC 错误码HTTP 状态码使用场景
BAD_REQUEST400超出 Zod 校验的无效输入
UNAUTHORIZED401未登录
FORBIDDEN403已登录但权限不足
NOT_FOUND404资源不存在
CONFLICT409重复资源
TOO_MANY_REQUESTS429超出速率限制
INTERNAL_SERVER_ERROR500意外的服务器错误

自定义错误格式化#

还记得我们搭建时的 error formatter 吗?以下是它在实践中的用法:

typescript
const t = initTRPC.context<typeof createTRPCContext>().create({
  transformer: superjson,
  errorFormatter({ shape, error }) {
    return {
      ...shape,
      data: {
        ...shape.data,
        zodError:
          error.cause instanceof ZodError ? error.cause.flatten() : null,
        // Add custom fields
        timestamp: new Date().toISOString(),
        requestId: crypto.randomUUID(),
      },
    };
  },
});

客户端错误处理#

typescript
"use client";
 
import { trpc } from "@/lib/trpc";
import { toast } from "sonner";
 
export function DeletePostButton({ postId }: { postId: string }) {
  const utils = trpc.useUtils();
 
  const deletePost = trpc.post.delete.useMutation({
    onSuccess: () => {
      toast.success("Post deleted");
      utils.post.list.invalidate();
    },
    onError: (error) => {
      switch (error.data?.code) {
        case "FORBIDDEN":
          toast.error("You don't have permission to delete this post");
          break;
        case "NOT_FOUND":
          toast.error("This post no longer exists");
          utils.post.list.invalidate();
          break;
        default:
          toast.error("Something went wrong. Please try again.");
      }
    },
  });
 
  return (
    <button
      onClick={() => deletePost.mutate({ id: postId })}
      disabled={deletePost.isPending}
    >
      {deletePost.isPending ? "Deleting..." : "Delete"}
    </button>
  );
}

全局错误处理#

你可以设置一个全局错误处理器来捕获所有未处理的 tRPC 错误:

typescript
// In your TRPCProvider
const [queryClient] = useState(
  () =>
    new QueryClient({
      defaultOptions: {
        mutations: {
          onError: (error) => {
            // Global fallback for unhandled mutation errors
            if (error instanceof TRPCClientError) {
              if (error.data?.code === "UNAUTHORIZED") {
                // Redirect to login
                window.location.href = "/login";
                return;
              }
 
              toast.error(error.message);
            }
          },
        },
      },
    })
);

Mutation 和乐观更新#

Mutation 是 tRPC 与 TanStack Query 真正良好集成的地方。来看一个真实的模式:带乐观更新的点赞按钮。

基本 Mutation#

typescript
// Server
const postRouter = router({
  toggleLike: protectedProcedure
    .input(z.object({ postId: z.string().uuid() }))
    .mutation(async ({ ctx, input }) => {
      const existing = await ctx.db.like.findUnique({
        where: {
          userId_postId: {
            userId: ctx.user.id,
            postId: input.postId,
          },
        },
      });
 
      if (existing) {
        await ctx.db.like.delete({
          where: { id: existing.id },
        });
        return { liked: false };
      }
 
      await ctx.db.like.create({
        data: {
          userId: ctx.user.id,
          postId: input.postId,
        },
      });
 
      return { liked: true };
    }),
});

乐观更新#

用户点击"喜欢"。你不想等 200ms 的服务器响应才更新 UI。乐观更新解决了这个问题:立即更新 UI,如果服务器拒绝就回滚。

typescript
"use client";
 
import { trpc } from "@/lib/trpc";
 
export function LikeButton({ postId, initialLiked, initialCount }: {
  postId: string;
  initialLiked: boolean;
  initialCount: number;
}) {
  const utils = trpc.useUtils();
 
  const toggleLike = trpc.post.toggleLike.useMutation({
    onMutate: async ({ postId }) => {
      // Cancel outgoing refetches so they don't overwrite our optimistic update
      await utils.post.byId.cancel({ id: postId });
 
      // Snapshot the previous value
      const previousPost = utils.post.byId.getData({ id: postId });
 
      // Optimistically update the cache
      utils.post.byId.setData({ id: postId }, (old) => {
        if (!old) return old;
        return {
          ...old,
          liked: !old.liked,
          likeCount: old.liked ? old.likeCount - 1 : old.likeCount + 1,
        };
      });
 
      // Return the snapshot for rollback
      return { previousPost };
    },
 
    onError: (_error, { postId }, context) => {
      // Roll back to the previous value on error
      if (context?.previousPost) {
        utils.post.byId.setData({ id: postId }, context.previousPost);
      }
    },
 
    onSettled: (_data, _error, { postId }) => {
      // Always refetch after error or success to ensure server state
      utils.post.byId.invalidate({ id: postId });
    },
  });
 
  return (
    <button
      onClick={() => toggleLike.mutate({ postId })}
      className={initialLiked ? "text-red-500" : "text-gray-400"}
    >
      ♥ {initialCount}
    </button>
  );
}

模式永远是一样的:

  1. onMutate:取消查询,快照当前数据,应用乐观更新,返回快照。
  2. onError:使用快照回滚。
  3. onSettled:无论成功还是失败,失效查询使其从服务器重新获取。

这个三步舞确保 UI 始终响应及时,并最终与服务器一致。

失效关联查询#

Mutation 之后,你通常需要刷新相关数据。tRPC 的 useUtils() 让这变得很方便:

typescript
const createComment = trpc.comment.create.useMutation({
  onSuccess: (_data, variables) => {
    // Invalidate the post's comment list
    utils.comment.listByPost.invalidate({ postId: variables.postId });
 
    // Invalidate the post itself (comment count changed)
    utils.post.byId.invalidate({ id: variables.postId });
 
    // Invalidate ALL post lists (comment counts in list views)
    utils.post.list.invalidate();
  },
});

批量请求和订阅#

HTTP 批量请求#

默认情况下,使用 httpBatchLink 的 tRPC 会将多个同时发起的请求合并为一个 HTTP 调用。如果一个组件渲染时发起了三个查询:

typescript
function Dashboard() {
  const user = trpc.user.me.useQuery();
  const posts = trpc.post.list.useQuery({ limit: 10 });
  const stats = trpc.stats.overview.useQuery();
 
  // ...
}

这三个查询会自动合并为一个 HTTP 请求:GET /api/trpc/user.me,post.list,stats.overview?batch=1&input=...

服务端处理这三个请求,在一个响应中返回三个结果,TanStack Query 把结果分发到各个 hook。不需要任何配置。

如果需要,你可以对特定调用禁用批量请求:

typescript
// In your provider, use splitLink to route specific procedures differently
import { splitLink, httpBatchLink, httpLink } from "@trpc/client";
 
const [trpcClient] = useState(() =>
  trpc.createClient({
    links: [
      splitLink({
        condition: (op) => op.path === "post.infiniteList",
        true: httpLink({
          url: `${getBaseUrl()}/api/trpc`,
          transformer: superjson,
        }),
        false: httpBatchLink({
          url: `${getBaseUrl()}/api/trpc`,
          transformer: superjson,
          maxURLLength: 2048,
        }),
      }),
    ],
  })
);

WebSocket 订阅#

对于实时功能,tRPC 支持通过 WebSocket 进行订阅。这需要一个单独的 WebSocket 服务器(Next.js 的 Route Handler 原生不支持 WebSocket)。

typescript
// src/server/routers/notification.ts
import { observable } from "@trpc/server/observable";
 
// In-memory event emitter (use Redis pub/sub in production)
import { EventEmitter } from "events";
 
const eventEmitter = new EventEmitter();
 
export const notificationRouter = router({
  onNew: protectedProcedure.subscription(({ ctx }) => {
    return observable<Notification>((emit) => {
      const handler = (notification: Notification) => {
        if (notification.userId === ctx.user.id) {
          emit.next(notification);
        }
      };
 
      eventEmitter.on("notification", handler);
 
      return () => {
        eventEmitter.off("notification", handler);
      };
    });
  }),
 
  // Mutation that triggers a notification
  markAsRead: protectedProcedure
    .input(z.object({ id: z.string() }))
    .mutation(async ({ ctx, input }) => {
      const notification = await ctx.db.notification.update({
        where: { id: input.id, userId: ctx.user.id },
        data: { readAt: new Date() },
      });
 
      return notification;
    }),
});

客户端:

typescript
"use client";
 
import { trpc } from "@/lib/trpc";
import { useState } from "react";
 
export function NotificationBell() {
  const [notifications, setNotifications] = useState<Notification[]>([]);
 
  trpc.notification.onNew.useSubscription(undefined, {
    onData: (notification) => {
      setNotifications((prev) => [notification, ...prev]);
      toast.info(notification.message);
    },
    onError: (error) => {
      console.error("Subscription error:", error);
    },
  });
 
  return (
    <div>
      <span className="badge">{notifications.length}</span>
      {/* notification list UI */}
    </div>
  );
}

WebSocket 传输需要一个专用的服务器进程。以下是使用 ws 库的最小化设置:

typescript
// ws-server.ts (separate process)
import { applyWSSHandler } from "@trpc/server/adapters/ws";
import { WebSocketServer } from "ws";
import { appRouter } from "./server/routers/_app";
import { createTRPCContext } from "./server/trpc";
 
const wss = new WebSocketServer({ port: 3001 });
 
const handler = applyWSSHandler({
  wss,
  router: appRouter,
  createContext: createTRPCContext,
});
 
wss.on("connection", (ws) => {
  console.log(`Connection opened (${wss.clients.size} total)`);
  ws.once("close", () => {
    console.log(`Connection closed (${wss.clients.size} total)`);
  });
});
 
process.on("SIGTERM", () => {
  handler.broadcastReconnectNotification();
  wss.close();
});
 
console.log("WebSocket server listening on ws://localhost:3001");

客户端需要一个 wsLink 来处理订阅:

typescript
import { wsLink, createWSClient } from "@trpc/client";
 
const wsClient = createWSClient({
  url: "ws://localhost:3001",
});
 
// Use splitLink to route subscriptions through WebSocket
const [trpcClient] = useState(() =>
  trpc.createClient({
    links: [
      splitLink({
        condition: (op) => op.type === "subscription",
        true: wsLink({ client: wsClient, transformer: superjson }),
        false: httpBatchLink({
          url: `${getBaseUrl()}/api/trpc`,
          transformer: superjson,
        }),
      }),
    ],
  })
);

类型安全的文件上传#

tRPC 原生不处理文件上传。它是一个 JSON-RPC 协议——二进制数据不在它的处理范围内。但你可以通过将 tRPC 与预签名 URL 结合来构建类型安全的上传流程。

模式如下:

  1. 客户端向 tRPC 请求预签名上传 URL。
  2. tRPC 校验请求,检查权限,生成 URL。
  3. 客户端使用预签名 URL 直接上传到 S3。
  4. 客户端通知 tRPC 上传完成。
typescript
// src/server/routers/upload.ts
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
 
const s3 = new S3Client({ region: process.env.AWS_REGION });
 
export const uploadRouter = router({
  getPresignedUrl: protectedProcedure
    .input(
      z.object({
        filename: z.string().min(1).max(255),
        contentType: z.string().regex(/^(image|application)\//),
        size: z.number().max(10 * 1024 * 1024, "File must be under 10MB"),
      })
    )
    .mutation(async ({ ctx, input }) => {
      const key = `uploads/${ctx.user.id}/${crypto.randomUUID()}-${input.filename}`;
 
      const command = new PutObjectCommand({
        Bucket: process.env.S3_BUCKET!,
        Key: key,
        ContentType: input.contentType,
        ContentLength: input.size,
      });
 
      const presignedUrl = await getSignedUrl(s3, command, {
        expiresIn: 300, // 5 minutes
      });
 
      // Store pending upload in database
      const upload = await ctx.db.upload.create({
        data: {
          key,
          userId: ctx.user.id,
          filename: input.filename,
          contentType: input.contentType,
          size: input.size,
          status: "PENDING",
        },
      });
 
      return {
        uploadId: upload.id,
        presignedUrl,
        key,
      };
    }),
 
  confirmUpload: protectedProcedure
    .input(z.object({ uploadId: z.string().uuid() }))
    .mutation(async ({ ctx, input }) => {
      const upload = await ctx.db.upload.findUnique({
        where: { id: input.uploadId, userId: ctx.user.id },
      });
 
      if (!upload) {
        throw new TRPCError({ code: "NOT_FOUND" });
      }
 
      // Verify the file actually exists in S3
      // (optional but recommended)
 
      const confirmed = await ctx.db.upload.update({
        where: { id: upload.id },
        data: { status: "CONFIRMED" },
      });
 
      return {
        url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${confirmed.key}`,
      };
    }),
});

客户端:

typescript
"use client";
 
import { trpc } from "@/lib/trpc";
import { useState, useCallback } from "react";
 
export function FileUpload() {
  const [uploading, setUploading] = useState(false);
 
  const getPresignedUrl = trpc.upload.getPresignedUrl.useMutation();
  const confirmUpload = trpc.upload.confirmUpload.useMutation();
 
  const handleFileChange = useCallback(
    async (e: React.ChangeEvent<HTMLInputElement>) => {
      const file = e.target.files?.[0];
      if (!file) return;
 
      setUploading(true);
 
      try {
        // Step 1: Get presigned URL from tRPC
        const { presignedUrl, uploadId } = await getPresignedUrl.mutateAsync({
          filename: file.name,
          contentType: file.type,
          size: file.size,
        });
 
        // Step 2: Upload directly to S3
        const uploadResponse = await fetch(presignedUrl, {
          method: "PUT",
          body: file,
          headers: {
            "Content-Type": file.type,
          },
        });
 
        if (!uploadResponse.ok) {
          throw new Error("Upload failed");
        }
 
        // Step 3: Confirm upload via tRPC
        const { url } = await confirmUpload.mutateAsync({ uploadId });
 
        toast.success(`File uploaded: ${url}`);
      } catch (error) {
        toast.error("Upload failed. Please try again.");
      } finally {
        setUploading(false);
      }
    },
    [getPresignedUrl, confirmUpload]
  );
 
  return (
    <div>
      <input
        type="file"
        onChange={handleFileChange}
        disabled={uploading}
        accept="image/*"
      />
      {uploading && <p>Uploading...</p>}
    </div>
  );
}

整个流程都是类型安全的。预签名 URL 的响应类型、上传 ID 的类型、确认响应——全部从服务端定义推断。如果你向预签名 URL 响应添加新字段,客户端立刻就知道。

服务端调用和 React Server Components#

在 Next.js App Router 中,你经常想在 Server Components 里获取数据。tRPC 通过服务端 caller 支持这种场景:

typescript
// src/server/trpc.ts
export const createCaller = createCallerFactory(appRouter);
 
// Usage in a Server Component
// src/app/posts/[id]/page.tsx
import { createTRPCContext } from "@/server/trpc";
import { createCaller } from "@/server/trpc";
 
export default async function PostPage({
  params,
}: {
  params: { id: string };
}) {
  const ctx = await createTRPCContext({
    req: new Request("http://localhost"),
    resHeaders: new Headers(),
  });
  const caller = createCaller(ctx);
 
  const post = await caller.post.byId({ id: params.id });
 
  return (
    <article>
      <h1>{post.title}</h1>
      <div>{post.content}</div>
      {/* Client components for interactive parts */}
      <LikeButton postId={post.id} initialLiked={post.liked} />
      <CommentSection postId={post.id} />
    </article>
  );
}

这让你两全其美:服务端渲染的初始数据具有完整的类型安全,而客户端交互则用于 mutation 和实时功能。

测试 tRPC Procedure#

测试很简单,因为 procedure 就是函数。你不需要启动 HTTP 服务器。

typescript
// src/server/routers/user.test.ts
import { describe, it, expect, vi } from "vitest";
import { appRouter } from "./routers/_app";
import { createCaller } from "./trpc";
 
describe("user router", () => {
  it("returns the current user profile", async () => {
    const caller = createCaller({
      user: { id: "user-1", email: "test@example.com", role: "USER" },
      db: prismaMock,
      session: mockSession,
      headers: {},
    });
 
    prismaMock.user.findUnique.mockResolvedValue({
      id: "user-1",
      name: "Test User",
      email: "test@example.com",
      image: null,
      createdAt: new Date("2026-01-01"),
    });
 
    const result = await caller.user.me();
 
    expect(result).toEqual({
      id: "user-1",
      name: "Test User",
      email: "test@example.com",
      image: null,
      createdAt: new Date("2026-01-01"),
    });
  });
 
  it("throws UNAUTHORIZED for unauthenticated requests", async () => {
    const caller = createCaller({
      user: null,
      db: prismaMock,
      session: null,
      headers: {},
    });
 
    await expect(caller.user.me()).rejects.toThrow("UNAUTHORIZED");
  });
 
  it("validates input with Zod", async () => {
    const caller = createCaller({
      user: { id: "user-1", email: "test@example.com", role: "USER" },
      db: prismaMock,
      session: mockSession,
      headers: {},
    });
 
    await expect(
      caller.user.updateProfile({
        name: "", // min length 1
      })
    ).rejects.toThrow();
  });
});

不需要 mock HTTP 层,不需要 supertest,不需要路由匹配。直接调用函数并断言结果。这是 tRPC 被低估的优势之一:测试简单得不可思议,因为传输层只是一个实现细节。

什么时候不该用 tRPC#

tRPC 不是万能方案。以下是它失效的场景:

公共 API#

如果你在构建一个供外部开发者使用的 API,tRPC 是错误的选择。外部消费者无法访问你的 TypeScript 类型。他们需要一个有文档的、稳定的契约——REST 用 OpenAPI/Swagger,或者 GraphQL schema。tRPC 的类型安全只在客户端和服务端共享同一个 TypeScript 代码库时才有效。

移动应用(除非你用 TypeScript)#

如果你的移动应用是用 Swift、Kotlin 或 Dart 写的,tRPC 没有任何价值。类型无法跨语言边界传递。理论上你可以用 trpc-openapi 从 tRPC 路由生成 OpenAPI spec,但到那时你又把繁琐的仪式加回来了。不如一开始就用 REST。

微服务#

tRPC 假设是单一的 TypeScript 代码库。如果你的后端拆分成多个不同语言的服务,tRPC 无法帮助服务间通信。用 gRPC、REST 或消息队列。

前后端分仓库的大团队#

如果你的前端和后端在不同的仓库,有独立的部署流水线,你就失去了 tRPC 的核心优势。类型共享需要 monorepo 或共享包。你 可以AppRouter 类型作为 npm 包发布,但现在你面临的版本管理问题,REST + OpenAPI 处理得更自然。

需要 REST 语义时#

如果你需要 HTTP 缓存头、内容协商、ETag 或其他 REST 特有的功能,tRPC 对 HTTP 的抽象会跟你作对。tRPC 把 HTTP 当作传输细节,而不是功能。

决策框架#

以下是我的决策方式:

场景建议
同仓库全栈 TypeScript 应用tRPC — 最大收益,最小开销
内部工具 / 管理面板tRPC — 开发速度是第一优先级
面向第三方开发者的公共 APIREST + OpenAPI — 消费者需要文档,不是类型
移动端 + Web 客户端(非 TS 移动端)RESTGraphQL — 需要语言无关的契约
重实时(聊天、游戏)tRPC subscriptions原生 WebSocket,取决于复杂度
前后端团队分离GraphQL — schema 是团队之间的契约

生产环境的实用经验#

一些我从生产环境运行 tRPC 中学到的、文档里没有的东西:

保持 router 精简。 单个 router 文件不应该超过 200 行。按领域拆分:userRouterpostRouterbillingRouter,每个一个文件。

使用 createCallerFactory 做服务端调用。 在 Server Component 中调用自己的 API 时不要用 fetch。Caller factory 给你同样的类型安全,零 HTTP 开销。

不要过度优化批量请求。 默认的 httpBatchLink 几乎总是够用的。我见过团队花好几天配置 splitLink 只换来了微小的收益。先做性能分析。

在 QueryClient 中设置 staleTime 默认 staleTime 为 0 意味着每次焦点事件都触发重新获取。根据你的数据新鲜度需求设置合理的值(30 秒到 5 分钟)。

从第一天就用 superjson 后面再加意味着需要同时迁移所有客户端和服务端。这是一行配置就能避免 Date 序列化 bug 的事。

Error boundary 是你的好朋友。 用 React error boundary 包裹 tRPC 密集的页面区域。一个失败的查询不应该搞崩整个页面。

typescript
"use client";
 
import { ErrorBoundary } from "react-error-boundary";
 
function ErrorFallback({ error, resetErrorBoundary }: {
  error: Error;
  resetErrorBoundary: () => void;
}) {
  return (
    <div role="alert" className="p-4 bg-red-50 rounded-lg">
      <p className="font-medium text-red-800">Something went wrong</p>
      <pre className="text-sm text-red-600 mt-2">{error.message}</pre>
      <button
        onClick={resetErrorBoundary}
        className="mt-3 px-4 py-2 bg-red-600 text-white rounded"
      >
        Try again
      </button>
    </div>
  );
}
 
export default function DashboardPage() {
  return (
    <div>
      <h1>Dashboard</h1>
      <ErrorBoundary FallbackComponent={ErrorFallback}>
        <UserStats />
      </ErrorBoundary>
      <ErrorBoundary FallbackComponent={ErrorFallback}>
        <RecentPosts />
      </ErrorBoundary>
    </div>
  );
}

总结#

tRPC 不是 REST 或 GraphQL 的替代品。它是针对特定场景的不同工具:当你同时控制客户端和服务端,两者都是 TypeScript,并且你想要从"我改了后端"到"前端知道了"的最短路径。

在这种场景下,没有什么能比得上它。没有代码生成,没有 schema 文件,没有漂移。就是 TypeScript 做它最擅长的事:在错误到达生产环境之前就捕获它们。

权衡很明确:你放弃了协议级别的互操作性(没有非 TypeScript 客户端可以使用),换来了开发速度和编译时安全性——这是用其他方式很难达到的。

对于大多数全栈 TypeScript 应用来说,这个权衡是值得的。

相关文章