本文最后更新于 7 个月前,文中所描述的信息可能已发生改变。
WARNING注意
本文内容已过时, 敬请期待后续更新
前言
为了激发群友们水群的积极性, 为了学习和应用 python 知识,我最近在开发一个 telegram 的 bot,所以写这篇文章以记录和分享
在这篇文章中,将使用 Python-Telegram-Bot 最新版库,基于 Python 的新版异步特性与 Telegram 友好开放的 API,开发一个兼顾实用性和趣味的 bot ,并使用 Docker 在任何地方部署 bot
Demo: kmua-bot
本文不是从零开始的教程,阅读本文前,你需要具有一点点(真的很少一点)的 python 或其他语言编程的基础。
准备
环境
使用你喜欢的工具创建虚拟环境,并安装以下 Python 库
PyYAML
python-telegram-bot
pypinyin
parsel
playwright
PyYAML
python-telegram-bot
pypinyin
parsel
playwright
- PyYAML: yaml 配置文件相关包
- python-telegram-bot: 一个对 Telegram bot api 封装的 python 库,注意版本,默认安装的是 13.14 ,要手动指定版本安装最新开发版本(截止写稿是 20.0a6 ),以支持异步特性。
更新: 2023-02-21: python-telegram-bot 已经默认安装最新版本了,不需要手动指定版本了
- pypinyin: 汉语拼音相关
- parsel: 数据解析
- pyppeteer: 浏览器爬虫包,puppeteer 的 python 移植(问我为啥不用更好的 playwright-python ?被简中互联网上的文章坑了,以后会迁移过去。) - playwright: 浏览器爬虫
更新: 2023-03-06: 用 playwright 重构
bot 申请
私聊 @BotFather。发送 /newbot
,根据提示一步步创建,记得妥善保存最后的 API Token。
获取你的 id
每个 tg 用户都有一串标识数字,即为 user_id,可以私聊 @userinfobot 来获取它
基础
开始:响应/start
在项目文件夹内,新建 bot.py
,开始编写 bot 最基础的功能,让其响应 /start 命令
首先,导入包(先不要管它们都是什么鬼东西)
# Update:从Telegram获取更新
from telegram import Update
#ApplicationBuilder:简单立即为构建 bot 对象
#ContextTypes:上下文类型
#CommandHandler:命令处理器
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler
# Update:从Telegram获取更新
from telegram import Update
#ApplicationBuilder:简单立即为构建 bot 对象
#ContextTypes:上下文类型
#CommandHandler:命令处理器
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler
然后,写一个异步函数 start(),当收到 /start 命令时,要调用它。
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应start命令'''
text = '你好~我是一个bot'
await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应start命令'''
text = '你好~我是一个bot'
await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
这个函数接受两个参数 update
和 context
,形参冒号后是类型注解。这两个参数 CommandHandler
会传递给它。
context.bot.send_message()
方法即为让 bot 发送消息,它能接受的参数其实很多,但是往往只需要 chat_id
,和 text
就够了,它们分别表示 要发送消息给的用户或群组 id ,要发送的文本。
而 update.effective_chat.id
即为当前有效对话的 id ,在哪里发消息给 bot ,它就指向哪里的 id。
你注意到 这个函数使用了 async/await 语法,这是 python 新版的语法糖,详见 官方文档 asyncio
现在,添加一个处理器(handler)
start_handler = CommandHandler('start', start)
start_handler = CommandHandler('start', start)
CommandHandler
类可以实现当收到某个命令时,调用某个函数(命令和函数名可以不一样),我们将其实例化为了 start_handler
,并且将命令名字 'start' 和对应要回调的函数名 start 传递给它
然后,启动bot
# 构建 bot
TOEKN='你 bot 的 api token'
application = ApplicationBuilder().token(TOKEN).build()
# 注册 handler
application.add_handler(start_handler)
# run!
application.run_polling()
# 构建 bot
TOEKN='你 bot 的 api token'
application = ApplicationBuilder().token(TOKEN).build()
# 注册 handler
application.add_handler(start_handler)
# run!
application.run_polling()
最后,就可以使用 python bot.py
启动你的 bot ,对 bot 发送 /start ,它应该就会回复你 ”你好~我是一个bot”。 使用 Ctrl + C 结束程序运行。
到这里,其实你已经了解到了最基本的 telegram bot 编写规则,即:
- 编写回调函数。上面的例子中即为
start
这个函数。 - 决定调用回调函数的规则。上述例子中,即为收到
/start
命令时,调用start
函数 - 实例化
handler
,注册给application
示例。上述例子中,即为application.add_handler(start_handler)
如果你的系统无法访问 Telegram 的服务器,可以设置代理
全局代理,整个程序都走代理:
os.environ['http_proxy'] = '代理地址'
os.environ['https_proxy'] = '代理地址'
os.environ['http_proxy'] = '代理地址'
os.environ['https_proxy'] = '代理地址'
若想仅为 telegram bot api 设置代理,则需要改动一下构建 bot 部分的代码:
http(s)代理:
proxy_url = 'http://USERNAME:PASSWORD@PROXY_HOST:PROXY_PORT' # can also be a https proxy
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
proxy_url = 'http://USERNAME:PASSWORD@PROXY_HOST:PROXY_PORT' # can also be a https proxy
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
socks5代理,需要安装 python-telegram-bot[socks]
,然后:
proxy_url = "socks5://user:pass@host:port"
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
proxy_url = "socks5://user:pass@host:port"
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
接下来,写一些更有趣的功能,使用更复杂一些的规则来调用这些功能。
授予群成员头衔
让群成员可以通过 bot 自助获得一个头衔吧,比如,群友可以在群里使用 /p@botname 好人
来给自己加上 “好人” 的头衔。
当然,前提是 bot 自己要是管理员,并且权限全开。
要实现这个功能,可以这样写:
async def set_right(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''设置成员权限和头衔'''
chat_id = update.effective_chat.id
user_id = update.effective_user.id
bot_username_len = len(update._bot.name)
custom_title = update.effective_message.text[3+bot_username_len:]
if not custom_title:
custom_title = update.effective_user.username
try:
await context.bot.promote_chat_member(chat_id=chat_id, user_id=user_id, can_manage_chat=True)
await context.bot.set_chat_administrator_custom_title(chat_id=chat_id, user_id=user_id, custom_title=custom_title)
text = f'好,你现在是{custom_title}啦'
await context.bot.send_message(chat_id=chat_id, reply_to_message_id=update.effective_message.id, text=text)
except:
await context.bot.send_message(chat_id=chat_id, text='不行!!')
async def set_right(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''设置成员权限和头衔'''
chat_id = update.effective_chat.id
user_id = update.effective_user.id
bot_username_len = len(update._bot.name)
custom_title = update.effective_message.text[3+bot_username_len:]
if not custom_title:
custom_title = update.effective_user.username
try:
await context.bot.promote_chat_member(chat_id=chat_id, user_id=user_id, can_manage_chat=True)
await context.bot.set_chat_administrator_custom_title(chat_id=chat_id, user_id=user_id, custom_title=custom_title)
text = f'好,你现在是{custom_title}啦'
await context.bot.send_message(chat_id=chat_id, reply_to_message_id=update.effective_message.id, text=text)
except:
await context.bot.send_message(chat_id=chat_id, text='不行!!')
然后,添加这个功能的handler:
set_right_handler = CommandHandler('p', set_right)
application.add_handler(set_right_handler)
set_right_handler = CommandHandler('p', set_right)
application.add_handler(set_right_handler)
注意handler的添加应该在 application.run_polling()
之前,也就是说 application.run_polling()
才是真正开始运行 bot
简单解释一下 set_right
这个函数:
首先,设置了调用这个函数时的 chat_id
和 user_id
为当前聊天和当前用户
然后用 len(update._bot.name)
获取 bot 自己用户名的长度,用于下面的切片选取用户想要的头衔
而 custom_title = update.effective_message.text[3+bot_username_len:]
即获取用户想要的头衔,存储在 custom_title
中,下面的 if not 的作用是,如果用户没有发送他想要的头衔,那么默认头衔为他的用户名
由于只有管理员才有头衔,所以我们使用 promote_chat_member() 方法设置成员权限,将其的 can_manage_chat 权限设置为 True 。注意,即使这样设置了,用户实则是有名无权的状态,只有一个管理员的名头,实际上没有任何权限。
要想赋予权限,你可以向 promote_chat_member() 方法中继续添加参数,如 can_pin_messages=True 可置顶消息
然后就可以使用 set_chat_administrator_custom_title() 方法设置成员头衔了,如果成功了,会继续执行接下来的语句,将结果反馈给用户。
响应未知命令
如果 bot 只能回应设置好的命令就太无趣了,所以再编写一个当收到未知命令时的执行功能 比如,想要让 bot 回应未知命令说 “我不会这个哦~”,可以这样写
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应未知命令'''
logger.debug('调用:unknown')
text = "我不会这个哦~"
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应未知命令'''
logger.debug('调用:unknown')
text = "我不会这个哦~"
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
然后,添加它的 handler
。这时候可能就出现问题: 前面的两个功能都是回应一个特定的命令,在这里,我们希望它回应除已经设定的命令外的其他所有命令,该怎么做?
这就需要用到 filters
,并且要设置功能的优先级 你需要先导入它
from telegram.ext import filters
然后可以这样写:
unknown_handler = MessageHandler(filters.COMMAND, unknown)
unknown_handler = MessageHandler(filters.COMMAND, unknown)
filters
即过滤器,而 filters.COMMAND
即为所有命令。而设置优先级其实非常简单,添加 handler
的顺序越靠后,对应功能的优先级越低
也就是说,我们需要在其他命令类 handler
之后,添加 unknown_handler
application.add_handler(start_handler)
application.add_handler(set_right_handler)
application.add_handler(unkonw_handler)
application.run_polling()
application.add_handler(start_handler)
application.add_handler(set_right_handler)
application.add_handler(unkonw_handler)
application.run_polling()
简单的关键词回复
bot 最常见的功能之一就是根据关键词回复特定内容,比如,当对 bot 说早安之类的话时,让 bot 对此做出回应。下面来实现这这一功能 由于命令和普通消息是两种不同的消息类,处理它们的方法是不一样的,针对非命令消息,要使用 MessageHandler
: from telegram.ext import MessageHandler
首先还是要编写这个功能执行的函数。
import random
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
texts = ['早上好呀','我的小鱼你醒了,还记得清晨吗','哦哈哟~']
await context.bot.send_message(chat_id=update.effective_chat.id, text=random.choice(text))
import random
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
texts = ['早上好呀','我的小鱼你醒了,还记得清晨吗','哦哈哟~']
await context.bot.send_message(chat_id=update.effective_chat.id, text=random.choice(text))
由于只回复一个固定的消息过于单调,所以我们使用 random
库来从一个列表中随机选择一个回复
实际上,这个函数仍然只是简单的发送消息,而何时调用这个函数,并不是这个函数本身要做的事,而是由接下来我们要编写的 MessageHandler
中的 filters
决定的:
filter_ohayo = filters.Regex('早安|早上好|哦哈哟|ohayo')
ohayo_handler = MessageHandler(filter_ohayo, ohayo)
filter_ohayo = filters.Regex('早安|早上好|哦哈哟|ohayo')
ohayo_handler = MessageHandler(filter_ohayo, ohayo)
filters.Regex()
即为使用正则表达式过滤,我们使用 '早安|早上好|哦哈哟|ohayo'
这个表达式,很容易理解何时会调用 ohayo
这个函数
记得添加它的 handler
application.add_handler(ohayo_handler)
application.add_handler(ohayo_handler)
进阶
定制日志系统
在上面写的一些小功能中,我们没有让程序在控制台或者文件里输出任何信息,只有当报错的时候python会输出一些traceback,这在以后排查故障的时候是极其不便的(毕竟谁都不能保证自己的程序不出bug,或是遇到自己没有考虑到的情况).所以有必要加上一个简单的日志系统.
在项目下新建一个 logger.py
文件,用来定制日志系统.
比如,我们想要根据日志等级让日志输出到两份文件,一份是 normal , 包括 INFO 以上的日志,另一份是 debug , 包括 DEBUG 以上的日志,而 INFO 以上的日志还同时输出到控制台,那么可以这样:
import logging
import os
from logging import handlers
class Logger:
def __init__(self, name: str, show: bool, save: bool = True, debug: bool = False) -> None:
"""
日志系统
:param name: 日志系统实例名
:param show: 是否显示在控制台
:param save: 是否保存到文件, defaults to True
:param debug: debug模式, defaults to False
"""
# 日志文件路径
normal_log_path = f'logs/normal.log'
debug_log_path = f'logs/debug.log'
if not os.path.exists('./logs'):
os.mkdir('./logs')
self.logger = logging.getLogger(name)
# 设置日志等级和格式
self.logger.setLevel(logging.DEBUG)
self.formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 设置相关 handler
if not self.logger.handlers:
if show:
# 控制台 handler
sh = logging.StreamHandler()
if debug:
sh.setLevel(logging.DEBUG)
else:
sh.setLevel(logging.INFO)
sh.setFormatter(self.formatter)
self.logger.addHandler(sh)
if save:
# 保存到文件的 handler
fh_debug = handlers.TimedRotatingFileHandler(
filename=debug_log_path,
when="D",
interval=1,
backupCount=3,
encoding='utf-8'
) # 自动日志切割
fh_debug.setLevel(logging.DEBUG)
fh_debug.setFormatter(self.formatter)
fh = handlers.TimedRotatingFileHandler(
filename=normal_log_path,
when="D",
interval=1,
backupCount=3,
encoding='utf-8'
)
fh.setLevel(logging.INFO)
fh.setFormatter(self.formatter)
self.logger.addHandler(fh)
self.logger.addHandler(fh_debug)
def debug(self, message):
self.logger.debug(message)
def info(self, message):
self.logger.info(message)
def warn(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def critical(self, message):
self.logger.critical(message)
import logging
import os
from logging import handlers
class Logger:
def __init__(self, name: str, show: bool, save: bool = True, debug: bool = False) -> None:
"""
日志系统
:param name: 日志系统实例名
:param show: 是否显示在控制台
:param save: 是否保存到文件, defaults to True
:param debug: debug模式, defaults to False
"""
# 日志文件路径
normal_log_path = f'logs/normal.log'
debug_log_path = f'logs/debug.log'
if not os.path.exists('./logs'):
os.mkdir('./logs')
self.logger = logging.getLogger(name)
# 设置日志等级和格式
self.logger.setLevel(logging.DEBUG)
self.formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 设置相关 handler
if not self.logger.handlers:
if show:
# 控制台 handler
sh = logging.StreamHandler()
if debug:
sh.setLevel(logging.DEBUG)
else:
sh.setLevel(logging.INFO)
sh.setFormatter(self.formatter)
self.logger.addHandler(sh)
if save:
# 保存到文件的 handler
fh_debug = handlers.TimedRotatingFileHandler(
filename=debug_log_path,
when="D",
interval=1,
backupCount=3,
encoding='utf-8'
) # 自动日志切割
fh_debug.setLevel(logging.DEBUG)
fh_debug.setFormatter(self.formatter)
fh = handlers.TimedRotatingFileHandler(
filename=normal_log_path,
when="D",
interval=1,
backupCount=3,
encoding='utf-8'
)
fh.setLevel(logging.INFO)
fh.setFormatter(self.formatter)
self.logger.addHandler(fh)
self.logger.addHandler(fh_debug)
def debug(self, message):
self.logger.debug(message)
def info(self, message):
self.logger.info(message)
def warn(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def critical(self, message):
self.logger.critical(message)
然后在 bot.py
里导入并实例化它
from logger import Logger
logger = Logger(name='bot', show=True)
from logger import Logger
logger = Logger(name='bot', show=True)
之后就可以在任意地方使用日志记录并输出一些信息,比如在 start()
函数里:
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应start命令'''
logger.info(f'收到来自{update.effective_chat.username}的/start指令')
text = '你好~我是一个bot'
await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''响应start命令'''
logger.info(f'收到来自{update.effective_chat.username}的/start指令')
text = '你好~我是一个bot'
await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
在其他任意你觉得需要日志的地方都可以使用.
使用 yml 文件配置
把配置直接写进代码里也太不优雅了些,而且对于其他普通用户来说,还可能会因为写错等原因直接破坏程序,所以需要把配置都搬到文件里. yml 是常见的配置文件格式,也比较简单.
后面我们还可能会需要很多小工具,所以新建一个 utils.py
,把各种辅助的方法都放里面.
导入yaml库,os库和我们的日志系统
import yaml
import os
from logger import Logger
import yaml
import os
from logger import Logger
然后
class Utils:
'''该类用于设置一些辅助的方法'''
def __init__(self) -> None:
logger.debug('实例化Utils')
pass
def read_config(self,config_name: str) -> dict:
'''读取配置'''
config_path = os.path.abspath(os.path.join(os.getcwd(), config_name))
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
logger.debug(f'已载入配置{config_name}')
return config
class Utils:
'''该类用于设置一些辅助的方法'''
def __init__(self) -> None:
logger.debug('实例化Utils')
pass
def read_config(self,config_name: str) -> dict:
'''读取配置'''
config_path = os.path.abspath(os.path.join(os.getcwd(), config_name))
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
logger.debug(f'已载入配置{config_name}')
return config
然后就可以在yml中写配置,比如 config.yml
:
# bot的token,必填
token: ''
#全局代理设置,如不需设置请留空
proxy: ''
# bot的token,必填
token: ''
#全局代理设置,如不需设置请留空
proxy: ''
需要读取这些配置时,先导入上面写的 utils
模块并实例化 Utils 类,然后调用 read_config
方法即可:
from utils import Utils
utils = Utils
config = utils.read_config('config.yml')
from utils import Utils
utils = Utils
config = utils.read_config('config.yml')
这里得到的 config 变量储存了我们的配置,它是一个字典.比如要在代理设置里使用它,只需要这样:
if config['proxy']:
os.environ['http_proxy'] = config['proxy']
os.environ['https_proxy'] = config['proxy']
if config['proxy']:
os.environ['http_proxy'] = config['proxy']
os.environ['https_proxy'] = config['proxy']
还有构建 bot 时:
TOKEN=config['token']
application = ApplicationBuilder().token(TOKEN).build()
TOKEN=config['token']
application = ApplicationBuilder().token(TOKEN).build()
其实这种方法也是不太优雅的,因为可能会多次读取配置文件,更好的方法是把配置写成一个模块,然后导入它们(单例模式).
使用 json 存取词库
关键词回复是 bot 很常见的功能,这时候就会用到词库.
新建 data/words
文件夹,用来存储我们的数据.下面是一个简单的词库 ohayo.json
示例(早上问候场景)
[
"我的小鱼你醒了,还认识早晨吗",
"昨夜你曾经说,愿夜幕永不开启",
"我的小鱼你睡醒了吗,还记得夜晚吗",
"早晨你曾说过,愿黎明曙光永不落下",
"早上好哦~",
"哦哈哟喵~",
"众生皆小饼干,你是最甜的那一块~",
"今天也是新的一天~",
"你醒啦?(歪头)",
"啊呜~你已经起床了嘛(打哈欠)"
]
[
"我的小鱼你醒了,还认识早晨吗",
"昨夜你曾经说,愿夜幕永不开启",
"我的小鱼你睡醒了吗,还记得夜晚吗",
"早晨你曾说过,愿黎明曙光永不落下",
"早上好哦~",
"哦哈哟喵~",
"众生皆小饼干,你是最甜的那一块~",
"今天也是新的一天~",
"你醒啦?(歪头)",
"啊呜~你已经起床了嘛(打哈欠)"
]
写一个比较通用的方法,用来反序列化json词库文件.把这个方法写进上面我们创建过的 Utils
类里即可.
def load_words(self, words: str):
'''读取并加载词库'''
words_path = os.path.join(
os.getcwd(), 'data', 'words', f'{words}.json')
try:
with open(words_path, 'r',encoding='utf-8') as f:
the_words_json = json.load(f)
logger.debug(f'已载入词库:{words_path}')
return the_words_json
except Exception as e:
logger.error(f'载入词库出错:{words_path}, {e}')
return {'Exception': 'except'}
def load_words(self, words: str):
'''读取并加载词库'''
words_path = os.path.join(
os.getcwd(), 'data', 'words', f'{words}.json')
try:
with open(words_path, 'r',encoding='utf-8') as f:
the_words_json = json.load(f)
logger.debug(f'已载入词库:{words_path}')
return the_words_json
except Exception as e:
logger.error(f'载入词库出错:{words_path}, {e}')
return {'Exception': 'except'}
这个方法接受一个词库文件名(不带后缀),然后从 data/words
读取对应的词库,并返回为 python 数据结构
这时候代码量可能已经比较多了,为了方便组织和管理,我把除了 bot.py
的其他 py 文件统一放到了 src
文件夹下.
然后创建一个用于从词库中取句子的类 Getwords
(因为词库也可能不止一种,为了之后方便扩展所以单独写一个类)
在 src/words.py 中:
import random
from .utils import Utils
from .logger import Logger
# 实例化类
logger = Logger(name='words',show=True)
utils = Utils()
# 读取 ohayo.json 这个词库
ohayo_words = utils.load_words('ohayo')
class GetWords():
def __init__(self):
"""
该类用于提取消息特定文本,或是返回特定文本
"""
logger.debug('实例化GetWords')
self.ohayo = ohayo_words
def get_ohayo(self) -> str:
'''返回早安问候句子'''
word = random.choice(self.ohayo)
logger.debug(f'获取到早安:{word}')
return word
import random
from .utils import Utils
from .logger import Logger
# 实例化类
logger = Logger(name='words',show=True)
utils = Utils()
# 读取 ohayo.json 这个词库
ohayo_words = utils.load_words('ohayo')
class GetWords():
def __init__(self):
"""
该类用于提取消息特定文本,或是返回特定文本
"""
logger.debug('实例化GetWords')
self.ohayo = ohayo_words
def get_ohayo(self) -> str:
'''返回早安问候句子'''
word = random.choice(self.ohayo)
logger.debug(f'获取到早安:{word}')
return word
然后修改 bot.py
中的早安问候功能:
# 导入 Getwords 类
from src.words import GetWords
# 实例化
getWords = GetWords()
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
text=getWords.get_ohayo()
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
# 导入 Getwords 类
from src.words import GetWords
# 实例化
getWords = GetWords()
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
text=getWords.get_ohayo()
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
再次强调一下,何时触发响应的功能应该交给过滤器去做,我们这里的 ohayo()
函数只需要关心功能触发后应该做什么就可以了.
使用 docker 部署 bot
python 程序的环境管理很烦人,用docker来跑再合适不过.
本地构建
如果要在本地构建 docker 镜像,参考下面的 Dockerfile
文件
FROM python:3.9
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT [ "python","/app/bot.py" ]
FROM python:3.9
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT [ "python","/app/bot.py" ]
将 Dockerfile 文件放到合适的目录(项目根目录即可),然后执行
docker build -t bot .
注意后面有一个 .
关于 docker 构建镜像的更多帮助,可以参阅 docker 从入门到实践
然后使用 docker run -d bot
启动容器,运行 bot
使用 GitHub action 自动构建
github action 可以构建 docker 镜像并发布到 ghcr,方便在其他地方部署 docker 容器.你依然需要在项目中写好 Dockerfile
在项目中新建 .github/workflows/build-docker.yml
,参考以下配置
name: Build and publish docker container
on:
workflow_dispatch:
jobs:
publish:
name: Publish container image
runs-on: ubuntu-20.04
env:
TZ: Asia/Shanghai
steps:
- name: Checkout
uses: actions/checkout@v3
- name: OCI meta
id: meta
uses: docker/[email protected]
with:
images: ghcr.io/${{ github.repository }}
tags: |
type=edge,branch=main
type=ref,event=branch
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Login to GHCR
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max
name: Build and publish docker container
on:
workflow_dispatch:
jobs:
publish:
name: Publish container image
runs-on: ubuntu-20.04
env:
TZ: Asia/Shanghai
steps:
- name: Checkout
uses: actions/checkout@v3
- name: OCI meta
id: meta
uses: docker/[email protected]
with:
images: ghcr.io/${{ github.repository }}
tags: |
type=edge,branch=main
type=ref,event=branch
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Login to GHCR
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max
然后可以使用 docker pull
的方法运行,或者使用 docker compose:
新建 docker-compose.yml
,参考以下内容
version: "3"
services:
kmua:
image: ghcr.io/krau/kmua-bot:main
container_name: kmua-main
init: true
volumes:
- ./data:/kmua/data
- ./logs:/kmua/logs
- ./config.yml:/kmua/config.yml
environment:
- TZ=Asia/Shanghai
version: "3"
services:
kmua:
image: ghcr.io/krau/kmua-bot:main
container_name: kmua-main
init: true
volumes:
- ./data:/kmua/data
- ./logs:/kmua/logs
- ./config.yml:/kmua/config.yml
environment:
- TZ=Asia/Shanghai
注意根据自己需要更改
然后就可以使用 docker compose up -d
启动容器
附加
使用playwright查询mc模组信息并记录
playwright 是一个浏览器自动化工具包,如果你了解过 selenium 那么你一定知道它是什么.但是 playwright 是异步的
我们想要实现这样的功能: 当 bot 收到包含 mcmod 页面链接的消息时,去主动打开链接,收集模组信息并截图,然后发送模组信息给用户.并且持久化保存这些信息,当再次收到同样的链接时,直接从已保存的数据中获取并返回给用户.
新建 src/mcmod.py
. 以下是一个例子,它获取了模组的名称并且截图网页,保存这些数据
import re
import os
import json
from playwright.async_api import async_playwright
from src.logger import Logger
logger = Logger(name='McMod', show=True)
class McMod:
def __init__(self, width: int = 1280, height: int = 1080) -> None:
"""
参数设定
:param width: 页面宽度, defaults to 1280
:param height: 页面高度, defaults to 1080
"""
logger.debug('实例化McMod')
self.args = ['--headless', '--no-sandbox',
'--disable-gpu', '--hide-scrollbars']
self.options = {'args': self.args, 'defaultViewport': {
'width': width, 'height': height},'dumpio':True}
async def screenshot(self, mod_url: str, width: int = 1280, height: int = 720, close: bool = True) -> dict:
"""
获取网页截屏
:param url: 网址
:param width: 页面宽度, defaults to 1280
:param height: 页面高度, defaults to 720
:param close: 完成后是否关闭网页, defaults to True
:return: 字典{'file_name':截图文件名,'cn_name':中文名,
'en_name':英文名,'full_name':格式化后的全名,'mod_url':模组链接}
"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_viewport_size({'width': width, 'height': height})
await page.goto(mod_url)
await page.wait_for_selector('.class-title')
class_title = await page.query_selector('.class-title')
try:
h4 = await class_title.query_selector('h4')
en_name = await h4.get_property('innerText')
en_name = await en_name.json_value()
try:
h3 = await class_title.query_selector('h3')
cn_name = await h3.get_property('innerText')
cn_name = await cn_name.json_value()
except:
cn_name = ''
except:
en_name = ''
cn_name = ''
file_name = re.sub(r'[^a-zA-Z]', '', en_name) + '.png' # 以英文模组名(去除非法字符)保存截屏文件
full_name = f'{cn_name} {en_name}' # 为了少开一次浏览器,干脆把模组名也顺便获取并返回
if not os.path.exists(f'./data/pics/{file_name}'):
await page.screenshot(path=f'./data/pics/{file_name}')
if close:
await page.close()
await browser.close()
record_flag = self.mod_data_record(mod_cn_name=cn_name,mod_en_name=en_name,mod_file_name=file_name,mod_full_name=full_name,mod_url=mod_url)
if record_flag:
data_dict = self.mod_data_read(mod_url=mod_url)
return data_dict
else:
logger.error(f'未能记录模组 {mod_url}')
return {}
except Exception as e:
logger.error(f'获取截屏 {mod_url} 失败!')
logger.error(f'错误:{e}')
if close:
await page.close()
await browser.close()
return {}
async def get_mod_name(self, url: str, lang: str = 'full', close: bool = True) -> str:
"""
获取模组名
:param url: mcmod的模组页面
:param lang: 要获取的模组名语言, defaults to 'full'
:param close: 完成后是否关闭网页, defaults to True
:return:
"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url)
await page.wait_for_selector('.class-title')
class_title = await page.query_selector('.class-title')
try:
h4 = await class_title.query_selector('h4')
en_name = await h4.get_property('innerText')
en_name = await en_name.json_value()
try:
h3 = await class_title.query_selector('h3')
cn_name = await h3.get_property('innerText')
cn_name = await cn_name.json_value()
except:
cn_name = ''
except:
en_name = ''
cn_name = ''
full_nm = f'{cn_name} {en_name}'
if close:
await page.close()
await browser.close()
if lang == 'full':
return full_nm
elif lang == 'cn':
return cn_name
elif lang == 'en':
return en_name
elif lang == 'dict':
name_dict = {'cn_name':cn_name,'en_name':en_name,'full_name':full_nm}
return name_dict
else:
logger.debug(f'lang不能为{lang}')
return ''
except Exception as e:
logger.error(f'获取模组名称 {url} 失败!')
logger.error(f'错误:{e}')
return ''
def mod_data_record(self, mod_file_name: str,mod_full_name: str, mod_url: str, mod_cn_name: str='', mod_en_name: str='', mod_pic_path: str = '') ->bool:
"""
以json文件存储模组的信息,变量名定义与其他函数中一致
:param mod_file_name: 模组去除非法字符(包括空格)的英文全名+后缀名
:param mod_full_name: 格式化后的全名, ex: 植物魔法|Botania
:param mod_url: 模组链接
:param mod_cn_name: 模组中文名, defaults to ''
:param mod_en_name: 模组英文名, defaults to ''
:param mod_pic_path: 模组页面截图路径, defaults to ''
:return: 记录成功返回True,否则False
"""
logger.debug('调用:McMod.mod_data_record')
try:
logger.debug(f'记录模组数据:{mod_cn_name}')
mods_data_path = './data/mods_data.json'
if not os.path.exists(mods_data_path):
with open(mods_data_path, 'w',encoding='utf-8') as f:
json.dump({}, f,ensure_ascii=False)
logger.debug(f'未找到数据文件路径,已新建')
if not mod_pic_path:
mod_pic_path = f'./data/pics/{mod_file_name}'
logger.debug(f'未传入pic_path,使用默认:{mod_pic_path}')
mod_data = {mod_url: {'file_name': mod_file_name, 'cn_name': mod_cn_name,
'en_name': mod_en_name, 'full_name': mod_full_name, 'mod_url': mod_url, 'pic_path': mod_pic_path}}
logger.debug(f'模组数据:{mod_data}')
with open(mods_data_path,'r',encoding='utf-8') as f:
data_content = json.load(f)
data_content.update(mod_data)
with open(mods_data_path,'w',encoding='utf-8') as f:
json.dump(data_content,f,indent=4,ensure_ascii=False)
logger.info(f'已记录模组数据:{mod_data}')
return True
except Exception as e:
logger.error(f'记录模组数据 {mod_url} 错误!')
logger.error(f'错误:{e}')
return False
def mod_data_read(self,mod_url:str) -> dict:
try:
logger.debug(f'读取模组数据:{mod_url}')
mods_data_path = './data/mods_data.json'
with open(mods_data_path,'r',encoding='utf-8') as f:
data = json.load(f)
if mod_url in data:
full_name = data[mod_url].get('full_name')
logger.info(f'该模组数据已记录:{full_name}')
return data[mod_url]
else:
logger.debug('该模组数据未记录')
return False
except Exception as e:
logger.error(f'读取模组数据 {mod_url} 错误!')
logger.error(f'错误:{e}')
return False
import re
import os
import json
from playwright.async_api import async_playwright
from src.logger import Logger
logger = Logger(name='McMod', show=True)
class McMod:
def __init__(self, width: int = 1280, height: int = 1080) -> None:
"""
参数设定
:param width: 页面宽度, defaults to 1280
:param height: 页面高度, defaults to 1080
"""
logger.debug('实例化McMod')
self.args = ['--headless', '--no-sandbox',
'--disable-gpu', '--hide-scrollbars']
self.options = {'args': self.args, 'defaultViewport': {
'width': width, 'height': height},'dumpio':True}
async def screenshot(self, mod_url: str, width: int = 1280, height: int = 720, close: bool = True) -> dict:
"""
获取网页截屏
:param url: 网址
:param width: 页面宽度, defaults to 1280
:param height: 页面高度, defaults to 720
:param close: 完成后是否关闭网页, defaults to True
:return: 字典{'file_name':截图文件名,'cn_name':中文名,
'en_name':英文名,'full_name':格式化后的全名,'mod_url':模组链接}
"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_viewport_size({'width': width, 'height': height})
await page.goto(mod_url)
await page.wait_for_selector('.class-title')
class_title = await page.query_selector('.class-title')
try:
h4 = await class_title.query_selector('h4')
en_name = await h4.get_property('innerText')
en_name = await en_name.json_value()
try:
h3 = await class_title.query_selector('h3')
cn_name = await h3.get_property('innerText')
cn_name = await cn_name.json_value()
except:
cn_name = ''
except:
en_name = ''
cn_name = ''
file_name = re.sub(r'[^a-zA-Z]', '', en_name) + '.png' # 以英文模组名(去除非法字符)保存截屏文件
full_name = f'{cn_name} {en_name}' # 为了少开一次浏览器,干脆把模组名也顺便获取并返回
if not os.path.exists(f'./data/pics/{file_name}'):
await page.screenshot(path=f'./data/pics/{file_name}')
if close:
await page.close()
await browser.close()
record_flag = self.mod_data_record(mod_cn_name=cn_name,mod_en_name=en_name,mod_file_name=file_name,mod_full_name=full_name,mod_url=mod_url)
if record_flag:
data_dict = self.mod_data_read(mod_url=mod_url)
return data_dict
else:
logger.error(f'未能记录模组 {mod_url}')
return {}
except Exception as e:
logger.error(f'获取截屏 {mod_url} 失败!')
logger.error(f'错误:{e}')
if close:
await page.close()
await browser.close()
return {}
async def get_mod_name(self, url: str, lang: str = 'full', close: bool = True) -> str:
"""
获取模组名
:param url: mcmod的模组页面
:param lang: 要获取的模组名语言, defaults to 'full'
:param close: 完成后是否关闭网页, defaults to True
:return:
"""
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url)
await page.wait_for_selector('.class-title')
class_title = await page.query_selector('.class-title')
try:
h4 = await class_title.query_selector('h4')
en_name = await h4.get_property('innerText')
en_name = await en_name.json_value()
try:
h3 = await class_title.query_selector('h3')
cn_name = await h3.get_property('innerText')
cn_name = await cn_name.json_value()
except:
cn_name = ''
except:
en_name = ''
cn_name = ''
full_nm = f'{cn_name} {en_name}'
if close:
await page.close()
await browser.close()
if lang == 'full':
return full_nm
elif lang == 'cn':
return cn_name
elif lang == 'en':
return en_name
elif lang == 'dict':
name_dict = {'cn_name':cn_name,'en_name':en_name,'full_name':full_nm}
return name_dict
else:
logger.debug(f'lang不能为{lang}')
return ''
except Exception as e:
logger.error(f'获取模组名称 {url} 失败!')
logger.error(f'错误:{e}')
return ''
def mod_data_record(self, mod_file_name: str,mod_full_name: str, mod_url: str, mod_cn_name: str='', mod_en_name: str='', mod_pic_path: str = '') ->bool:
"""
以json文件存储模组的信息,变量名定义与其他函数中一致
:param mod_file_name: 模组去除非法字符(包括空格)的英文全名+后缀名
:param mod_full_name: 格式化后的全名, ex: 植物魔法|Botania
:param mod_url: 模组链接
:param mod_cn_name: 模组中文名, defaults to ''
:param mod_en_name: 模组英文名, defaults to ''
:param mod_pic_path: 模组页面截图路径, defaults to ''
:return: 记录成功返回True,否则False
"""
logger.debug('调用:McMod.mod_data_record')
try:
logger.debug(f'记录模组数据:{mod_cn_name}')
mods_data_path = './data/mods_data.json'
if not os.path.exists(mods_data_path):
with open(mods_data_path, 'w',encoding='utf-8') as f:
json.dump({}, f,ensure_ascii=False)
logger.debug(f'未找到数据文件路径,已新建')
if not mod_pic_path:
mod_pic_path = f'./data/pics/{mod_file_name}'
logger.debug(f'未传入pic_path,使用默认:{mod_pic_path}')
mod_data = {mod_url: {'file_name': mod_file_name, 'cn_name': mod_cn_name,
'en_name': mod_en_name, 'full_name': mod_full_name, 'mod_url': mod_url, 'pic_path': mod_pic_path}}
logger.debug(f'模组数据:{mod_data}')
with open(mods_data_path,'r',encoding='utf-8') as f:
data_content = json.load(f)
data_content.update(mod_data)
with open(mods_data_path,'w',encoding='utf-8') as f:
json.dump(data_content,f,indent=4,ensure_ascii=False)
logger.info(f'已记录模组数据:{mod_data}')
return True
except Exception as e:
logger.error(f'记录模组数据 {mod_url} 错误!')
logger.error(f'错误:{e}')
return False
def mod_data_read(self,mod_url:str) -> dict:
try:
logger.debug(f'读取模组数据:{mod_url}')
mods_data_path = './data/mods_data.json'
with open(mods_data_path,'r',encoding='utf-8') as f:
data = json.load(f)
if mod_url in data:
full_name = data[mod_url].get('full_name')
logger.info(f'该模组数据已记录:{full_name}')
return data[mod_url]
else:
logger.debug('该模组数据未记录')
return False
except Exception as e:
logger.error(f'读取模组数据 {mod_url} 错误!')
logger.error(f'错误:{e}')
return False
然后在 bot.py
中添加这一功能:
async def get_mcmod(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''自动获取mcmod上的模组信息'''
mod_urls = getWords.get_mcmod_url(text=update.effective_message.text)
for mod_url in mod_urls:
data_dict = mcmod.mod_data_read(mod_url=mod_url)
if data_dict:
await send_mod_data(update=update, context=context, data_dict=data_dict)
continue
try:
data_dict = await mcmod.screenshot(mod_url=mod_url)
except Exception as e:
text = f'无法获取模组信息:{e}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
continue
if not data_dict:
text = f'无法找到模组信息:{mod_url}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
continue
await send_mod_data(update=update, context=context, data_dict=data_dict)
async def send_mod_data(update:Update, context:ContextTypes.DEFAULT_TYPE, data_dict:dict):
'''发送模组数据'''
try:
file = data_dict['file_name']
mod_url = data_dict['mod_url']
full_name = data_dict['full_name']
async with aiofiles.open(f'./data/pics/{file}', 'rb') as f:
photo = await f.read()
except FileNotFoundError:
text = f'无法找到截图文件:{file}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
return
text = f'找到了这个模组~\n<b><a href="{mod_url}">{full_name}</a></b>'
await context.bot.send_photo(chat_id=update.effective_chat.id, photo=photo, caption=text, parse_mode='HTML')
async def get_mcmod(update: Update, context: ContextTypes.DEFAULT_TYPE):
'''自动获取mcmod上的模组信息'''
mod_urls = getWords.get_mcmod_url(text=update.effective_message.text)
for mod_url in mod_urls:
data_dict = mcmod.mod_data_read(mod_url=mod_url)
if data_dict:
await send_mod_data(update=update, context=context, data_dict=data_dict)
continue
try:
data_dict = await mcmod.screenshot(mod_url=mod_url)
except Exception as e:
text = f'无法获取模组信息:{e}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
continue
if not data_dict:
text = f'无法找到模组信息:{mod_url}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
continue
await send_mod_data(update=update, context=context, data_dict=data_dict)
async def send_mod_data(update:Update, context:ContextTypes.DEFAULT_TYPE, data_dict:dict):
'''发送模组数据'''
try:
file = data_dict['file_name']
mod_url = data_dict['mod_url']
full_name = data_dict['full_name']
async with aiofiles.open(f'./data/pics/{file}', 'rb') as f:
photo = await f.read()
except FileNotFoundError:
text = f'无法找到截图文件:{file}'
await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
return
text = f'找到了这个模组~\n<b><a href="{mod_url}">{full_name}</a></b>'
await context.bot.send_photo(chat_id=update.effective_chat.id, photo=photo, caption=text, parse_mode='HTML')
我们还需要一个从消息中提取链接的函数,在 words.py
中:
def get_mcmod_url(self, text: str) -> list[str]:
'''返回句中的mcmod页面链接列表'''
mod_nums = re.findall(r"www\.mcmod\.cn/class/([0-9]+)", text)
urls = [f"https://www.mcmod.cn/class/{x}" for x in mod_nums]
if urls:
logger.info(f'获取到mcmod链接{urls}')
return urls
else:
logger.info('无mcmod链接')
return []
def get_mcmod_url(self, text: str) -> list[str]:
'''返回句中的mcmod页面链接列表'''
mod_nums = re.findall(r"www\.mcmod\.cn/class/([0-9]+)", text)
urls = [f"https://www.mcmod.cn/class/{x}" for x in mod_nums]
if urls:
logger.info(f'获取到mcmod链接{urls}')
return urls
else:
logger.info('无mcmod链接')
return []
这个函数返回一个列表,包含了消息中包含的所有 mcmod 模组页面的链接
然后,还需要一个用于触发该功能的过滤器:
filter_mcmod = filters.Regex(r"www.mcmod.cn/class")
filter_mcmod = filters.Regex(r"www.mcmod.cn/class")
最后添加它的 handler 即可:
get_mcmod_handler = MessageHandler(filter_mcmod, get_mcmod)
application.add_handler(get_mcmod_handler)
get_mcmod_handler = MessageHandler(filter_mcmod, get_mcmod)
application.add_handler(get_mcmod_handler)