管理员通知接入

管理员通知接入

当前项目已经具备一套独立的管理员通知机制,但默认没有接到任何业务流程里

这意味着:

  • 基础机制已经准备好
  • 支付、认证、AI 等流程不会自动发通知
  • 你需要自己决定在什么地方调用它

目前已经具备的能力

当前实现包含:

  • 基于数据库的 outbox 事件与投递队列
  • 带重试能力的 dispatcher
  • Telegram 渠道实现
  • 后台通知设置项
  • 一个受保护的 dispatch 接口,方便 cron 或手动补偿

核心文件:

  • src/admin-notification/outbox.ts
  • src/admin-notification/dispatcher.ts
  • src/admin-notification/notifier.ts
  • src/admin-notification/channels/telegram.ts
  • src/app/api/admin-notifications/dispatch/route.ts

整体工作方式

这套机制的设计目标是:不拖慢主业务流程,也不因为通知失败影响主流程

执行链路是:

  1. 你的业务代码创建一个管理员通知事件
  2. 事件写入数据库 outbox
  3. 按已启用渠道创建 delivery,例如 Telegram
  4. dispatcher 异步发送待处理 delivery
  5. 失败的 delivery 按退避策略重试

也就是说:

  • 主流程只负责 enqueue
  • 发送由异步派发负责
  • 通知失败不应该影响业务成功与否

数据库准备

这个功能新增了两张表:

  • admin_notification_event
  • admin_notification_delivery

对应 schema 文件:

  • src/config/db/schema.sqlite.ts
  • src/config/db/schema.postgres.ts
  • src/config/db/schema.mysql.ts

在使用之前先把数据库结构推上去:

pnpm run db:push

如果你走 migration 流程:

pnpm run db:generate
pnpm run db:migrate

后台配置

后台设置里已经新增了 Notifications tab。

关键配置项:

  • admin_notification_enabled
  • admin_notification_order_enabled
  • admin_notification_alert_enabled
  • admin_notification_telegram_enabled
  • admin_notification_telegram_bot_token
  • admin_notification_telegram_chat_ids
  • admin_notification_telegram_thread_id

Telegram 最小可用配置:

  • 开启管理员通知
  • 开启 Telegram 渠道
  • 填 bot token
  • 填一个或多个 chat id

admin_notification_telegram_chat_ids 支持:

  • 每行一个 chat id
  • 或者逗号分隔

如何手动接入

方案一:使用现成的 notifier helper

从这里引入:

import {
  notifyAdminOrderPaid,
  notifyAdminPaymentWebhookFailed,
  notifyAdminSubscriptionCanceled,
  notifyAdminSubscriptionRenewed,
} from '@/admin-notification';

示例:

import { notifyAdminOrderPaid } from '@/admin-notification';

async function notifyOrder(order: {
  orderNo: string;
  amount?: number | null;
  currency?: string | null;
  paymentAmount?: number | null;
  paymentCurrency?: string | null;
  paymentProvider?: string | null;
  paymentEmail?: string | null;
  userEmail?: string | null;
  productName?: string | null;
}) {
  try {
    await notifyAdminOrderPaid({
      orderNo: order.orderNo,
      amount: order.amount,
      currency: order.currency,
      paymentAmount: order.paymentAmount,
      paymentCurrency: order.paymentCurrency,
      paymentProvider: order.paymentProvider,
      userEmail: order.paymentEmail || order.userEmail,
      productName: order.productName,
    });
  } catch (error) {
    console.log('enqueue admin notification failed', error);
  }
}

同样的模式也适用于这些 helper:

  • notifyAdminSubscriptionRenewed
  • notifyAdminSubscriptionCanceled
  • notifyAdminPaymentWebhookFailed

方案二:自己 enqueue 自定义事件

如果你的业务事件不适合现成 helper,可以直接用底层 API:

import {
  AdminNotificationEventType,
  AdminNotificationLevel,
  AdminNotificationSource,
  enqueueAdminNotification,
} from '@/admin-notification';

示例:

await enqueueAdminNotification({
  eventType: AdminNotificationEventType.PAYMENT_WEBHOOK_FAILED,
  level: AdminNotificationLevel.CRITICAL,
  source: AdminNotificationSource.PAYMENT,
  title: 'Custom webhook alert',
  entityType: 'webhook',
  entityId: 'stripe',
  dedupeKey: `custom_webhook_alert:${Date.now()}`,
  payload: {
    provider: 'stripe',
    error: 'custom error',
    route: '/api/example',
    occurredAt: new Date().toISOString(),
  },
});

如果你需要新增事件类型,改这里:

src / admin - notification / types.ts;

如果你需要自定义 Telegram 消息格式,改这里:

src / admin - notification / templates / telegram.ts;

如何让通知尽量实时

现成的 helper 内部已经会调用 scheduleAdminNotificationDispatch()

如果你直接使用 enqueueAdminNotification(),并希望在当前请求结束后立即做一次 best-effort 派发,可以这样写:

import {
  enqueueAdminNotification,
  scheduleAdminNotificationDispatch,
} from '@/admin-notification';

const result = await enqueueAdminNotification(...);

if (result.queued) {
  scheduleAdminNotificationDispatch();
}

这样做的效果是:

  • 主流程只负责入库
  • 请求结束后尽快尝试发送
  • 不在业务代码里直接等待 Telegram API

兜底派发接口

项目里已经有一个受保护的内部接口:

/api/admin-notifications/dispatch

请求头要求:

  • Authorization: Bearer <AUTH_SECRET>

示例:

curl -X POST \
  -H "Authorization: Bearer $AUTH_SECRET" \
  "http://localhost:3000/api/admin-notifications/dispatch?limit=20"

这个接口适合用于:

  • cron 兜底派发
  • 异步派发失败后的补偿
  • 手动触发测试

Cron 建议

建议的兜底频率:

  • 关键告警:每 15s30s
  • 普通管理员通知:每 30s

推荐组合是:

  • 业务代码里做即时 best-effort dispatch
  • 再配一个 cron 做兜底和重试

不建议单纯依赖高频扫库来追求实时性。

重试规则

当前重试逻辑在 src/admin-notification/dispatcher.ts 中。

退避时间:

  • 第 1 次重试:10s
  • 第 2 次重试:30s
  • 第 3 次重试:120s
  • 后续重试:600s

这类情况建议视为不可重试并直接 dead:

  • token 配错
  • chat id 错误
  • 渠道不支持

Telegram 配置说明

你需要:

  1. 从 BotFather 获取 bot token
  2. 获取目标 chat id
  3. 如果使用 forum topic,可选填 message_thread_id

补充说明:

  • 群组或频道 chat id 往往是负数
  • Telegram forum topic 需要 message_thread_id

即便 Telegram 配置错误,也不应该影响你的主业务流程,最多只是 delivery 失败。

推荐接入姿势

建议统一按这种模式接入:

try {
  await notifyAdminOrderPaid(...);
} catch (error) {
  console.log('enqueue admin notification failed', error);
}

原则:

  • 不让通知异常中断主业务
  • 在主业务成功写入后再 enqueue
  • 不在业务代码里直接调用 Telegram API
  • 只通过 notifier 或 outbox 层接入

建议接入的位置

常见接入点:

  • 支付成功
  • 订阅续费
  • 订阅取消
  • webhook 异常
  • AI 回调失败
  • 存储服务失败
  • 新用户注册提醒

接入点尽量放在业务成功/失败边界附近,不要放在 UI 层。

如何验证

  1. 在后台配置 Telegram 设置
  2. 确认数据库 schema 已经应用
  3. 临时在某个服务端流程里加一条 notifier 调用
  4. 触发对应业务
  5. 看 Telegram 是否收到消息
  6. 查数据库:
    • admin_notification_event
    • admin_notification_delivery
  7. 如有需要,手动调用 dispatch 接口补发

当前限制

当前 V1 范围是:

  • 只支持管理员通知
  • 只内置 Telegram 渠道
  • 默认不自动接任何业务
  • 暂时没有后台投递列表页

这不是缺陷,是当前刻意控制的范围:先把机制做稳,再由你按需接业务。

后续建议

如果你继续往下做,优先级比较高的是:

  • 补一个后台 delivery 列表页,方便看 dead letter
  • 把重要事件的 enqueue 融到业务事务里
  • 增加 Slack、Email、Webhook 等渠道
  • 增加 AI 和系统告警的现成 helper