someip和sd协议信息解析
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
注意下面
如果有车载讨论需要的小伙伴,可以私信加我微信
,拉你进群,和同行业大佬交流
注意上面
提示:这里可以添加本文要记录的大概内容:
最近有机会接触车载以太网通讯相关的技术方案,在解读过程中正好解析下相关协议信息,作为参考
代码如下(示例):
from typing import Optional, List
class SomeIpSDAttribute:
def __init__(self, raw):
"""
解析someip协议的SD服务协议信息
"""
service_id = int(raw[0:4], 16)
method_id = int(raw[4:8], 16)
length = int(raw[8:16], 16)
client_id = int(raw[16:20], 16)
session_id = int(raw[20:24], 16)
some_ip_ver = int(raw[24:26], 16)
interface_var = int(raw[26:28], 16)
message_type = int(raw[28:30], 16)
return_code = int(raw[30:32], 16)
self.__some_ip_protocol = SomeIpProtocol(service_id, method_id, length, client_id, session_id, some_ip_ver,
interface_var,
message_type, return_code)
flags = int(raw[32:34], 16)
reserved = int(raw[34:40], 16)
entries_len = int(raw[40:48], 16)
entries_array = raw[48:48 + entries_len * 2]
options_len = int(raw[(48 + entries_len * 2):(48 + entries_len * 2 + 8)], 16)
if options_len == 0:
self.__sd_protocol = SomeIpSD(flags, reserved, entries_array)
else:
options_array = raw[(48 + entries_len * 2 + 8):(48 + entries_len * 2 + 8 + options_len * 2)]
self.__sd_protocol = SomeIpSD(flags, reserved, entries_array, options_array)
@property
def some_ip_protocol(self):
return self.__some_ip_protocol
@property
def sd_protocol(self):
return self.__sd_protocol
class SomeIpSD:
def __init__(self, flags, reserved, entries_array, options_array=None):
self.__flags = flags
self.__sd_reserved = reserved
self.__entries: Optional[List[SomeIpSDEntries]] = list()
self.__options: Optional[List[SomeIpSDOptions]] = list()
self.__analysis_data(entries_array, options_array)
@property
def flags(self):
return self.__flags
@property
def sd_reserved(self):
return self.__sd_reserved
@property
def entries(self):
return self.__entries
@property
def options(self):
return self.__options
def __analysis_data(self, entries_array, options_array):
if options_array is None:
self.__analysis_entries(entries_array)
else:
self.__analysis_entries(entries_array)
self.__analysis_options(options_array)
def __analysis_entries(self, entries_array):
for sbs in range(0, len(entries_array), 32):
entries = entries_array[sbs:sbs + 32]
entries_obj = SomeIpSDEntries(entries)
self.__entries.append(entries_obj)
def __analysis_options(self, options_array):
for sbs in range(0, len(options_array), 24):
options = options_array[sbs:sbs + 24]
options_obj = SomeIpSDOptions(options)
self.__options.append(options_obj)
class SomeIpSDEntries:
def __init__(self, entries_array):
self.__service_type = int(entries_array[0:2], 16)
self.__index1 = int(entries_array[2:4], 16)
self.__index2 = int(entries_array[4:6], 16)
self.__num_opt1 = int(entries_array[6:7], 16)
self.__num_opt2 = int(entries_array[7:8], 16)
self.__service_id = int(entries_array[8:12], 16)
self.__instance_id = int(entries_array[12:16], 16)
self.__major_ver = int(entries_array[16:18], 16)
self.__ttl = int(entries_array[18:24], 16)
self.__minor_ver = int(entries_array[24:32], 16)
@property
def service_type(self):
return self.__service_type
@property
def index1(self):
return self.__index1
@property
def index2(self):
return self.__index2
@property
def num_opt1(self):
return self.__num_opt1
@property
def num_opt2(self):
return self.__num_opt2
@property
def service_id(self):
return self.__service_id
@property
def instance_id(self):
return self.__instance_id
@property
def major_ver(self):
return self.__major_ver
@property
def ttl(self):
return self.__ttl
@property
def minor_ver(self):
return self.__minor_ver
class SomeIpSDOptions:
def __init__(self, options_array):
self.__length = int(options_array[0:4], 16)
self.__options_type = int(options_array[4:6], 16)
self.__options_reserved = int(options_array[6:8], 16)
self.__ipv4_addr = int(options_array[8:16], 16)
self.__options_reserved2 = int(options_array[16:18], 16)
self.__protocol = int(options_array[18:20], 16)
self.__port = int(options_array[20:24], 16)
@property
def length(self):
return self.__length
@property
def options_type(self):
return self.__options_type
@property
def options_reserved(self):
return self.__options_reserved
@property
def ipv4_addr(self):
return self.__ipv4_addr
@property
def options_reserved2(self):
return self.__options_reserved2
@property
def protocol(self):
return self.__protocol
@property
def port(self):
return self.__port
class SomeIpAttribute:
"""
解析编译标准SomeIp数据
"""
def __init__(self, raw):
self.__some_ip_protocol_list: Optional[List[SomeIpProtocol]] = list()
self.__get_some_ip_protocol(raw)
def __iter__(self):
return iter(self.__some_ip_protocol_list)
def __get_some_ip_protocol(self, raw):
num = 0
max_num = len(raw)
while num < max_num:
service_id = int(raw[num:num + 4], 16)
method_id = int(raw[num + 4:num + 8], 16)
length = int(raw[num + 8:num + 16], 16)
client_id = int(raw[num + 16:num + 20], 16)
session_id = int(raw[num + 20:num + 24], 16)
some_ip_ver = int(raw[num + 24:num + 26], 16)
interface_var = int(raw[num + 26:num + 28], 16)
message_type = int(raw[num + 28:num + 30], 16)
return_code = int(raw[num + 30:num + 32], 16)
if length != 8:
payload = raw[num + 32:num + 32 + (length - 8) * 2]
si_protocol = SomeIpProtocol(service_id, method_id, length, client_id, session_id, some_ip_ver,
interface_var, message_type, return_code, payload)
self.__some_ip_protocol_list.append(si_protocol)
else:
si_protocol = SomeIpProtocol(service_id, method_id, length, client_id, session_id, some_ip_ver,
interface_var, message_type, return_code)
self.__some_ip_protocol_list.append(si_protocol)
num = num + 32 + (length - 8) * 2
class SomeIpProtocol:
"""
解析标准应用报文协议信息
"""
def __init__(self, service_id, method_id, length, client_id, session_id, some_ip_ver, interface_var, message_type,
return_code, payload=None):
self.__service_id = service_id
self.__method_id = method_id
self.__length = length
self.__client_id = client_id
self.__session_id = session_id
self.__some_ip_ver = some_ip_ver
self.__interface_var = interface_var
self.__message_type = message_type
self.__return_code = return_code
self.__payload = payload
@property
def service_id(self):
return self.__service_id
@property
def method_id(self):
return self.__method_id
@property
def length(self):
return self.__length
@property
def client_id(self):
return self.__client_id
@property
def session_id(self):
return self.__session_id
@property
def some_ip_ver(self):
return self.__some_ip_ver
@property
def interface_var(self):
return self.__interface_var
@property
def message_type(self):
return self.__message_type
@property
def return_code(self):
return self.__return_code
@property
def payload(self):
return self.__payload
# if __name__ == '__main__':
# data = 'ffff81000000004c0000000d01010200c00000000000002001000010203000010100000300000000010100102031000101000003000000000000001800090400ac1d0b050006d11000090400ac1d0b050006d111'
# me =SomeIpSDAttribute(data)
# print(me.sd_protocol.flags)
# for en in me.sd_protocol.entries:
# print(en.service_type)
# print(en.index1)
# print(en.index2)
# print(en.num_opt1)
# print(en.num_opt2)
# print(en.service_id)
# print(en.instance_id)
# print(en.major_ver)
# print(en.ttl)
# print(en.minor_ver)
# print("en stop")
# print('--------------------------')
# for op in me.sd_protocol.options:
# print(op.length)
# print(op.options_type)
# print(op.options_reserved)
# print(op.ipv4_addr)
# print(op.options_reserved2)
# print(op.protocol)
# print(op.port)
# print("op stop")
# # print(me.sd_protocol.entries)
# # print(me.sd_protocol.options)
我是一名车载集成测试开发工程师,希望能和志同道合的朋友一起相互学习进步