数据库操作本质上也是一种异步行为,因此,我们可以像前端封装接口请求那样,把数据库操作单独封装一下,然后在 service 业务代码中使用,从而简化代码结构。
这里我们以登录相关的功能为例,来展开这个封装方式的学习。
先看这行导入,它决定了后面查询条件和 SQL 表达式的写法:
1import { and, eq, isNull, sql } from 'drizzle-orm'
其实这里 4 个东西都不复杂。
eq(a, b) 表示相等条件。你可以把它理解成 SQL 里的 a = b。
1eq(applications.code, 'admin')
and(...) 表示把多个条件拼成 AND。
1and(2eq(applications.code, 'admin'),3eq(applications.status, 'active'),4eq(applicationAuthMethods.provider, 'password'),5)
isNull(x) 表示 SQL 里的 x IS NULL。
1isNull(refreshTokens.usedAtMs)
sql 则是写原生 SQL 片段时用的。
1sql`COALESCE(${authSessions.revokedAtMs}, ${params.revokedAtMs})`
它适合那种 Drizzle 链式 API 不够直接,或者你本来就想写一小段 SQL 表达式的场景。
除了这 4 个条件工具,后面还反复出现这些查询 API:
.select(...):查什么列.from(...):从哪张表开始查.innerJoin(...):和哪张表做内连接.where(...):过滤条件.limit(1):只取一条.get():拿一条结果.insert(...).values(...):插入数据.update(...).set(...):更新数据.returning(...):把更新后的结果带回来.batch([...]):把多条写操作一起执行你可以先把整套链式写法看成“在 TypeScript 里写 SQL”。只不过不是直接拼字符串,而是把 select、join、where 这些步骤拆成链式方法。
isPasswordLoginEnabledForAdmin01export async function isPasswordLoginEnabledForAdmin(db: ApiDb): Promise<boolean> {02const row = await db03.select({ enabled: applicationAuthMethods.enabled })04.from(applicationAuthMethods)05.innerJoin(applications, eq(applications.id, applicationAuthMethods.applicationId))06.where(07and(08eq(applications.code, 'admin'),09eq(applications.status, 'active'),10eq(applicationAuthMethods.provider, 'password'),11),12)13.limit(1)14.get()1516return row?.enabled === 117}
这个方法的目标很单纯:查 admin 这条线有没有启用密码登录。
顺着链往下读就行。
.select({ enabled: applicationAuthMethods.enabled }) 表示只查 enabled 这一列,并把它取名成 enabled。
.from(applicationAuthMethods) 表示从 application_auth_methods 这张表开始。
.innerJoin(applications, eq(applications.id, applicationAuthMethods.applicationId)) 表示把 applications 表连进来,连接条件是 application id 对上。
.where(...) 里又拼了 3 个条件:
adminactivepassword.limit(1).get() 连起来看,意思就是:只取一条,并把结果当成单条记录拿出来。
最后:
1return row?.enabled === 1
这里把数据库里的 1 / 0 转成真正的布尔值。
findLoginUserByNormalizedEmail01export async function findLoginUserByNormalizedEmail(02db: ApiDb,03normalizedEmail: string,04): Promise<LoginUserRecord | null> {05const row = await db06.select({07userId: users.id,08emailId: userEmails.id,09email: userEmails.email,10userStatus: users.status,11passwordHash: passwordCredentials.passwordHash,12passwordAlgo: passwordCredentials.passwordAlgo,13})14.from(userEmails)15.innerJoin(users, eq(users.id, userEmails.userId))16.innerJoin(17passwordCredentials,18and(19eq(passwordCredentials.userId, users.id),20eq(passwordCredentials.emailId, userEmails.id),21),22)23.where(eq(userEmails.normalizedEmail, normalizedEmail))24.limit(1)25.get()2627return row28? {29...row,30userStatus: row.userStatus as LoginUserRecord['userStatus'],31passwordAlgo: row.passwordAlgo as LoginUserRecord['passwordAlgo'],32}33: null34}
这个方法是登录入口里最典型的读取工具方法。
它不是只查一张表,而是把:
user_emailsuserspassword_credentials三张表串起来,一次把登录要用到的核心信息拿全。
这里的关键在 .select({...})。
它不是查整行,而是只挑当前登录流程真正要用的字段:
这样返回值就会很聚焦,不会把没用的列也一股脑带出来。
再看 join。
第一段:
1.innerJoin(users, eq(users.id, userEmails.userId))
这是通过 userEmails.userId 找到对应的 users。
第二段:
1.innerJoin(2passwordCredentials,3and(4eq(passwordCredentials.userId, users.id),5eq(passwordCredentials.emailId, userEmails.id),6),7)
这里的连接条件比上一段更严。它同时要求:
最后 .where(eq(userEmails.normalizedEmail, normalizedEmail)) 表示按标准化邮箱去查。
整个方法返回 LoginUserRecord | null,所以查不到时会明确返回 null。
getAdminRolesForUser01export async function getAdminRolesForUser(02db: ApiDb,03userId: string,04): Promise<string[]> {05const rows = await db06.select({ code: roles.code })07.from(userRoleBindings)08.innerJoin(roles, eq(roles.id, userRoleBindings.roleId))09.innerJoin(applications, eq(applications.id, roles.applicationId))10.where(11and(12eq(userRoleBindings.userId, userId),13eq(userRoleBindings.status, 'active'),14eq(applications.code, 'admin'),15),16)1718return rows.map((row) => row.code)19}
这个方法查的是 admin 角色。
它从 user_role_bindings 起步,再一路连到 roles 和 applications,最后把条件收窄成:
activeadmin注意它的返回值不是整行对象,而是:
1return rows.map((row) => row.code)
也就是直接返回角色 code 数组。
这说明这个工具方法已经帮上层做了一层结果整理,调用方不用再自己去 map。
getAdminApplicationId01export async function getAdminApplicationId(db: ApiDb): Promise<string> {02const row = await db03.select({ id: applications.id })04.from(applications)05.where(eq(applications.code, 'admin'))06.limit(1)07.get()0809if (!row) {10throw new Error('Admin application is missing')11}1213return row.id14}
这个方法很适合拿来理解 .limit(1).get()。
它的意图就是:按 code 查 admin 这条 application,只拿一条。
如果查不到,直接抛错。
所以这类方法很像“配置读取器”:它预期这条数据就该存在,不存在就是异常状态。
createAdminSession01export async function createAdminSession(params: {02db: ApiDb03userId: string04applicationId: string05userAgent: string | null06ip: string | null07nowMs: number08expiresAtMs: number09roles: string[]10}): Promise<SessionContext> {11const sessionId = uuidv7()1213await params.db.batch([14params.db.insert(authSessions).values({15id: sessionId,16userId: params.userId,17applicationId: params.applicationId,18sessionType: 'admin',19deviceName: null,20userAgent: params.userAgent,21ip: params.ip,22lastSeenAtMs: params.nowMs,23createdAtMs: params.nowMs,24expiresAtMs: params.expiresAtMs,25revokedAtMs: null,26revokeReason: null,27}),28params.db29.update(users)30.set({31lastLoginAtMs: params.nowMs,32updatedAtMs: params.nowMs,33})34.where(eq(users.id, params.userId)),35])3637return {38sessionId,39userId: params.userId,40app: 'admin',41roles: params.roles,42expiresAtMs: params.expiresAtMs,43}44}
这个方法开始进入写操作。
第一眼先看两件事:
const sessionId = uuidv7()await params.db.batch([...])先生成 session id,再把两条写操作一起执行。
batch([...]) 里的第一条是插入 auth_sessions:
1params.db.insert(authSessions).values({ ... })
这就是最标准的 Drizzle 插入写法:
.insert(table) 指定插入哪张表.values({...}) 指定写入哪些字段第二条是更新 users:
1params.db2.update(users)3.set({4lastLoginAtMs: params.nowMs,5updatedAtMs: params.nowMs,6})7.where(eq(users.id, params.userId))
这就是最标准的更新写法:
.update(table) 选表.set({...}) 设新值.where(...) 限定更新哪一行最后方法返回的不是数据库原始结果,而是业务层更想要的 SessionContext。
所以它已经不只是“写库”,还顺手把业务层后面要继续传递的 session 信息整理好了。
insertRefreshToken01export async function insertRefreshToken(params: {02db: ApiDb03tokenId: string04sessionId: string05jtiHash: string06parentTokenId: string | null07issuedAtMs: number08expiresAtMs: number09}): Promise<void> {10await params.db.insert(refreshTokens).values({11id: params.tokenId,12sessionId: params.sessionId,13jtiHash: params.jtiHash,14parentTokenId: params.parentTokenId,15issuedAtMs: params.issuedAtMs,16expiresAtMs: params.expiresAtMs,17usedAtMs: null,18revokedAtMs: null,19replacedByTokenId: null,20})21}
这个方法几乎就是“纯插入”。
它没有 join,没有查询,没有额外整理,主要就是把 refresh token 这条记录写进去。
这种方法很适合单独抽出来,因为调用方只需要说一句“插入 refresh token”,不用反复关心表字段长什么样。
findRefreshTokenForSession01export async function findRefreshTokenForSession(params: {02db: ApiDb03jtiHash: string04sessionId: string05}): Promise<RefreshTokenRecord | null> {06const row = await params.db07.select({08tokenId: refreshTokens.id,09sessionId: refreshTokens.sessionId,10userId: authSessions.userId,11applicationCode: applications.code,12expiresAtMs: refreshTokens.expiresAtMs,13usedAtMs: refreshTokens.usedAtMs,14revokedAtMs: refreshTokens.revokedAtMs,15sessionRevokedAtMs: authSessions.revokedAtMs,16})17.from(refreshTokens)18.innerJoin(authSessions, eq(authSessions.id, refreshTokens.sessionId))19.innerJoin(applications, eq(applications.id, authSessions.applicationId))20.where(21and(22eq(refreshTokens.jtiHash, params.jtiHash),23eq(refreshTokens.sessionId, params.sessionId),24),25)26.limit(1)27.get()2829return row ?? null30}
这个方法和前面的登录查询很像,也是“多表拼起来,一次拿全刷新流程要用的信息”。
它一口气查出了:
这就是抽数据库工具方法的一个现实好处:service 层不需要自己分两三次查,工具方法内部可以一次把后面要判断的字段凑齐。
markRefreshTokenUsed01export async function markRefreshTokenUsed(params: {02db: ApiDb03tokenId: string04usedAtMs: number05}): Promise<boolean> {06const updated = await params.db07.update(refreshTokens)08.set({ usedAtMs: params.usedAtMs })09.where(10and(11eq(refreshTokens.id, params.tokenId),12isNull(refreshTokens.usedAtMs),13isNull(refreshTokens.revokedAtMs),14),15)16.returning({ id: refreshTokens.id })1718return updated.length === 119}
这个方法最值得看的地方是 .where(...) 和 .returning(...)。
先看条件:
usedAtMs 还必须是 NULLrevokedAtMs 也必须是 NULL也就是说,它不是无脑更新,而是“只有当前 token 还没被用过、也没被撤销时,才允许把它标记成已使用”。
再看:
1.returning({ id: refreshTokens.id })
这里表示把更新成功的结果带回来。
最后:
1return updated.length === 1
这就把数据库层的更新结果,转换成了一个很适合上层判断的布尔值。
updateRefreshRotation01export async function updateRefreshRotation(params: {02db: ApiDb03oldTokenId: string04newTokenId: string05sessionId: string06lastSeenAtMs: number07}): Promise<void> {08await params.db.batch([09params.db10.update(refreshTokens)11.set({ replacedByTokenId: params.newTokenId })12.where(eq(refreshTokens.id, params.oldTokenId)),13params.db14.update(authSessions)15.set({ lastSeenAtMs: params.lastSeenAtMs })16.where(eq(authSessions.id, params.sessionId)),17])18}
这个方法没有查询,只有两条更新。
第一条把旧 token 指向新 token:
replacedByTokenId = newTokenId第二条更新 session 的 lastSeenAtMs。
因为两条更新是一起发生的,所以继续用 batch([...]) 收在一个方法里很自然。
revokeSession01export async function revokeSession(params: {02db: ApiDb03sessionId: string04revokedAtMs: number05reason: string06}): Promise<void> {07await params.db.batch([08params.db09.update(authSessions)10.set({11revokedAtMs: sql`COALESCE(${authSessions.revokedAtMs}, ${params.revokedAtMs})`,12revokeReason: sql`COALESCE(${authSessions.revokeReason}, ${params.reason})`,13})14.where(eq(authSessions.id, params.sessionId)),15params.db16.update(refreshTokens)17.set({18revokedAtMs: sql`COALESCE(${refreshTokens.revokedAtMs}, ${params.revokedAtMs})`,19})20.where(21and(22eq(refreshTokens.sessionId, params.sessionId),23isNull(refreshTokens.revokedAtMs),24),25),26])27}
这个方法最适合拿来理解 sql。
看这句:
1sql`COALESCE(${authSessions.revokedAtMs}, ${params.revokedAtMs})`
COALESCE(a, b) 的意思可以先记成:如果 a 不是 NULL,就用 a;否则用 b。
所以这里的作用就是:
revokedAtMs 本来已经有值,就保留原值NULL,才写入这次的撤销时间revokeReason 也是同样的思路。
这时链式 API 不够直接,sql 就很合适。它允许你在 Drizzle 语句里嵌一段原生 SQL 表达式。
后半段更新 refreshTokens 时,又结合了:
eq(...)and(...)isNull(...)也就是只撤销当前 session 下、且还没被撤销的 refresh token。
读完这些方法之后,再回头看这篇的核心思想,其实已经很清楚了。
这些方法如果散在 route 或 service 里,业务主线会被大量数据库细节打断。你一边想看登录流程,一边却总要切去读 join、where、batch、returning。
单独抽出来之后,层次就顺了:
而且抽出来还有两个很实在的好处。
第一,重复查询模式只写一遍。以后谁要按标准化邮箱查登录用户,就直接调 findLoginUserByNormalizedEmail(...)。
第二,service 层会更像业务代码。你读 service 时看到的是“检查密码登录是否启用”“查登录用户”“创建 session”“插入 refresh token”,而不是每一步都重新展开 SQL 细节。
所以这组方法的真正价值,不只是“把代码拆文件”,而是把数据库读写从业务主线里剥出来,让上层代码更容易顺着读。
001import { and, eq, isNull, sql } from 'drizzle-orm'002import { uuidv7 } from 'uuidv7'003import type { ApiDb } from '@/db/client'004import {005applicationAuthMethods,006applications,007authSessions,008passwordCredentials,009refreshTokens,010roles,011userEmails,012userRoleBindings,013users,014} from '@/db/schema'015import type {016LoginUserRecord,017RefreshTokenRecord,018SessionContext,019} from './types'020021export async function isPasswordLoginEnabledForAdmin(db: ApiDb): Promise<boolean> {022const row = await db023.select({ enabled: applicationAuthMethods.enabled })024.from(applicationAuthMethods)025.innerJoin(applications, eq(applications.id, applicationAuthMethods.applicationId))026.where(027and(028eq(applications.code, 'admin'),029eq(applications.status, 'active'),030eq(applicationAuthMethods.provider, 'password'),031),032)033.limit(1)034.get()035036return row?.enabled === 1037}038039export async function findLoginUserByNormalizedEmail(040db: ApiDb,041normalizedEmail: string,042): Promise<LoginUserRecord | null> {043const row = await db044.select({045userId: users.id,046emailId: userEmails.id,047email: userEmails.email,048userStatus: users.status,049passwordHash: passwordCredentials.passwordHash,050passwordAlgo: passwordCredentials.passwordAlgo,051})052.from(userEmails)053.innerJoin(users, eq(users.id, userEmails.userId))054.innerJoin(055passwordCredentials,056and(057eq(passwordCredentials.userId, users.id),058eq(passwordCredentials.emailId, userEmails.id),059),060)061.where(eq(userEmails.normalizedEmail, normalizedEmail))062.limit(1)063.get()064065return row066? {067...row,068userStatus: row.userStatus as LoginUserRecord['userStatus'],069passwordAlgo: row.passwordAlgo as LoginUserRecord['passwordAlgo'],070}071: null072}073074export async function getAdminRolesForUser(075db: ApiDb,076userId: string,077): Promise<string[]> {078const rows = await db079.select({ code: roles.code })080.from(userRoleBindings)081.innerJoin(roles, eq(roles.id, userRoleBindings.roleId))082.innerJoin(applications, eq(applications.id, roles.applicationId))083.where(084and(085eq(userRoleBindings.userId, userId),086eq(userRoleBindings.status, 'active'),087eq(applications.code, 'admin'),088),089)090091return rows.map((row) => row.code)092}093094export async function getAdminApplicationId(db: ApiDb): Promise<string> {095const row = await db096.select({ id: applications.id })097.from(applications)098.where(eq(applications.code, 'admin'))099.limit(1)100.get()101102if (!row) {103throw new Error('Admin application is missing')104}105106return row.id107}108109export async function createAdminSession(params: {110db: ApiDb111userId: string112applicationId: string113userAgent: string | null114ip: string | null115nowMs: number116expiresAtMs: number117roles: string[]118}): Promise<SessionContext> {119const sessionId = uuidv7()120121await params.db.batch([122params.db.insert(authSessions).values({123id: sessionId,124userId: params.userId,125applicationId: params.applicationId,126sessionType: 'admin',127deviceName: null,128userAgent: params.userAgent,129ip: params.ip,130lastSeenAtMs: params.nowMs,131createdAtMs: params.nowMs,132expiresAtMs: params.expiresAtMs,133revokedAtMs: null,134revokeReason: null,135}),136params.db137.update(users)138.set({139lastLoginAtMs: params.nowMs,140updatedAtMs: params.nowMs,141})142.where(eq(users.id, params.userId)),143])144145return {146sessionId,147userId: params.userId,148app: 'admin',149roles: params.roles,150expiresAtMs: params.expiresAtMs,151}152}153154export async function insertRefreshToken(params: {155db: ApiDb156tokenId: string157sessionId: string158jtiHash: string159parentTokenId: string | null160issuedAtMs: number161expiresAtMs: number162}): Promise<void> {163await params.db.insert(refreshTokens).values({164id: params.tokenId,165sessionId: params.sessionId,166jtiHash: params.jtiHash,167parentTokenId: params.parentTokenId,168issuedAtMs: params.issuedAtMs,169expiresAtMs: params.expiresAtMs,170usedAtMs: null,171revokedAtMs: null,172replacedByTokenId: null,173})174}175176export async function findRefreshTokenForSession(params: {177db: ApiDb178jtiHash: string179sessionId: string180}): Promise<RefreshTokenRecord | null> {181const row = await params.db182.select({183tokenId: refreshTokens.id,184sessionId: refreshTokens.sessionId,185userId: authSessions.userId,186applicationCode: applications.code,187expiresAtMs: refreshTokens.expiresAtMs,188usedAtMs: refreshTokens.usedAtMs,189revokedAtMs: refreshTokens.revokedAtMs,190sessionRevokedAtMs: authSessions.revokedAtMs,191})192.from(refreshTokens)193.innerJoin(authSessions, eq(authSessions.id, refreshTokens.sessionId))194.innerJoin(applications, eq(applications.id, authSessions.applicationId))195.where(196and(197eq(refreshTokens.jtiHash, params.jtiHash),198eq(refreshTokens.sessionId, params.sessionId),199),200)201.limit(1)202.get()203204return row ?? null205}206207export async function markRefreshTokenUsed(params: {208db: ApiDb209tokenId: string210usedAtMs: number211}): Promise<boolean> {212const updated = await params.db213.update(refreshTokens)214.set({ usedAtMs: params.usedAtMs })215.where(216and(217eq(refreshTokens.id, params.tokenId),218isNull(refreshTokens.usedAtMs),219isNull(refreshTokens.revokedAtMs),220),221)222.returning({ id: refreshTokens.id })223224return updated.length === 1225}226227export async function updateRefreshRotation(params: {228db: ApiDb229oldTokenId: string230newTokenId: string231sessionId: string232lastSeenAtMs: number233}): Promise<void> {234await params.db.batch([235params.db236.update(refreshTokens)237.set({ replacedByTokenId: params.newTokenId })238.where(eq(refreshTokens.id, params.oldTokenId)),239params.db240.update(authSessions)241.set({ lastSeenAtMs: params.lastSeenAtMs })242.where(eq(authSessions.id, params.sessionId)),243])244}245246export async function revokeSession(params: {247db: ApiDb248sessionId: string249revokedAtMs: number250reason: string251}): Promise<void> {252await params.db.batch([253params.db254.update(authSessions)255.set({256revokedAtMs: sql`COALESCE(${authSessions.revokedAtMs}, ${params.revokedAtMs})`,257revokeReason: sql`COALESCE(${authSessions.revokeReason}, ${params.reason})`,258})259.where(eq(authSessions.id, params.sessionId)),260params.db261.update(refreshTokens)262.set({263revokedAtMs: sql`COALESCE(${refreshTokens.revokedAtMs}, ${params.revokedAtMs})`,264})265.where(266and(267eq(refreshTokens.sessionId, params.sessionId),268isNull(refreshTokens.revokedAtMs),269),270),271])272}