bot_sender.py 讲解
这个文件包含核心功能类 TelegramBotSender,负责发送消息。
原始代码
import asyncio
from typing import Dict, Any, List, Union
from telethon import TelegramClient
from telethon.sessions import StringSession
from config import API_ID, API_HASH, PROXY
class TelegramBotSender:
_instance = None
def __new__(cls, session_string: str):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.session_string = session_string
cls._instance._client = None
cls._instance._loop = None
return cls._instance
@property
def client(self):
if self._client is None:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._client = TelegramClient(
StringSession(self.session_string),
API_ID,
API_HASH,
proxy=PROXY,
loop=self._loop
)
return self._client
async def connect(self):
if not self.client.is_connected():
await self.client.connect()
async def send_message(
self,
target: str,
text: str
) -> Dict[str, Any]:
"""向单个机器人/用户发送消息"""
await self.connect()
try:
entity = await self.client.get_entity(target)
message = await self.client.send_message(entity, text)
return {
"success": True,
"message_id": message.id,
"target": target,
"text": text,
"timestamp": str(message.date)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"target": target,
"text": text
}
async def send_batch(
self,
messages: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
"""批量发送消息到多个目标"""
Args:
messages: [{"target": "@bot1", "text": "hello"}, {"target": "@bot2", "text": "hi"}]
Returns:
每条消息的发送结果列表
await self.connect()
results = []
for msg in messages:
result = await self.send_message(msg["target"], msg["text"])
results.append(result)
return results
async def get_bot_response(
self,
target: str,
text: str,
timeout: float = 10.0
) -> Dict[str, Any]:
"""发送消息并等待回复"""
await self.connect()
try:
entity = await self.client.get_entity(target)
sent_msg = await self.client.send_message(entity, text)
response = None
async for message in self.client.iter_messages(
entity,
limit=10,
min_id=sent_msg.id
):
sender = message.sender
if sender and sender.bot:
response = message
break
if response:
return {
"success": True,
"sent_message": text,
"response": response.text,
"response_id": response.id,
"target": target
}
else:
return {
"success": True,
"sent_message": text,
"response": None,
"message": "未收到回复",
"target": target
}
except Exception as e:
return {
"success": False,
"error": str(e),
"target": target,
"text": text
}
async def get_dialogs(self, limit: int = 20):
"""获取对话列表"""
await self.connect()
dialogs = []
async for dialog in self.client.iter_dialogs(limit=limit):
entity = dialog.entity
dialogs.append({
"name": dialog.name,
"id": dialog.id,
"username": getattr(entity, 'username', None),
"is_bot": getattr(entity, 'bot', False),
"unread_count": dialog.unread_count
})
return dialogs
async def get_me(self) -> Dict[str, Any]:
"""获取当前用户信息"""
await self.connect()
me = await self.client.get_me()
return {
"id": me.id,
"first_name": me.first_name,
"last_name": me.last_name,
"username": me.username,
"phone": me.phone
}
第一部分:导入
import asyncio
from typing import Dict, Any, List, Union
from telethon import TelegramClient
from telethon.sessions import StringSession
from config import API_ID, API_HASH, PROXY
| 导入 | 说明 |
|---|---|
asyncio |
异步编程模块 |
typing |
类型提示(帮助 IDE 和开发者理解代码) |
TelegramClient |
Telegram 客户端类 |
StringSession |
字符串形式的会话 |
config |
配置文件中的变量 |
类型提示是什么?
from typing import Dict, Any, List
# 表示:参数是字符串列表,返回值是字典
def process(items: List[str]) -> Dict[str, Any]:
...
类型提示让代码更清晰,但 Python 不会强制检查。
第二部分:类定义和单例模式
类的基本结构
class TelegramBotSender:
_instance = None # 类变量
def __new__(cls, session_string: str): # 特殊方法
...
@property # 装饰器
def client(self): # 实例方法
...
async def send_message(self, target: str, text: str): # 普通方法
...
类变量 vs 实例变量
class Dog:
species = "狗" # 类变量,所有实例共享
def __init__(self, name):
self.name = name # 实例变量,每个实例独有
dog1 = Dog("旺财")
dog2 = Dog("来福")
print(dog1.species) # "狗"
print(dog2.species) # "狗"
print(dog1.name) # "旺财"
print(dog2.name) # "来福"
单例模式详解
class TelegramBotSender:
_instance = None # 类变量,存储唯一实例
def __new__(cls, session_string: str):
if cls._instance is None: # 如果还没有实例
cls._instance = super().__new__(cls) # 创建实例
cls._instance.session_string = session_string
cls._instance._client = None
cls._instance._loop = None
return cls._instance # 返回唯一实例
什么是单例模式?
单例模式确保一个类只有一个实例。
第一次创建 → 新建实例 → 保存到 _instance
第二次创建 → 发现已存在 → 返回已有实例
第三次创建 → 发现已存在 → 返回已有实例
__new__ vs __init__
| 方法 | 作用 | 调用时机 |
|---|---|---|
__new__ |
创建对象 | 先调用 |
__init__ |
初始化对象 | 后调用 |
# 普通 __init__
class Person:
def __init__(self, name):
self.name = name
p = Person("张三") # 先 __new__ 创建对象,再 __init__ 初始化
为什么用单例?
Telegram 客户端只需要一个连接,多个实例会浪费资源或导致冲突。
@property 装饰器
@property
def client(self):
if self._client is None:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._client = TelegramClient(
StringSession(self.session_string),
API_ID,
API_HASH,
proxy=PROXY,
loop=self._loop
)
return self._client
@property 是什么?
把方法变成属性访问,更简洁。
# 没有 @property
sender = TelegramBotSender(session)
client = sender.client() # 需要加括号
# 有 @property
client = sender.client # 像访问属性一样
惰性加载(懒加载):
if self._client is None: # 只有第一次访问时才创建
self._client = TelegramClient(...)
return self._client
- 第一次访问
client时才创建 TelegramClient - 之后直接返回已创建的实例
- 节省资源,只在需要时才创建
第三部分:方法详解
connect 方法
async def connect(self):
if not self.client.is_connected():
await self.client.connect()
逻辑:
- 检查是否已连接
- 如果没连接,才进行连接
这是"懒连接"模式:只在需要时才连接。
send_message 方法
async def send_message(
self,
target: str,
text: str
) -> Dict[str, Any]:
"""向单个机器人/用户发送消息"""
await self.connect()
try:
entity = await self.client.get_entity(target)
message = await self.client.send_message(entity, text)
return {
"success": True,
"message_id": message.id,
"target": target,
"text": text,
"timestamp": str(message.date)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"target": target,
"text": text
}
流程:
连接服务器
↓
获取目标实体(用户/群组/频道)
↓
发送消息
↓
返回成功结果
↓
(如果出错)捕获异常,返回失败结果
返回值设计:
无论成功还是失败,都返回字典,包含 success 字段:
# 成功
{"success": True, "message_id": 123, ...}
# 失败
{"success": False, "error": "错误信息", ...}
这种设计让调用者很容易判断结果。
send_batch 方法
async def send_batch(
self,
messages: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
"""批量发送消息到多个目标"""
await self.connect()
results = []
for msg in messages:
result = await self.send_message(msg["target"], msg["text"])
results.append(result)
return results
拆解:
messages是一个列表,每个元素是{"target": "...", "text": "..."}- 用
for循环遍历每个消息 - 调用
send_message发送 - 把结果追加到
results列表 - 返回所有结果
示例:
messages = [
{"target": "@BotFather", "text": "hello"},
{"target": "@userbot", "text": "hi"}
]
results = await sender.send_batch(messages)
# [{"success": True, ...}, {"success": True, ...}]
get_bot_response 方法
async def get_bot_response(
self,
target: str,
text: str,
timeout: float = 10.0
) -> Dict[str, Any]:
"""发送消息并等待回复"""
await self.connect()
try:
entity = await self.client.get_entity(target)
sent_msg = await self.client.send_message(entity, text)
response = None
async for message in self.client.iter_messages(
entity,
limit=10,
min_id=sent_msg.id
):
sender = message.sender
if sender and sender.bot:
response = message
break
...
async for 是什么?
异步迭代器,用于遍历异步数据流。
# 普通 for
for item in items:
...
# 异步 for(用于异步生成器)
async for item in async_items:
...
iter_messages 参数:
limit=10- 最多获取 10 条消息min_id=sent_msg.id- 只获取 ID 比发送消息大的(即之后的)
查找回复的逻辑:
发送消息
↓
获取之后的消息
↓
遍历这些消息
↓
找到第一条来自机器人的消息
↓
返回这个回复
get_dialogs 方法
async def get_dialogs(self, limit: int = 20):
"""获取对话列表"""
await self.connect()
dialogs = []
async for dialog in self.client.iter_dialogs(limit=limit):
entity = dialog.entity
dialogs.append({
"name": dialog.name,
"id": dialog.id,
"username": getattr(entity, 'username', None),
"is_bot": getattr(entity, 'bot', False),
"unread_count": dialog.unread_count
})
return dialogs
getattr() 函数:
安全地获取对象属性,如果不存在则返回默认值。
# 等价于 entity.username,但如果属性不存在不会报错
username = getattr(entity, 'username', None)
# 相当于:
try:
username = entity.username
except AttributeError:
username = None
get_me 方法
async def get_me(self) -> Dict[str, Any]:
"""获取当前用户信息"""
await self.connect()
me = await self.client.get_me()
return {
"id": me.id,
"first_name": me.first_name,
"last_name": me.last_name,
"username": me.username,
"phone": me.phone
}
get_me()是 Telethon 提供的方法- 返回当前登录用户的信息
- 我们把它转成字典返回
关键知识点总结
| 概念 | 语法 | 说明 |
|---|---|---|
| 类定义 | class Name: |
定义一个类 |
__new__ |
def __new__(cls, ...) |
控制实例创建 |
| 单例模式 | _instance 类变量 |
确保只有一个实例 |
@property |
装饰器 | 把方法变成属性 |
| 惰性加载 | if self._x is None |
延迟创建资源 |
| 类型提示 | -> Dict[str, Any] |
说明返回类型 |
| 异步方法 | async def |
可以使用 await |
getattr() |
getattr(obj, 'attr', default) |
安全获取属性 |
练习
- 理解单例模式:为什么 Telegram 客户端需要单例?
- 理解
@property:它有什么好处? - 尝试添加一个新方法
get_chat_history(target, limit=10)