tRPC:无需繁文缛节的端到端类型安全
tRPC 如何消除 API 契约问题,与 Next.js App Router 配合,处理认证中间件、文件上传和订阅。从零开始搭建一个真实的 tRPC 项目。
你一定经历过这种场景。你修改了 API 响应中的一个字段名,更新了后端类型,然后部署。接着前端在生产环境崩了,因为有人忘了更新 Dashboard.tsx 第 247 行的 fetch 调用。那个字段现在是 undefined,组件渲染空白,你的错误追踪系统在凌晨两点亮起了红灯。
这就是 API 契约问题。它不是技术问题,而是协作问题。再多的 Swagger 文档或 GraphQL schema 也无法解决前后端类型悄悄漂移的事实。
tRPC 通过拒绝让类型漂移来解决这个问题。没有 schema 文件,没有代码生成步骤,没有需要维护的独立契约。你在服务端写一个 TypeScript 函数,客户端在编译时就知道它的精确输入和输出类型。如果你重命名一个字段,前端就无法编译,直到你修复它。
这就是它的承诺。让我来展示它实际如何工作,在哪里闪光,以及在哪里你绝对不应该使用它。
更精确地描述问题#
来看看大多数团队今天是怎么构建 API 的。
REST + OpenAPI:你编写端点。也许加上 Swagger 注解。也许从 OpenAPI spec 生成客户端 SDK。但 spec 是一个独立的产物,它可能会过时。生成步骤是 CI 流水线中另一个可能出错或被遗忘的环节。而且生成的类型通常很丑——paths["/api/users"]["get"]["responses"]["200"]["content"]["application/json"] 这种深层嵌套的怪物。
GraphQL:类型安全更好,但仪式感太重。你用 SDL 写 schema,写 resolver,从 schema 生成类型,在客户端写查询,再从查询生成类型。这至少是两次代码生成步骤、一个 schema 文件和一个所有人都得记得运行的构建步骤。对于一个同时控制前后端的团队来说,这是为一个有更简单解决方案的问题搭建了太多基础设施。
手动 fetch 调用:最常见的方式,也是最危险的。你写 fetch("/api/users"),把结果断言为 User[],然后祈祷一切顺利。没有任何编译时安全性。类型断言是你对 TypeScript 撒的谎。
// The lie every frontend developer has told
const res = await fetch("/api/users");
const users = (await res.json()) as User[]; // 🙏 hope this is righttRPC 采取了完全不同的方式。它不是用另一种格式描述 API 然后生成类型,而是在服务端写纯 TypeScript 函数,直接把类型导入客户端。没有生成步骤,没有 schema 文件,没有漂移。
核心概念#
在开始搭建之前,让我们先理解思维模型。
Router#
tRPC router 是一组 procedure 的集合。可以把它想象成 MVC 中的 controller,只不过它是一个内置了类型推断的普通对象。
import { initTRPC } from "@trpc/server";
const t = initTRPC.create();
export const appRouter = t.router({
user: t.router({
list: t.procedure.query(/* ... */),
byId: t.procedure.input(/* ... */).query(/* ... */),
create: t.procedure.input(/* ... */).mutation(/* ... */),
}),
post: t.router({
list: t.procedure.query(/* ... */),
publish: t.procedure.input(/* ... */).mutation(/* ... */),
}),
});
export type AppRouter = typeof appRouter;那个 AppRouter 类型导出就是全部的魔法。客户端导入这个类型——不是运行时代码,只是类型——然后就获得了每个 procedure 的完整自动补全和类型检查。
Procedure#
Procedure 是一个单独的端点。有三种类型:
- Query:读操作。映射到 HTTP GET 语义。由 TanStack Query 缓存。
- Mutation:写操作。映射到 HTTP POST。不缓存。
- Subscription:实时流。使用 WebSocket。
Context#
Context 是每个请求作用域内可用的数据。数据库连接、已认证的用户、请求头——任何你会放在 Express 的 req 对象中的东西都在这里。
Middleware#
Middleware 转换 context 或控制访问。最常见的模式是一个认证 middleware,检查有效的 session 并添加 ctx.user。
类型推断链#
这是关键的思维模型。当你这样定义一个 procedure 时:
t.procedure
.input(z.object({ id: z.string() }))
.query(({ input }) => {
// input is typed as { id: string }
return db.user.findUnique({ where: { id: input.id } });
});返回类型一路流到客户端。如果 db.user.findUnique 返回 User | null,客户端的 useQuery hook 的 data 就会被类型化为 User | null。不需要手动定义类型,不需要类型断言。端到端推断。
在 Next.js App Router 中搭建#
让我们从头开始构建。假设你有一个 Next.js 14+ 项目,使用 App Router。
安装依赖#
npm install @trpc/server @trpc/client @trpc/react-query @trpc/next @tanstack/react-query zod第一步:在服务端初始化 tRPC#
创建 tRPC 实例并定义 context 类型。
// src/server/trpc.ts
import { initTRPC, TRPCError } from "@trpc/server";
import { type FetchCreateContextFnOptions } from "@trpc/server/adapters/fetch";
import superjson from "superjson";
import { ZodError } from "zod";
export async function createTRPCContext(opts: FetchCreateContextFnOptions) {
const session = await getSession(opts.req);
return {
session,
db: prisma,
req: opts.req,
};
}
const t = initTRPC.context<typeof createTRPCContext>().create({
transformer: superjson,
errorFormatter({ shape, error }) {
return {
...shape,
data: {
...shape.data,
zodError:
error.cause instanceof ZodError ? error.cause.flatten() : null,
},
};
},
});
export const createCallerFactory = t.createCallerFactory;
export const router = t.router;
export const publicProcedure = t.procedure;几点值得注意:
superjsontransformer:tRPC 默认将数据序列化为 JSON,这意味着Date对象、Map、Set和其他非 JSON 类型会丢失。superjson 保留它们。- Error formatter:我们把 Zod 校验错误附加到响应中,这样客户端就能显示字段级别的错误。
createTRPCContext:这个函数在每个请求上运行。你在这里解析 session、设置数据库连接、构建 context 对象。
第二步:定义 Router#
// src/server/routers/user.ts
import { z } from "zod";
import { router, publicProcedure, protectedProcedure } from "../trpc";
export const userRouter = router({
me: protectedProcedure.query(async ({ ctx }) => {
const user = await ctx.db.user.findUnique({
where: { id: ctx.user.id },
select: {
id: true,
name: true,
email: true,
image: true,
createdAt: true,
},
});
if (!user) {
throw new TRPCError({
code: "NOT_FOUND",
message: "User not found",
});
}
return user;
}),
updateProfile: protectedProcedure
.input(
z.object({
name: z.string().min(1).max(100),
bio: z.string().max(500).optional(),
})
)
.mutation(async ({ ctx, input }) => {
const updated = await ctx.db.user.update({
where: { id: ctx.user.id },
data: {
name: input.name,
bio: input.bio,
},
});
return updated;
}),
byId: publicProcedure
.input(z.object({ id: z.string().uuid() }))
.query(async ({ ctx, input }) => {
const user = await ctx.db.user.findUnique({
where: { id: input.id },
select: {
id: true,
name: true,
image: true,
bio: true,
},
});
if (!user) {
throw new TRPCError({
code: "NOT_FOUND",
message: "User not found",
});
}
return user;
}),
});// src/server/routers/_app.ts
import { router } from "../trpc";
import { userRouter } from "./user";
import { postRouter } from "./post";
import { notificationRouter } from "./notification";
export const appRouter = router({
user: userRouter,
post: postRouter,
notification: notificationRouter,
});
export type AppRouter = typeof appRouter;第三步:通过 Next.js Route Handler 暴露#
在 App Router 中,tRPC 作为标准的 Route Handler 运行。不需要自定义服务器,不需要特殊的 Next.js 插件。
// src/app/api/trpc/[trpc]/route.ts
import { fetchRequestHandler } from "@trpc/server/adapters/fetch";
import { appRouter } from "@/server/routers/_app";
import { createTRPCContext } from "@/server/trpc";
const handler = (req: Request) =>
fetchRequestHandler({
endpoint: "/api/trpc",
req,
router: appRouter,
createContext: createTRPCContext,
onError:
process.env.NODE_ENV === "development"
? ({ path, error }) => {
console.error(
`❌ tRPC failed on ${path ?? "<no-path>"}: ${error.message}`
);
}
: undefined,
});
export { handler as GET, handler as POST };就这样。GET 和 POST 都处理了。Query 走 GET(带 URL 编码的输入),mutation 走 POST。
第四步:设置客户端#
// src/lib/trpc.ts
import { createTRPCReact } from "@trpc/react-query";
import type { AppRouter } from "@/server/routers/_app";
export const trpc = createTRPCReact<AppRouter>();注意:我们导入 AppRouter 只作为 类型。没有服务端代码泄漏到客户端 bundle 中。
第五步:Provider 设置#
// src/components/providers/TRPCProvider.tsx
"use client";
import { useState } from "react";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { httpBatchLink, loggerLink } from "@trpc/client";
import { trpc } from "@/lib/trpc";
import superjson from "superjson";
function getBaseUrl() {
if (typeof window !== "undefined") return "";
if (process.env.VERCEL_URL) return `https://${process.env.VERCEL_URL}`;
return `http://localhost:${process.env.PORT ?? 3000}`;
}
export function TRPCProvider({ children }: { children: React.ReactNode }) {
const [queryClient] = useState(
() =>
new QueryClient({
defaultOptions: {
queries: {
staleTime: 5 * 60 * 1000,
retry: 1,
},
},
})
);
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
loggerLink({
enabled: (op) =>
process.env.NODE_ENV === "development" ||
(op.direction === "down" && op.result instanceof Error),
}),
httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
transformer: superjson,
}),
],
})
);
return (
<trpc.Provider client={trpcClient} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>
{children}
</QueryClientProvider>
</trpc.Provider>
);
}// src/app/layout.tsx (relevant part)
import { TRPCProvider } from "@/components/providers/TRPCProvider";
export default function RootLayout({ children }: { children: React.ReactNode }) {
return (
<html>
<body>
<TRPCProvider>{children}</TRPCProvider>
</body>
</html>
);
}第六步:在组件中使用#
// src/app/dashboard/page.tsx
"use client";
import { trpc } from "@/lib/trpc";
export default function DashboardPage() {
const { data: user, isLoading, error } = trpc.user.me.useQuery();
if (isLoading) return <DashboardSkeleton />;
if (error) return <ErrorDisplay message={error.message} />;
return (
<div>
<h1>Welcome back, {user.name}</h1>
<p>Member since {user.createdAt.toLocaleDateString()}</p>
</div>
);
}那个 user.name 是完全类型化的。如果你拼错成 user.nme,TypeScript 会立刻捕获。如果你把服务端改成返回 displayName 而不是 name,每个客户端使用处都会显示编译错误。不会有运行时意外。
Context 和 Middleware#
Context 和 middleware 是 tRPC 从"巧妙的类型技巧"升级为"生产就绪框架"的关键。
创建 Context#
Context 函数在每个请求上运行。以下是一个真实的版本:
// src/server/trpc.ts
import { type FetchCreateContextFnOptions } from "@trpc/server/adapters/fetch";
import { getServerSession } from "next-auth";
import { authOptions } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
export async function createTRPCContext(opts: FetchCreateContextFnOptions) {
const session = await getServerSession(authOptions);
return {
session,
user: session?.user ?? null,
db: prisma,
headers: Object.fromEntries(opts.req.headers),
};
}
type Context = Awaited<ReturnType<typeof createTRPCContext>>;认证 Middleware#
最常见的 middleware 模式是区分公开和受保护的 procedure:
// src/server/trpc.ts
const isAuthed = t.middleware(async ({ ctx, next }) => {
if (!ctx.user) {
throw new TRPCError({
code: "UNAUTHORIZED",
message: "You must be logged in to perform this action",
});
}
return next({
ctx: {
// Override the context type — user is no longer nullable
user: ctx.user,
},
});
});
export const protectedProcedure = t.procedure.use(isAuthed);这个 middleware 运行后,任何 protectedProcedure 中的 ctx.user 都保证是非 null 的。类型系统强制执行这一点。你无法在公开 procedure 中意外访问 ctx.user.id 而不收到 TypeScript 的警告。
基于角色的 Middleware#
你可以组合 middleware 来实现更细粒度的访问控制:
const isAdmin = t.middleware(async ({ ctx, next }) => {
if (!ctx.user) {
throw new TRPCError({ code: "UNAUTHORIZED" });
}
if (ctx.user.role !== "ADMIN") {
throw new TRPCError({
code: "FORBIDDEN",
message: "Admin access required",
});
}
return next({
ctx: {
user: ctx.user,
},
});
});
export const adminProcedure = t.procedure.use(isAdmin);日志 Middleware#
Middleware 不仅仅用于认证。这里是一个性能日志 middleware:
const loggerMiddleware = t.middleware(async ({ path, type, next }) => {
const start = Date.now();
const result = await next();
const duration = Date.now() - start;
if (duration > 1000) {
console.warn(`⚠️ Slow ${type} ${path}: ${duration}ms`);
}
return result;
});
// Apply to all procedures
export const publicProcedure = t.procedure.use(loggerMiddleware);速率限制 Middleware#
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(20, "10 s"),
});
const rateLimitMiddleware = t.middleware(async ({ ctx, next }) => {
const ip = ctx.headers["x-forwarded-for"] ?? "unknown";
const { success, remaining } = await ratelimit.limit(ip);
if (!success) {
throw new TRPCError({
code: "TOO_MANY_REQUESTS",
message: `Rate limit exceeded. Try again later.`,
});
}
return next();
});使用 Zod 进行输入校验#
tRPC 使用 Zod 做输入校验。这不是可选的装饰——它是确保客户端和服务端输入安全的核心机制。
基本校验#
const postRouter = router({
create: protectedProcedure
.input(
z.object({
title: z.string().min(1, "Title is required").max(200, "Title too long"),
content: z.string().min(10, "Content must be at least 10 characters"),
categoryId: z.string().uuid("Invalid category ID"),
tags: z.array(z.string()).max(5, "Maximum 5 tags").default([]),
published: z.boolean().default(false),
})
)
.mutation(async ({ ctx, input }) => {
// input is fully typed:
// {
// title: string;
// content: string;
// categoryId: string;
// tags: string[];
// published: boolean;
// }
return ctx.db.post.create({
data: {
...input,
authorId: ctx.user.id,
},
});
}),
});双重校验技巧#
这里有一个微妙的点:Zod 校验在 两端 都会运行。在客户端,tRPC 在发送请求之前就校验输入。如果输入无效,请求根本不会离开浏览器。在服务端,同一个 schema 再次校验作为安全措施。
这意味着你免费获得了即时的客户端校验:
"use client";
import { trpc } from "@/lib/trpc";
import { useState } from "react";
export function CreatePostForm() {
const [title, setTitle] = useState("");
const utils = trpc.useUtils();
const createPost = trpc.post.create.useMutation({
onSuccess: () => {
utils.post.list.invalidate();
},
onError: (error) => {
// Zod errors arrive structured
if (error.data?.zodError) {
const fieldErrors = error.data.zodError.fieldErrors;
// fieldErrors.title?: string[]
// fieldErrors.content?: string[]
console.log(fieldErrors);
}
},
});
return (
<form
onSubmit={(e) => {
e.preventDefault();
createPost.mutate({
title,
content: "...",
categoryId: "...",
});
}}
>
<input value={title} onChange={(e) => setTitle(e.target.value)} />
{createPost.error?.data?.zodError?.fieldErrors.title && (
<span className="text-red-500">
{createPost.error.data.zodError.fieldErrors.title[0]}
</span>
)}
<button type="submit" disabled={createPost.isPending}>
{createPost.isPending ? "Creating..." : "Create Post"}
</button>
</form>
);
}复杂输入模式#
// Discriminated unions
const searchInput = z.discriminatedUnion("type", [
z.object({
type: z.literal("user"),
query: z.string(),
includeInactive: z.boolean().default(false),
}),
z.object({
type: z.literal("post"),
query: z.string(),
category: z.string().optional(),
}),
]);
// Pagination input reused across procedures
const paginationInput = z.object({
cursor: z.string().nullish(),
limit: z.number().min(1).max(100).default(20),
});
const postRouter = router({
infiniteList: publicProcedure
.input(
z.object({
...paginationInput.shape,
category: z.string().optional(),
sortBy: z.enum(["newest", "popular", "trending"]).default("newest"),
})
)
.query(async ({ ctx, input }) => {
const { cursor, limit, category, sortBy } = input;
const posts = await ctx.db.post.findMany({
take: limit + 1,
cursor: cursor ? { id: cursor } : undefined,
where: category ? { categoryId: category } : undefined,
orderBy:
sortBy === "newest"
? { createdAt: "desc" }
: sortBy === "popular"
? { likes: "desc" }
: { score: "desc" },
});
let nextCursor: string | undefined;
if (posts.length > limit) {
const nextItem = posts.pop();
nextCursor = nextItem?.id;
}
return {
posts,
nextCursor,
};
}),
});可选输入#
不是每个 procedure 都需要输入。Query 通常不需要:
const statsRouter = router({
// No input needed
overview: publicProcedure.query(async ({ ctx }) => {
const [userCount, postCount, commentCount] = await Promise.all([
ctx.db.user.count(),
ctx.db.post.count(),
ctx.db.comment.count(),
]);
return { userCount, postCount, commentCount };
}),
// Optional filters
detailed: publicProcedure
.input(
z
.object({
from: z.date().optional(),
to: z.date().optional(),
})
.optional()
)
.query(async ({ ctx, input }) => {
const where = {
...(input?.from && { createdAt: { gte: input.from } }),
...(input?.to && { createdAt: { lte: input.to } }),
};
return ctx.db.post.groupBy({
by: ["categoryId"],
where,
_count: true,
});
}),
});错误处理#
tRPC 的错误处理是结构化的、类型安全的,并且与 HTTP 语义和客户端 UI 都能干净地集成。
在服务端抛出错误#
import { TRPCError } from "@trpc/server";
const postRouter = router({
delete: protectedProcedure
.input(z.object({ id: z.string().uuid() }))
.mutation(async ({ ctx, input }) => {
const post = await ctx.db.post.findUnique({
where: { id: input.id },
});
if (!post) {
throw new TRPCError({
code: "NOT_FOUND",
message: "Post not found",
});
}
if (post.authorId !== ctx.user.id) {
throw new TRPCError({
code: "FORBIDDEN",
message: "You can only delete your own posts",
});
}
await ctx.db.post.delete({ where: { id: input.id } });
return { success: true };
}),
});tRPC 错误码映射到 HTTP 状态码:
| tRPC 错误码 | HTTP 状态码 | 使用场景 |
|---|---|---|
BAD_REQUEST | 400 | 超出 Zod 校验的无效输入 |
UNAUTHORIZED | 401 | 未登录 |
FORBIDDEN | 403 | 已登录但权限不足 |
NOT_FOUND | 404 | 资源不存在 |
CONFLICT | 409 | 重复资源 |
TOO_MANY_REQUESTS | 429 | 超出速率限制 |
INTERNAL_SERVER_ERROR | 500 | 意外的服务器错误 |
自定义错误格式化#
还记得我们搭建时的 error formatter 吗?以下是它在实践中的用法:
const t = initTRPC.context<typeof createTRPCContext>().create({
transformer: superjson,
errorFormatter({ shape, error }) {
return {
...shape,
data: {
...shape.data,
zodError:
error.cause instanceof ZodError ? error.cause.flatten() : null,
// Add custom fields
timestamp: new Date().toISOString(),
requestId: crypto.randomUUID(),
},
};
},
});客户端错误处理#
"use client";
import { trpc } from "@/lib/trpc";
import { toast } from "sonner";
export function DeletePostButton({ postId }: { postId: string }) {
const utils = trpc.useUtils();
const deletePost = trpc.post.delete.useMutation({
onSuccess: () => {
toast.success("Post deleted");
utils.post.list.invalidate();
},
onError: (error) => {
switch (error.data?.code) {
case "FORBIDDEN":
toast.error("You don't have permission to delete this post");
break;
case "NOT_FOUND":
toast.error("This post no longer exists");
utils.post.list.invalidate();
break;
default:
toast.error("Something went wrong. Please try again.");
}
},
});
return (
<button
onClick={() => deletePost.mutate({ id: postId })}
disabled={deletePost.isPending}
>
{deletePost.isPending ? "Deleting..." : "Delete"}
</button>
);
}全局错误处理#
你可以设置一个全局错误处理器来捕获所有未处理的 tRPC 错误:
// In your TRPCProvider
const [queryClient] = useState(
() =>
new QueryClient({
defaultOptions: {
mutations: {
onError: (error) => {
// Global fallback for unhandled mutation errors
if (error instanceof TRPCClientError) {
if (error.data?.code === "UNAUTHORIZED") {
// Redirect to login
window.location.href = "/login";
return;
}
toast.error(error.message);
}
},
},
},
})
);Mutation 和乐观更新#
Mutation 是 tRPC 与 TanStack Query 真正良好集成的地方。来看一个真实的模式:带乐观更新的点赞按钮。
基本 Mutation#
// Server
const postRouter = router({
toggleLike: protectedProcedure
.input(z.object({ postId: z.string().uuid() }))
.mutation(async ({ ctx, input }) => {
const existing = await ctx.db.like.findUnique({
where: {
userId_postId: {
userId: ctx.user.id,
postId: input.postId,
},
},
});
if (existing) {
await ctx.db.like.delete({
where: { id: existing.id },
});
return { liked: false };
}
await ctx.db.like.create({
data: {
userId: ctx.user.id,
postId: input.postId,
},
});
return { liked: true };
}),
});乐观更新#
用户点击"喜欢"。你不想等 200ms 的服务器响应才更新 UI。乐观更新解决了这个问题:立即更新 UI,如果服务器拒绝就回滚。
"use client";
import { trpc } from "@/lib/trpc";
export function LikeButton({ postId, initialLiked, initialCount }: {
postId: string;
initialLiked: boolean;
initialCount: number;
}) {
const utils = trpc.useUtils();
const toggleLike = trpc.post.toggleLike.useMutation({
onMutate: async ({ postId }) => {
// Cancel outgoing refetches so they don't overwrite our optimistic update
await utils.post.byId.cancel({ id: postId });
// Snapshot the previous value
const previousPost = utils.post.byId.getData({ id: postId });
// Optimistically update the cache
utils.post.byId.setData({ id: postId }, (old) => {
if (!old) return old;
return {
...old,
liked: !old.liked,
likeCount: old.liked ? old.likeCount - 1 : old.likeCount + 1,
};
});
// Return the snapshot for rollback
return { previousPost };
},
onError: (_error, { postId }, context) => {
// Roll back to the previous value on error
if (context?.previousPost) {
utils.post.byId.setData({ id: postId }, context.previousPost);
}
},
onSettled: (_data, _error, { postId }) => {
// Always refetch after error or success to ensure server state
utils.post.byId.invalidate({ id: postId });
},
});
return (
<button
onClick={() => toggleLike.mutate({ postId })}
className={initialLiked ? "text-red-500" : "text-gray-400"}
>
♥ {initialCount}
</button>
);
}模式永远是一样的:
onMutate:取消查询,快照当前数据,应用乐观更新,返回快照。onError:使用快照回滚。onSettled:无论成功还是失败,失效查询使其从服务器重新获取。
这个三步舞确保 UI 始终响应及时,并最终与服务器一致。
失效关联查询#
Mutation 之后,你通常需要刷新相关数据。tRPC 的 useUtils() 让这变得很方便:
const createComment = trpc.comment.create.useMutation({
onSuccess: (_data, variables) => {
// Invalidate the post's comment list
utils.comment.listByPost.invalidate({ postId: variables.postId });
// Invalidate the post itself (comment count changed)
utils.post.byId.invalidate({ id: variables.postId });
// Invalidate ALL post lists (comment counts in list views)
utils.post.list.invalidate();
},
});批量请求和订阅#
HTTP 批量请求#
默认情况下,使用 httpBatchLink 的 tRPC 会将多个同时发起的请求合并为一个 HTTP 调用。如果一个组件渲染时发起了三个查询:
function Dashboard() {
const user = trpc.user.me.useQuery();
const posts = trpc.post.list.useQuery({ limit: 10 });
const stats = trpc.stats.overview.useQuery();
// ...
}这三个查询会自动合并为一个 HTTP 请求:GET /api/trpc/user.me,post.list,stats.overview?batch=1&input=...
服务端处理这三个请求,在一个响应中返回三个结果,TanStack Query 把结果分发到各个 hook。不需要任何配置。
如果需要,你可以对特定调用禁用批量请求:
// In your provider, use splitLink to route specific procedures differently
import { splitLink, httpBatchLink, httpLink } from "@trpc/client";
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: (op) => op.path === "post.infiniteList",
true: httpLink({
url: `${getBaseUrl()}/api/trpc`,
transformer: superjson,
}),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
transformer: superjson,
maxURLLength: 2048,
}),
}),
],
})
);WebSocket 订阅#
对于实时功能,tRPC 支持通过 WebSocket 进行订阅。这需要一个单独的 WebSocket 服务器(Next.js 的 Route Handler 原生不支持 WebSocket)。
// src/server/routers/notification.ts
import { observable } from "@trpc/server/observable";
// In-memory event emitter (use Redis pub/sub in production)
import { EventEmitter } from "events";
const eventEmitter = new EventEmitter();
export const notificationRouter = router({
onNew: protectedProcedure.subscription(({ ctx }) => {
return observable<Notification>((emit) => {
const handler = (notification: Notification) => {
if (notification.userId === ctx.user.id) {
emit.next(notification);
}
};
eventEmitter.on("notification", handler);
return () => {
eventEmitter.off("notification", handler);
};
});
}),
// Mutation that triggers a notification
markAsRead: protectedProcedure
.input(z.object({ id: z.string() }))
.mutation(async ({ ctx, input }) => {
const notification = await ctx.db.notification.update({
where: { id: input.id, userId: ctx.user.id },
data: { readAt: new Date() },
});
return notification;
}),
});客户端:
"use client";
import { trpc } from "@/lib/trpc";
import { useState } from "react";
export function NotificationBell() {
const [notifications, setNotifications] = useState<Notification[]>([]);
trpc.notification.onNew.useSubscription(undefined, {
onData: (notification) => {
setNotifications((prev) => [notification, ...prev]);
toast.info(notification.message);
},
onError: (error) => {
console.error("Subscription error:", error);
},
});
return (
<div>
<span className="badge">{notifications.length}</span>
{/* notification list UI */}
</div>
);
}WebSocket 传输需要一个专用的服务器进程。以下是使用 ws 库的最小化设置:
// ws-server.ts (separate process)
import { applyWSSHandler } from "@trpc/server/adapters/ws";
import { WebSocketServer } from "ws";
import { appRouter } from "./server/routers/_app";
import { createTRPCContext } from "./server/trpc";
const wss = new WebSocketServer({ port: 3001 });
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext: createTRPCContext,
});
wss.on("connection", (ws) => {
console.log(`Connection opened (${wss.clients.size} total)`);
ws.once("close", () => {
console.log(`Connection closed (${wss.clients.size} total)`);
});
});
process.on("SIGTERM", () => {
handler.broadcastReconnectNotification();
wss.close();
});
console.log("WebSocket server listening on ws://localhost:3001");客户端需要一个 wsLink 来处理订阅:
import { wsLink, createWSClient } from "@trpc/client";
const wsClient = createWSClient({
url: "ws://localhost:3001",
});
// Use splitLink to route subscriptions through WebSocket
const [trpcClient] = useState(() =>
trpc.createClient({
links: [
splitLink({
condition: (op) => op.type === "subscription",
true: wsLink({ client: wsClient, transformer: superjson }),
false: httpBatchLink({
url: `${getBaseUrl()}/api/trpc`,
transformer: superjson,
}),
}),
],
})
);类型安全的文件上传#
tRPC 原生不处理文件上传。它是一个 JSON-RPC 协议——二进制数据不在它的处理范围内。但你可以通过将 tRPC 与预签名 URL 结合来构建类型安全的上传流程。
模式如下:
- 客户端向 tRPC 请求预签名上传 URL。
- tRPC 校验请求,检查权限,生成 URL。
- 客户端使用预签名 URL 直接上传到 S3。
- 客户端通知 tRPC 上传完成。
// src/server/routers/upload.ts
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
const s3 = new S3Client({ region: process.env.AWS_REGION });
export const uploadRouter = router({
getPresignedUrl: protectedProcedure
.input(
z.object({
filename: z.string().min(1).max(255),
contentType: z.string().regex(/^(image|application)\//),
size: z.number().max(10 * 1024 * 1024, "File must be under 10MB"),
})
)
.mutation(async ({ ctx, input }) => {
const key = `uploads/${ctx.user.id}/${crypto.randomUUID()}-${input.filename}`;
const command = new PutObjectCommand({
Bucket: process.env.S3_BUCKET!,
Key: key,
ContentType: input.contentType,
ContentLength: input.size,
});
const presignedUrl = await getSignedUrl(s3, command, {
expiresIn: 300, // 5 minutes
});
// Store pending upload in database
const upload = await ctx.db.upload.create({
data: {
key,
userId: ctx.user.id,
filename: input.filename,
contentType: input.contentType,
size: input.size,
status: "PENDING",
},
});
return {
uploadId: upload.id,
presignedUrl,
key,
};
}),
confirmUpload: protectedProcedure
.input(z.object({ uploadId: z.string().uuid() }))
.mutation(async ({ ctx, input }) => {
const upload = await ctx.db.upload.findUnique({
where: { id: input.uploadId, userId: ctx.user.id },
});
if (!upload) {
throw new TRPCError({ code: "NOT_FOUND" });
}
// Verify the file actually exists in S3
// (optional but recommended)
const confirmed = await ctx.db.upload.update({
where: { id: upload.id },
data: { status: "CONFIRMED" },
});
return {
url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${confirmed.key}`,
};
}),
});客户端:
"use client";
import { trpc } from "@/lib/trpc";
import { useState, useCallback } from "react";
export function FileUpload() {
const [uploading, setUploading] = useState(false);
const getPresignedUrl = trpc.upload.getPresignedUrl.useMutation();
const confirmUpload = trpc.upload.confirmUpload.useMutation();
const handleFileChange = useCallback(
async (e: React.ChangeEvent<HTMLInputElement>) => {
const file = e.target.files?.[0];
if (!file) return;
setUploading(true);
try {
// Step 1: Get presigned URL from tRPC
const { presignedUrl, uploadId } = await getPresignedUrl.mutateAsync({
filename: file.name,
contentType: file.type,
size: file.size,
});
// Step 2: Upload directly to S3
const uploadResponse = await fetch(presignedUrl, {
method: "PUT",
body: file,
headers: {
"Content-Type": file.type,
},
});
if (!uploadResponse.ok) {
throw new Error("Upload failed");
}
// Step 3: Confirm upload via tRPC
const { url } = await confirmUpload.mutateAsync({ uploadId });
toast.success(`File uploaded: ${url}`);
} catch (error) {
toast.error("Upload failed. Please try again.");
} finally {
setUploading(false);
}
},
[getPresignedUrl, confirmUpload]
);
return (
<div>
<input
type="file"
onChange={handleFileChange}
disabled={uploading}
accept="image/*"
/>
{uploading && <p>Uploading...</p>}
</div>
);
}整个流程都是类型安全的。预签名 URL 的响应类型、上传 ID 的类型、确认响应——全部从服务端定义推断。如果你向预签名 URL 响应添加新字段,客户端立刻就知道。
服务端调用和 React Server Components#
在 Next.js App Router 中,你经常想在 Server Components 里获取数据。tRPC 通过服务端 caller 支持这种场景:
// src/server/trpc.ts
export const createCaller = createCallerFactory(appRouter);
// Usage in a Server Component
// src/app/posts/[id]/page.tsx
import { createTRPCContext } from "@/server/trpc";
import { createCaller } from "@/server/trpc";
export default async function PostPage({
params,
}: {
params: { id: string };
}) {
const ctx = await createTRPCContext({
req: new Request("http://localhost"),
resHeaders: new Headers(),
});
const caller = createCaller(ctx);
const post = await caller.post.byId({ id: params.id });
return (
<article>
<h1>{post.title}</h1>
<div>{post.content}</div>
{/* Client components for interactive parts */}
<LikeButton postId={post.id} initialLiked={post.liked} />
<CommentSection postId={post.id} />
</article>
);
}这让你两全其美:服务端渲染的初始数据具有完整的类型安全,而客户端交互则用于 mutation 和实时功能。
测试 tRPC Procedure#
测试很简单,因为 procedure 就是函数。你不需要启动 HTTP 服务器。
// src/server/routers/user.test.ts
import { describe, it, expect, vi } from "vitest";
import { appRouter } from "./routers/_app";
import { createCaller } from "./trpc";
describe("user router", () => {
it("returns the current user profile", async () => {
const caller = createCaller({
user: { id: "user-1", email: "test@example.com", role: "USER" },
db: prismaMock,
session: mockSession,
headers: {},
});
prismaMock.user.findUnique.mockResolvedValue({
id: "user-1",
name: "Test User",
email: "test@example.com",
image: null,
createdAt: new Date("2026-01-01"),
});
const result = await caller.user.me();
expect(result).toEqual({
id: "user-1",
name: "Test User",
email: "test@example.com",
image: null,
createdAt: new Date("2026-01-01"),
});
});
it("throws UNAUTHORIZED for unauthenticated requests", async () => {
const caller = createCaller({
user: null,
db: prismaMock,
session: null,
headers: {},
});
await expect(caller.user.me()).rejects.toThrow("UNAUTHORIZED");
});
it("validates input with Zod", async () => {
const caller = createCaller({
user: { id: "user-1", email: "test@example.com", role: "USER" },
db: prismaMock,
session: mockSession,
headers: {},
});
await expect(
caller.user.updateProfile({
name: "", // min length 1
})
).rejects.toThrow();
});
});不需要 mock HTTP 层,不需要 supertest,不需要路由匹配。直接调用函数并断言结果。这是 tRPC 被低估的优势之一:测试简单得不可思议,因为传输层只是一个实现细节。
什么时候不该用 tRPC#
tRPC 不是万能方案。以下是它失效的场景:
公共 API#
如果你在构建一个供外部开发者使用的 API,tRPC 是错误的选择。外部消费者无法访问你的 TypeScript 类型。他们需要一个有文档的、稳定的契约——REST 用 OpenAPI/Swagger,或者 GraphQL schema。tRPC 的类型安全只在客户端和服务端共享同一个 TypeScript 代码库时才有效。
移动应用(除非你用 TypeScript)#
如果你的移动应用是用 Swift、Kotlin 或 Dart 写的,tRPC 没有任何价值。类型无法跨语言边界传递。理论上你可以用 trpc-openapi 从 tRPC 路由生成 OpenAPI spec,但到那时你又把繁琐的仪式加回来了。不如一开始就用 REST。
微服务#
tRPC 假设是单一的 TypeScript 代码库。如果你的后端拆分成多个不同语言的服务,tRPC 无法帮助服务间通信。用 gRPC、REST 或消息队列。
前后端分仓库的大团队#
如果你的前端和后端在不同的仓库,有独立的部署流水线,你就失去了 tRPC 的核心优势。类型共享需要 monorepo 或共享包。你 可以 把 AppRouter 类型作为 npm 包发布,但现在你面临的版本管理问题,REST + OpenAPI 处理得更自然。
需要 REST 语义时#
如果你需要 HTTP 缓存头、内容协商、ETag 或其他 REST 特有的功能,tRPC 对 HTTP 的抽象会跟你作对。tRPC 把 HTTP 当作传输细节,而不是功能。
决策框架#
以下是我的决策方式:
| 场景 | 建议 |
|---|---|
| 同仓库全栈 TypeScript 应用 | tRPC — 最大收益,最小开销 |
| 内部工具 / 管理面板 | tRPC — 开发速度是第一优先级 |
| 面向第三方开发者的公共 API | REST + OpenAPI — 消费者需要文档,不是类型 |
| 移动端 + Web 客户端(非 TS 移动端) | REST 或 GraphQL — 需要语言无关的契约 |
| 重实时(聊天、游戏) | tRPC subscriptions 或 原生 WebSocket,取决于复杂度 |
| 前后端团队分离 | GraphQL — schema 是团队之间的契约 |
生产环境的实用经验#
一些我从生产环境运行 tRPC 中学到的、文档里没有的东西:
保持 router 精简。 单个 router 文件不应该超过 200 行。按领域拆分:userRouter、postRouter、billingRouter,每个一个文件。
使用 createCallerFactory 做服务端调用。 在 Server Component 中调用自己的 API 时不要用 fetch。Caller factory 给你同样的类型安全,零 HTTP 开销。
不要过度优化批量请求。 默认的 httpBatchLink 几乎总是够用的。我见过团队花好几天配置 splitLink 只换来了微小的收益。先做性能分析。
在 QueryClient 中设置 staleTime。 默认 staleTime 为 0 意味着每次焦点事件都触发重新获取。根据你的数据新鲜度需求设置合理的值(30 秒到 5 分钟)。
从第一天就用 superjson。 后面再加意味着需要同时迁移所有客户端和服务端。这是一行配置就能避免 Date 序列化 bug 的事。
Error boundary 是你的好朋友。 用 React error boundary 包裹 tRPC 密集的页面区域。一个失败的查询不应该搞崩整个页面。
"use client";
import { ErrorBoundary } from "react-error-boundary";
function ErrorFallback({ error, resetErrorBoundary }: {
error: Error;
resetErrorBoundary: () => void;
}) {
return (
<div role="alert" className="p-4 bg-red-50 rounded-lg">
<p className="font-medium text-red-800">Something went wrong</p>
<pre className="text-sm text-red-600 mt-2">{error.message}</pre>
<button
onClick={resetErrorBoundary}
className="mt-3 px-4 py-2 bg-red-600 text-white rounded"
>
Try again
</button>
</div>
);
}
export default function DashboardPage() {
return (
<div>
<h1>Dashboard</h1>
<ErrorBoundary FallbackComponent={ErrorFallback}>
<UserStats />
</ErrorBoundary>
<ErrorBoundary FallbackComponent={ErrorFallback}>
<RecentPosts />
</ErrorBoundary>
</div>
);
}总结#
tRPC 不是 REST 或 GraphQL 的替代品。它是针对特定场景的不同工具:当你同时控制客户端和服务端,两者都是 TypeScript,并且你想要从"我改了后端"到"前端知道了"的最短路径。
在这种场景下,没有什么能比得上它。没有代码生成,没有 schema 文件,没有漂移。就是 TypeScript 做它最擅长的事:在错误到达生产环境之前就捕获它们。
权衡很明确:你放弃了协议级别的互操作性(没有非 TypeScript 客户端可以使用),换来了开发速度和编译时安全性——这是用其他方式很难达到的。
对于大多数全栈 TypeScript 应用来说,这个权衡是值得的。