• python解析mdf或mf4文件利器---asammdf


    安装

    pip install asammdf

    解析

    1.读取文件,获取信号

    1. from asammdf import MDF
    2. f = r"xxx.mdf"
    3. mdf = MDF(f)
    4. if '信号名' in mdf:
    5. #注意,一个mdf里可能有重名的信号,这个时候要加上group_index和channel_index,比如mdf.get('信号名',3,2),至于后面两个索引号怎么获取,这个就需要看后面讲的的channels_db了
    6. signal = mdf.get('信号名')
    7. data = signal.samples
    8. timestamps = signal.timestamps

    如果对mdf的格式不熟悉,可以百度下资料,本文主要介绍asammdf的使用。

    2. 获取所有信号名及其索引

    chn_db = mdf.channels_db

    3. mdf转成dataframe

    1. #代码接上
    2. df = mdf.to_dataframe()

    这个方法有个坑,canape采集数据允许不同信号不同频率,也就是说数据对不齐,这种情况下调这个方法会导致爆内存,所以使用时应该注意一下,如果转换时间过长,或者报错,多数是信号采集频率不同,如果事先就已经知道频率不同,只能用get方法获取单个信号,然后分析,或者用后面讲到的方法先进行数据对齐(重采样),然后在进行转换。

    4.获取channelgroup和chnannel

    mdf文件一般是用channel和channel group组织的,一个文件可能包含多个chnannel group,一个channel group也可以包含多个channel,channel和signal一一对应,channel保存了一些描述信息,数据和时间戳保存在signal里,asammdf里提供了几个方法来获取信息,下面是一段从源码里摘取的一段

    1. for group_index, (virtual_group_index, virtual_group) in enumerate(
    2. mdf_ins.virtual_groups.items()
    3. ):
    4. if virtual_group.cycles_nr == 0 and empty_channels == "skip":
    5. continue
    6. channels = [
    7. (None, gp_index, ch_index)
    8. for gp_index, channel_indexes in mdf_ins.included_channels(
    9. virtual_group_index
    10. )[virtual_group_index].items()
    11. for ch_index in channel_indexes
    12. if ch_index != mdf_ins.masters_db.get(gp_index, None)
    13. ]

    从中可以看出,我们可以通过mdf.virtual_groups获取到全部的channel group,virtual_groups是一个字典,所以可以通过items方法遍历,然后通过mdf.included_channels(virtual_group_index)方法获取到当前channel group下包含的channels,这里我们注意到,他获取的channels是一个元组列表,第一个都是None,第二个和第三个分别是channel group的index和channel的index,再看mdf.get()方法的参数其实有很多,前三个是signal name,group index,channel index,其实就是列表元组的三个,因为channel group、channel是一个二维结构,所以,其实可以不用信号名来获取信号,也可以通过group index,和channel index,然后第一个参数传None,同时,不同通道组中的通道名字可能相同,所以这时,如果仅仅传信号名,代码会报错,因为它不知道你取得哪个组里的信号,所以这时候就需要传group index和channel index,但是如果一开始就用索引就不会,因为这两个索引会唯一确定一个信号。

     5.获取一部分信号数据

    基于上段的通道列表,我们就可以用mdf.select方法筛选出我们需要的信号,信号和通道是一一对应关系,我们可以把这两个东西当成一个东西理解,只不过两种数据结构存的数据不一样,所以这里大家不要混淆或者蒙圈。上面获取的其实是全部的信号,其实也可以是部分。

    1. old_signals = [
    2. signal
    3. for signal in mdf_ins.select(
    4. channels, raw=True, copy_master=False, validate=False
    5. )
    6. ]

    mdf.select返回的是信号列表,而不是包含这些信号的mdf实列,mdf.filter()返回的是mdf实列,这里需要注意下,还有那个raw参数,虽然它这里用的True,但是,一般我们要设置为False,特别是自己处理数据的时候,因为设置成True会导致读出的数据错误,它之所以有这个参数,我怀疑是为了兼容一些数据格式,而不是为了数据准确性。

     6. 数据合并

    MDF类里有个静态方法concatenate,用于多个文件的合并。

    1. from asammdf import MDF
    2. mdf1 = MDF(r"f1.mf4")
    3. mdf2 = MDF(r"f2.mf4")
    4. mdf = MDF.concatenate([mdf1,mdf2])

    concatenate是一个静态方法,可以用类直接调用,这个方法用于两个group和channel都一样的文件进行连接,即纵向连接。不支持包含不同信号的数据进行横向拼接 。

    7. 数据过滤

    mdf.filter(channels),这个上文已经提到,它返回的是包含指定信号的mdf实例,而不是信号列表。

    8. 数据导出

    asammdf支持把mdf转成其它格式,比如csv,hdf5,mat,parquet。

    mdf.export(format,filename)

    9. 数据绘图

    asammdf支持用信号数据绘图,下面是官方示例

    1. # map signals
    2. xs = np.linspace(-1, 1, 50)
    3. ys = np.linspace(-1, 1, 50)
    4. X, Y = np.meshgrid(xs, ys)
    5. vals = np.linspace(0, 180.0 / np.pi, 100)
    6. phi = np.ones((len(vals), 50, 50), dtype=np.float64)
    7. for i, val in enumerate(vals):
    8. phi[i] *= val
    9. R = 1 - np.sqrt(X**2 + Y**2)
    10. samples = np.cos(2 * np.pi * X + phi) * R
    11. timestamps = np.arange(0, 2, 0.02)
    12. s_map = Signal(
    13. samples=samples, timestamps=timestamps, name="Variable Map Signal", unit="dB"
    14. )
    15. s_map.plot()

     我们看到plot方法是挂在Signal上的,所以,我们用mdf.get('signame')获取到一个信号,也可以直接绘图,值得注意的是,信号是支持直接计算的,再看一个官方的例子

    1. import numpy as np
    2. from asammdf import Signal
    3. # create 3 Signal objects with different time stamps
    4. # unit8 with 100ms time raster
    5. timestamps = np.array([0.1 * t for t in range(5)], dtype=np.float32)
    6. s_uint8 = Signal(
    7. samples=np.array([t for t in range(5)], dtype=np.uint8),
    8. timestamps=timestamps,
    9. name="Uint8_Signal",
    10. unit="u1",
    11. )
    12. # int32 with 50ms time raster
    13. timestamps = np.array([0.05 * t for t in range(10)], dtype=np.float32)
    14. s_int32 = Signal(
    15. samples=np.array(list(range(-500, 500, 100)), dtype=np.int32),
    16. timestamps=timestamps,
    17. name="Int32_Signal",
    18. unit="i4",
    19. )
    20. # float64 with 300ms time raster
    21. timestamps = np.array([0.3 * t for t in range(3)], dtype=np.float32)
    22. s_float64 = Signal(
    23. samples=np.array(list(range(2000, -1000, -1000)), dtype=np.int32),
    24. timestamps=timestamps,
    25. name="Float64_Signal",
    26. unit="f8",
    27. )
    28. prod = s_float64 * s_uint8
    29. prod.name = "Uint8_Signal * Float64_Signal"
    30. prod.unit = "*"
    31. prod.plot()
    32. pow2 = s_uint8**2
    33. pow2.name = "Uint8_Signal ^ 2"
    34. pow2.unit = "u1^2"
    35. pow2.plot()
    36. allsum = s_uint8 + s_int32 + s_float64
    37. allsum.name = "Uint8_Signal + Int32_Signal + Float64_Signal"
    38. allsum.unit = "+"
    39. allsum.plot()
    40. # inplace operations
    41. pow2 *= -1
    42. pow2.name = "- Uint8_Signal ^ 2"
    43. pow2.plot()
    44. # cut signal
    45. s_int32.plot()
    46. cut_signal = s_int32.cut(start=0.2, stop=0.35)
    47. cut_signal.plot()

     我们可以看到,信号是可以相乘,相加,平方操作的,操作返回应该还是信号类型,因为后面我们看到它调用了plot方法,这个方法是挂在Signal上面的。

    10.数据重采样

    canape支持不同信号的采集频率不同,如果不同信号数据量不同,也就是说数据不对齐,这样在转成dataframe的时候就会爆内存,再比如频率如果过高的话,数据量必然特别大,这样的话也不利于分析,这时就需要我们对数据进行重新采样。asammdf有个resample方法。下面是个例子

    1. from asammdf import MDF
    2. from utils import get_lines_from_txt
    3. f = r"C:\Users\c-master1\Desktop\下载\AnalysisTools\YIQI_2022-08-06_14-37-50.mf4"
    4. f1 = r"C:\Users\c-master1\Downloads\2022-11-21_16-59-29.mf4"
    5. f2 = r"C:\Users\c-master1\Downloads\Recorder_2022-11-17_14-55-00.MDF"
    6. clist_f = r"C:\Users\c-master1\Downloads\数据分析用信号列表(2)(2).txt"
    7. channel_list = ['ABV_Ki_Out','ACsurge_Flag_b','csABPV_posVlv_perc']
    8. channel_list = get_lines_from_txt(clist_f)
    9. mdf = MDF(f2)
    10. mdf = mdf.filter(channel_list)
    11. signals = mdf.select(channel_list)
    12. max = 0
    13. name = ""
    14. for sig in signals:
    15. if len(sig.samples) > max:
    16. max = len(sig.samples)
    17. name = sig.name
    18. mdf1 = mdf.resample(raster=name)
    19. df1 = mdf1.to_dataframe()
    20. df1

    resample方法支持按某个信号进行重采样,即所有其它信号按这个信号对齐,也支持手动写个频率,比如0.1就是0.1秒一个数据,还支持传一个数组,采集指定时间点的数据,进行重采样后数据就对齐了,这时就可以转成dataframe了,对于升采样,有个需要注意的问题,一般mf4保存的是数字,当然也可以是字符串,对于数值型数据,主要分两种,也就是整数和小数,也叫整型和浮点型,在asammdf中,对于整数和小数的默认重采样策略是不同的,整数的话默认是重复之前的值,而对于浮点型也就是小数的话,默认是线性插值,所以,有的时候你会发现重采样后数据好像被改了,其实就是线性插值导致的,如果想改变默认行为,可以用

    mdf.configure(float_interpolation=0,integer_interpolation=0)

    来改变,其中mdf是MDF的实例,针对浮点型,asammdf支持两种重采样策略,一种是重复之前的值,一种就是线性插值,对于整型,提供了三种策略,前面两种和浮点型一样,还有一种混合策略,就是结合重复前值和线性插值的特点,具体没有试过。还有一个坑需要注意,就是表面上我们看到数据里存的是整数,但是它的类型是Double的,这种asammdf也会把它当浮点型处理,因为它只按数据类型处理,不管你存的具体数据到底是整数还是小数。 

    mdf/mf4文件创建

    1. mdf支持从零创建mdf/mf4文件,下面是官方示例

    1. # -*- coding: utf-8 -*-
    2. """
    3. *asammdf* MDF usage example
    4. """
    5. import numpy as np
    6. from asammdf import MDF, Signal
    7. # create 3 Signal objects
    8. timestamps = np.array([0.1, 0.2, 0.3, 0.4, 0.5], dtype=np.float32)
    9. # unit8
    10. s_uint8 = Signal(
    11. samples=np.array([0, 1, 2, 3, 4], dtype=np.uint8),
    12. timestamps=timestamps,
    13. name="Uint8_Signal",
    14. unit="u1",
    15. )
    16. # int32
    17. s_int32 = Signal(
    18. samples=np.array([-20, -10, 0, 10, 20], dtype=np.int32),
    19. timestamps=timestamps,
    20. name="Int32_Signal",
    21. unit="i4",
    22. )
    23. # float64
    24. s_float64 = Signal(
    25. samples=np.array([-20, -10, 0, 10, 20], dtype=np.float64),
    26. timestamps=timestamps,
    27. name="Float64_Signal",
    28. unit="f8",
    29. )
    30. # create empty MDf version 4.00 file
    31. with MDF(version="4.10") as mdf4:
    32. # append the 3 signals to the new file
    33. signals = [s_uint8, s_int32, s_float64]
    34. mdf4.append(signals, comment="Created by Python")
    35. # save new file
    36. mdf4.save("my_new_file.mf4", overwrite=True)
    37. # convert new file to mdf version 3.10
    38. mdf3 = mdf4.convert(version="3.10")
    39. print(mdf3.version)
    40. # get the float signal
    41. sig = mdf3.get("Float64_Signal")
    42. print(sig)
    43. # cut measurement from 0.3s to end of measurement
    44. mdf4_cut = mdf4.cut(start=0.3)
    45. mdf4_cut.get("Float64_Signal").plot()
    46. # cut measurement from start of measurement to 0.4s
    47. mdf4_cut = mdf4.cut(stop=0.45)
    48. mdf4_cut.get("Float64_Signal").plot()
    49. # filter some signals from the file
    50. mdf4 = mdf4.filter(["Int32_Signal", "Uint8_Signal"])
    51. # save using zipped transpose deflate blocks
    52. mdf4.save("out.mf4", compression=2, overwrite=True)

            当然,从官方的示例中还可以获得一些其它信息,比如,文件的裁剪,mdf.cut(**args)方法.

    2. 也支持从dataframe创建

    1. import pandas as pd
    2. import numpy as np
    3. df = pd.DataFrame({
    4. 'a': np.arrange(10),
    5. 'b': np.arrange(10),
    6. 'c': np.arrange(10),
    7. })
    8. mdf_f = MDF()
    9. mdf_f.append(df)
    10. mdf_f.save("xx.mf4",overwrite=True)

    最后

            asammdf其实还有一个gui工具,用pip install asammdf[gui]命令即可安装,图形界面实现了一些canape的功能,感觉还是很强大的,感兴趣的可以用下试试。

  • 相关阅读:
    代码随想录算法训练营Day1 : 704.二分查找、27.移除元素
    Win11+ VS2022编译 FFmpeg6.0 静态库
    Google Earth Engine(GEE)——Landsat 8 C02影像NDVI平滑窗口计算(NDVI波段为例)
    DBC2000数据库,DBC2000怎么设置?DBC2000架设传奇教程
    IDEA中Maven依赖下载失败的解决方法
    共享店铺系统如何搭建无人共享自习室?
    尚未解决的难题
    解析Vue3源码(二)——ref
    分布式链路追踪系统Skywalking的部署和应用
    [LINUX]linux基本指令大总结
  • 原文地址:https://blog.csdn.net/zy1620454507/article/details/128017125