• 万字长文保姆级教你制作自己的多功能QQ机器人

    若发现存在部分图片缺失,可以访问原文: 万字长文保姆级教你制作自己的多功能QQ机器人 - 小锋学长生活大爆炸















































    • 类似QMsg酱的消息通知
    • QQ群消息转发
    • 翻译查询
    • 照片上传
    • 实况天气
    • 实时热搜
    • 控制树莓派舵机
    • 控制树莓派屏显
    • ... ...


    • 接入控制ESP32(实现智能家居控制)



    首先为了 能运行mirai,且随时随地能连接,我们需要有一个 具备公网IP的服务器。这里使用腾讯云的 免费服务器


    1. 进入官网领取:云产品免费试用;需要选购的进:轻量应用服务器专场 (限时3年388,点我进);不清楚怎么操作的可以看教程:腾讯云产品免费试用教程
    2. 领取完成后,由于后面需要用到端口,因此这里我们提前开放2个端口:88889966


    1. sudo apt install firewalld -y
    2. sudo firewall-cmd --list-all
    3. sudo firewall-cmd --permanent --zone=public --add-port=8888/tcp && sudo firewall-cmd --reload
    4. sudo firewall-cmd --permanent --zone=public --add-port=9966/tcp && sudo firewall-cmd --reload
    5. sudo systemctl start firewalld.service



    1. 搜索打开powershell:

    1. 输入以下命令连接SSH:
    ssh 用户名@<公网ip>
    1. 或者使用MobaXterm软件:

    1. 先更新一下软件库:
    1. sudo apt upgrade -y
    2. sudo apt autoremove -y
    1. 一般不建议使用管理员账户,因此我们要自己新建一个账户:
    sudo adduser sxf


    1. sudo apt install vim
    2. sudo vim /etc/sudoers



    Ubuntu20.04 + VirtualBox相关_小锋学长生活大爆炸的博客-CSDN博客


            官方mirai的github仓库:GitHub - mamoe/mirai: 高效率 QQ 机器人支持库
            其他的一些文档:Mirai | mirai
            官方论坛:主页 | MiraiForum

    1. 先SSH连接上服务器,建议不要用root用户登录。
    2. 下载安装包mcl-installer-a02f711-linux-amd64:
    1. mkdir qqbot
    2. cd qqbot
    3. wget http://xfxuezhang.cn/web/share/QQBot/mcl-installer-a02f711-linux-amd64
    4. sudo chmod +x mcl-installer-a02f711-linux-amd64




    1. 安装完成后,还需要安装mirai-api-http。在当前页面下,继续输入:
    ./mcl --update-package net.mamoe:mirai-api-http --channel stable-v2 --type plugin
    1. 编辑_config/net.mamoe.mirai-api-http/setting.yml_配置文件 (没有则自行创建)
    1. ## 配置文件中的值,全为默认值
    2. ## 启用的 adapter, 内置有 http, ws, reverse-ws, webhook
    3. adapters:
    4. - http
    5. - ws
    6. ## 是否开启认证流程, 若为 true 则建立连接时需要验证 verifyKey
    7. ## 建议公网连接时开启
    8. enableVerify: true
    9. verifyKey: 1234567890
    10. ## 开启一些调式信息
    11. debug: false
    12. ## 是否开启单 session 模式, 若为 true,则自动创建 session 绑定 console 中登录的 bot
    13. ## 开启后,接口中任何 sessionKey 不需要传递参数
    14. ## 若 console 中有多个 bot 登录,则行为未定义
    15. ## 确保 console 中只有一个 bot 登陆时启用
    16. singleMode: false
    17. ## 历史消息的缓存大小
    18. ## 同时,也是 http adapter 的消息队列容量
    19. cacheSize: 4096
    20. ## adapter 的单独配置,键名与 adapters 项配置相同
    21. adapterSettings:
    22. ## 详情看 http adapter 使用说明 配置
    23. http:
    24. #是允许远程访问,localhost只能同机器访问
    25. host:
    26. port: 8888
    27. cors: ["*"]
    28. unreadQueueMaxSize: 100
    29. ## 详情看 websocket adapter 使用说明 配置
    30. ws:
    31. host: localhost
    32. port: 8080
    33. reservedSyncId: -1
    1. 启动mirai即可:


    1. 使用以下命令即可登录QQ号:
    /login  [password] 


    /autoLogin add   


    /autoLogin setConfig  protocol ANDROID_PAD


    1. 现在QQ风控很严了,第一次登录很有可能遇到“需要滑动验证码”的。建议申请小号使用,以免发生不测。并且首次使用时在QQ“账号安全设置”中关闭“安全登录检查”、“陌生设备登录保护”。如果遇到验证码,可以尝试:

      1. 将Captcha link通过另一个QQ,发给待登录的mirai-QQ,手机登录mirai-QQ并点击链接,手动完成滑块验证,然后回到mobaxterm输入回车;

    1. 如果不行,就参考这个链接的方法:GitHub - project-mirai/mirai-login-solver-selenium: SliderCaptcha solver
    2. 还不行,再参考这个链接的方法:mirai-login-solver-selenium | mirai
    3. 还有一个小技巧可以尝试。在手机端先通过手机号登录QQ,如果没问题,再通过手机号在mirai上登录。手机建议先登录上mirai-QQ,有时可能会弹窗提示“是否允许陌生设备登录”等等,要手动点确认的。
    4. 另外,最新申请的QQ号,一般可以成功登录mirai。
    5. 如果以上都不行,目前的终极方案是使用miraiAndroidMiraiAndroid
    • 在手机上的MiraiAndroid登录QQ后导出device.json
    • 将cache目录下的3个文件account.secrets、servers.json、session.bin也复制出来

    • 接下来点击左上角, 再点击“工具”。选择你机器人的账号, 选择 导出 DEVICE.JSON 将其导出。

    • 再次回到服务器端,进入 “bots/<你的QQ号>” 下面, 将导出的 device.json 复制放入。对应的cache文件夹也复制放入。

    • 再次执行 ./mcl 启动 mirai-console 看看效果。
    • 若仍有问题,欢迎加入文末Q群交流。
    1. 至此,mcl就已经能正常接收QQ消息了。而我们的实现代码对mcl的控制,就是通过mirai-api-http插件来实现的。根据上面第4步配置的_setting.yml_文件,再参考官方API文档HttpAdapter文档,即可实现互联互通。(讲起来比较麻烦,no bibi,后面直接show me the code)。



    1. verifyKey: 1234567890
    2. http: port: 8888



    1. class Logger:
    2. def __init__(self, level='debug'):
    3. self.level = level
    4. def DebugLog(self, *args):
    5. if self.level == 'debug':
    6. print(*args)
    7. def TraceLog(self, *args):
    8. if self.level == 'trace':
    9. print(*args)
    10. def setDebugLevel(self, level):
    11. self.level = level.lower()



    1. auth_key = '1234567890'
    2. def verifySession(self, auth_key):
    3. """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
    4. session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    5. data = {"verifyKey": auth_key}
    6. url = self.addr+'verify'
    7. res = requests.post(url, data=json.dumps(data)).json()
    8. logger.DebugLog(res)
    9. if res['code'] == 0:
    10. return res['session']
    11. return None



    1. qq = '121215' # mirai登录的那个QQ
    2. session = 'grge8484' # 上面verifySession函数的返回值
    3. def bindSession(self, session, qq):
    4. """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    5. data = {"sessionKey": session, "qq": qq}
    6. url = self.addr + 'bind'
    7. res = requests.post(url, data=json.dumps(data)).json()
    8. logger.DebugLog(res)
    9. if res['code'] == 0:
    10. self.session = session
    11. return True
    12. return False



    1. def releaseSession(self, session, qq):
    2. """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
    3. 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    4. data = {"sessionKey": session, "qq": qq}
    5. url = self.addr + 'release'
    6. res = requests.post(url, data=json.dumps(data)).json()
    7. logger.DebugLog(res)
    8. if res['code'] == 0:
    9. return True
    10. return False



    1. def getMessageCount(self, session):
    2. url = self.addr + 'countMessage?sessionKey='+session
    3. res = requests.get(url).json()
    4. if res['code'] == 0:
    5. return res['data']
    6. return 0



    1. def fetchLatestMessage(self, session):
    2. url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    3. res = requests.get(url).json()
    4. if res['code'] == 0:
    5. return res['data']
    6. return None



    1. data = 'xxx' # 可以是上面getMsgFromGroup函数的返回值
    2. def parseGroupMsg(self, data):
    3. res = []
    4. if data is None:
    5. return res
    6. for item in data:
    7. if item['type'] == 'GroupMessage':
    8. type = item['messageChain'][-1]['type']
    9. if type == 'Image':
    10. text = item['messageChain'][-1]['url']
    11. elif type == 'Plain':
    12. text = item['messageChain'][-1]['text']
    13. elif type == 'Face':
    14. text = item['messageChain'][-1]['faceId']
    15. else:
    16. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    17. continue
    18. name = item['sender']['memberName']
    19. group_id = str(item['sender']['group']['id'])
    20. group_name = item['sender']['group']['name']
    21. res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
    22. return res



    1. def sendFriendMessage(self, session, qq, msg):
    2. msg_list = msg.split(r'\n')
    3. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    4. data = {
    5. "sessionKey": session,
    6. "target": qq,
    7. "messageChain": msg_chain
    8. }
    9. url = self.addr + 'sendFriendMessage'
    10. try:
    11. res = requests.post(url, data=json.dumps(data)).json()
    12. except:
    13. logger.DebugLog(">> 发送失败")
    14. return 0
    15. if res['code'] == 0:
    16. return res['messageId']
    17. return 0



    1. def sendMsgToGroup(self, session, group, msg):
    2. text = msg['text']
    3. type = msg['type']
    4. name = msg['name']
    5. group_id = msg['groupId']
    6. group_name = msg['groupName']
    7. content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
    8. name, group_id, group_name, text)
    9. content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
    10. name, group_id, group_name)
    11. logger.DebugLog(">> 消息类型:" + type)
    12. if type == 'Plain':
    13. message = [{"type": type, "text": content1}]
    14. elif type == 'Image':
    15. message = [
    16. {"type": 'Plain', "text": content2},
    17. {"type": type, "url": text}]
    18. elif type == 'Face':
    19. message = [{"type": 'Plain', "text": content2},
    20. {"type": type, "faceId": text}]
    21. else:
    22. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    23. return 0
    24. data = {
    25. "sessionKey": session,
    26. "group": group,
    27. "messageChain": message
    28. }
    29. logger.DebugLog(">> 消息内容:" + str(data))
    30. url = self.addr + 'sendGroupMessage'
    31. try:
    32. res = requests.post(url, data=json.dumps(data)).json()
    33. except:
    34. logger.DebugLog(">> 转发失败")
    35. return 0
    36. logger.DebugLog(">> 请求返回:" + str(res))
    37. if res['code'] == 0:
    38. return res['messageId']
    39. return 0



    1. def sendPlainTextToGroup(self, session, group, msg):
    2. msg_list = msg.split(r'\n')
    3. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    4. data = {
    5. "sessionKey": session,
    6. "group": group,
    7. "messageChain": msg_chain
    8. }
    9. url = self.addr + 'sendGroupMessage'
    10. try:
    11. res = requests.post(url, data=json.dumps(data)).json()
    12. except:
    13. logger.DebugLog(">> 转发失败")
    14. return 0
    15. logger.DebugLog(">> 请求返回:" + str(res))
    16. if res['code'] == 0:
    17. return res['messageId']
    18. return 0




    1. import requests
    2. from time import sleep
    3. class Logger:
    4. def __init__(self, level='debug'):
    5. self.level = level
    6. def DebugLog(self, *args):
    7. if self.level == 'debug':
    8. print(*args)
    9. def TraceLog(self, *args):
    10. if self.level == 'trace':
    11. print(*args)
    12. def setDebugLevel(self, level):
    13. self.level = level.lower()
    14. logger = Logger()
    15. class QQBot:
    16. def __init__(self):
    17. self.addr = ''
    18. self.session = None
    19. def verifySession(self, auth_key):
    20. """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
    21. session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    22. data = {"verifyKey": auth_key}
    23. url = self.addr+'verify'
    24. res = requests.post(url, data=json.dumps(data)).json()
    25. logger.DebugLog(res)
    26. if res['code'] == 0:
    27. return res['session']
    28. return None
    29. def bindSession(self, session, qq):
    30. """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    31. data = {"sessionKey": session, "qq": qq}
    32. url = self.addr + 'bind'
    33. res = requests.post(url, data=json.dumps(data)).json()
    34. logger.DebugLog(res)
    35. if res['code'] == 0:
    36. self.session = session
    37. return True
    38. return False
    39. def releaseSession(self, session, qq):
    40. """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
    41. 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    42. data = {"sessionKey": session, "qq": qq}
    43. url = self.addr + 'release'
    44. res = requests.post(url, data=json.dumps(data)).json()
    45. logger.DebugLog(res)
    46. if res['code'] == 0:
    47. return True
    48. return False
    49. def fetchLatestMessage(self, session):
    50. url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    51. res = requests.get(url).json()
    52. if res['code'] == 0:
    53. return res['data']
    54. return None
    55. def parseGroupMsg(self, data):
    56. res = []
    57. if data is None:
    58. return res
    59. for item in data:
    60. if item['type'] == 'GroupMessage':
    61. type = item['messageChain'][-1]['type']
    62. if type == 'Image':
    63. text = item['messageChain'][-1]['url']
    64. elif type == 'Plain':
    65. text = item['messageChain'][-1]['text']
    66. elif type == 'Face':
    67. text = item['messageChain'][-1]['faceId']
    68. else:
    69. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    70. continue
    71. name = item['sender']['memberName']
    72. group_id = str(item['sender']['group']['id'])
    73. group_name = item['sender']['group']['name']
    74. res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
    75. return res
    76. def getMessageCount(self, session):
    77. url = self.addr + 'countMessage?sessionKey='+session
    78. res = requests.get(url).json()
    79. if res['code'] == 0:
    80. return res['data']
    81. return 0
    82. def sendPlainTextToGroup(self, session, group, msg):
    83. msg_list = msg.split(r'\n')
    84. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    85. data = {
    86. "sessionKey": session,
    87. "group": group,
    88. "messageChain": msg_chain
    89. }
    90. url = self.addr + 'sendGroupMessage'
    91. try:
    92. res = requests.post(url, data=json.dumps(data)).json()
    93. except:
    94. logger.DebugLog(">> 转发失败")
    95. return 0
    96. logger.DebugLog(">> 请求返回:" + str(res))
    97. if res['code'] == 0:
    98. return res['messageId']
    99. return 0
    100. def sendMsgToGroup(self, session, group, msg):
    101. text = msg['text']
    102. type = msg['type']
    103. name = msg['name']
    104. group_id = msg['groupId']
    105. group_name = msg['groupName']
    106. content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
    107. name, group_id, group_name, text)
    108. content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
    109. name, group_id, group_name)
    110. logger.DebugLog(">> 消息类型:" + type)
    111. if type == 'Plain':
    112. message = [{"type": type, "text": content1}]
    113. elif type == 'Image':
    114. message = [
    115. {"type": 'Plain', "text": content2},
    116. {"type": type, "url": text}]
    117. elif type == 'Face':
    118. message = [{"type": 'Plain', "text": content2},
    119. {"type": type, "faceId": text}]
    120. else:
    121. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    122. return 0
    123. data = {
    124. "sessionKey": session,
    125. "group": group,
    126. "messageChain": message
    127. }
    128. logger.DebugLog(">> 消息内容:" + str(data))
    129. url = self.addr + 'sendGroupMessage'
    130. try:
    131. res = requests.post(url, data=json.dumps(data)).json()
    132. except:
    133. logger.DebugLog(">> 转发失败")
    134. return 0
    135. logger.DebugLog(">> 请求返回:" + str(res))
    136. if res['code'] == 0:
    137. return res['messageId']
    138. return 0
    139. def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
    140. # 对每条消息进行检查
    141. for msg in msg_data:
    142. group_id = msg['groupId']
    143. # 接收的消息群正确(目前只支持 消息类型)
    144. if group_id in receive_groups:
    145. # 依次将消息转发到目标群
    146. for g in send_groups:
    147. logger.DebugLog(">> 当前群:"+g)
    148. if g == group_id:
    149. logger.DebugLog(">> 跳过此群")
    150. continue
    151. res = self.sendMsgToGroup(session, g, msg)
    152. if res != 0:
    153. logger.TraceLog(">> 转发成功!{}".format(g))
    154. def sendFriendMessage(self, session, qq, msg):
    155. msg_list = msg.split(r'\n')
    156. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    157. data = {
    158. "sessionKey": session,
    159. "target": qq,
    160. "messageChain": msg_chain
    161. }
    162. url = self.addr + 'sendFriendMessage'
    163. try:
    164. res = requests.post(url, data=json.dumps(data)).json()
    165. except:
    166. logger.DebugLog(">> 发送失败")
    167. return 0
    168. if res['code'] == 0:
    169. return res['messageId']
    170. return 0
    171. def qqTransfer():
    172. with open('conf.json', 'r+', encoding="utf-8") as f:
    173. content = f.read()
    174. conf = json.loads(content)
    175. auth_key = conf['auth_key']
    176. bind_qq = conf['bind_qq']
    177. sleep_time = conf['sleep_time']
    178. debug_level = conf['debug_level']
    179. receive_groups = conf['receive_groups']
    180. send_groups = conf['send_groups']
    181. logger.setDebugLevel(debug_level)
    182. session = bot.verifySession(auth_key)
    183. logger.DebugLog(">> session: "+session)
    184. bot.bindSession(session, bind_qq)
    185. while True:
    186. cnt = bot.getMessageCount(session)
    187. if cnt:
    188. logger.DebugLog('>> 有消息了 => {}'.format(cnt))
    189. logger.DebugLog('获取消息内容')
    190. data = bot.fetchLatestMessage(session)
    191. if len(data) == 0:
    192. logger.DebugLog('消息为空')
    193. continue
    194. logger.DebugLog(data)
    195. logger.DebugLog('解析消息内容')
    196. data = bot.parseGroupMsg(data)
    197. logger.DebugLog(data)
    198. logger.DebugLog('转发消息内容')
    199. bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
    200. # else:
    201. # logger.DebugLog('空闲')
    202. sleep(sleep_time)
    203. bot.releaseSession(session, bind_qq)


    1. {
    2. "auth_key": "1234567890",
    3. "bind_qq": "123456", # mirai登录的QQ (复制时记得删我)
    4. "sleep_time": 1,
    5. "receive_groups": ["913182235", "977307922"], # 要接受消息的群 (复制时记得删我)
    6. "send_groups": ["913182235", "977307922"], # 要发送消息的群 (复制时记得删我)
    7. "debug_level": "debug"
    8. }






    1. from flask import Flask, request
    2. app = Flask(__name__)
    3. @app.route('/QQ/send/friend', methods=['GET'])
    4. def qqListenMsgToFriend():
    5. # 类似于Qmsg的功能
    6. # flask做得接收HTTP请求转为QQ消息
    7. qq = request.args.get('target', None)
    8. msg = request.args.get('msg', None)
    9. bot.sendFriendMessage(bot.session, qq, msg)
    10. return 'Hello World! Friend!'
    11. @app.route('/QQ/send/group', methods=['GET'])
    12. def qqListenMsgToGroup():
    13. # 类似于Qmsg的功能
    14. # flask做得接收HTTP请求转为QQ消息
    15. qq = request.args.get('target', None)
    16. msg = request.args.get('msg', None)
    17. bot.sendPlainTextToGroup(bot.session, qq, msg)
    18. return 'Hello World! Group!'
    19. if __name__ == '__main__':
    20. app.run(port='9966', host='')


    1. if __name__ == '__main__':
    2. t = threading.Thread(target=qqTransfer)
    3. t.setDaemon(True)
    4. t.start()
    5. app.run(port='9966', host='')







    • 我们发送的内容可以分为:功能选择 与 消息详情
    • 为了区分他俩,可以在选择功能时添加指定前缀,如“CMD+翻译”;
    • 小锋仔接收到后,进入翻译模式准备;
    • 发送指令详情时,就不加前缀。而小锋仔则将收到的消息进行翻译,再把结果返回。


    1. class StatusStore:
    2. def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
    3. self.from_qq = from_qq # 发送者的QQ号
    4. self.is_cmd = is_cmd # 是否是指令(选择功能)
    5. self.func_name = func_name # 选择的功能的名称
    6. self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
    7. self.msg = msg # 本次发送的消息内容
    8. def detail(self):
    9. return self.__dict__


    1. {
    2. "type": "FriendMessage",
    3. "sender": {
    4. "id": 123,
    5. "nickname": "",
    6. "remark": ""
    7. },
    8. "messageChain": [] // 数组,内容为下文消息类型
    9. }

    因此,我们可以从"type"和 "sender:id"入手判断。

    1. class MultiFunction:
    2. """多功能函数集合"""
    3. def __init__(self) -> None:
    4. pass
    5. @staticmethod
    6. def translate(original:str, convert:str='zh2en') -> str:
    7. return '假装是翻译结果'
    8. @staticmethod
    9. def uploadImage(image_path:str) -> str:
    10. return '假装是上传结果'
    11. @staticmethod
    12. def weather(city:str) -> str:
    13. return '假装是天气结果'
    14. @staticmethod
    15. def hotNews(status_store:StatusStore) -> str:
    16. return '假装是热搜结果'
    17. # 多功能函数的映射
    18. # function: 功能对应函数名
    19. # need_second: 是否需要经过两步:先发cmd指令,再发详细内容
    20. # desc: 需要经过两步时,第一次返回的提示语
    21. function_map = {
    22. '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
    23. '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
    24. '热搜': {'function': MultiFunction.hotNews, 'need_second': False}
    25. }
    26. def choiceFunction(store_obj:StatusStore):
    27. res = ''
    28. if function_map.get(store_obj.func_name):
    29. res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    30. return res



    1. def analyzeFriendMsg(self, data):
    2. if data is None or data['type'] != 'FriendMessage':
    3. return None, None, None
    4. sender_id = data['sender']['id']
    5. msg_type = data['messageChain'][-1]['type']
    6. if msg_type == 'Plain':
    7. msg_text = data['messageChain'][-1]['text']
    8. elif msg_type == 'Image':
    9. msg_text = data['messageChain'][-1]['url']
    10. else:
    11. msg_text = ''
    12. return sender_id, msg_type, msg_text


    1. def xiaofengzai():
    2. auth_key = '1234567890' # settings.yml中的verifyKey
    3. bind_qq = '3126229950' # mirai登录的QQ
    4. target_qq = '1061700625' # 我们自己用的主QQ
    5. target_qq = int(target_qq) # 接收到的消息里,QQ是int类型的
    6. sleep_time = 1 # 轮询间隔
    7. status_store = {}
    8. session = bot.verifySession(auth_key)
    9. logger.DebugLog(">> session: "+session)
    10. bot.bindSession(session, bind_qq)
    11. while True:
    12. cnt = bot.getMessageCount(session)
    13. if not cnt:
    14. sleep(sleep_time)
    15. continue
    16. logger.DebugLog('>> 有消息了 => {}'.format(cnt))
    17. logger.DebugLog('获取消息内容')
    18. data = bot.fetchLatestMessage(session)
    19. if len(data) == 0:
    20. logger.DebugLog('消息为空')
    21. sleep(sleep_time)
    22. continue
    23. logger.DebugLog(data)
    24. logger.DebugLog('解析消息内容')
    25. sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
    26. if not sender_id or sender_id != target_qq:
    27. sleep(sleep_time)
    28. continue
    29. if msg_text.strip().lower().startswith('cmd'):
    30. _, func_name = msg_text.strip().split('\n')[0].split()
    31. func_name = func_name.strip()
    32. store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
    33. # 不需要发两次,直接调用函数返回结果即可
    34. func_info = function_map.get(func_name)
    35. if not func_info:
    36. res = '指令[{}]暂不支持'.format(func_name)
    37. elif func_info.get('need_second'):
    38. res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
    39. # 添加或更新记录
    40. status_store[sender_id] = store_obj
    41. else:
    42. res = '请求结果为:\n' + str(choiceFunction(store_obj))
    43. status_store.pop(sender_id, '')
    44. else:
    45. res = '请先发送指令哦...'
    46. store_obj = status_store.get(sender_id)
    47. if store_obj and store_obj.is_cmd:
    48. store_obj.msg = msg_text
    49. res = '请求结果为:\n' + str(choiceFunction(store_obj))
    50. status_store.pop(sender_id, '')
    51. bot.sendFriendMessage(session, qq=sender_id, msg=res)





    1. res = choiceFunction(StatusStore(func_name='翻译', msg='你好'))
    2. print(res)




            支持很多类型的翻译,这次我们先选文本翻译机器翻译 文本翻译-API 文档-文档中心-腾讯云


    pip install --upgrade tencentcloud-sdk-python




    1. import json
    2. from tencentcloud.common import credential
    3. from tencentcloud.common.profile.client_profile import ClientProfile
    4. from tencentcloud.common.profile.http_profile import HttpProfile
    5. from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
    6. from tencentcloud.tmt.v20180321 import tmt_client, models
    7. def translate(original:str, convert:str='en'):
    8. secretId = 'xxx' # 从API控制台获取
    9. secretKey = 'xxx' # 从API控制台获取
    10. AppId = 12123 # 从API控制台获取
    11. try:
    12. cred = credential.Credential(secretId, secretKey)
    13. client = tmt_client.TmtClient(cred, "ap-guangzhou")
    14. req = models.TextTranslateRequest()
    15. params = {
    16. "SourceText": original,
    17. "Source": "auto",
    18. "Target": convert,
    19. "ProjectId": AppId
    20. }
    21. req.from_json_string(json.dumps(params))
    22. resp = client.TextTranslate(req)
    23. # print(resp.to_json_string())
    24. return resp.TargetText
    25. except TencentCloudSDKException as err:
    26. print(err)
    27. return ''


    1. print(choiceFunction(StatusStore(func_name='翻译', msg='你好')))
    2. # 输出:
    3. {"TargetText": "Hello", "Source": "zh", "Target": "en", "RequestId": "a1b17f47-751e-44cd-89a5-6a22e9f2c444"}
    4. Hello



            天气部分,我们是用免费的和风天气API:实时天气 - API
            首先也要进行登录并获取KEY,这个步骤官网讲的很详细,图文并茂的,这边就不多写了,大家可以跳转过去(注意我们选的是Web API):创建应用和KEY - RESOURCE



    1. def weather(city:str) -> str:
    2. url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
    3. url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
    4. weather_key = 'xxxxx' # 和风天气控制台的key
    5. # 实况天气
    6. def getCityId(city_kw):
    7. url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
    8. city = requests.get(url_v2).json()['location'][0]
    9. return city['id']
    10. city_name = '广州'
    11. city_id = getCityId(city_name)
    12. url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
    13. res = requests.get(url).json()
    14. text = "<天气信息获取失败>"
    15. if res['code'] == '200' or res['code'] == 200:
    16. text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
    17. city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
    18. return text




            这部分用的是天行数据,免费会员每天赠送100次调用额度:今日头条新闻API接口 - 天行数据TianAPI。先注册账号,然后点击“申请接口”即可。




    1. def hotNews(status_store:StatusStore) -> str:
    2. tianxing_key = 'e05966abe0b054686c9f6b7d60e59a8d'
    3. def common(data):
    4. url = data + '?key={}'.format(tianxing_key)
    5. res = requests.get(url).json()
    6. return res['newslist']
    7. res = common('http://api.tianapi.com/topnews/index')
    8. tops = []
    9. index = 1
    10. for item in res:
    11. tops.append(str(index) + '. ' + item['title'])
    12. index += 1
    13. return '\n'.join(tops[0:10])





            还是这个链接:云产品体验 - 腾讯云,在“云产品体验-基础-对象存储COS”下面。对象存储不止可以用来存文件,这里我们只用来存图片。

    1. 领取完成后,进入控制台,创建存储桶:

    1. 配置存储桶信息,访问权限设置为“公有读私有写”,这样别人就能看到了,便于分享图片:

    1. 创建完成后,就可以通过Python APi去控制上传了。不过需要先安装下SDK:
    pip install -U cos-python-sdk-v5
    1. 上传图片部分,我们现在随便拿一张图测试:
    1. from qcloud_cos import CosConfig
    2. from qcloud_cos import CosS3Client
    3. import os
    4. def uploadImage(image_path:str) -> str:
    5. bucket_id = 'image-1253093297' # 存储桶的名称
    6. secret_id = 'xxx'
    7. secret_key = 'xxx'
    8. region = 'ap-guangzhou' # 存储桶的地区
    9. token = None
    10. scheme = 'https'
    11. config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
    12. client = CosS3Client(config)
    13. # 本地文件形式上传
    14. # response = client.upload_file(
    15. # Bucket=bucket_id,
    16. # LocalFilePath=image_path,
    17. # Key=image_path.split(os.sep)[-1],
    18. # PartSize=1,
    19. # MAXThread=10,
    20. # EnableMD5=False
    21. # )
    22. # 网络文件形式上传
    23. file_keyname = image_path.split('/')[-2] + '.jpg'
    24. stream = requests.get(image_path)
    25. response = client.put_object(
    26. Bucket=bucket_id,
    27. Body=stream,
    28. Key=file_keyname
    29. )
    30. print(response['ETag'])
    31. img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket_id, region, file_keyname)
    32. return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
    1. 上传完成后,在控制台就可以看到了。

    1. 更多cos操作可看官方文档:对象存储 快速入门-SDK 文档-文档中心-腾讯云



    [{'type': 'FriendMessage', 'messageChain': [{'type': 'Source', 'id': 55312, 'time': 1662048857}, {'type': 'Image', 'imageId': '{DCAD8B29-D606-B354-117D-F39479C14FE3}.jpg', 'url': 'http://c2cpicdw.qpic.cn/offpic_new/1061700625//1061700625-141936558-DCAD8B29D606B354117DF39479C14FE3/0?term=2', 'path': None, 'base64': None}], 'sender': {'id': 1061700625, 'nickname': '热心市民', 'remark': '热心市民'}}]




    1. MultiFunction类中添加功能函数的实现,入参尽量为字符串型,返回也为字符串型
    2. function_map中添加函数信息;


    1. # 疫情信息
    2. def getYiQing():
    3. url = 'https://c.m.163.com/ug/api/wuhan/app/data/list-total?t={}'.format(329091037164)
    4. headers = {
    5. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
    6. 'Chrome/96.0.4664.110 Safari/537.36 '
    7. }
    8. res = requests.get(url, headers=headers).json()
    9. total = res['data']['chinaTotal']['total']
    10. today = res['data']['chinaTotal']['today']
    11. a = 99
    12. symbol_today = '+' if today['confirm'] >= 0 else ''
    13. symbol_total = '+' if today['storeConfirm'] >= 0 else ''
    14. symbol_input = '+' if today['input'] >= 0 else ''
    15. confirmTotal = '累计确诊:{},较昨日:{}{}'.format(total['confirm'], symbol_today, today['confirm'])
    16. confirmToday = '现有确诊:{},较昨日:{}{}'.format(total['confirm'] - total['dead'] - total['heal'], symbol_total, today['storeConfirm'])
    17. inputs = '境外输入:{},较昨日:{}{}'.format(total['input'], symbol_input, today['input'])
    18. return inputs + '\n' + confirmToday + '\n' + confirmTotal
    19. # 历史上的今天
    20. def getHistoryToday():
    21. url = 'https://api.oick.cn/lishi/api.php'
    22. res = requests.get(url).json()
    23. historyToday = []
    24. for item in res['result']:
    25. historyToday.append(item['date'] + ', ' + item['title'])
    26. return '\n'.join(random.choices(historyToday, k=3))
    27. # 一言
    28. def dailysentence():
    29. url = 'https://res.abeim.cn/api-text_yiyan'
    30. res = requests.get(url).json()
    31. return res['content']
    32. # 天行api
    33. def common(data):
    34. tianxing_key = '' # 天行key
    35. url = data + '?key={}'.format(tianxing_key)
    36. res = requests.get(url).json()
    37. return res['newslist']
    38. # 天行api - 小窍门
    39. def dailyTips():
    40. res = common('http://api.tianapi.com/qiaomen/index')
    41. tipsArray = res[0]
    42. return tipsArray['content']
    43. # 天行api - 健康小知识
    44. def healthTips():
    45. res = common('http://api.tianapi.com/healthtip/index')
    46. tipsArray = res[0]
    47. return tipsArray['content']


    1. if msg_text.strip() == '功能清单':
    2. res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())




    • 树莓派上也运行mirai。通过设置不同的protocol,是可以实现同时在线的。
    • 通过MQTT通信。这个比较好用,是个物联网协议,广泛适用于IoT场景,推荐。



            树莓派由于不在身边,因此这部分暂时先略过,大家可以通过上面几篇博客自学一下,他们也都是使用到了mirai的。这里讲一下MQTT的安装,也可以参考安装EMQX MQTT

    1. SSH进入我们的服务器后,输入指令:
    1. sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
    2. sudo apt-get update
    3. sudo apt-get install mosquitto
    4. sudo apt-get install mosquitto-clients
    5. sudo apt clean
    1. 然后去腾讯云控制台开放下1883端口。
    2. 再在防火墙软件上也放行下1883端口
    1. sudo firewall-cmd --permanent --zone=public --add-port=1883/tcp && sudo firewall-cmd --reload
    2. sudo systemctl start firewalld.service
    1. 通过Python调用MQTT的示例可以参考:Python MQTT
    2. 想了解学习MQTT概念的可以参考:MQTT V3.1协议规范
    3. 更多MQTT使用示例可以参考:

      1. Ubuntu18和Raspbian搭建LAMP环境+部署图片上传网页+安装Mosquitto
      2. 纯JavaScript实现的MQTT智能门锁
      3. Qt搭建MQTT环境






    1. import json
    2. import os
    3. import requests
    4. from flask import Flask, request
    5. from time import sleep
    6. import threading
    7. import json
    8. from tencentcloud.common import credential
    9. from tencentcloud.common.profile.client_profile import ClientProfile
    10. from tencentcloud.common.profile.http_profile import HttpProfile
    11. from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
    12. from tencentcloud.tmt.v20180321 import tmt_client, models
    13. import requests
    14. from qcloud_cos import CosConfig
    15. from qcloud_cos import CosS3Client
    16. # ---------------------------- 变量定义区 ---------------------------- #
    17. # 自己运行mirai的服务器IP和mirai-api-http的监听端口
    18. mirai_server_url = ''
    19. # settings.yml中的verifyKey
    20. auth_key = '1234567890'
    21. # mirai登录的QQ
    22. bind_qq = 'xxx'
    23. # 我们自己用的主QQ。接收到的消息里,QQ是int类型的
    24. target_qq = 123123
    25. # 腾讯COS存储桶的名称
    26. tencent_cos_bucket_id = 'xxx'
    27. # 腾讯COS存储桶的地区
    28. tencent_cos_region = 'ap-guangzhou'
    29. # 腾讯控制台的SecretId
    30. tencent_secret_id = 'xxx'
    31. # 腾讯控制台的SecretKey
    32. tencent_secret_key = 'xxx'
    33. # 腾讯机器翻译的appid
    34. tencent_translate_AppId = xxx
    35. # 和风天气API的key
    36. weather_key = 'xxx'
    37. # 天行API的key
    38. tianxing_key = 'xxx'
    39. # ------------------------------------------------------------------ #
    40. class Logger:
    41. def __init__(self, level='debug'):
    42. self.level = level
    43. def DebugLog(self, *args):
    44. if self.level == 'debug':
    45. print(*args)
    46. def TraceLog(self, *args):
    47. if self.level == 'trace':
    48. print(*args)
    49. def setDebugLevel(self, level):
    50. self.level = level.lower()
    51. class QQBot:
    52. def __init__(self):
    53. self.addr = mirai_server_url
    54. self.session = None
    55. def verifySession(self, auth_key):
    56. """每个Session只能绑定一个Bot,但一个Bot可有多个Session。
    57. session Key在未进行校验的情况下,一定时间后将会被自动释放"""
    58. data = {"verifyKey": auth_key}
    59. url = self.addr+'verify'
    60. res = requests.post(url, data=json.dumps(data)).json()
    61. logger.DebugLog(res)
    62. if res['code'] == 0:
    63. return res['session']
    64. return None
    65. def bindSession(self, session, qq):
    66. """校验并激活Session,同时将Session与一个已登录的Bot绑定"""
    67. data = {"sessionKey": session, "qq": qq}
    68. url = self.addr + 'bind'
    69. res = requests.post(url, data=json.dumps(data)).json()
    70. logger.DebugLog(res)
    71. if res['code'] == 0:
    72. self.session = session
    73. return True
    74. return False
    75. def releaseSession(self, session, qq):
    76. """不使用的Session应当被释放,长时间(30分钟)未使用的Session将自动释放,
    77. 否则Session持续保存Bot收到的消息,将会导致内存泄露(开启websocket后将不会自动释放)"""
    78. data = {"sessionKey": session, "qq": qq}
    79. url = self.addr + 'release'
    80. res = requests.post(url, data=json.dumps(data)).json()
    81. logger.DebugLog(res)
    82. if res['code'] == 0:
    83. return True
    84. return False
    85. def fetchLatestMessage(self, session):
    86. url = self.addr + 'fetchLatestMessage?count=10&sessionKey='+session
    87. res = requests.get(url).json()
    88. if res['code'] == 0:
    89. return res['data']
    90. return None
    91. def parseGroupMsg(self, data):
    92. res = []
    93. if data is None:
    94. return res
    95. for item in data:
    96. if item['type'] == 'GroupMessage':
    97. type = item['messageChain'][-1]['type']
    98. if type == 'Image':
    99. text = item['messageChain'][-1]['url']
    100. elif type == 'Plain':
    101. text = item['messageChain'][-1]['text']
    102. elif type == 'Face':
    103. text = item['messageChain'][-1]['faceId']
    104. else:
    105. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    106. continue
    107. name = item['sender']['memberName']
    108. group_id = str(item['sender']['group']['id'])
    109. group_name = item['sender']['group']['name']
    110. res.append({'text': text, 'type': type, 'name': name, 'groupId': group_id, 'groupName': group_name})
    111. return res
    112. def getMessageCount(self, session):
    113. url = self.addr + 'countMessage?sessionKey='+session
    114. res = requests.get(url).json()
    115. if res['code'] == 0:
    116. return res['data']
    117. return 0
    118. def peekMessage(self, session):
    119. url = self.addr + 'peekMessage?sessionKey='+session
    120. res = requests.get(url).json()
    121. if res['code'] == 0:
    122. return res['data']
    123. return 0
    124. def sendPlainTextToGroup(self, session, group, msg):
    125. msg_list = msg.split(r'\n')
    126. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    127. data = {
    128. "sessionKey": session,
    129. "group": group,
    130. "messageChain": msg_chain
    131. }
    132. url = self.addr + 'sendGroupMessage'
    133. try:
    134. res = requests.post(url, data=json.dumps(data)).json()
    135. except:
    136. logger.DebugLog(">> 转发失败")
    137. return 0
    138. logger.DebugLog(">> 请求返回:" + str(res))
    139. if res['code'] == 0:
    140. return res['messageId']
    141. return 0
    142. def sendMsgToGroup(self, session, group, msg):
    143. text = msg['text']
    144. type = msg['type']
    145. name = msg['name']
    146. group_id = msg['groupId']
    147. group_name = msg['groupName']
    148. content1 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n{}".format(
    149. name, group_id, group_name, text)
    150. content2 = "【消息中转助手】\n用户:{}\n群号:{}\n群名:{}\n消息:\n".format(
    151. name, group_id, group_name)
    152. logger.DebugLog(">> 消息类型:" + type)
    153. if type == 'Plain':
    154. message = [{"type": type, "text": content1}]
    155. elif type == 'Image':
    156. message = [
    157. {"type": 'Plain', "text": content2},
    158. {"type": type, "url": text}]
    159. elif type == 'Face':
    160. message = [{"type": 'Plain', "text": content2},
    161. {"type": type, "faceId": text}]
    162. else:
    163. logger.TraceLog(">> 当前消息类型暂不支持转发:=> "+type)
    164. return 0
    165. data = {
    166. "sessionKey": session,
    167. "group": group,
    168. "messageChain": message
    169. }
    170. logger.DebugLog(">> 消息内容:" + str(data))
    171. url = self.addr + 'sendGroupMessage'
    172. try:
    173. res = requests.post(url, data=json.dumps(data)).json()
    174. except:
    175. logger.DebugLog(">> 转发失败")
    176. return 0
    177. logger.DebugLog(">> 请求返回:" + str(res))
    178. if res['code'] == 0:
    179. return res['messageId']
    180. return 0
    181. def sendMsgToAllGroups(self, session, receive_groups, send_groups, msg_data):
    182. # 对每条消息进行检查
    183. for msg in msg_data:
    184. group_id = msg['groupId']
    185. # 接收的消息群正确(目前只支持 消息类型)
    186. if group_id in receive_groups:
    187. # 依次将消息转发到目标群
    188. for g in send_groups:
    189. logger.DebugLog(">> 当前群:"+g)
    190. if g == group_id:
    191. logger.DebugLog(">> 跳过此群")
    192. continue
    193. res = self.sendMsgToGroup(session, g, msg)
    194. if res != 0:
    195. logger.TraceLog(">> 转发成功!{}".format(g))
    196. def sendFriendMessage(self, session, qq, msg):
    197. msg_list = msg.split(r'\n')
    198. msg_chain = [{ "type": "Plain", "text": m+'\n' } for m in msg_list]
    199. data = {
    200. "sessionKey": session,
    201. "target": qq,
    202. "messageChain": msg_chain
    203. }
    204. url = self.addr + 'sendFriendMessage'
    205. try:
    206. res = requests.post(url, data=json.dumps(data)).json()
    207. except:
    208. logger.DebugLog(">> 发送失败")
    209. return 0
    210. if res['code'] == 0:
    211. return res['messageId']
    212. return 0
    213. def analyzeFriendMsg(self, data):
    214. if data is None or data['type'] != 'FriendMessage':
    215. return None, None, None
    216. sender_id = data['sender']['id']
    217. msg_type = data['messageChain'][-1]['type']
    218. if msg_type == 'Plain':
    219. msg_text = data['messageChain'][-1]['text']
    220. elif msg_type == 'Image':
    221. msg_text = data['messageChain'][-1]['url']
    222. else:
    223. msg_text = ''
    224. return sender_id, msg_type, msg_text
    225. logger = Logger()
    226. bot = QQBot()
    227. app = Flask(__name__)
    228. def qqTransfer():
    229. with open('conf.json', 'r+', encoding="utf-8") as f:
    230. content = f.read()
    231. conf = json.loads(content)
    232. auth_key = conf['auth_key']
    233. bind_qq = conf['bind_qq']
    234. sleep_time = conf['sleep_time']
    235. debug_level = conf['debug_level']
    236. receive_groups = conf['receive_groups']
    237. send_groups = conf['send_groups']
    238. logger.setDebugLevel(debug_level)
    239. session = bot.verifySession(auth_key)
    240. logger.DebugLog(">> session: "+session)
    241. bot.bindSession(session, bind_qq)
    242. while True:
    243. cnt = bot.getMessageCount(session)
    244. if cnt:
    245. logger.DebugLog('>> 有消息了 => {}'.format(cnt))
    246. logger.DebugLog('获取消息内容')
    247. data = bot.fetchLatestMessage(session)
    248. if len(data) == 0:
    249. logger.DebugLog('消息为空')
    250. continue
    251. logger.DebugLog(data)
    252. logger.DebugLog('解析消息内容')
    253. data = bot.parseGroupMsg(data)
    254. logger.DebugLog(data)
    255. logger.DebugLog('转发消息内容')
    256. bot.sendMsgToAllGroups(session, receive_groups, send_groups, data)
    257. # else:
    258. # logger.DebugLog('空闲')
    259. sleep(sleep_time)
    260. bot.releaseSession(session, bind_qq)
    261. class StatusStore:
    262. def __init__(self, from_qq:int=None, is_cmd:bool=False, func_name:str=None, need_second:bool=False, msg:str=None) -> None:
    263. self.from_qq = from_qq # 发送者的QQ号
    264. self.is_cmd = is_cmd # 是否是指令(选择功能)
    265. self.func_name = func_name # 选择的功能的名称
    266. self.need_second = need_second # 是否需要经过两步:先发cmd指令,再发详细内容
    267. self.msg = msg # 本次发送的消息内容
    268. def detail(self):
    269. return self.__dict__
    270. class MultiFunction:
    271. """多功能函数集合"""
    272. def __init__(self) -> None:
    273. pass
    274. @staticmethod
    275. def translate(original:str, convert:str='en'):
    276. try:
    277. cred = credential.Credential(tencent_secret_id, tencent_secret_key)
    278. client = tmt_client.TmtClient(cred, "ap-guangzhou")
    279. req = models.TextTranslateRequest()
    280. params = {
    281. "SourceText": original,
    282. "Source": "auto",
    283. "Target": convert,
    284. "ProjectId": tencent_translate_AppId
    285. }
    286. req.from_json_string(json.dumps(params))
    287. resp = client.TextTranslate(req)
    288. # print(resp.to_json_string())
    289. return resp.TargetText
    290. except TencentCloudSDKException as err:
    291. print(err)
    292. return ''
    293. @staticmethod
    294. def uploadImage(image_path:str) -> str:
    295. token = None
    296. scheme = 'https'
    297. config = CosConfig(Region=tencent_cos_region, SecretId=tencent_secret_id, SecretKey=tencent_secret_key, Token=token, Scheme=scheme)
    298. client = CosS3Client(config)
    299. # 本地文件形式上传
    300. # response = client.upload_file(
    301. # Bucket=bucket_id,
    302. # LocalFilePath=image_path,
    303. # Key=image_path.split(os.sep)[-1],
    304. # PartSize=1,
    305. # MAXThread=10,
    306. # EnableMD5=False
    307. # )
    308. # 网络文件形式上传
    309. file_keyname = image_path.split('/')[-2] + '.jpg'
    310. stream = requests.get(image_path)
    311. response = client.put_object(
    312. Bucket=tencent_cos_bucket_id,
    313. Body=stream,
    314. Key=file_keyname
    315. )
    316. print(response['ETag'])
    317. img_url = 'https://{}.cos.{}.myqcloud.com/{}'.format(tencent_cos_bucket_id, tencent_cos_region, file_keyname)
    318. return '上传成功, ETag: {}\nURL: {}'.format(response['ETag'], img_url)
    319. @staticmethod
    320. def weather(city_name:str='广州') -> str:
    321. url_api_weather = 'https://devapi.qweather.com/v7/weather/now?'
    322. url_api_geo = 'https://geoapi.qweather.com/v2/city/lookup?'
    323. # 实况天气
    324. def getCityId(city_kw):
    325. url_v2 = url_api_geo + 'location=' + city_kw + '&key=' + weather_key
    326. city = requests.get(url_v2).json()['location'][0]
    327. return city['id']
    328. city_id = getCityId(city_name)
    329. url = url_api_weather + 'location=' + city_id + '&key=' + weather_key
    330. res = requests.get(url).json()
    331. text = "<天气信息获取失败>"
    332. print(res)
    333. if res['code'] == '200' or res['code'] == 200:
    334. text = '实时天气:\n 亲爱的 小主, 您所在的地区为 {},\n 现在的天气是 {},\n 气温 {}°, 湿度 {}%,\n 体感气温为 {}°,\n 风向 {}, 风速 {}km/h'.format(
    335. city_name, res['now']['text'], res['now']['temp'], res['now']['humidity'], res['now']['feelsLike'], res['now']['windDir'], res['now']['windSpeed'])
    336. return text
    337. @staticmethod
    338. def hotNews(status_store:StatusStore) -> str:
    339. def common(data):
    340. url = data + '?key={}'.format(tianxing_key)
    341. res = requests.get(url).json()
    342. return res['newslist']
    343. res = common('http://api.tianapi.com/topnews/index')
    344. tops = []
    345. index = 1
    346. for item in res:
    347. tops.append(str(index) + '. ' + item['title'])
    348. index += 1
    349. return '\n'.join(tops[0:10])
    350. # 多功能函数的映射
    351. # function: 功能对应函数名
    352. # need_second: 是否需要经过两步:先发cmd指令,再发详细内容
    353. # desc: 需要经过两步时,第一次返回的提示语
    354. function_map = {
    355. '翻译': {'function': MultiFunction.translate, 'need_second': True, 'desc': '请输入您要翻译的内容~'},
    356. '天气': {'function': MultiFunction.weather, 'need_second': True, 'desc': '请问是哪座城市的天气呢?'},
    357. '热搜': {'function': MultiFunction.hotNews, 'need_second': False},
    358. '上传图片': {'function': MultiFunction.uploadImage, 'need_second': True, 'desc': '请发送图片过来吧~'},
    359. }
    360. def choiceFunction(store_obj:StatusStore):
    361. res = ''
    362. if function_map.get(store_obj.func_name):
    363. res = function_map.get(store_obj.func_name)['function'](store_obj.msg)
    364. return res
    365. def xiaofengzai():
    366. sleep_time = 1 # 轮询间隔
    367. status_store = {}
    368. session = bot.verifySession(auth_key)
    369. logger.DebugLog(">> session: "+session)
    370. bot.bindSession(session, bind_qq)
    371. while True:
    372. cnt = bot.getMessageCount(session)
    373. if not cnt:
    374. sleep(sleep_time)
    375. continue
    376. logger.DebugLog('>> 有消息了 => {}'.format(cnt))
    377. logger.DebugLog('获取消息内容')
    378. data = bot.fetchLatestMessage(session)
    379. if len(data) == 0:
    380. logger.DebugLog('消息为空')
    381. sleep(sleep_time)
    382. continue
    383. logger.DebugLog(data)
    384. logger.DebugLog('解析消息内容')
    385. sender_id, msg_type, msg_text = bot.analyzeFriendMsg(data[0])
    386. if not sender_id or sender_id != target_qq:
    387. sleep(sleep_time)
    388. continue
    389. if msg_text.strip() == '功能清单':
    390. res = '目前支持的关键词有:\n' + '\n'.join(function_map.keys())
    391. elif msg_text.strip().lower().startswith('cmd'):
    392. _, func_name = msg_text.strip().split('\n')[0].split()
    393. func_name = func_name.strip()
    394. store_obj = StatusStore(from_qq=sender_id, is_cmd=True, func_name=func_name)
    395. # 不需要发两次,直接调用函数返回结果即可
    396. func_info = function_map.get(func_name)
    397. if not func_info:
    398. res = '指令[{}]暂不支持'.format(func_name)
    399. elif func_info.get('need_second'):
    400. res = '收到你的指令:{}\n{}'.format(func_name, func_info.get('desc') or '已进入对应状态, 请继续发送详细内容')
    401. # 添加或更新记录
    402. status_store[sender_id] = store_obj
    403. else:
    404. res = '请求结果为:\n' + str(choiceFunction(store_obj))
    405. status_store.pop(sender_id, '')
    406. else:
    407. res = '请先发送指令哦...'
    408. store_obj = status_store.get(sender_id)
    409. if store_obj and store_obj.is_cmd:
    410. store_obj.msg = msg_text
    411. res = '请求结果为:\n' + str(choiceFunction(store_obj))
    412. status_store.pop(sender_id, '')
    413. bot.sendFriendMessage(session, qq=sender_id, msg=res)
    414. @app.route('/QQ/send', methods=['GET'])
    415. def qqListenMsg():
    416. # 类似于Qmsg的功能
    417. # flask做得接收HTTP请求转为QQ消息
    418. qq = request.args.get('target', None)
    419. msg = request.args.get('msg', None)
    420. bot.sendFriendMessage(bot.session, qq, msg)
    421. return 'Hello World!'
    422. @app.route('/QQ/send/friend', methods=['GET'])
    423. def qqListenMsgToFriend():
    424. # 类似于Qmsg的功能
    425. # flask做得接收HTTP请求转为QQ消息
    426. qq = request.args.get('target', None)
    427. msg = request.args.get('msg', None)
    428. bot.sendFriendMessage(bot.session, qq, msg)
    429. return 'Hello World! Friend!'
    430. @app.route('/QQ/send/group', methods=['GET'])
    431. def qqListenMsgToGroup():
    432. # 类似于Qmsg的功能
    433. # flask做得接收HTTP请求转为QQ消息
    434. qq = request.args.get('target', None)
    435. msg = request.args.get('msg', None)
    436. bot.sendPlainTextToGroup(bot.session, qq, msg)
    437. return 'Hello World! Group!'
    438. if __name__ == '__main__':
    439. t = threading.Thread(target=xiaofengzai)
    440. t.setDaemon(True)
    441. t.start()
    442. # t = threading.Thread(target=qqTransfer)
    443. # t.setDaemon(True)
    444. # t.start()
    445. app.run(port='9966', host='')

