api_server.py 讲解
这是项目的核心文件,提供 REST API 服务。
原始代码(分段讲解)
第一部分:导入和初始化
from quart import Quart, request, jsonify
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime
import uuid
from bot_sender import TelegramBotSender
from config import SESSION_STRING
app = Quart(__name__)
# 初始化 sender 和调度器
sender = TelegramBotSender(SESSION_STRING)
scheduler = AsyncIOScheduler()
# 存储定时任务
scheduled_jobs = {}
导入说明:
| 模块 | 说明 |
|---|---|
quart |
异步 Web 框架(类似 Flask,但支持异步) |
apscheduler |
定时任务调度器 |
datetime |
日期时间处理 |
uuid |
生成唯一 ID |
Quart vs Flask:
| 特性 | Flask | Quart |
|---|---|---|
| 异步支持 | 否 | 是 |
| 语法 | 相同 | 相同 |
| 性能 | 普通 | 高并发更好 |
初始化流程:
创建 Quart 应用实例
↓
创建 TelegramBotSender 实例
↓
创建调度器实例
↓
创建空字典存储定时任务
第二部分:生命周期钩子
@app.before_serving
async def startup():
"""服务启动前初始化"""
scheduler.start()
print("调度器已启动")
@app.after_serving
async def shutdown():
"""服务关闭时清理"""
scheduler.shutdown()
print("调度器已关闭")
什么是钩子(Hook)?
钩子是在特定时机自动执行的函数。
服务器启动 ────→ 执行 before_serving
↓
服务运行中...
↓
服务器关闭 ────→ 执行 after_serving
装饰器语法:
@app.before_serving
async def startup():
...
等价于:
async def startup():
...
startup = app.before_serving(startup)
装饰器把函数"注册"到特定事件上。
第三部分:路由基础
什么是路由?
路由就是把 URL 和函数对应起来。
GET /health ────→ health() 函数
POST /send ────→ send_message() 函数
健康检查路由
@app.route("/health", methods=["GET"])
async def health():
"""健康检查"""
try:
me = await sender.get_me()
return jsonify({"status": "ok", "user": me})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
拆解:
@app.route("/health", methods=["GET"])- 注册路由async def health():- 异步处理函数jsonify(...)- 把字典转成 JSON 响应500- HTTP 状态码(服务器错误)
HTTP 状态码:
| 状态码 | 含义 |
|---|---|
| 200 | 成功 |
| 400 | 客户端错误 |
| 404 | 未找到 |
| 500 | 服务器错误 |
第四部分:发送消息路由
@app.route("/send", methods=["POST"])
async def send_message():
"""发送消息给单个目标
Body:
{
"target": "@BotUsername",
"text": "/start"
}
"""
data = await request.get_json()
if not data:
return jsonify({"error": "缺少请求体"}), 400
target = data.get("target") or data.get("bot") # 兼容旧参数
text = data.get("text")
if not target or not text:
return jsonify({"error": "缺少 target 或 text 参数"}), 400
result = await sender.send_message(target, text)
if result["success"]:
return jsonify(result)
else:
return jsonify(result), 500
获取请求体:
data = await request.get_json()
# data = {"target": "@BotFather", "text": "/start"}
参数获取:
target = data.get("target") or data.get("bot")
# 如果 target 不存在,尝试用 bot(兼容旧版本)
data.get() vs data["key"]:
# 用 get,键不存在返回 None,不报错
data.get("target") # 返回 None 或值
# 用方括号,键不存在会报错
data["target"] # 可能抛出 KeyError
返回响应:
# 成功,返回 200(默认)
return jsonify(result)
# 失败,返回 500
return jsonify(result), 500
第五部分:批量发送路由
@app.route("/send-batch", methods=["POST"])
async def send_batch():
"""批量发送消息"""
data = await request.get_json()
if not data:
return jsonify({"error": "缺少请求体"}), 400
messages = data.get("messages")
if not messages or not isinstance(messages, list):
return jsonify({"error": "缺少 messages 数组参数"}), 400
results = await sender.send_batch(messages)
success_count = sum(1 for r in results if r["success"])
return jsonify({
"total": len(messages),
"success": success_count,
"failed": len(messages) - success_count,
"results": results
})
isinstance() 函数:
检查变量类型。
isinstance([1, 2, 3], list) # True
isinstance("hello", list) # False
isinstance(123, int) # True
列表推导式统计:
success_count = sum(1 for r in results if r["success"])
等价于:
count = 0
for r in results:
if r["success"]:
count += 1
success_count = count
第六部分:发送并等待回复
@app.route("/send-and-wait", methods=["POST"])
async def send_and_wait():
"""发送消息并等待回复"""
data = await request.get_json()
if not data:
return jsonify({"error": "缺少请求体"}), 400
target = data.get("target") or data.get("bot")
text = data.get("text")
timeout = data.get("timeout", 10.0) # 默认 10 秒
if not target or not text:
return jsonify({"error": "缺少 target 或 text 参数"}), 400
result = await sender.get_bot_response(target, text, timeout)
return jsonify(result)
默认参数:
timeout = data.get("timeout", 10.0)
# 如果 timeout 不存在,使用默认值 10.0
第七部分:定时任务 - 创建
@app.route("/schedule", methods=["POST"])
async def schedule_message():
"""创建定时发送任务"""
data = await request.get_json()
# ... 参数验证 ...
job_id = str(uuid.uuid4())[:8] # 生成 8 位 ID
schedule_type = schedule.get("type", "once")
# 创建触发器
if schedule_type == "once":
dt_str = schedule.get("datetime")
run_date = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
trigger = DateTrigger(run_date=run_date)
elif schedule_type == "interval":
interval = schedule.get("interval")
trigger = IntervalTrigger(seconds=interval)
elif schedule_type == "cron":
cron_expr = schedule.get("cron")
trigger = CronTrigger.from_crontab(cron_expr)
# 添加任务
job = scheduler.add_job(
sender.send_message, # 要执行的函数
trigger=trigger, # 触发器
args=[target, text], # 传递给函数的参数
id=job_id,
name=f"Send to {target}"
)
# 保存任务信息
scheduled_jobs[job_id] = {
"job_id": job_id,
"target": target,
"text": text,
"schedule_type": schedule_type,
"next_run": str(job.next_run_time),
"status": "active"
}
return jsonify({
"success": True,
"job_id": job_id,
"next_run": str(job.next_run_time)
})
UUID 生成:
import uuid
uuid.uuid4() # 生成完整 UUID
# 'd3ec5b21-a957-4b8f-9c2d-123456789abc'
str(uuid.uuid4())[:8] # 取前 8 位
# 'd3ec5b21'
日期解析:
from datetime import datetime
dt_str = "2026-03-29 15:30:00"
run_date = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
# 格式说明:
# %Y - 年(4位)
# %m - 月(01-12)
# %d - 日(01-31)
# %H - 小时(00-23)
# %M - 分钟(00-59)
# %S - 秒(00-59)
调度器添加任务:
job = scheduler.add_job(
func, # 要执行的函数
trigger, # 触发器(决定何时执行)
args=[...], # 位置参数
kwargs={...}, # 关键字参数
id="job_id" # 任务 ID
)
第八部分:定时任务 - 查询和删除
查询任务列表
@app.route("/schedule/list", methods=["GET"])
async def list_scheduled():
"""获取所有定时任务"""
return jsonify({
"jobs": list(scheduled_jobs.values()),
"total": len(scheduled_jobs)
})
字典转列表:
scheduled_jobs = {
"abc1": {"job_id": "abc1", "target": "@BotFather", ...},
"def2": {"job_id": "def2", "target": "@user", ...}
}
list(scheduled_jobs.values())
# [{"job_id": "abc1", ...}, {"job_id": "def2", ...}]
删除任务
@app.route("/schedule/<job_id>", methods=["DELETE"])
async def cancel_scheduled(job_id):
"""取消定时任务"""
if job_id not in scheduled_jobs:
return jsonify({"error": "任务不存在"}), 404
scheduler.remove_job(job_id)
scheduled_jobs[job_id]["status"] = "cancelled"
del scheduled_jobs[job_id]
return jsonify({"success": True, "message": f"任务 {job_id} 已取消"})
URL 参数:
@app.route("/schedule/<job_id>", methods=["DELETE"])
# ↑
# URL 中的动态参数
# 请求:DELETE /schedule/abc123
# job_id = "abc123"
字典操作:
# 检查键是否存在
if job_id in scheduled_jobs:
# 删除键值对
del scheduled_jobs[job_id]
第九部分:其他路由
@app.route("/dialogs", methods=["GET"])
async def get_dialogs():
"""获取对话列表"""
limit = request.args.get("limit", 20, type=int)
dialogs = await sender.get_dialogs(limit)
return jsonify({"dialogs": dialogs})
@app.route("/me", methods=["GET"])
async def get_me():
"""获取当前用户信息"""
me = await sender.get_me()
return jsonify(me)
查询参数:
# URL: /dialogs?limit=10
limit = request.args.get("limit", 20, type=int)
# ↑ ↑
# 默认值 类型转换
第十部分:启动服务
if __name__ == "__main__":
print("启动 Telegram Bot Sender API...")
print()
print("接口文档:")
print(" 消息发送:")
print(" POST /send - 发送单条消息")
print(" POST /send-batch - 批量发送消息")
print(" POST /send-and-wait - 发送并等待回复")
print()
# ... 更多打印 ...
app.run(host="0.0.0.0", port=5000, debug=False)
app.run() 参数:
| 参数 | 说明 |
|---|---|
host |
监听地址,0.0.0.0 表示所有网卡 |
port |
端口号 |
debug |
调试模式,True 会自动重载代码 |
完整架构图
┌─────────────────────────────────────────────────────┐
│ API Server │
│ (api_server.py) │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Routes │ │ Scheduler │ │
│ │ (路由层) │ │ (调度器) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ └────────┬────────┘ │
│ ↓ │
│ ┌───────────────────────────┐ │
│ │ TelegramBotSender │ │
│ │ (bot_sender.py) │ │
│ └─────────────┬─────────────┘ │
│ ↓ │
│ ┌───────────────────────────┐ │
│ │ TelegramClient │ │
│ │ (telethon) │ │
│ └───────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
关键知识点总结
| 概念 | 语法 | 说明 |
|---|---|---|
| 路由装饰器 | @app.route("/path") |
注册 URL 路由 |
| 异步视图 | async def handler(): |
异步处理请求 |
| 获取 JSON | await request.get_json() |
解析请求体 |
| 返回 JSON | jsonify(data) |
转换为 JSON 响应 |
| URL 参数 | @app.route("/<id>") |
获取动态 URL 部分 |
| 查询参数 | request.args.get("key") |
获取 URL ?key=value |
| 生命周期钩子 | @app.before_serving |
服务启动/关闭时执行 |
| 调度器 | scheduler.add_job() |
添加定时任务 |
REST API 设计原则
这个 API 遵循 RESTful 设计:
| 操作 | HTTP 方法 | URL |
|---|---|---|
| 发送消息 | POST | /send |
| 查询任务 | GET | /schedule/list |
| 删除任务 | DELETE | /schedule/{id} |
状态码使用:
200- 成功400- 客户端参数错误404- 资源不存在500- 服务器内部错误
练习
- 添加一个新的路由
GET /status,返回服务状态 - 思考:为什么消息发送用 POST 而不是 GET?
- 尝试添加一个批量取消任务的接口