使用Python写一个Telegram bot吧|Telegram bot教程

本文最后更新于 7 个月前,文中所描述的信息可能已发生改变。

WARNING注意

本文内容已过时, 敬请期待后续更新

前言

为了激发群友们水群的积极性, 为了学习和应用 python 知识,我最近在开发一个 telegram 的 bot,所以写这篇文章以记录和分享

在这篇文章中,将使用 Python-Telegram-Bot 最新版库,基于 Python 的新版异步特性与 Telegram 友好开放的 API,开发一个兼顾实用性和趣味的 bot ,并使用 Docker 在任何地方部署 bot

Demo: kmua-bot

本文不是从零开始的教程,阅读本文前,你需要具有一点点(真的很少一点)的 python 或其他语言编程的基础。

准备

环境

使用你喜欢的工具创建虚拟环境,并安装以下 Python 库

yaml
PyYAML
python-telegram-bot
pypinyin
parsel
playwright
PyYAML
python-telegram-bot
pypinyin
parsel
playwright
  • PyYAML: yaml 配置文件相关包
  • python-telegram-bot: 一个对 Telegram bot api 封装的 python 库,注意版本,默认安装的是 13.14 ,要手动指定版本安装最新开发版本(截止写稿是 20.0a6 ),以支持异步特性。

更新: 2023-02-21: python-telegram-bot 已经默认安装最新版本了,不需要手动指定版本了

  • pypinyin: 汉语拼音相关
  • parsel: 数据解析 - pyppeteer: 浏览器爬虫包,puppeteer 的 python 移植(问我为啥不用更好的 playwright-python ?被简中互联网上的文章坑了,以后会迁移过去。)
  • playwright: 浏览器爬虫

更新: 2023-03-06: 用 playwright 重构

bot 申请

私聊 @BotFather。发送 /newbot,根据提示一步步创建,记得妥善保存最后的 API Token

获取你的 id

每个 tg 用户都有一串标识数字,即为 user_id,可以私聊 @userinfobot 来获取它


基础

开始:响应/start

在项目文件夹内,新建 bot.py,开始编写 bot 最基础的功能,让其响应 /start 命令

首先,导入包(先不要管它们都是什么鬼东西)

python
# Update:从Telegram获取更新
from telegram import Update
#ApplicationBuilder:简单立即为构建 bot 对象
#ContextTypes:上下文类型
#CommandHandler:命令处理器
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler
# Update:从Telegram获取更新
from telegram import Update
#ApplicationBuilder:简单立即为构建 bot 对象
#ContextTypes:上下文类型
#CommandHandler:命令处理器
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler

然后,写一个异步函数 start(),当收到 /start 命令时,要调用它。

python
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应start命令'''
    text = '你好~我是一个bot'
    await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应start命令'''
    text = '你好~我是一个bot'
    await context.bot.send_message(chat_id=update.effective_chat.id,text=text)

这个函数接受两个参数 updatecontext,形参冒号后是类型注解。这两个参数 CommandHandler 会传递给它。

context.bot.send_message() 方法即为让 bot 发送消息,它能接受的参数其实很多,但是往往只需要 chat_id ,和 text 就够了,它们分别表示 要发送消息给的用户或群组 id ,要发送的文本。

update.effective_chat.id 即为当前有效对话的 id ,在哪里发消息给 bot ,它就指向哪里的 id。

你注意到 这个函数使用了 async/await 语法,这是 python 新版的语法糖,详见 官方文档 asyncio

现在,添加一个处理器(handler)

python
start_handler = CommandHandler('start', start)
start_handler = CommandHandler('start', start)

CommandHandler 类可以实现当收到某个命令时,调用某个函数(命令和函数名可以不一样),我们将其实例化为了 start_handler ,并且将命令名字 'start' 和对应要回调的函数名 start 传递给它

然后,启动bot

python
# 构建 bot
TOEKN='你 bot 的 api token'
application = ApplicationBuilder().token(TOKEN).build()
# 注册 handler
application.add_handler(start_handler)
# run!
application.run_polling()
# 构建 bot
TOEKN='你 bot 的 api token'
application = ApplicationBuilder().token(TOKEN).build()
# 注册 handler
application.add_handler(start_handler)
# run!
application.run_polling()

最后,就可以使用 python bot.py 启动你的 bot ,对 bot 发送 /start ,它应该就会回复你 ”你好~我是一个bot”。 使用 Ctrl + C 结束程序运行。

到这里,其实你已经了解到了最基本的 telegram bot 编写规则,即:

  1. 编写回调函数。上面的例子中即为 start 这个函数。
  2. 决定调用回调函数的规则。上述例子中,即为收到 /start 命令时,调用 start 函数
  3. 实例化 handler ,注册给 application 示例。上述例子中,即为 application.add_handler(start_handler)

如果你的系统无法访问 Telegram 的服务器,可以设置代理

全局代理,整个程序都走代理:

python
os.environ['http_proxy'] = '代理地址'
os.environ['https_proxy'] = '代理地址'
os.environ['http_proxy'] = '代理地址'
os.environ['https_proxy'] = '代理地址'

若想仅为 telegram bot api 设置代理,则需要改动一下构建 bot 部分的代码:

http(s)代理:

python
proxy_url = 'http://USERNAME:PASSWORD@PROXY_HOST:PROXY_PORT'  # can also be a https proxy
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
proxy_url = 'http://USERNAME:PASSWORD@PROXY_HOST:PROXY_PORT'  # can also be a https proxy
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()

socks5代理,需要安装 python-telegram-bot[socks],然后:

python
proxy_url = "socks5://user:pass@host:port"
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()
proxy_url = "socks5://user:pass@host:port"
application = ApplicationBuilder().token("TOKEN").proxy_url(proxy_url).get_updates_proxy_url(proxy_url).build()

接下来,写一些更有趣的功能,使用更复杂一些的规则来调用这些功能。

授予群成员头衔

让群成员可以通过 bot 自助获得一个头衔吧,比如,群友可以在群里使用 /p@botname 好人 来给自己加上 “好人” 的头衔。

当然,前提是 bot 自己要是管理员,并且权限全开。

要实现这个功能,可以这样写:

python
async def set_right(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''设置成员权限和头衔'''
    chat_id = update.effective_chat.id
    user_id = update.effective_user.id
    bot_username_len = len(update._bot.name)
    custom_title = update.effective_message.text[3+bot_username_len:]
    if not custom_title:
        custom_title = update.effective_user.username
    try:
        await context.bot.promote_chat_member(chat_id=chat_id, user_id=user_id, can_manage_chat=True)
        await context.bot.set_chat_administrator_custom_title(chat_id=chat_id, user_id=user_id, custom_title=custom_title)
        text = f'好,你现在是{custom_title}啦'
        await context.bot.send_message(chat_id=chat_id, reply_to_message_id=update.effective_message.id, text=text)
    except:
        await context.bot.send_message(chat_id=chat_id, text='不行!!')
async def set_right(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''设置成员权限和头衔'''
    chat_id = update.effective_chat.id
    user_id = update.effective_user.id
    bot_username_len = len(update._bot.name)
    custom_title = update.effective_message.text[3+bot_username_len:]
    if not custom_title:
        custom_title = update.effective_user.username
    try:
        await context.bot.promote_chat_member(chat_id=chat_id, user_id=user_id, can_manage_chat=True)
        await context.bot.set_chat_administrator_custom_title(chat_id=chat_id, user_id=user_id, custom_title=custom_title)
        text = f'好,你现在是{custom_title}啦'
        await context.bot.send_message(chat_id=chat_id, reply_to_message_id=update.effective_message.id, text=text)
    except:
        await context.bot.send_message(chat_id=chat_id, text='不行!!')

然后,添加这个功能的handler:

python
set_right_handler = CommandHandler('p', set_right)
application.add_handler(set_right_handler)
set_right_handler = CommandHandler('p', set_right)
application.add_handler(set_right_handler)

注意handler的添加应该在 application.run_polling() 之前,也就是说 application.run_polling() 才是真正开始运行 bot

简单解释一下 set_right 这个函数:

首先,设置了调用这个函数时的 chat_iduser_id 为当前聊天和当前用户

然后用 len(update._bot.name) 获取 bot 自己用户名的长度,用于下面的切片选取用户想要的头衔

custom_title = update.effective_message.text[3+bot_username_len:] 即获取用户想要的头衔,存储在 custom_title 中,下面的 if not 的作用是,如果用户没有发送他想要的头衔,那么默认头衔为他的用户名

由于只有管理员才有头衔,所以我们使用 promote_chat_member() 方法设置成员权限,将其的 can_manage_chat 权限设置为 True 。注意,即使这样设置了,用户实则是有名无权的状态,只有一个管理员的名头,实际上没有任何权限。

要想赋予权限,你可以向 promote_chat_member() 方法中继续添加参数,如 can_pin_messages=True 可置顶消息

然后就可以使用 set_chat_administrator_custom_title() 方法设置成员头衔了,如果成功了,会继续执行接下来的语句,将结果反馈给用户。

响应未知命令

如果 bot 只能回应设置好的命令就太无趣了,所以再编写一个当收到未知命令时的执行功能 比如,想要让 bot 回应未知命令说 “我不会这个哦~”,可以这样写

python
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应未知命令'''
    logger.debug('调用:unknown')
    text = "我不会这个哦~"
    await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应未知命令'''
    logger.debug('调用:unknown')
    text = "我不会这个哦~"
    await context.bot.send_message(chat_id=update.effective_chat.id, text=text)

然后,添加它的 handler。这时候可能就出现问题: 前面的两个功能都是回应一个特定的命令,在这里,我们希望它回应除已经设定的命令外的其他所有命令,该怎么做?

这就需要用到 filters ,并且要设置功能的优先级 你需要先导入它

from telegram.ext import filters

然后可以这样写:

python
unknown_handler = MessageHandler(filters.COMMAND, unknown)
unknown_handler = MessageHandler(filters.COMMAND, unknown)

filters 即过滤器,而 filters.COMMAND 即为所有命令。而设置优先级其实非常简单,添加 handler 的顺序越靠后,对应功能的优先级越低

也就是说,我们需要在其他命令类 handler 之后,添加 unknown_handler

python
application.add_handler(start_handler)
application.add_handler(set_right_handler)
application.add_handler(unkonw_handler)

application.run_polling()
application.add_handler(start_handler)
application.add_handler(set_right_handler)
application.add_handler(unkonw_handler)

application.run_polling()

简单的关键词回复

bot 最常见的功能之一就是根据关键词回复特定内容,比如,当对 bot 说早安之类的话时,让 bot 对此做出回应。下面来实现这这一功能 由于命令和普通消息是两种不同的消息类,处理它们的方法是不一样的,针对非命令消息,要使用 MessageHandler: from telegram.ext import MessageHandler

首先还是要编写这个功能执行的函数。

python
import random
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    texts = ['早上好呀','我的小鱼你醒了,还记得清晨吗','哦哈哟~']
    await context.bot.send_message(chat_id=update.effective_chat.id, text=random.choice(text))
import random
async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    texts = ['早上好呀','我的小鱼你醒了,还记得清晨吗','哦哈哟~']
    await context.bot.send_message(chat_id=update.effective_chat.id, text=random.choice(text))

由于只回复一个固定的消息过于单调,所以我们使用 random 库来从一个列表中随机选择一个回复

实际上,这个函数仍然只是简单的发送消息,而何时调用这个函数,并不是这个函数本身要做的事,而是由接下来我们要编写的 MessageHandler 中的 filters 决定的:

python
filter_ohayo = filters.Regex('早安|早上好|哦哈哟|ohayo')
ohayo_handler = MessageHandler(filter_ohayo, ohayo)
filter_ohayo = filters.Regex('早安|早上好|哦哈哟|ohayo')
ohayo_handler = MessageHandler(filter_ohayo, ohayo)

filters.Regex() 即为使用正则表达式过滤,我们使用 '早安|早上好|哦哈哟|ohayo' 这个表达式,很容易理解何时会调用 ohayo 这个函数

记得添加它的 handler

python
application.add_handler(ohayo_handler)
application.add_handler(ohayo_handler)

进阶

定制日志系统

在上面写的一些小功能中,我们没有让程序在控制台或者文件里输出任何信息,只有当报错的时候python会输出一些traceback,这在以后排查故障的时候是极其不便的(毕竟谁都不能保证自己的程序不出bug,或是遇到自己没有考虑到的情况).所以有必要加上一个简单的日志系统.

在项目下新建一个 logger.py 文件,用来定制日志系统.

比如,我们想要根据日志等级让日志输出到两份文件,一份是 normal , 包括 INFO 以上的日志,另一份是 debug , 包括 DEBUG 以上的日志,而 INFO 以上的日志还同时输出到控制台,那么可以这样:

python
import logging
import os
from logging import handlers


class Logger:
    def __init__(self, name: str, show: bool, save: bool = True, debug: bool = False) -> None:
        """
        日志系统

        :param name: 日志系统实例名
        :param show: 是否显示在控制台
        :param save: 是否保存到文件, defaults to True
        :param debug: debug模式, defaults to False
        """
        # 日志文件路径
        normal_log_path = f'logs/normal.log'
        debug_log_path = f'logs/debug.log'
        if not os.path.exists('./logs'):
            os.mkdir('./logs')
        self.logger = logging.getLogger(name)
        # 设置日志等级和格式
        self.logger.setLevel(logging.DEBUG)
        self.formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        # 设置相关 handler
        if not self.logger.handlers:
            if show:
                # 控制台 handler
                sh = logging.StreamHandler()
                if debug:
                    sh.setLevel(logging.DEBUG)
                else:
                    sh.setLevel(logging.INFO)
                sh.setFormatter(self.formatter)
                self.logger.addHandler(sh)
            if save:
                # 保存到文件的 handler
                fh_debug = handlers.TimedRotatingFileHandler(
                    filename=debug_log_path,
                    when="D",
                    interval=1,
                    backupCount=3,
                    encoding='utf-8'
                ) # 自动日志切割
                fh_debug.setLevel(logging.DEBUG)
                fh_debug.setFormatter(self.formatter)
                fh = handlers.TimedRotatingFileHandler(
                    filename=normal_log_path,
                    when="D",
                    interval=1,
                    backupCount=3,
                    encoding='utf-8'
                )
                fh.setLevel(logging.INFO)
                fh.setFormatter(self.formatter)
                self.logger.addHandler(fh)
                self.logger.addHandler(fh_debug)

    def debug(self, message):
        self.logger.debug(message)

    def info(self, message):
        self.logger.info(message)

    def warn(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

    def critical(self, message):
        self.logger.critical(message)
import logging
import os
from logging import handlers


class Logger:
    def __init__(self, name: str, show: bool, save: bool = True, debug: bool = False) -> None:
        """
        日志系统

        :param name: 日志系统实例名
        :param show: 是否显示在控制台
        :param save: 是否保存到文件, defaults to True
        :param debug: debug模式, defaults to False
        """
        # 日志文件路径
        normal_log_path = f'logs/normal.log'
        debug_log_path = f'logs/debug.log'
        if not os.path.exists('./logs'):
            os.mkdir('./logs')
        self.logger = logging.getLogger(name)
        # 设置日志等级和格式
        self.logger.setLevel(logging.DEBUG)
        self.formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        # 设置相关 handler
        if not self.logger.handlers:
            if show:
                # 控制台 handler
                sh = logging.StreamHandler()
                if debug:
                    sh.setLevel(logging.DEBUG)
                else:
                    sh.setLevel(logging.INFO)
                sh.setFormatter(self.formatter)
                self.logger.addHandler(sh)
            if save:
                # 保存到文件的 handler
                fh_debug = handlers.TimedRotatingFileHandler(
                    filename=debug_log_path,
                    when="D",
                    interval=1,
                    backupCount=3,
                    encoding='utf-8'
                ) # 自动日志切割
                fh_debug.setLevel(logging.DEBUG)
                fh_debug.setFormatter(self.formatter)
                fh = handlers.TimedRotatingFileHandler(
                    filename=normal_log_path,
                    when="D",
                    interval=1,
                    backupCount=3,
                    encoding='utf-8'
                )
                fh.setLevel(logging.INFO)
                fh.setFormatter(self.formatter)
                self.logger.addHandler(fh)
                self.logger.addHandler(fh_debug)

    def debug(self, message):
        self.logger.debug(message)

    def info(self, message):
        self.logger.info(message)

    def warn(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

    def critical(self, message):
        self.logger.critical(message)

然后在 bot.py 里导入并实例化它

python
from logger import Logger
logger = Logger(name='bot', show=True)
from logger import Logger
logger = Logger(name='bot', show=True)

之后就可以在任意地方使用日志记录并输出一些信息,比如在 start() 函数里:

python
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应start命令'''
    logger.info(f'收到来自{update.effective_chat.username}的/start指令')
    text = '你好~我是一个bot'
    await context.bot.send_message(chat_id=update.effective_chat.id,text=text)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''响应start命令'''
    logger.info(f'收到来自{update.effective_chat.username}的/start指令')
    text = '你好~我是一个bot'
    await context.bot.send_message(chat_id=update.effective_chat.id,text=text)

在其他任意你觉得需要日志的地方都可以使用.

使用 yml 文件配置

把配置直接写进代码里也太不优雅了些,而且对于其他普通用户来说,还可能会因为写错等原因直接破坏程序,所以需要把配置都搬到文件里. yml 是常见的配置文件格式,也比较简单.

后面我们还可能会需要很多小工具,所以新建一个 utils.py,把各种辅助的方法都放里面.

导入yaml库,os库和我们的日志系统

python
import yaml
import os
from logger import Logger
import yaml
import os
from logger import Logger

然后

python
class Utils:
    '''该类用于设置一些辅助的方法'''

    def __init__(self) -> None:
        logger.debug('实例化Utils')
        pass

    def read_config(self,config_name: str) -> dict:
        '''读取配置'''
        config_path = os.path.abspath(os.path.join(os.getcwd(), config_name))
        with open(config_path, 'r', encoding='utf-8') as f:
            config = yaml.load(f, Loader=yaml.FullLoader)
        logger.debug(f'已载入配置{config_name}')
        return config
class Utils:
    '''该类用于设置一些辅助的方法'''

    def __init__(self) -> None:
        logger.debug('实例化Utils')
        pass

    def read_config(self,config_name: str) -> dict:
        '''读取配置'''
        config_path = os.path.abspath(os.path.join(os.getcwd(), config_name))
        with open(config_path, 'r', encoding='utf-8') as f:
            config = yaml.load(f, Loader=yaml.FullLoader)
        logger.debug(f'已载入配置{config_name}')
        return config

然后就可以在yml中写配置,比如 config.yml:

yml
# bot的token,必填
token: ''

#全局代理设置,如不需设置请留空
proxy: ''
# bot的token,必填
token: ''

#全局代理设置,如不需设置请留空
proxy: ''

需要读取这些配置时,先导入上面写的 utils 模块并实例化 Utils 类,然后调用 read_config 方法即可:

python
from utils import Utils
utils = Utils
config = utils.read_config('config.yml')
from utils import Utils
utils = Utils
config = utils.read_config('config.yml')

这里得到的 config 变量储存了我们的配置,它是一个字典.比如要在代理设置里使用它,只需要这样:

python
if config['proxy']:
    os.environ['http_proxy'] = config['proxy']
    os.environ['https_proxy'] = config['proxy']
if config['proxy']:
    os.environ['http_proxy'] = config['proxy']
    os.environ['https_proxy'] = config['proxy']

还有构建 bot 时:

python
TOKEN=config['token']
application = ApplicationBuilder().token(TOKEN).build()
TOKEN=config['token']
application = ApplicationBuilder().token(TOKEN).build()

其实这种方法也是不太优雅的,因为可能会多次读取配置文件,更好的方法是把配置写成一个模块,然后导入它们(单例模式).

使用 json 存取词库

关键词回复是 bot 很常见的功能,这时候就会用到词库.

新建 data/words 文件夹,用来存储我们的数据.下面是一个简单的词库 ohayo.json 示例(早上问候场景)

json
[
    "我的小鱼你醒了,还认识早晨吗",
    "昨夜你曾经说,愿夜幕永不开启",
    "我的小鱼你睡醒了吗,还记得夜晚吗",
    "早晨你曾说过,愿黎明曙光永不落下",
    "早上好哦~",
    "哦哈哟喵~",
    "众生皆小饼干,你是最甜的那一块~",
    "今天也是新的一天~",
    "你醒啦?(歪头)",
    "啊呜~你已经起床了嘛(打哈欠)"
]
[
    "我的小鱼你醒了,还认识早晨吗",
    "昨夜你曾经说,愿夜幕永不开启",
    "我的小鱼你睡醒了吗,还记得夜晚吗",
    "早晨你曾说过,愿黎明曙光永不落下",
    "早上好哦~",
    "哦哈哟喵~",
    "众生皆小饼干,你是最甜的那一块~",
    "今天也是新的一天~",
    "你醒啦?(歪头)",
    "啊呜~你已经起床了嘛(打哈欠)"
]

写一个比较通用的方法,用来反序列化json词库文件.把这个方法写进上面我们创建过的 Utils 类里即可.

python
def load_words(self, words: str):
        '''读取并加载词库'''
        words_path = os.path.join(
            os.getcwd(), 'data', 'words', f'{words}.json')
        try:
            with open(words_path, 'r',encoding='utf-8') as f:
                the_words_json = json.load(f)
            logger.debug(f'已载入词库:{words_path}')
            return the_words_json
        except Exception as e:
            logger.error(f'载入词库出错:{words_path}, {e}')
            return {'Exception': 'except'}
def load_words(self, words: str):
        '''读取并加载词库'''
        words_path = os.path.join(
            os.getcwd(), 'data', 'words', f'{words}.json')
        try:
            with open(words_path, 'r',encoding='utf-8') as f:
                the_words_json = json.load(f)
            logger.debug(f'已载入词库:{words_path}')
            return the_words_json
        except Exception as e:
            logger.error(f'载入词库出错:{words_path}, {e}')
            return {'Exception': 'except'}

这个方法接受一个词库文件名(不带后缀),然后从 data/words 读取对应的词库,并返回为 python 数据结构

这时候代码量可能已经比较多了,为了方便组织和管理,我把除了 bot.py 的其他 py 文件统一放到了 src 文件夹下.

然后创建一个用于从词库中取句子的类 Getwords(因为词库也可能不止一种,为了之后方便扩展所以单独写一个类)

在 src/words.py 中:

python
import random
from .utils import Utils
from .logger import Logger

# 实例化类
logger = Logger(name='words',show=True)
utils = Utils()

# 读取 ohayo.json 这个词库
ohayo_words = utils.load_words('ohayo')

class GetWords():
    def __init__(self):
        """
        该类用于提取消息特定文本,或是返回特定文本
        """
        logger.debug('实例化GetWords')
        self.ohayo = ohayo_words

    def get_ohayo(self) -> str:
        '''返回早安问候句子'''
        word =  random.choice(self.ohayo)
        logger.debug(f'获取到早安:{word}')
        return word
import random
from .utils import Utils
from .logger import Logger

# 实例化类
logger = Logger(name='words',show=True)
utils = Utils()

# 读取 ohayo.json 这个词库
ohayo_words = utils.load_words('ohayo')

class GetWords():
    def __init__(self):
        """
        该类用于提取消息特定文本,或是返回特定文本
        """
        logger.debug('实例化GetWords')
        self.ohayo = ohayo_words

    def get_ohayo(self) -> str:
        '''返回早安问候句子'''
        word =  random.choice(self.ohayo)
        logger.debug(f'获取到早安:{word}')
        return word

然后修改 bot.py 中的早安问候功能:

python
# 导入 Getwords 类
from src.words import GetWords

# 实例化
getWords = GetWords()

async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    text=getWords.get_ohayo()
    await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
# 导入 Getwords 类
from src.words import GetWords

# 实例化
getWords = GetWords()

async def ohayo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    text=getWords.get_ohayo()
    await context.bot.send_message(chat_id=update.effective_chat.id, text=text)

再次强调一下,何时触发响应的功能应该交给过滤器去做,我们这里的 ohayo() 函数只需要关心功能触发后应该做什么就可以了.

使用 docker 部署 bot

python 程序的环境管理很烦人,用docker来跑再合适不过.

本地构建

如果要在本地构建 docker 镜像,参考下面的 Dockerfile 文件

Dockerfile
FROM python:3.9
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT [ "python","/app/bot.py" ]
FROM python:3.9
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT [ "python","/app/bot.py" ]

将 Dockerfile 文件放到合适的目录(项目根目录即可),然后执行

docker build -t bot .

注意后面有一个 .

关于 docker 构建镜像的更多帮助,可以参阅 docker 从入门到实践

然后使用 docker run -d bot 启动容器,运行 bot

使用 GitHub action 自动构建

github action 可以构建 docker 镜像并发布到 ghcr,方便在其他地方部署 docker 容器.你依然需要在项目中写好 Dockerfile

在项目中新建 .github/workflows/build-docker.yml,参考以下配置

yml
name: Build and publish docker container

on:
  workflow_dispatch:
    

jobs:
  publish:
    name: Publish container image
    runs-on: ubuntu-20.04
    env:
      TZ: Asia/Shanghai
    steps:
      - name: Checkout
        uses: actions/checkout@v3

      - name: OCI meta
        id: meta
        uses: docker/[email protected]
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            type=edge,branch=main
            type=ref,event=branch
            type=semver,pattern={{version}}
            type=semver,pattern={{major}}.{{minor}}
            type=semver,pattern={{major}}
            type=sha
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v2
        
      - name: Set up QEMU
        uses: docker/setup-qemu-action@v2

      - name: Login to GHCR
        uses: docker/login-action@v2
        with:
          registry: ghcr.io
          username: ${{ github.repository_owner }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push
        uses: docker/build-push-action@v3
        with:
          context: .
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          platforms: linux/amd64,linux/arm64
          cache-from: type=gha
          cache-to: type=gha,mode=max
name: Build and publish docker container

on:
  workflow_dispatch:
    

jobs:
  publish:
    name: Publish container image
    runs-on: ubuntu-20.04
    env:
      TZ: Asia/Shanghai
    steps:
      - name: Checkout
        uses: actions/checkout@v3

      - name: OCI meta
        id: meta
        uses: docker/[email protected]
        with:
          images: ghcr.io/${{ github.repository }}
          tags: |
            type=edge,branch=main
            type=ref,event=branch
            type=semver,pattern={{version}}
            type=semver,pattern={{major}}.{{minor}}
            type=semver,pattern={{major}}
            type=sha
      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v2
        
      - name: Set up QEMU
        uses: docker/setup-qemu-action@v2

      - name: Login to GHCR
        uses: docker/login-action@v2
        with:
          registry: ghcr.io
          username: ${{ github.repository_owner }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push
        uses: docker/build-push-action@v3
        with:
          context: .
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          platforms: linux/amd64,linux/arm64
          cache-from: type=gha
          cache-to: type=gha,mode=max

然后可以使用 docker pull 的方法运行,或者使用 docker compose:

新建 docker-compose.yml,参考以下内容

yml
version: "3"
services:
  kmua:
    image: ghcr.io/krau/kmua-bot:main
    container_name: kmua-main
    init: true
    volumes:
      - ./data:/kmua/data
      - ./logs:/kmua/logs
      - ./config.yml:/kmua/config.yml
    environment:
      - TZ=Asia/Shanghai
version: "3"
services:
  kmua:
    image: ghcr.io/krau/kmua-bot:main
    container_name: kmua-main
    init: true
    volumes:
      - ./data:/kmua/data
      - ./logs:/kmua/logs
      - ./config.yml:/kmua/config.yml
    environment:
      - TZ=Asia/Shanghai

注意根据自己需要更改

然后就可以使用 docker compose up -d 启动容器


附加

使用playwright查询mc模组信息并记录

playwright 是一个浏览器自动化工具包,如果你了解过 selenium 那么你一定知道它是什么.但是 playwright 是异步的

我们想要实现这样的功能: 当 bot 收到包含 mcmod 页面链接的消息时,去主动打开链接,收集模组信息并截图,然后发送模组信息给用户.并且持久化保存这些信息,当再次收到同样的链接时,直接从已保存的数据中获取并返回给用户.

新建 src/mcmod.py. 以下是一个例子,它获取了模组的名称并且截图网页,保存这些数据

python
import re
import os
import json
from playwright.async_api import async_playwright
from src.logger import Logger

logger = Logger(name='McMod', show=True)


class McMod:
    def __init__(self, width: int = 1280, height: int = 1080) -> None:
        """
        参数设定

        :param width: 页面宽度, defaults to 1280
        :param height: 页面高度, defaults to 1080
        """
        logger.debug('实例化McMod')
        self.args = ['--headless', '--no-sandbox',
                     '--disable-gpu', '--hide-scrollbars']
        self.options = {'args': self.args, 'defaultViewport': {
            'width': width, 'height': height},'dumpio':True}

    async def screenshot(self, mod_url: str, width: int = 1280, height: int = 720, close: bool = True) -> dict:
        """
        获取网页截屏

        :param url: 网址
        :param width: 页面宽度, defaults to 1280
        :param height: 页面高度, defaults to 720
        :param close: 完成后是否关闭网页, defaults to True
        :return: 字典{'file_name':截图文件名,'cn_name':中文名,
                        'en_name':英文名,'full_name':格式化后的全名,'mod_url':模组链接}
        """
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch()
                page = await browser.new_page()
                await page.set_viewport_size({'width': width, 'height': height})
                await page.goto(mod_url)
                await page.wait_for_selector('.class-title')
                class_title = await page.query_selector('.class-title')
                try:
                    h4 = await class_title.query_selector('h4')
                    en_name = await h4.get_property('innerText')
                    en_name = await en_name.json_value()
                    try:
                        h3 = await class_title.query_selector('h3')
                        cn_name = await h3.get_property('innerText')
                        cn_name = await cn_name.json_value()
                    except:
                        cn_name = ''
                except:
                    en_name = ''
                    cn_name = ''
                file_name = re.sub(r'[^a-zA-Z]', '', en_name) + '.png'  # 以英文模组名(去除非法字符)保存截屏文件
                full_name = f'{cn_name} {en_name}'  # 为了少开一次浏览器,干脆把模组名也顺便获取并返回
                if not os.path.exists(f'./data/pics/{file_name}'):
                    await page.screenshot(path=f'./data/pics/{file_name}')
                if close:
                    await page.close()
                    await browser.close()
                record_flag = self.mod_data_record(mod_cn_name=cn_name,mod_en_name=en_name,mod_file_name=file_name,mod_full_name=full_name,mod_url=mod_url)
                if record_flag:
                    data_dict = self.mod_data_read(mod_url=mod_url)
                    return data_dict
                else:
                    logger.error(f'未能记录模组 {mod_url}')
                    return {}
        except Exception as e:
            logger.error(f'获取截屏 {mod_url} 失败!')
            logger.error(f'错误:{e}')
            if close:
                await page.close()
                await browser.close()
            return {}

    async def get_mod_name(self, url: str, lang: str = 'full', close: bool = True) -> str:
        """
        获取模组名

        :param url: mcmod的模组页面
        :param lang: 要获取的模组名语言, defaults to 'full'
        :param close: 完成后是否关闭网页, defaults to True
        :return:
        """
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch()
                page = await browser.new_page()
                await page.goto(url)
                await page.wait_for_selector('.class-title')
                class_title = await page.query_selector('.class-title')
                try:
                    h4 = await class_title.query_selector('h4')
                    en_name = await h4.get_property('innerText')
                    en_name = await en_name.json_value()
                    try:
                        h3 = await class_title.query_selector('h3')
                        cn_name = await h3.get_property('innerText')
                        cn_name = await cn_name.json_value()
                    except:
                        cn_name = ''
                except:
                    en_name = ''
                    cn_name = ''
                full_nm = f'{cn_name} {en_name}'
                if close:
                    await page.close()
                    await browser.close()
                if lang == 'full':
                    return full_nm
                elif lang == 'cn':
                    return cn_name
                elif lang == 'en':
                    return en_name
                elif lang == 'dict':
                    name_dict = {'cn_name':cn_name,'en_name':en_name,'full_name':full_nm}
                    return name_dict
                else:
                    logger.debug(f'lang不能为{lang}')
                    return ''
        except Exception as e:
            logger.error(f'获取模组名称 {url} 失败!')
            logger.error(f'错误:{e}')
            return ''

    def mod_data_record(self, mod_file_name: str,mod_full_name: str, mod_url: str, mod_cn_name: str='', mod_en_name: str='',  mod_pic_path: str = '') ->bool:
        """
        以json文件存储模组的信息,变量名定义与其他函数中一致

        :param mod_file_name: 模组去除非法字符(包括空格)的英文全名+后缀名
        :param mod_full_name: 格式化后的全名, ex: 植物魔法|Botania
        :param mod_url: 模组链接
        :param mod_cn_name: 模组中文名, defaults to ''
        :param mod_en_name: 模组英文名, defaults to ''
        :param mod_pic_path: 模组页面截图路径, defaults to ''
        :return: 记录成功返回True,否则False
        """
        logger.debug('调用:McMod.mod_data_record')
        try:
            logger.debug(f'记录模组数据:{mod_cn_name}')
            mods_data_path = './data/mods_data.json'
            if not os.path.exists(mods_data_path):
                with open(mods_data_path, 'w',encoding='utf-8') as f:
                    json.dump({}, f,ensure_ascii=False)
                logger.debug(f'未找到数据文件路径,已新建')
            if not mod_pic_path:
                mod_pic_path = f'./data/pics/{mod_file_name}'
                logger.debug(f'未传入pic_path,使用默认:{mod_pic_path}')
            mod_data = {mod_url: {'file_name': mod_file_name, 'cn_name': mod_cn_name,
                                        'en_name': mod_en_name, 'full_name': mod_full_name, 'mod_url': mod_url, 'pic_path': mod_pic_path}}
            logger.debug(f'模组数据:{mod_data}')
            with open(mods_data_path,'r',encoding='utf-8') as f:
                data_content = json.load(f)
            data_content.update(mod_data)
            with open(mods_data_path,'w',encoding='utf-8') as f:
                json.dump(data_content,f,indent=4,ensure_ascii=False)
            logger.info(f'已记录模组数据:{mod_data}')
            return True
        except Exception as e:
            logger.error(f'记录模组数据 {mod_url} 错误!')
            logger.error(f'错误:{e}')
            return False

    def mod_data_read(self,mod_url:str) -> dict:
        try:
            logger.debug(f'读取模组数据:{mod_url}')
            mods_data_path = './data/mods_data.json'
            with open(mods_data_path,'r',encoding='utf-8') as f:
                data = json.load(f)
            if mod_url in data:
                full_name = data[mod_url].get('full_name')
                logger.info(f'该模组数据已记录:{full_name}')
                return data[mod_url]
            else:
                logger.debug('该模组数据未记录')
                return False
        except Exception as e:
            logger.error(f'读取模组数据 {mod_url} 错误!')
            logger.error(f'错误:{e}')
            return False
import re
import os
import json
from playwright.async_api import async_playwright
from src.logger import Logger

logger = Logger(name='McMod', show=True)


class McMod:
    def __init__(self, width: int = 1280, height: int = 1080) -> None:
        """
        参数设定

        :param width: 页面宽度, defaults to 1280
        :param height: 页面高度, defaults to 1080
        """
        logger.debug('实例化McMod')
        self.args = ['--headless', '--no-sandbox',
                     '--disable-gpu', '--hide-scrollbars']
        self.options = {'args': self.args, 'defaultViewport': {
            'width': width, 'height': height},'dumpio':True}

    async def screenshot(self, mod_url: str, width: int = 1280, height: int = 720, close: bool = True) -> dict:
        """
        获取网页截屏

        :param url: 网址
        :param width: 页面宽度, defaults to 1280
        :param height: 页面高度, defaults to 720
        :param close: 完成后是否关闭网页, defaults to True
        :return: 字典{'file_name':截图文件名,'cn_name':中文名,
                        'en_name':英文名,'full_name':格式化后的全名,'mod_url':模组链接}
        """
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch()
                page = await browser.new_page()
                await page.set_viewport_size({'width': width, 'height': height})
                await page.goto(mod_url)
                await page.wait_for_selector('.class-title')
                class_title = await page.query_selector('.class-title')
                try:
                    h4 = await class_title.query_selector('h4')
                    en_name = await h4.get_property('innerText')
                    en_name = await en_name.json_value()
                    try:
                        h3 = await class_title.query_selector('h3')
                        cn_name = await h3.get_property('innerText')
                        cn_name = await cn_name.json_value()
                    except:
                        cn_name = ''
                except:
                    en_name = ''
                    cn_name = ''
                file_name = re.sub(r'[^a-zA-Z]', '', en_name) + '.png'  # 以英文模组名(去除非法字符)保存截屏文件
                full_name = f'{cn_name} {en_name}'  # 为了少开一次浏览器,干脆把模组名也顺便获取并返回
                if not os.path.exists(f'./data/pics/{file_name}'):
                    await page.screenshot(path=f'./data/pics/{file_name}')
                if close:
                    await page.close()
                    await browser.close()
                record_flag = self.mod_data_record(mod_cn_name=cn_name,mod_en_name=en_name,mod_file_name=file_name,mod_full_name=full_name,mod_url=mod_url)
                if record_flag:
                    data_dict = self.mod_data_read(mod_url=mod_url)
                    return data_dict
                else:
                    logger.error(f'未能记录模组 {mod_url}')
                    return {}
        except Exception as e:
            logger.error(f'获取截屏 {mod_url} 失败!')
            logger.error(f'错误:{e}')
            if close:
                await page.close()
                await browser.close()
            return {}

    async def get_mod_name(self, url: str, lang: str = 'full', close: bool = True) -> str:
        """
        获取模组名

        :param url: mcmod的模组页面
        :param lang: 要获取的模组名语言, defaults to 'full'
        :param close: 完成后是否关闭网页, defaults to True
        :return:
        """
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch()
                page = await browser.new_page()
                await page.goto(url)
                await page.wait_for_selector('.class-title')
                class_title = await page.query_selector('.class-title')
                try:
                    h4 = await class_title.query_selector('h4')
                    en_name = await h4.get_property('innerText')
                    en_name = await en_name.json_value()
                    try:
                        h3 = await class_title.query_selector('h3')
                        cn_name = await h3.get_property('innerText')
                        cn_name = await cn_name.json_value()
                    except:
                        cn_name = ''
                except:
                    en_name = ''
                    cn_name = ''
                full_nm = f'{cn_name} {en_name}'
                if close:
                    await page.close()
                    await browser.close()
                if lang == 'full':
                    return full_nm
                elif lang == 'cn':
                    return cn_name
                elif lang == 'en':
                    return en_name
                elif lang == 'dict':
                    name_dict = {'cn_name':cn_name,'en_name':en_name,'full_name':full_nm}
                    return name_dict
                else:
                    logger.debug(f'lang不能为{lang}')
                    return ''
        except Exception as e:
            logger.error(f'获取模组名称 {url} 失败!')
            logger.error(f'错误:{e}')
            return ''

    def mod_data_record(self, mod_file_name: str,mod_full_name: str, mod_url: str, mod_cn_name: str='', mod_en_name: str='',  mod_pic_path: str = '') ->bool:
        """
        以json文件存储模组的信息,变量名定义与其他函数中一致

        :param mod_file_name: 模组去除非法字符(包括空格)的英文全名+后缀名
        :param mod_full_name: 格式化后的全名, ex: 植物魔法|Botania
        :param mod_url: 模组链接
        :param mod_cn_name: 模组中文名, defaults to ''
        :param mod_en_name: 模组英文名, defaults to ''
        :param mod_pic_path: 模组页面截图路径, defaults to ''
        :return: 记录成功返回True,否则False
        """
        logger.debug('调用:McMod.mod_data_record')
        try:
            logger.debug(f'记录模组数据:{mod_cn_name}')
            mods_data_path = './data/mods_data.json'
            if not os.path.exists(mods_data_path):
                with open(mods_data_path, 'w',encoding='utf-8') as f:
                    json.dump({}, f,ensure_ascii=False)
                logger.debug(f'未找到数据文件路径,已新建')
            if not mod_pic_path:
                mod_pic_path = f'./data/pics/{mod_file_name}'
                logger.debug(f'未传入pic_path,使用默认:{mod_pic_path}')
            mod_data = {mod_url: {'file_name': mod_file_name, 'cn_name': mod_cn_name,
                                        'en_name': mod_en_name, 'full_name': mod_full_name, 'mod_url': mod_url, 'pic_path': mod_pic_path}}
            logger.debug(f'模组数据:{mod_data}')
            with open(mods_data_path,'r',encoding='utf-8') as f:
                data_content = json.load(f)
            data_content.update(mod_data)
            with open(mods_data_path,'w',encoding='utf-8') as f:
                json.dump(data_content,f,indent=4,ensure_ascii=False)
            logger.info(f'已记录模组数据:{mod_data}')
            return True
        except Exception as e:
            logger.error(f'记录模组数据 {mod_url} 错误!')
            logger.error(f'错误:{e}')
            return False

    def mod_data_read(self,mod_url:str) -> dict:
        try:
            logger.debug(f'读取模组数据:{mod_url}')
            mods_data_path = './data/mods_data.json'
            with open(mods_data_path,'r',encoding='utf-8') as f:
                data = json.load(f)
            if mod_url in data:
                full_name = data[mod_url].get('full_name')
                logger.info(f'该模组数据已记录:{full_name}')
                return data[mod_url]
            else:
                logger.debug('该模组数据未记录')
                return False
        except Exception as e:
            logger.error(f'读取模组数据 {mod_url} 错误!')
            logger.error(f'错误:{e}')
            return False

然后在 bot.py 中添加这一功能:

python
async def get_mcmod(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''自动获取mcmod上的模组信息'''
    mod_urls = getWords.get_mcmod_url(text=update.effective_message.text)
    for mod_url in mod_urls:
        data_dict = mcmod.mod_data_read(mod_url=mod_url)
        if data_dict:
            await send_mod_data(update=update, context=context, data_dict=data_dict)
            continue
        try:
            data_dict = await mcmod.screenshot(mod_url=mod_url)
        except Exception as e:
            text = f'无法获取模组信息:{e}'
            await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
            continue
        if not data_dict:
            text = f'无法找到模组信息:{mod_url}'
            await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
            continue
        await send_mod_data(update=update, context=context, data_dict=data_dict)


async def send_mod_data(update:Update, context:ContextTypes.DEFAULT_TYPE, data_dict:dict):
    '''发送模组数据'''
    try:
        file = data_dict['file_name']
        mod_url = data_dict['mod_url']
        full_name = data_dict['full_name']
        async with aiofiles.open(f'./data/pics/{file}', 'rb') as f:
            photo = await f.read()
    except FileNotFoundError:
        text = f'无法找到截图文件:{file}'
        await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
        return

    text = f'找到了这个模组~\n<b><a href="{mod_url}">{full_name}</a></b>'
    await context.bot.send_photo(chat_id=update.effective_chat.id, photo=photo, caption=text, parse_mode='HTML')
async def get_mcmod(update: Update, context: ContextTypes.DEFAULT_TYPE):
    '''自动获取mcmod上的模组信息'''
    mod_urls = getWords.get_mcmod_url(text=update.effective_message.text)
    for mod_url in mod_urls:
        data_dict = mcmod.mod_data_read(mod_url=mod_url)
        if data_dict:
            await send_mod_data(update=update, context=context, data_dict=data_dict)
            continue
        try:
            data_dict = await mcmod.screenshot(mod_url=mod_url)
        except Exception as e:
            text = f'无法获取模组信息:{e}'
            await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
            continue
        if not data_dict:
            text = f'无法找到模组信息:{mod_url}'
            await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
            continue
        await send_mod_data(update=update, context=context, data_dict=data_dict)


async def send_mod_data(update:Update, context:ContextTypes.DEFAULT_TYPE, data_dict:dict):
    '''发送模组数据'''
    try:
        file = data_dict['file_name']
        mod_url = data_dict['mod_url']
        full_name = data_dict['full_name']
        async with aiofiles.open(f'./data/pics/{file}', 'rb') as f:
            photo = await f.read()
    except FileNotFoundError:
        text = f'无法找到截图文件:{file}'
        await context.bot.send_message(chat_id=update.effective_chat.id, text=text)
        return

    text = f'找到了这个模组~\n<b><a href="{mod_url}">{full_name}</a></b>'
    await context.bot.send_photo(chat_id=update.effective_chat.id, photo=photo, caption=text, parse_mode='HTML')

我们还需要一个从消息中提取链接的函数,在 words.py 中:

python
def get_mcmod_url(self, text: str) -> list[str]:
        '''返回句中的mcmod页面链接列表'''
        mod_nums = re.findall(r"www\.mcmod\.cn/class/([0-9]+)", text)
        urls = [f"https://www.mcmod.cn/class/{x}" for x in mod_nums]
        if urls:
            logger.info(f'获取到mcmod链接{urls}')
            return urls
        else:
            logger.info('无mcmod链接')
            return []
def get_mcmod_url(self, text: str) -> list[str]:
        '''返回句中的mcmod页面链接列表'''
        mod_nums = re.findall(r"www\.mcmod\.cn/class/([0-9]+)", text)
        urls = [f"https://www.mcmod.cn/class/{x}" for x in mod_nums]
        if urls:
            logger.info(f'获取到mcmod链接{urls}')
            return urls
        else:
            logger.info('无mcmod链接')
            return []

这个函数返回一个列表,包含了消息中包含的所有 mcmod 模组页面的链接

然后,还需要一个用于触发该功能的过滤器:

python
filter_mcmod = filters.Regex(r"www.mcmod.cn/class")
filter_mcmod = filters.Regex(r"www.mcmod.cn/class")

最后添加它的 handler 即可:

python
get_mcmod_handler = MessageHandler(filter_mcmod, get_mcmod)
application.add_handler(get_mcmod_handler)
get_mcmod_handler = MessageHandler(filter_mcmod, get_mcmod)
application.add_handler(get_mcmod_handler)
将 hexo 博客迁移至 valaxy
ChatGPT简单使用体验|你的下一个Google,何必是搜索引擎
Valaxy v0.14.61 驱动 | 主题 - Yun v0.14.61
本站已勉强运行0 天0 小时0 分0 秒(ノ`Д)ノ