曾几何时,wpe 是一个网络游戏抓包神器,我们在做协议测试的时候,也是通过 wpe 来抓包、改包和发包,来测试服务器对于非法协议的处理逻辑是否正确。wpe 操作简单,只需要加载进程,启动抓包,停止抓包就能看到所有与服务器之间的封包记录,但美中不足的是,wpe 抓取的封包内容全部是 16 进制显示,有时候我们需要改动某个字段,还需要先破译并找到字段位置来做修改,可读性比较差。
随着时代的发展,现在的游戏做了很多的安全措施,wpe 已无法找到目标游戏的进程,其他的抓包工具也很少有能满足测试需求的。而且,我们公司的测试组有自己编写的机器人,在之前,客户端和服务端之间的详细的交互逻辑,我们知道的并不是很清楚,比如我执行一个操作之后,客户端会发送哪些协议,会收到哪些协议返回?协议的先后顺序是怎样的?在不知道这些信息的时候,机器人的脚本行为编写很慢,效率比较低。
为了更好的配合机器人脚本的编写,以及针对我们的游戏项目进行协议测试,我们写了一个简单的小工具来达到显示封包收发过程以及插包改包来验证服务器逻辑的目的(PS:仅适合针对自己公司内部项目,因为需要知道协议文档嘿嘿),通过这个工具,客户端与服务端的每一条协议交互,都会按照先后顺序进行明文显示,这样在我们编写机器人行为的时候,也能够帮我们更好的梳理机器人协议发送的流程,更快的完成行为方法的编写,另外因为进行了明文显示,在协助我们做协议测试方面也比 wpe 更好。接下来我们来实现这个抓包发包工具。
提前声明:本人文笔有限,python 纯自学,代码能力也有限,如对文中内容有不同看法,请合理讨论,或者私聊我,勿喷,谢谢。
图有点乱,不过它大概是下面这样的:
1.wpe 加载客户端进程,然后将修改后的 dll 注入到客户端进程,hook 客户端的 send 和 recv 方法调用。
2.客户端进行发包操作,协议在通过客户端的 socket 时,会通过注入的 dll 进行处理,然后再发给服务端(处理可能包括拦截、修改等)。
3.服务端在返回封包之后,也会通过客户端的 socket,调用 dll 进行处理,然后再发给客户端。
4.wpe 可以直接插入伪装包,通过 dll 调用直接将伪装的封包通过客户端的 socket 发给服务端。
这里面最关键的是 dll 注入和 hook 钩子,不过这两个我研究了一阵子,没研究懂,所以我不会,尴尬…不过没关系,接下来我们换一种思路来做。
首先来分析一下我们想要的功能,请看下图(图片仅供参考,但需要自行使用 QT 设计一个类似的界面出来):
这个是目前已经做出来的一个小工具,大概分了几个区块:
1.目标服务器: 就是游戏需要登录的那个服务器地址
2.进制转换功能: 做这个东西主要是为了方便解读十六进制,当时用 wpe 测试的时候,解析字节流太繁琐了,产生了心理阴影。
3.协议生成和发送: 可以填入协议号和参数组自动生成十六进制的协议,也可以模拟发送,实现 wpe 的插包功能。
4.其他功能: 其他想要实现的辅助测试的功能,需要自己写。
5.协议记录: 发送协议和接收协议的全部记录。
6.日志: 记录一些过程和错误内容。
7.代理: 工具的核心,实现所有的协议转发和修改功能。
刚刚讲到了,dll 注入有点难,我们需要换一种思路,上面这个小工具我是用代理 agent 的方式来实现的,原理图如下:
从图中可以看到,原本客户端与服务器是通过 socket 直连的,但是我们无法注入 dll 的话,就无法对收发过程进行干预,所以我们就需要在他们之间加一个代理,客户端发给服务器的包,先发到代理这里,然后代理再转发给服务器,同样的,服务器发给客户端的包,也是先发到代理这里,然后再转发给客户端。由于代理的功能是我们自己来实现的,所以在接收到客户端发给服务器的或者服务器返回的封包时,我们可以对其进行任意的修改。
接下来我们需要设计工具界面,并实现这个工具的各个功能:
由于各个模块都与代理的功能挂钩,所以我们先实现代理的功能,以便后续关联 UI 的时候方便调试。
先来看下代理原理分析图:
1.代理 server 启动监听,当客户端尝试连接服务器的时候,创建一个代理与客户端之间的链接 C_A_socket。
2.然后创建一个代理与服务器之间的链接 S_A_socket。
3.调用代理的 start 方法,启动发包、收包、解析等线程。发包线程持续从 C_A_socket 处 recv,收到协议之后根据需求进行一定的处理 (修改、拦截等),然后从 S_A_socket 处 send 给服务器,收包线程刚好相反,从 S_A_socket 处 recv,然后从 C_A_socket 处 send 给客户端,完成协议的转发。解析线程是将收发协议进行明文化解析,显示到工具上,方便查看。
接下来我们来实现代理的功能:
首先我们写一个 AgentServer 类,这个类启动后,绑定一个本地端口并进行监听,当监听到客户端连接后,启动一个代理实例。
class AgentServer:
def __init__(self, agent_port, server_host, server_port, ui):
"""
agent_server初始化
:param agent_port: 本地代理的绑定端口
:param server_host: 服务器的ip
:param server_port: 服务器的端口
"""
self.ui = ui # 工具ui,后面跟工具做关联的时候会用到 # 初始化一个socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 将socket绑定本地ip和端口进行监听 self.socket.bind(('127.0.0.1', agent_port))
self.socket.listen(5)
self.server_host = server_host
self.server_port = server_port
# 生成一个空的代理对象 self.agent_obj = None
def start(self):
"""
启动agent_server,等待客户端的链接请求
:return:
"""
while True:
# 等待客户端连接后获取到与客户端的socket client_socket, addr = self.socket.accept()
# 创建一个与服务器连接的socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect((self.server_host, self.server_port))
# 生成一个代理实例,将client_socket和server_socket传给代理 self.agent_obj = socket_agent(client_socket, server_socket, self.ui)
# 启动代理 self.agent_obj.start()
def stop(self):
"""
停止agent_server,关闭socket监听,并停止agent
:return:
"""
# 关闭agent_server的socket self.socket.close()
# 停止agent_obj self.agent_obj.stop()
接下来我们写一个代理类,Agent,并实现它的 start 和 stop 方法。
class Agent:
def __init__(self, client_socket, server_socket, ui):
"""
初始化一个agent代理
:param client_socket: 与客户端之间的socket连接
:param server_socket: 与服务器之间的socket连接
"""
self.ui = ui # 工具ui,后面跟工具做关联的时候会用到 self.client_socket = client_socket
self.server_socket = server_socket
self.sock_Num = 0 # 验证码,游戏发包验证 self.send_data = b'' # 发送协议包池 self.recv_data = b'' # 接收协议包池 self.alive = True # 是否存活 self.hide_proto = False # 是否屏蔽协议 self.loop_send = False # 循环发送状态
start 方法后面再写,先写一个 stop 方法
def stop(self):
"""
停止转发器,关闭所有线程
:return:
"""
self.alive = False
try:
self.client_socket.shutdown(2)
self.client_socket.close()
except Exception as e:
print('socket 关闭时出现了异常', e)
try:
self.server_socket.shutdown(2)
self.server_socket.close()
except Exception as e:
print('socket 关闭时出现了异常', e)
写一个客户端协议处理方法,从客户端处持续 recv 字节流,并放入包池中。
这里不直接把字节流转发给服务器,因为我们可能需要对它做一些处理,如修
改发包顺序验证码等。
def agent_clinet_to_server(self):
"""
处理客户端发送给服务器的包
1.获取客户端发起的字节流
2.将字节流全部粘贴在一起进行后续处理
3.此处不调用server socket进行转发,因为发送协议需要进行一些特殊的处理
4.循环此步骤
:return:
"""
# 首先判断agent是否存活状态 if self.alive:
try:
# 从客户端处接收字节流 data = self.client_socket.recv(65535)
if data:
# 将收到的字节流放入包池中等待后续处理 self.send_data += data
# 重新调用这个方法接收客户端的封包 Timer(0, self.agent_clinet_to_server).start()
else:
print('收到了空字节流,socket连接可能已经断开', data)
except OSError as e:
# 如果触发了OSError异常,说明socket可能已经断开了 self.alive = False
print('连接已断开!', e)
except:
Timer(0, self.agent_clinet_to_server).start()
写一个服务器协议转发方法,从服务器持续 recv 字节流,并放入包池中待解析,然后收到的字节流直接转发给客户端,因为服务器返回的协议我们无需特殊处理。
def agent_server_to_client(self):
"""
处理服务器发送给客户端的包
1.获取服务器返回的字节流
2.将字节流全部粘贴在一起进行后续解析显示
3.调用client socket,将服务器返回的字节流直接转发给客户端
4.循环此步骤
:return:
"""
if self.alive:
try:
# 从服务器处接收字节流 data = self.server_socket.recv(65535)
if data:
# 通过client_socket直接转发给客户端 self.client_socket.send(data)
# 同时将受到的字节流放入包池中等待解析 self.recv_data += data
# 重新调用这个方法接收服务器的封包 Timer(0, self.agent_server_to_client).start()
except OSError as e:
# 如果触发了OSError异常,说明socket可能已经断开了 self.alive = False
print('连接已断开!', e)
except:
Timer(0, self.agent_server_to_client).start()
接下来写一个修改发送包,转发给服务器并显示到 UI 上的方法,在这个方法里面调用了两个外部方法,分别是 edit_bytes 和 make_send_proto_msg。
edit_bytes 方法 是将原本的客户端协议替换发包顺序验证码(比如验证码的顺序在客户端那里是自加 1,如果我们插个包发给服务器,要修改这个验证码 +1,但是客户端自身的验证码还是原来那个,所以会导致后面的协议验证码冲突,简单粗暴的做法就是所有的验证码都从代理工具生成,客户端发过来的协议验证码被无条件替换)。
make_send_proto_msg 方法 是将协议 obj 转化成一个解析后的字符串 (具体可以看上面那个工具图的协议接收和发送记录那里,我是直接调用的协议文档转成的 python 文件,由于涉及到文档内容,这里没有列出,请见谅,这里只要实现一个方法,将协议封包对象明文显示即可),解析错误的话就将错误信息发送到日志文本展示框那里进行展示,这两个方法的具体实现逻辑这里就不写了。
def get_and_analyze_one_send_proto(self):
"""
拆解一个发送包并进行明文化显示到工具UI上
然后将这个包做一定修改后发给服务器
:return:
"""
if self.alive:
# 做一下封包完整性判断(7是根据协议结构来的) while len(self.send_data) >= 7 and self.alive:
# 获取协议的协议长度 length = struct.unpack("!H", self.send_data[0:2])[0]
# 判断是否能拆解出一个完整的封包 if length <= len(self.send_data) - 7:
# 通过edit_bytes编辑验证码并重新序列化协议 # PS:由于插包会改变原本的协议顺序验证码 # 所以我们让所有的封包都按照我们的验证码规则来 send_data, proto, self.sock_Num = edit_bytes(
self.send_data[0:7 + length], self.sock_Num)
# 将修改后的协议通过server_socket发送给服务器 self.server_socket.send(send_data)
# 判断是否屏蔽协议 if not self.hide_proto:
# 将通过make_send_proto_msg方法解析的协议内容显示到ui上 self.ui.textBrowser_send.append(
make_send_proto_msg(proto, send_data))
# 截取包池,抛弃已经处理的字节流 self.send_data = self.send_data[7 + length:]
# 重新调用此方法 Timer(0, self.get_and_analyze_one_send_proto).start()
写一个解析接收包,并显示到 UI 上的方法,make_recv_proto_msg 方法同样是将协议 obj 转化成一个解析后的字符串,解析错误的话就将错误信息发送到日志文本展示框那里进行展示,同样的,这个方法的具体实现逻辑这里不再说明。
PS:这两个方法中的 6 和 7 是根据项目的协议格式来的,我们项目的格式是 16 位的协议长度,32 位的协议 id,8 位的验证码,因此是 2+4+1=7,由于服务器返回的协议不需要验证码,所以是 6
def get_and_analyze_one_recv_proto(self):
"""
定义一个拆包并解析这个包的方法
服务器返回的包我们在recv的时候已经直接转发了,所以这里没再转发
:return:
"""
if self.alive:
# 做一下封包完整性判断(6是根据协议结构来的,比上面少了1位的验证码) while len(self.recv_data) >= 6 and self.alive:
# 解析出协议的长度和协议号 length, proto = struct.unpack("!HI", self.recv_data[0:6])
# 判断是否能拆解出一个完整的封包 if length <= len(self.recv_data) - 6:
# 判断是否屏蔽协议 if not self.hide_proto:
# 将通过make_recv_proto_msg方法解析的协议内容显示到ui上 self.ui.textBrowser_recv.append(make_recv_proto_msg(proto, self.recv_data[0:6 + length]))
# 截取包池,抛弃已经处理的字节流 self.recv_data = self.recv_data[6+length:]
# 重新调用此方法 Timer(0, self.get_and_analyze_one_recv_proto).start()
写一个发送伪造包的方法。
def agent_insert_send(self, data, times=1, sleep_time=100):
"""
插入一个伪造的数据包,并发送
:param data:伪造的数据包
:param times:发送次数
:param sleep_time:间隔时间
:return:
"""
try:
if self.alive:
if times:
for i in range(times):
data, proto, self.sock_Num = edit_bytes(data, self.sock_Num)
self.server_socket.send(data)
time.sleep(sleep_time/1000.0)
else:
while self.loop_send:
data, proto, self.sock_Num = edit_bytes(data, self.sock_Num)
self.server_socket.send(data)
time.sleep(sleep_time / 1000.0)
except Exception as e:
print("请先开启客户端并连接服务器!", e)
最后我们再回头写 start 方法。
def start(self):
"""
启动转发线程
启动拆包解包线程
:return:
"""
Timer(0, self.agent_clinet_to_server).start()
Timer(0, self.agent_server_to_client).start()
Timer(0, self.get_and_analyze_one_send_proto).start()
Timer(0, self.get_and_analyze_one_recv_proto).start()
至此,agent_server 和 agent 就写完了,接下来我们让他俩跟 UI 进行绑定。
1.服务器相关区块:
PS:以下方法是写在界面类下面的。
# 绑定启动服务按钮事件
self.ui.start_btn.clicked.connect(self.wpe_start)
# 绑定停止抓包按钮事件
self.ui.stop_btn.clicked.connect(self.wpe_stop)
def wpe_start(self):
"""
启动中转代理agent_server,等待客户端连接
:return:
"""
try:
from pyqt_thread import begin_agent_service
self.agent_thread = begin_agent_service(From=self)
self.agent_thread.start()
except Exception as e:
print('启动失败!出现了异常', e)def wpe_stop(self):
"""
停止中转socket,关闭全部socket连接
:return:
"""
self.ui.textBrowser_send.append("*********服务已停止********")
self.ui.textBrowser_recv.append("*********服务已停止********")
self.agent_server.stop()class begin_agent_service(QThread):
"""
启动转发代理的thread类
"""
def __init__(self, parent=None, From=None):
super(begin_agent_service, self).__init__(parent)
self.ui = From
def run(self):
# 获取server_ip, server_port, my_port server_ip = self.ui.ui.ip_edit_line.text()
server_port = self.ui.ui.port_edit_line.text()
my_port = self.ui.ui.host_port_edit_line.text()
if server_ip and self.ui.ui.host_port_edit_line.text() and self.ui.ui.port_edit_line.text():
if self.ui.get_server_ip():
# 提示开始 self.ui.ui.textBrowser_send.append(
"*********代理已启动,请开启游戏**********")
self.ui.ui.textBrowser_recv.append(
"*********代理已启动,请开启游戏**********")
# AgentServer实例 self.ui.agent_server = AgentServer(
int(my_port), server_ip, int(server_port), self.ui)
Timer(0, self.ui.agent_server.start).start()
print('服务已启动')
else:
# 加个ip格式错误提示 for i in range(10):
self.ui.ui.ip_edit_line.setText('ip格式错误!')
time.sleep(0.05)
self.ui.ui.ip_edit_line.clear()
time.sleep(0.05)
self.ui.ui.ip_edit_line.setText(server_ip)
else:
# 加个参数错误提示 for i in range(10):
self.ui.ui.ip_edit_line.setText('三参不能为空!')
time.sleep(0.05)
self.ui.ui.ip_edit_line.clear()
time.sleep(0.05)
self.ui.ui.ip_edit_line.setText(server_ip)
2.进制转化区块
输入格式参考的是 wpe 那种显示格式,因为当时做的时候主要是针对 wpe 数据进行解析用的。
# 绑定转换int按钮事件
self.ui.h2i_btn.clicked.connect(self.on_h2i_btn_click)
# 绑定转换str按钮事件
self.ui.h2s_btn.clicked.connect(self.on_h2s_btn_click)
def on_h2i_btn_click(self):"""
点击转换为int按钮之后的处理方法
读取文本框中的16进制内容,转换为int类型
:return:
"""# 获取文本框中的内容try:
msg = self.ui.hex_input.toPlainText()
result = str(int(msg.replace(" ", "").lower(), 16))
self.ui.hex_tran_result.setText(result)except:
self.ui.hex_tran_result.setText('输入内容有误,请检查!!!')def on_h2s_btn_click(self):"""
点击转换为str按钮之后的处理方法
读取文本框中的16进制内容,转换为str类型
:return:
"""# 获取文本框中的内容try:
msg = self.ui.hex_input.toPlainText()
result = binascii.a2b_hex(msg.replace(" ", "").lower()).decode()
self.ui.hex_tran_result.setText(result)except:
self.ui.hex_tran_result.setText('输入内容有误,请检查!!!')
3.协议生成和伪造包发送区块
# 绑定发送一次按钮事件
self.ui.once_send_btn.clicked.connect(self.on_once_send_btn_click)
# 绑定循环开始按钮事件
self.ui.loop_send_begin_btn.clicked.connect(self.on_loop_send_begin_btn_click)
# 绑定循环结束按钮事件
self.ui.loop_send_stop_btn.clicked.connect(self.on_loop_send_stop_btn_click)
# 绑定生成协议按钮事件
self.ui.creat_proto_btn.clicked.connect(self.on_creat_proto_btn_click)
def on_creat_proto_btn_click(self):
"""
点击生成协议按钮之后的处理方法
1.首先判断编号格式是否正确
2.判断参数格式是否正确
3.生成一个实例,然后调用对应的协议编号的encode方法,看是否正确返回
4.将生成的字节流转换成WPE字符串显示格式
:return:
"""
# 判断协议号和协议参数是否有内容 if self.ui.proto_id_line.text() and self.ui.proto_msg_line.toPlainText():
try:
# 获取协议号 proto_id = int(self.ui.proto_id_line.text())
try:
# 获取协议参数 args = eval(self.ui.proto_msg_line.toPlainText())
try:
# 声明对应的协议类,并生成协议实例 exec("from proto_cfg.proto_%d import C%d" % (int(proto_id / 1000), proto_id))
obj = eval('C%d' % proto_id)(*args)
# 调用obj序列化方法,获得字节流 proto, length, buf = obj.encode()
# 将字节流拼接后按照wpe的格式显示到文本显示框 # 字节流转wpe的具体实现方法buf_to_wpe这里就不详细说明了 self.ui.textEdit.setText(buf_to_wpe(proto, length, buf))
except:
out_put_to_log("协议生成失败:如确认填写正确,则可能是参数不符合协议格式,请检查对应的协议格式!")
except:
out_put_to_log("协议生成失败:协议参数不正确,请输入正确的参数")
except:
out_put_to_log("协议生成失败:协议编号格式不正确,请输入正确的数字")
else:
self.ui.textEdit.setText("协议号和协议内容不能为空!!!")def on_once_send_btn_click(self):
"""
点击发送一次按钮之后的处理方法
读取文本框中的内容,转换为bytes,组合成一个完整协议,并发送给服务器
:return:
"""
# 首先从发送框得获取封包内容 try:
data = str_to_buf(self.ui.textEdit.toPlainText())
# 判断agent状态是否正常 if self.agent_server.agent_obj.alive:
# 判断封包内容是否符合正确格式,如果是,则发送,否则,提醒格式不正确 if check_data(data):
self.agent_server.agent_obj.agent_insert_send(data)
else:
out_put_to_log("协议检查失败,填入的协议无法正常解析")
else:
self.ui.textEdit.setText("代理服务未启动,请先启动代理服务!")
except:
out_put_to_log("协议发送失败,填入的协议无法正常解析")def on_loop_send_begin_btn_click(self):
"""
点击循环开始按钮之后的处理方法
读取文本框中的内容,转换为bytes,组合成完整的协议,循环发送给服务器
:return:
"""
# 首先从发送框得获取封包内容 data = str_to_byte(self.ui.textEdit.toPlainText())
# 判断agent状态是否正常 if self.agent_server.agent_obj.alive:
# 检查一下封包内容是否符合正确格式,如果是,则发送,否则提醒格式不正确 if check_data(data):
try:
# 获取发送次数 times = int(self.ui.lineEdit.text())
if times == 0:
# 设置一个打断开关 self.agent_server.agent_obj.loop_send = True
try:
# 获取发包间隔时间 sleep_time = int(self.ui.lineEdit_2.text())
# 设置一个最小间隔时间,防止输入错误 if sleep_time < 10:
sleep_time = 10
# 调用agent_insert_send插包方法 Timer(0, self.agent_server.agent_obj.agent_insert_send, [data, times, sleep_time]).start()
except:
out_put_to_log("请检查间隔填写是否正确!")
except:
out_put_to_log("请检查次数填写是否正确!")
else:
out_put_to_log("协议检查失败,填入的协议无法正常解析")
else:
self.ui.textEdit.setText("代理服务未启动,请先启动代理服务!")def on_loop_send_stop_btn_click(self):
"""
点击循环结束按钮之后的处理方法
停止之前进行的循环发协议
:return:
"""
self.agent_server.agent_obj.loop_send = False
4.常用方法区块
这里主要是自己写了几个比较常用的方法,然后跟按钮点击事件进行绑定,这里就不再详细讲了。
5.协议过程和日志的显示
这个主要是将文本信息 append 到 TextBrowser 控件中,这个很简单,这里也就不再讲了,可以自行百度。
工具到这里就基本写完了,在使用的时候,我们需要先在客户端的配置文件里配置一个代理服务器,这个代理服务器的 ip 就是本地 ip127.0.0.1,端口设置为我们预留的一个端口,然后在工具那里配置一个服务器选项,ip 和端口为需要连接的服务器,在使用的时候,先选好工具的服务器,启动服务,然后打开游戏客户端,选择代理服务器(这样客户端在选择对应的服务器时才会与我们的代理创建 socket,而不是直连服务器),这样就可以通过工具中转,连接目标服务器了。
可以在公众号:程序员一凡 自行提取一份216页软件测试工程师面试宝典文档资料【免费的】。以及相对应的视频学习教程免费分享!,其中包括了有基础知识、Linux必备、Shell、互联网程序原理、Mysql数据库、抓包工具专题、接口测试工具、测试进阶-Python编程、Web自动化测试、APP自动化测试、接口自动化测试、测试高级持续集成、测试架构开发测试框架、性能测试、安全测试等。