转载请注明出处:小锋学长生活大爆炸( https://xfxuezhang.blog.csdn.net/)
若发现存在部分图片缺失,可以访问原文: 万字长文保姆级教你制作自己的多功能QQ机器人 - 小锋学长生活大爆炸
目录
QQ、微信是我们平常使用最多的通讯工具,网上也有很多通过软件去控制QQ/微信的开源工具,通过这些工具,我们可以实现许多有意思的效果,而不仅仅局限于消息聊天。
自从微信网页版被官方禁用后,微信的软件工具几乎已经失效了,现有的一些是通过hook微信本身来实现,这种很容易被官方检测并封号。另一些是通过注册企业号来控制,但不直观且功能受限。
这里我们借助相对更开放的QQ来制作我们的机器人,对比几款工具后,最终选择了mirai。
先放一张整体结构图:
网上现有开源的机器人大多只是实现了类似“自动推送天气、接入图灵机器人自动聊天”等等,大多属于自娱自乐,没有发挥最大用途。
因此,我们的QQ机器人(暂且取名为“小锋仔”)是根据日常所需而制,包含常用功能且设计得易于扩展。
目前包含的功能有:
将来可能包含的功能有:
接下来详细介绍如何自己搭建一个这样的QQ机器人。篇幅较长且保姆级详细,建议收藏后慢慢看。
首先为了 能运行mirai,且随时随地能连接,我们需要有一个 具备公网IP的服务器。这里使用腾讯云的 免费服务器。
对于还不想买的童靴,可以免费领取腾讯云提供的1个月服务器试用套餐。步骤:
这里腾讯云可能有个小特点。如果发现在控制台防火墙放行后,还是无法访问。需要再在服务器里放行一下端口。这里先写着,大家可以在后面一节中连接上了服务器,再回过来这里输入指令。
- sudo apt install firewalld -y
- sudo firewall-cmd --list-all
- sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
- sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
- sudo systemctl start firewalld.service
服务器初始化完成后,就可以通过SSH去连接了。这里我们可以直接使用powershell来连接,其他SSH软件我强推mobaxterm!!安装包也已经准备好了:MobaXterm.exe
ssh 用户名@<公网ip>
- sudo apt upgrade -y
- sudo apt autoremove -y
sudo adduser sxf
然后将账户加入sudoers组:
- sudo apt install vim
- sudo vim /etc/sudoers
然后退出软件,重新用新建的账号登录即可。
至此,服务器环境就搭建完成了。
这篇博客里记录了很多我在使用过程中,常用软件的安装,非常详细且经过亲测,时不时也会更新内容,大家可以收藏以备下次使用。
Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客
接下来就要在服务器上搭建QQ机器人(mirai)基础环境。搭建完成后,我们就可以远程跟机器人进行交互。
官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
由于github是国外的,而官方已经不再支持gitee的维护,因此如果大家无法访问上面的连接,可以用我帮大家下载下来的安装包:
其他的一些文档:Mirai | mirai
官方论坛:主页 | MiraiForum
下面开始正式安装:
- mkdir qqbot
- cd qqbot
- wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
- sudo chmod +x mcl-installer-a02f711-linux-amd64
此时需要输入密码(在上面选购并装完服务器后会显示,当时要求记下的)。
./mcl-installer-a02f711-linux-amd64
此时进入安装流程,弹出的几个选项都直接回车选默认即可。
./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
- ## 配置文件中的值,全为默认值
-
- ## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
- adapters:
- - http
- - ws
-
- ## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
- ## 建议公网连接时开启
- enableVerify: true
- verifyKey: 1234567890
-
- ## 开启一些调式信息
- debug: false
-
- ## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
- ## 开启后,接口中任何 sessionKey 不需要传递参数
- ## 若 console 中有多个 bot 登录,则行为未定义
- ## 确保 console 中只有一个 bot 登陆时启用
- singleMode: false
-
- ## 历史消息的缓存大小
- ## 同时,也是 http adapter 的消息队列容量
- cacheSize: 4096
-
- ## adapter 的单独配置,键名与 adapters 项配置相同
- adapterSettings:
- ## 详情看 http adapter 使用说明 配置
- http:
- # 0.0.0.0是允许远程访问,localhost只能同机器访问
- host: 0.0.0.0
- port: 8888
- cors: ["*"]
- unreadQueueMaxSize: 100
-
- ## 详情看 websocket adapter 使用说明 配置
- ws:
- host: localhost
- port: 8080
- reservedSyncId: -1
./mcl
首次启动会自动下载jar包。等待启动完成后,输入"?",可以查看所有支持的mcl命令。
/login [password]
如果想要启动mcl后自动登录QQ号,可以用:
/autoLogin add
也可以设置不同的设备登录。
/autoLogin setConfig protocol ANDROID_PAD
它对应的配置文件其实就在:config/Console/AutoLogin.yml
现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:
当服务器成功运行了mirai后,我们就可以在本地进行Python脚本的编写了。由于最新的mirai-api-http变更过接口规范,因此网上某些一两年前的代码已经失效了。本教程对应的mirai-api-http使用的是最新的2.x版本。
接下来的操作,都默认已经完成“启动mcl并login了QQ号”。
在上面setting.yml中,有两个配置项值得注意,他是我们脚本可以控制的密钥:
- verifyKey: 1234567890
- http: port: 8888
简单封装下。直接用print也是可以的。
- class Logger:
- def __init__(self, level='debug'):
- self.level = level
-
- def DebugLog(self, *args):
- if self.level == 'debug':
- print(*args)
-
- def TraceLog(self, *args):
- if self.level == 'trace':
- print(*args)
-
- def setDebugLevel(self, level):
- self.level = level.lower()
在交互前,脚本需要先向mirai获取一个verifyKey,之后在每个请求时候,都需要带上这个key,也叫session。其中,参数auth_key对应了上面setting.yml里的verifyKey。
- auth_key = '1234567890'
-
- def verifySession(self, auth_key):
- """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
- session Key在未进行校验的情况下,一定时间后将会被自动释放"""
- data = {"verifyKey": auth_key}
- url = self.addr+'verify'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return res['session']
- return None
使用此方法校验并激活你的Session,同时将Session与一个已登录的Bot绑定。
- qq = '121215' # mirai登录的那个QQ
- session = 'grge8484' # 上面verifySession函数的返回值
-
- def bindSession(self, session, qq):
- """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'bind'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- self.session = session
- return True
- return False
使用此方式释放session及其相关资源(Bot不会被释放)
- def releaseSession(self, session, qq):
- """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
- 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'release'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return True
- return False
获取当前有多少条未读消息。
- def getMessageCount(self, session):
- url = self.addr + 'countMessage?sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return 0
获取消息后会从队列中移除。
- def fetchLatestMessage(self, session):
- url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return None
简单实现了部分消息类型的解析,会有消息丢失,请根据使用需求自行调整。
- data = 'xxx' # 可以是上面getMsgFromGroup函数的返回值
-
- def parseGroupMsg(self, data):
- res = []
- if data is None:
- return res
- for item in data:
- if item['type'] == 'GroupMessage':
- type = item['messageChain'][-1]['type']
- if type == 'Image':
- text = item['messageChain'][-1]['url']
- elif type == 'Plain':
- text = item['messageChain'][-1]['text']
- elif type == 'Face':
- text = item['messageChain'][-1]['faceId']
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- continue
- name = item['sender']['memberName']
- group_id = str(item['sender']['group']['id'])
- group_name = item['sender']['group']['name']
- res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
- return res
向指定好友发送消息。
- def sendFriendMessage(self, session, qq, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
-
- data = {
- "sessionKey": session,
- "target": qq,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendFriendMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 发送失败")
- return 0
- if res['code'] == 0:
- return res['messageId']
- return 0
也只是简单实现。
- def sendMsgToGroup(self, session, group, msg):
- text = msg['text']
- type = msg['type']
- name = msg['name']
- group_id = msg['groupId']
- group_name = msg['groupName']
- content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
- name, group_id, group_name, text)
- content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
- name, group_id, group_name)
- logger.DebugLog(">> 消息类型:" + type)
- if type == 'Plain':
- message = [{"type": type, "text": content1}]
- elif type == 'Image':
- message = [
- {"type": 'Plain', "text": content2},
- {"type": type, "url": text}]
- elif type == 'Face':
- message = [{"type": 'Plain', "text": content2},
- {"type": type, "faceId": text}]
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- return 0
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": message
- }
- logger.DebugLog(">> 消息内容:" + str(data))
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
跟上面的差不多,消息类型变了一下,从而支持类似HTML形式的消息发送。
- def sendPlainTextToGroup(self, session, group, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
以上就是几个简单、常用的函数。基于这些函数,就已经可以实现蛮多有趣的功能了。
这部分可以直接参考之前的博客:Q群消息转发例程。其实也就是把上面的函数整合一下,放一个完整版:
- import requests
- from time import sleep
-
- class Logger:
- def __init__(self, level='debug'):
- self.level = level
-
- def DebugLog(self, *args):
- if self.level == 'debug':
- print(*args)
-
- def TraceLog(self, *args):
- if self.level == 'trace':
- print(*args)
-
- def setDebugLevel(self, level):
- self.level = level.lower()
-
- logger = Logger()
- class QQBot:
- def __init__(self):
- self.addr = 'http://43.143.12.250:8888/'
- self.session = None
-
- def verifySession(self, auth_key):
- """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
- session Key在未进行校验的情况下,一定时间后将会被自动释放"""
- data = {"verifyKey": auth_key}
- url = self.addr+'verify'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return res['session']
- return None
-
- def bindSession(self, session, qq):
- """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'bind'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- self.session = session
- return True
- return False
-
- def releaseSession(self, session, qq):
- """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
- 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'release'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return True
- return False
-
- def fetchLatestMessage(self, session):
- url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return None
-
- def parseGroupMsg(self, data):
- res = []
- if data is None:
- return res
- for item in data:
- if item['type'] == 'GroupMessage':
- type = item['messageChain'][-1]['type']
- if type == 'Image':
- text = item['messageChain'][-1]['url']
- elif type == 'Plain':
- text = item['messageChain'][-1]['text']
- elif type == 'Face':
- text = item['messageChain'][-1]['faceId']
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- continue
- name = item['sender']['memberName']
- group_id = str(item['sender']['group']['id'])
- group_name = item['sender']['group']['name']
- res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
- return res
-
- def getMessageCount(self, session):
- url = self.addr + 'countMessage?sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return 0
-
- def sendPlainTextToGroup(self, session, group, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
-
- def sendMsgToGroup(self, session, group, msg):
- text = msg['text']
- type = msg['type']
- name = msg['name']
- group_id = msg['groupId']
- group_name = msg['groupName']
- content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
- name, group_id, group_name, text)
- content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
- name, group_id, group_name)
- logger.DebugLog(">> 消息类型:" + type)
- if type == 'Plain':
- message = [{"type": type, "text": content1}]
- elif type == 'Image':
- message = [
- {"type": 'Plain', "text": content2},
- {"type": type, "url": text}]
- elif type == 'Face':
- message = [{"type": 'Plain', "text": content2},
- {"type": type, "faceId": text}]
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- return 0
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": message
- }
- logger.DebugLog(">> 消息内容:" + str(data))
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
-
- def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
- # 对每条消息进行检查
- for msg in msg_data:
- group_id = msg['groupId']
- # 接收的消息群正确(目前只支持 消息类型)
- if group_id in receive_groups:
- # 依次将消息转发到目标群
- for g in send_groups:
- logger.DebugLog(">> 当前群:"+g)
- if g == group_id:
- logger.DebugLog(">> 跳过此群")
- continue
- res = self.sendMsgToGroup(session, g, msg)
- if res != 0:
- logger.TraceLog(">> 转发成功!{}".format(g))
-
- def sendFriendMessage(self, session, qq, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
-
- data = {
- "sessionKey": session,
- "target": qq,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendFriendMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 发送失败")
- return 0
- if res['code'] == 0:
- return res['messageId']
- return 0
-
- def qqTransfer():
- with open('conf.json', 'r+', encoding="utf-8") as f:
- content = f.read()
- conf = json.loads(content)
-
- auth_key = conf['auth_key']
- bind_qq = conf['bind_qq']
- sleep_time = conf['sleep_time']
- debug_level = conf['debug_level']
-
- receive_groups = conf['receive_groups']
- send_groups = conf['send_groups']
-
- logger.setDebugLevel(debug_level)
-
- session = bot.verifySession(auth_key)
- logger.DebugLog(">> session: "+session)
- bot.bindSession(session, bind_qq)
- while True:
- cnt = bot.getMessageCount(session)
- if cnt:
- logger.DebugLog('>> 有消息了 => {}'.format(cnt))
- logger.DebugLog('获取消息内容')
- data = bot.fetchLatestMessage(session)
- if len(data) == 0:
- logger.DebugLog('消息为空')
- continue
- logger.DebugLog(data)
- logger.DebugLog('解析消息内容')
- data = bot.parseGroupMsg(data)
- logger.DebugLog(data)
- logger.DebugLog('转发消息内容')
- bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
- # else:
- # logger.DebugLog('空闲')
- sleep(sleep_time)
- bot.releaseSession(session, bind_qq)
其中,conf.json内容为:
- {
- "auth_key": "1234567890",
- "bind_qq": "123456", # mirai登录的QQ (复制时记得删我)
- "sleep_time": 1,
- "receive_groups": ["913182235", "977307922"], # 要接受消息的群 (复制时记得删我)
- "send_groups": ["913182235", "977307922"], # 要发送消息的群 (复制时记得删我)
- "debug_level": "debug"
- }
下面,我们就先从类似QMsg酱的消息通知开始。
设计目标:通过调用指定的URL,小锋仔机器人就会给指定的好友发送指定的消息。
关于QMsg酱的使用教程可以看:免费的QQ微信消息推送机器人
前面我们特地开放了9966端口,因此可以使用Flask来监听这个端口。
本着越简单越好的原则,我们把“发给好友还是群”、“目标好友或群的号”、“发送的内容”三部分都拼接到URL上,因此有:
- http://43.143.12.250:9966/QQ/send/friend?target=123&msg=hello
- http://43.143.12.250:9966/QQ/send/group?target=123&msg=hello
因此,代码可以写成:
- from flask import Flask, request
-
- app = Flask(__name__)
-
- @app.route('/QQ/send/friend', methods=['GET'])
- def qqListenMsgToFriend():
- # 类似于Qmsg的功能
- # flask做得接收HTTP请求转为QQ消息
- qq = request.args.get('target', None)
- msg = request.args.get('msg', None)
- bot.sendFriendMessage(bot.session, qq, msg)
- return 'Hello World! Friend!'
-
- @app.route('/QQ/send/group', methods=['GET'])
- def qqListenMsgToGroup():
- # 类似于Qmsg的功能
- # flask做得接收HTTP请求转为QQ消息
- qq = request.args.get('target', None)
- msg = request.args.get('msg', None)
- bot.sendPlainTextToGroup(bot.session, qq, msg)
- return 'Hello World! Group!'
-
- if __name__ == '__main__':
- app.run(port='9966', host='0.0.0.0')
由于Flask和小锋仔QQBot都要阻塞运行,因此稍微变动一下,让小锋仔以子线程的形式运行即可。
- if __name__ == '__main__':
- t = threading.Thread(target=qqTransfer)
- t.setDaemon(True)
- t.start()
-
- app.run(port='9966', host='0.0.0.0')
测试一下:
http://localhost:9966/QQ/send/friend?target=1061700625&msg=hello
如果我们把这个脚本放到服务器上去运行,那么链接就变成了:
http://43.143.12.250:9966/QQ/send/friend?target=1061700625&msg=hello
当然,能发消息的前提是“先加好友”或“加群”啦。
上面我们进行了简单地尝鲜。
1、从这部分开始,我们涉及的功能比较杂,为了能更好的区分功能,需要设计一个简单的交互协议。
根据以上内容,小锋仔需要记录的状态信息至少有:
- class StatusStore:
- def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
- self.from_qq = from_qq # 发送者的QQ号
- self.is_cmd = is_cmd # 是否是指令(选择功能)
- self.func_name = func_name # 选择的功能的名称
- self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
- self.msg = msg # 本次发送的消息内容
-
- def detail(self):
- return self.__dict__
2、并且我们设置,只有从指定QQ发过来消息,才能响应。因此在接收到消息时,需要判断对方的信息。对于好友类型的消息,mirai返回格式如消息类型说明:
- {
- "type": "FriendMessage",
- "sender": {
- "id": 123,
- "nickname": "",
- "remark": ""
- },
- "messageChain": [] // 数组,内容为下文消息类型
- }
因此,我们可以从"type"和 "sender:id"入手判断。
3、我们暂时考虑只有一个主QQ能发送指令的情况。
4、定义一个类来专门管理不同功能的函数,例如:
- class MultiFunction:
- """多功能函数集合"""
- def __init__(self) -> None:
- pass
-
- @staticmethod
- def translate(original:str, convert:str='zh2en') -> str:
- return '假装是翻译结果'
-
- @staticmethod
- def uploadImage(image_path:str) -> str:
- return '假装是上传结果'
-
- @staticmethod
- def weather(city:str) -> str:
- return '假装是天气结果'
-
- @staticmethod
- def hotNews(status_store:StatusStore) -> str:
- return '假装是热搜结果'
-
-
- # 多功能函数的映射
- # function: 功能对应函数名
- # need_second: 是否需要经过两步:先发cmd指令,再发详细内容
- # desc: 需要经过两步时,第一次返回的提示语
- function_map = {
- '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
- '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
- '热搜': {'function': MultiFunction.hotNews, 'need_second': False}
- }
-
- def choiceFunction(store_obj:StatusStore):
- res = ''
- if function_map.get(store_obj.func_name):
- res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
- return res
5、大致实现流程的想法是:
对应代码实现:
- def analyzeFriendMsg(self, data):
- if data is None or data['type'] != 'FriendMessage':
- return None, None, None
- sender_id = data['sender']['id']
- msg_type = data['messageChain'][-1]['type']
- if msg_type == 'Plain':
- msg_text = data['messageChain'][-1]['text']
- elif msg_type == 'Image':
- msg_text = data['messageChain'][-1]['url']
- else:
- msg_text = ''
- return sender_id, msg_type, msg_text
最终的框架就是:
- def xiaofengzai():
- auth_key = '1234567890' # settings.yml中的verifyKey
- bind_qq = '3126229950' # mirai登录的QQ
- target_qq = '1061700625' # 我们自己用的主QQ
- target_qq = int(target_qq) # 接收到的消息里,QQ是int类型的
- sleep_time = 1 # 轮询间隔
- status_store = {}
-
- session = bot.verifySession(auth_key)
- logger.DebugLog(">> session: "+session)
- bot.bindSession(session, bind_qq)
- while True:
- cnt = bot.getMessageCount(session)
- if not cnt:
- sleep(sleep_time)
- continue
- logger.DebugLog('>> 有消息了 => {}'.format(cnt))
- logger.DebugLog('获取消息内容')
- data = bot.fetchLatestMessage(session)
- if len(data) == 0:
- logger.DebugLog('消息为空')
- sleep(sleep_time)
- continue
- logger.DebugLog(data)
- logger.DebugLog('解析消息内容')
-
- sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
- if not sender_id or sender_id != target_qq:
- sleep(sleep_time)
- continue
-
- if msg_text.strip().lower().startswith('cmd'):
- _, func_name = msg_text.strip().split('\n')[0].split()
- func_name = func_name.strip()
- store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
- # 不需要发两次,直接调用函数返回结果即可
- func_info = function_map.get(func_name)
- if not func_info:
- res = '指令[{}]暂不支持'.format(func_name)
- elif func_info.get('need_second'):
- res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
- # 添加或更新记录
- status_store[sender_id] = store_obj
- else:
- res = '请求结果为:\n' + str(choiceFunction(store_obj))
- status_store.pop(sender_id, '')
- else:
- res = '请先发送指令哦...'
- store_obj = status_store.get(sender_id)
- if store_obj and store_obj.is_cmd:
- store_obj.msg = msg_text
- res = '请求结果为:\n' + str(choiceFunction(store_obj))
- status_store.pop(sender_id, '')
-
- bot.sendFriendMessage(session, qq=sender_id, msg=res)
看一下效果:
至此,骨架有了,接下来开始填充功能了。
根据上面的骨架可知,我们只需要实现MultiFunction类下的translate函数即可。如果想快速测试函数效果,可以使用以下代码,而不用先启动mirai:
- res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
- print(res)
要做翻译,最方便的就是调用API了(没错,调包侠!)。
这里使用腾讯的翻译API,可以免费领取:领取腾讯翻译API。点进链接后,往下拖到“云产品体验”专区,选择“人工智能”,下面有“机器翻译”。他的调用量是每月更新,非常的良心了。
点击“立即体验”,进入控制台界面,虽然上面显示的是“开通付费版”,但不用担心,他是有免费额度的,更何况你账户里又没充余额,哈哈哈。
支持很多类型的翻译,这次我们先选文本翻译,机器翻译 文本翻译-API 文档-文档中心-腾讯云:
我们用SDK的方式,免去了自己封装复杂的加密步骤:
pip install --upgrade tencentcloud-sdk-python
然后去获取密钥API密钥管理,记下APPID、SecretId、SecretKey:
小锋仔bot结合翻译功能,直接上代码:
- import json
- from tencentcloud.common import credential
- from tencentcloud.common.profile.client_profile import ClientProfile
- from tencentcloud.common.profile.http_profile import HttpProfile
- from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
- from tencentcloud.tmt.v20180321 import tmt_client, models
-
- def translate(original:str, convert:str='en'):
- secretId = 'xxx' # 从API控制台获取
- secretKey = 'xxx' # 从API控制台获取
- AppId = 12123 # 从API控制台获取
- try:
- cred = credential.Credential(secretId, secretKey)
- client = tmt_client.TmtClient(cred, "ap-guangzhou")
- req = models.TextTranslateRequest()
- params = {
- "SourceText": original,
- "Source": "auto",
- "Target": convert,
- "ProjectId": AppId
- }
- req.from_json_string(json.dumps(params))
- resp = client.TextTranslate(req)
- # print(resp.to_json_string())
- return resp.TargetText
- except TencentCloudSDKException as err:
- print(err)
- return ''
使用测试效果:
- print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))
-
- # 输出:
- {"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"}
- Hello
天气部分,我们是用免费的和风天气API:实时天气 - API。
首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE。
同样的,直接上代码:
- def weather(city:str) -> str:
- url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
- url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
- weather_key = 'xxxxx' # 和风天气控制台的key
-
- # 实况天气
- def getCityId(city_kw):
- url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
- city = requests.get(url_v2).json()['location'][0]
- return city['id']
-
- city_name = '广州'
- city_id = getCityId(city_name)
- url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
- res = requests.get(url).json()
- text = "<天气信息获取失败>"
- if res['code'] == '200' or res['code'] == 200:
- text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
- city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
- return text
测试效果:
这部分用的是天行数据,免费会员每天赠送100次调用额度:今日头条新闻API接口 - 天行数据TianAPI。先注册账号,然后点击“申请接口”即可。
注意,首次注册需要在控制台完成“实名认证”和“邮箱验证”(马上通过,不需要等待审核)。
对于密钥Key,是在“控制台-数据管理-我的密钥KEY”中。
同样的,直接上代码:
- def hotNews(status_store:StatusStore) -> str:
- tianxing_key = 'e05966abe0b054686c9f6b7d60e59a8d'
- def common(data):
- url = data + '?key={}'.format(tianxing_key)
- res = requests.get(url).json()
- return res['newslist']
- res = common('http://api.tianapi.com/topnews/index')
- tops = []
- index = 1
- for item in res:
- tops.append(str(index) + '. ' + item['title'])
- index += 1
- return '\n'.join(tops[0:10])
测试效果:
照片上传
有时候我们想保存一些照片,但又不想放手机里,那我们可以做个“通过把照片发给小锋仔机器人,让小锋仔再上传到服务器或者COS上”的功能。
还是这个链接:云产品体验 - 腾讯云,在“云产品体验-基础-对象存储COS”下面。对象存储不止可以用来存文件,这里我们只用来存图片。
pip install -U cos-python-sdk-v5
- from qcloud_cos import CosConfig
- from qcloud_cos import CosS3Client
- import os
-
- def uploadImage(image_path:str) -> str:
- bucket_id = 'image-1253093297' # 存储桶的名称
- secret_id = 'xxx'
- secret_key = 'xxx'
- region = 'ap-guangzhou' # 存储桶的地区
- token = None
- scheme = 'https'
- config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
- client = CosS3Client(config)
- # 本地文件形式上传
- # response = client.upload_file(
- # Bucket=bucket_id,
- # LocalFilePath=image_path,
- # Key=image_path.split(os.sep)[-1],
- # PartSize=1,
- # MAXThread=10,
- # EnableMD5=False
- # )
-
- # 网络文件形式上传
- file_keyname = image_path.split('/')[-2] + '.jpg'
- stream = requests.get(image_path)
- response = client.put_object(
- Bucket=bucket_id,
- Body=stream,
- Key=file_keyname
- )
- print(response['ETag'])
- img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket_id, region, file_keyname)
- return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
通过mirai文档可知,图片消息格式为:
[{'type': 'FriendMessage', 'messageChain': [{'type': 'Source', 'id': 55312, 'time': 1662048857}, {'type': 'Image', 'imageId': '{DCAD8B29-D606-B354-117D-F39479C14FE3}.jpg', 'url': 'http://c2cpicdw.qpic.cn/offpic_new/1061700625//1061700625-141936558-DCAD8B29D606B354117DF39479C14FE3/0?term=2', 'path': None, 'base64': None}], 'sender': {'id': 1061700625, 'nickname': '热心市民', 'remark': '热心市民'}}]
因此只需要拿到里面的URL就行,而我们的analyzeFriendMsg函数就已经提取了URl了,因此啥都不用多改!!(结构好,就是方便呀~)
直接测试:
通过上面几个小功能,不难发现我们的程序在功能上很方便扩展,总结一下,就2步:
下面提供几个好玩的接口,给大家留个作业,自己集成到机器人中去:
- # 疫情信息
- def getYiQing():
- url = 'https://c.m.163.com/ug/api/wuhan/app/data/list-total?t={}'.format(329091037164)
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/96.0.4664.110 Safari/537.36 '
- }
- res = requests.get(url, headers=headers).json()
- total = res['data']['chinaTotal']['total']
- today = res['data']['chinaTotal']['today']
- a = 99
- symbol_today = '+' if today['confirm'] >= 0 else ''
- symbol_total = '+' if today['storeConfirm'] >= 0 else ''
- symbol_input = '+' if today['input'] >= 0 else ''
- confirmTotal = '累计确诊:{},较昨日:{}{}'.format(total['confirm'], symbol_today, today['confirm'])
- confirmToday = '现有确诊:{},较昨日:{}{}'.format(total['confirm'] - total['dead'] - total['heal'], symbol_total, today['storeConfirm'])
- inputs = '境外输入:{},较昨日:{}{}'.format(total['input'], symbol_input, today['input'])
- return inputs + '\n' + confirmToday + '\n' + confirmTotal
-
- # 历史上的今天
- def getHistoryToday():
- url = 'https://api.oick.cn/lishi/api.php'
- res = requests.get(url).json()
- historyToday = []
- for item in res['result']:
- historyToday.append(item['date'] + ', ' + item['title'])
- return '\n'.join(random.choices(historyToday, k=3))
-
- # 一言
- def dailysentence():
- url = 'https://res.abeim.cn/api-text_yiyan'
- res = requests.get(url).json()
- return res['content']
-
- # 天行api
- def common(data):
- tianxing_key = '' # 天行key
- url = data + '?key={}'.format(tianxing_key)
- res = requests.get(url).json()
- return res['newslist']
-
- # 天行api - 小窍门
- def dailyTips():
- res = common('http://api.tianapi.com/qiaomen/index')
- tipsArray = res[0]
- return tipsArray['content']
-
- # 天行api - 健康小知识
- def healthTips():
- res = common('http://api.tianapi.com/healthtip/index')
- tipsArray = res[0]
- return tipsArray['content']
如果以后功能越来越多,我们很容易记不住关键词是啥,因此,稍稍变动一下,让我们可以知道功能清单。在xiaofengzai函数这个位置添加一段代码:
- if msg_text.strip() == '功能清单':
- res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
这部分摘自我前面的博客:
与树莓派的主要交互,这里主要有两种方式:
我的另一个大型项目“基于树莓派的智能魔镜”,它里面树莓派与手机的通信,就是通过MQTT实现的。很贴心的,B站还有配套的视频教程,欢迎来踩,哈哈哈~小锋学长生活大爆炸的个人空间。
树莓派由于不在身边,因此这部分暂时先略过,大家可以通过上面几篇博客自学一下,他们也都是使用到了mirai的。这里讲一下MQTT的安装,也可以参考安装EMQX MQTT。
- sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
- sudo apt-get update
- sudo apt-get install mosquitto
- sudo apt-get install mosquitto-clients
- sudo apt clean
- sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp && sudo firewall-cmd --reload
- sudo systemctl start firewalld.service
更多MQTT使用示例可以参考:
ESP32是一块可以链接WIFI的嵌入式开发板,支持MQTT协议。这样一来,只要通过跟我们的机器人互相订阅Topic,在通过设计一套通信协议,就可以实现远程交互了。进一步地,给ESP32接入外设,就可以很容易的实现一个智能家居,而我们则可以通过QQ机器人来实现对智能家居的控制。
为了方便,我们把所有需要修改的变量,都统一提取到了最前面。大家在复制过程中,务必记得都填上自己的!!
![Q073C@O[}BS4ON(A]H$C_YX.png](https://cdn.nlark.com/yuque/0/2022/png/21876370/1662051010396-4111cadd-c0f7-444e-88bf-f5ab77abd9c1.png#clientId=u1cf12aa3-c8f3-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=547&id=u1448e26e&margin=%5Bobject%20Object%5D&name=Q073C%40O%5B%7DBS4ON%28A%5DH%24C_YX.png&originHeight=903&originWidth=1013&originalType=binary&ratio=1&rotation=0&showTitle=false&size=46750&status=done&style=none&taskId=ud48045de-328e-4049-870b-8561bc1feb4&title=&width=614)
最后,贴上完整代码。由于水平有限,写的可能不是很好。也欢迎大家DIY魔改成自己的。如果有问题,欢迎加入文末Q群一起交流~~~
- import json
- import os
-
- import requests
- from flask import Flask, request
- from time import sleep
- import threading
- import json
- from tencentcloud.common import credential
- from tencentcloud.common.profile.client_profile import ClientProfile
- from tencentcloud.common.profile.http_profile import HttpProfile
- from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
- from tencentcloud.tmt.v20180321 import tmt_client, models
- import requests
- from qcloud_cos import CosConfig
- from qcloud_cos import CosS3Client
-
-
- # ---------------------------- 变量定义区 ---------------------------- #
- # 自己运行mirai的服务器IP和mirai-api-http的监听端口
- mirai_server_url = 'http://43.143.12.250:8888/'
- # settings.yml中的verifyKey
- auth_key = '1234567890'
- # mirai登录的QQ
- bind_qq = 'xxx'
- # 我们自己用的主QQ。接收到的消息里,QQ是int类型的
- target_qq = 123123
-
- # 腾讯COS存储桶的名称
- tencent_cos_bucket_id = 'xxx'
- # 腾讯COS存储桶的地区
- tencent_cos_region = 'ap-guangzhou'
- # 腾讯控制台的SecretId
- tencent_secret_id = 'xxx'
- # 腾讯控制台的SecretKey
- tencent_secret_key = 'xxx'
- # 腾讯机器翻译的appid
- tencent_translate_AppId = xxx
- # 和风天气API的key
- weather_key = 'xxx'
- # 天行API的key
- tianxing_key = 'xxx'
- # ------------------------------------------------------------------ #
-
-
- class Logger:
- def __init__(self, level='debug'):
- self.level = level
-
- def DebugLog(self, *args):
- if self.level == 'debug':
- print(*args)
-
- def TraceLog(self, *args):
- if self.level == 'trace':
- print(*args)
-
- def setDebugLevel(self, level):
- self.level = level.lower()
-
-
- class QQBot:
- def __init__(self):
- self.addr = mirai_server_url
- self.session = None
-
- def verifySession(self, auth_key):
- """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
- session Key在未进行校验的情况下,一定时间后将会被自动释放"""
- data = {"verifyKey": auth_key}
- url = self.addr+'verify'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return res['session']
- return None
-
- def bindSession(self, session, qq):
- """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'bind'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- self.session = session
- return True
- return False
-
- def releaseSession(self, session, qq):
- """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
- 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
- data = {"sessionKey": session, "qq": qq}
- url = self.addr + 'release'
- res = requests.post(url, data=json.dumps(data)).json()
- logger.DebugLog(res)
- if res['code'] == 0:
- return True
- return False
-
- def fetchLatestMessage(self, session):
- url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return None
-
-
- def parseGroupMsg(self, data):
- res = []
- if data is None:
- return res
- for item in data:
- if item['type'] == 'GroupMessage':
- type = item['messageChain'][-1]['type']
- if type == 'Image':
- text = item['messageChain'][-1]['url']
- elif type == 'Plain':
- text = item['messageChain'][-1]['text']
- elif type == 'Face':
- text = item['messageChain'][-1]['faceId']
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- continue
- name = item['sender']['memberName']
- group_id = str(item['sender']['group']['id'])
- group_name = item['sender']['group']['name']
- res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
- return res
-
- def getMessageCount(self, session):
- url = self.addr + 'countMessage?sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return 0
-
- def peekMessage(self, session):
- url = self.addr + 'peekMessage?sessionKey='+session
- res = requests.get(url).json()
- if res['code'] == 0:
- return res['data']
- return 0
-
- def sendPlainTextToGroup(self, session, group, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
-
- def sendMsgToGroup(self, session, group, msg):
- text = msg['text']
- type = msg['type']
- name = msg['name']
- group_id = msg['groupId']
- group_name = msg['groupName']
- content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
- name, group_id, group_name, text)
- content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
- name, group_id, group_name)
- logger.DebugLog(">> 消息类型:" + type)
- if type == 'Plain':
- message = [{"type": type, "text": content1}]
- elif type == 'Image':
- message = [
- {"type": 'Plain', "text": content2},
- {"type": type, "url": text}]
- elif type == 'Face':
- message = [{"type": 'Plain', "text": content2},
- {"type": type, "faceId": text}]
- else:
- logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
- return 0
- data = {
- "sessionKey": session,
- "group": group,
- "messageChain": message
- }
- logger.DebugLog(">> 消息内容:" + str(data))
- url = self.addr + 'sendGroupMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 转发失败")
- return 0
- logger.DebugLog(">> 请求返回:" + str(res))
- if res['code'] == 0:
- return res['messageId']
- return 0
-
- def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
- # 对每条消息进行检查
- for msg in msg_data:
- group_id = msg['groupId']
- # 接收的消息群正确(目前只支持 消息类型)
- if group_id in receive_groups:
- # 依次将消息转发到目标群
- for g in send_groups:
- logger.DebugLog(">> 当前群:"+g)
- if g == group_id:
- logger.DebugLog(">> 跳过此群")
- continue
- res = self.sendMsgToGroup(session, g, msg)
- if res != 0:
- logger.TraceLog(">> 转发成功!{}".format(g))
-
- def sendFriendMessage(self, session, qq, msg):
- msg_list = msg.split(r'\n')
- msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
-
- data = {
- "sessionKey": session,
- "target": qq,
- "messageChain": msg_chain
- }
- url = self.addr + 'sendFriendMessage'
- try:
- res = requests.post(url, data=json.dumps(data)).json()
- except:
- logger.DebugLog(">> 发送失败")
- return 0
- if res['code'] == 0:
- return res['messageId']
- return 0
-
-
- def analyzeFriendMsg(self, data):
- if data is None or data['type'] != 'FriendMessage':
- return None, None, None
- sender_id = data['sender']['id']
- msg_type = data['messageChain'][-1]['type']
- if msg_type == 'Plain':
- msg_text = data['messageChain'][-1]['text']
- elif msg_type == 'Image':
- msg_text = data['messageChain'][-1]['url']
- else:
- msg_text = ''
- return sender_id, msg_type, msg_text
-
-
-
-
- logger = Logger()
- bot = QQBot()
- app = Flask(__name__)
-
- def qqTransfer():
- with open('conf.json', 'r+', encoding="utf-8") as f:
- content = f.read()
- conf = json.loads(content)
-
- auth_key = conf['auth_key']
- bind_qq = conf['bind_qq']
- sleep_time = conf['sleep_time']
- debug_level = conf['debug_level']
-
- receive_groups = conf['receive_groups']
- send_groups = conf['send_groups']
-
- logger.setDebugLevel(debug_level)
-
- session = bot.verifySession(auth_key)
- logger.DebugLog(">> session: "+session)
- bot.bindSession(session, bind_qq)
- while True:
- cnt = bot.getMessageCount(session)
- if cnt:
- logger.DebugLog('>> 有消息了 => {}'.format(cnt))
- logger.DebugLog('获取消息内容')
- data = bot.fetchLatestMessage(session)
- if len(data) == 0:
- logger.DebugLog('消息为空')
- continue
- logger.DebugLog(data)
- logger.DebugLog('解析消息内容')
- data = bot.parseGroupMsg(data)
- logger.DebugLog(data)
- logger.DebugLog('转发消息内容')
- bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
- # else:
- # logger.DebugLog('空闲')
- sleep(sleep_time)
- bot.releaseSession(session, bind_qq)
-
-
-
-
-
- class StatusStore:
- def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
- self.from_qq = from_qq # 发送者的QQ号
- self.is_cmd = is_cmd # 是否是指令(选择功能)
- self.func_name = func_name # 选择的功能的名称
- self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
- self.msg = msg # 本次发送的消息内容
-
- def detail(self):
- return self.__dict__
-
- class MultiFunction:
- """多功能函数集合"""
- def __init__(self) -> None:
- pass
-
- @staticmethod
- def translate(original:str, convert:str='en'):
- try:
- cred = credential.Credential(tencent_secret_id, tencent_secret_key)
- client = tmt_client.TmtClient(cred, "ap-guangzhou")
- req = models.TextTranslateRequest()
- params = {
- "SourceText": original,
- "Source": "auto",
- "Target": convert,
- "ProjectId": tencent_translate_AppId
- }
- req.from_json_string(json.dumps(params))
- resp = client.TextTranslate(req)
- # print(resp.to_json_string())
- return resp.TargetText
- except TencentCloudSDKException as err:
- print(err)
- return ''
-
- @staticmethod
- def uploadImage(image_path:str) -> str:
- token = None
- scheme = 'https'
- config = CosConfig(Region=tencent_cos_region, SecretId=tencent_secret_id, SecretKey=tencent_secret_key, Token=token, Scheme=scheme)
- client = CosS3Client(config)
- # 本地文件形式上传
- # response = client.upload_file(
- # Bucket=bucket_id,
- # LocalFilePath=image_path,
- # Key=image_path.split(os.sep)[-1],
- # PartSize=1,
- # MAXThread=10,
- # EnableMD5=False
- # )
-
- # 网络文件形式上传
- file_keyname = image_path.split('/')[-2] + '.jpg'
- stream = requests.get(image_path)
- response = client.put_object(
- Bucket=tencent_cos_bucket_id,
- Body=stream,
- Key=file_keyname
- )
- print(response['ETag'])
- img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(tencent_cos_bucket_id, tencent_cos_region, file_keyname)
- return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
-
- @staticmethod
- def weather(city_name:str='广州') -> str:
- url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
- url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
-
- # 实况天气
- def getCityId(city_kw):
- url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
- city = requests.get(url_v2).json()['location'][0]
- return city['id']
-
- city_id = getCityId(city_name)
- url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
- res = requests.get(url).json()
- text = "<天气信息获取失败>"
- print(res)
- if res['code'] == '200' or res['code'] == 200:
- text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
- city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
- return text
-
- @staticmethod
- def hotNews(status_store:StatusStore) -> str:
- def common(data):
- url = data + '?key={}'.format(tianxing_key)
- res = requests.get(url).json()
- return res['newslist']
- res = common('http://api.tianapi.com/topnews/index')
- tops = []
- index = 1
- for item in res:
- tops.append(str(index) + '. ' + item['title'])
- index += 1
- return '\n'.join(tops[0:10])
-
-
-
-
-
-
- # 多功能函数的映射
- # function: 功能对应函数名
- # need_second: 是否需要经过两步:先发cmd指令,再发详细内容
- # desc: 需要经过两步时,第一次返回的提示语
- function_map = {
- '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
- '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
- '热搜': {'function': MultiFunction.hotNews, 'need_second': False},
- '上传图片': {'function': MultiFunction.uploadImage, 'need_second': True, 'desc': '请发送图片过来吧~'},
- }
- def choiceFunction(store_obj:StatusStore):
- res = ''
- if function_map.get(store_obj.func_name):
- res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
- return res
-
-
-
-
-
-
- def xiaofengzai():
- sleep_time = 1 # 轮询间隔
- status_store = {}
-
- session = bot.verifySession(auth_key)
- logger.DebugLog(">> session: "+session)
- bot.bindSession(session, bind_qq)
- while True:
- cnt = bot.getMessageCount(session)
- if not cnt:
- sleep(sleep_time)
- continue
- logger.DebugLog('>> 有消息了 => {}'.format(cnt))
- logger.DebugLog('获取消息内容')
- data = bot.fetchLatestMessage(session)
- if len(data) == 0:
- logger.DebugLog('消息为空')
- sleep(sleep_time)
- continue
- logger.DebugLog(data)
- logger.DebugLog('解析消息内容')
-
- sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
- if not sender_id or sender_id != target_qq:
- sleep(sleep_time)
- continue
-
- if msg_text.strip() == '功能清单':
- res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
- elif msg_text.strip().lower().startswith('cmd'):
- _, func_name = msg_text.strip().split('\n')[0].split()
- func_name = func_name.strip()
- store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
- # 不需要发两次,直接调用函数返回结果即可
- func_info = function_map.get(func_name)
- if not func_info:
- res = '指令[{}]暂不支持'.format(func_name)
- elif func_info.get('need_second'):
- res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
- # 添加或更新记录
- status_store[sender_id] = store_obj
- else:
- res = '请求结果为:\n' + str(choiceFunction(store_obj))
- status_store.pop(sender_id, '')
- else:
- res = '请先发送指令哦...'
- store_obj = status_store.get(sender_id)
- if store_obj and store_obj.is_cmd:
- store_obj.msg = msg_text
- res = '请求结果为:\n' + str(choiceFunction(store_obj))
- status_store.pop(sender_id, '')
-
- bot.sendFriendMessage(session, qq=sender_id, msg=res)
-
-
-
-
- @app.route('/QQ/send', methods=['GET'])
- def qqListenMsg():
- # 类似于Qmsg的功能
- # flask做得接收HTTP请求转为QQ消息
- qq = request.args.get('target', None)
- msg = request.args.get('msg', None)
- bot.sendFriendMessage(bot.session, qq, msg)
- return 'Hello World!'
-
- @app.route('/QQ/send/friend', methods=['GET'])
- def qqListenMsgToFriend():
- # 类似于Qmsg的功能
- # flask做得接收HTTP请求转为QQ消息
- qq = request.args.get('target', None)
- msg = request.args.get('msg', None)
- bot.sendFriendMessage(bot.session, qq, msg)
- return 'Hello World! Friend!'
-
- @app.route('/QQ/send/group', methods=['GET'])
- def qqListenMsgToGroup():
- # 类似于Qmsg的功能
- # flask做得接收HTTP请求转为QQ消息
- qq = request.args.get('target', None)
- msg = request.args.get('msg', None)
- bot.sendPlainTextToGroup(bot.session, qq, msg)
- return 'Hello World! Group!'
-
-
- if __name__ == '__main__':
- t = threading.Thread(target=xiaofengzai)
- t.setDaemon(True)
- t.start()
-
- # t = threading.Thread(target=qqTransfer)
- # t.setDaemon(True)
- # t.start()
-
- app.run(port='9966', host='0.0.0.0')