• 基于哈尔小波基的一维密度估计(Python)


    先说点其他的东西。

    关于强非线性、强间断、多物理场强耦合或高度复杂几何形态问题能够得以有效求解的核心难题之一,是如何构建在多尺度情形、非线性作用下具有准确地识别、定位、捕获以及分离各个尺度特征尤其是小尺度局部特征能力的数值工具,这之中包括了大尺度低阶近似解与小尺度高阶微小截断误差的分离解耦。例如,对于湍流等强非线性问题,其核心就在于如何有效地从所关心的大尺度近似解中剥离小尺度特征。对于激波等强间断问题,其核心就在于准确地得到无非物理数值振荡的激波面,且在光滑区域不引入过多的数值粘性。对于多场耦合问题,其难点在于如何以稀疏的数据信息准确表征解和有效地传递局部细节特征。而对于复杂的几何形态,则关键在于有效地刻画出局部几何细节。针对这些问题求解中所体现出的共性需求——即对函数时频局部化特征进行提取与分析的数学能力,小波多分辨分析提供了一种非常有效的数学工具。我们可以通过小波分解系数量级识别局部大梯度特征和局部高频显著信息,且能够通过各种滤波手段得到数值解的稀疏表征,设计出自适应小波数值算法。

    其次,开始基于哈尔小波基的一维密度估计,代码较为简单。

    1. import numpy as np
    2. import matplotlib.pyplot as plt
    3. from scipy.stats import norm, uniform
    4. from scipy.integrate import quad
    5. import sys, os, time, fileinput
    6. import matplotlib.pyplot as plt
    7. import matplotlib as mpl
    8. plt.style.use('default')
    9. # sample data from normal distribution
    10. N_data = 10000
    11. # preload sample data
    12. x_data = np.array([])
    13. # point source samples
    14. Ng = 5
    15. N_scales = uniform.rvs(size = Ng)
    16. scales = 0.005 * uniform.rvs(size = Ng)
    17. scales = 0.005 * np.ones(Ng)
    18. locs = 0.8 * uniform.rvs(size = Ng) + 0.1
    19. for n in range(Ng):
    20. nm_small = norm(scale = scales[n], loc = locs[n])
    21. x_data_small = nm_small.rvs(size = int(N_scales[n] * N_data / 20))
    22. x_data = np.concatenate((x_data, x_data_small))
    23. # gaussian samples
    24. nm_large = norm(scale = 0.1, loc = 0.5)
    25. x_data_large = nm_large.rvs(size = N_data)
    26. x_data = np.concatenate((x_data, x_data_large))
    27. # uniform samples
    28. uni = uniform
    29. x_data_uni = uni.rvs(size = int(N_data / 2))
    30. x_data = np.concatenate((x_data, x_data_uni))
    31. # plot histogram of distribution
    32. counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
    33. plt.xlim([0,1])
    34. N = len(x_data)

    1. # load pywt haar wavelets for comparison
    2. import pywt
    3. wavelet_name = 'haar'
    4. wavelet = pywt.Wavelet(wavelet_name)
    5. phi, psi, x = wavelet.wavefun(level=9) # level does not affect timing
    6. L = int(x[-1] - x[0])
    7. # rescale to unit length
    8. x = x/L
    9. phi = np.sqrt(L) * phi
    10. psi = np.sqrt(L) * psi
    11. # define standard haar wavelet and scaling function
    12. def haar_mother_(t):
    13. return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,psi)
    14. def haar_scale_(t):
    15. return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,phi)
    16. x_p = np.linspace(-1,1,1000)
    17. plt.plot(x_p, haar_scale_(x_p - 0), c = 'red', alpha = 0.5)
    18. plt.plot(x_p, haar_scale_(2 * x_p - 1), c = 'lightblue', alpha = 0.5)
    19. plt.plot(x_p, haar_scale_(2 * x_p - 0) - haar_scale_(2 * x_p - 1), c = 'purple')
    20. plt.xlim([-0.25,1.25])

    1. # define dyadic version of wavelet and scaling function
    2. def psi_(x,j,k):
    3. return 2**(j/2) * haar_mother_(2**j * x - k)
    4. def phi_(x,j0,k):
    5. return 2**(j0/2) * haar_scale_(2**j0 * x - k)
    6. j1 = 7 # maximum scale (6 -> ~2 min , 7 -> ~9 min)
    7. klist1 = np.arange(0,2**j1 - 1 + 0.5,1) # translations
    8. Nk1 = np.size(klist1)
    9. scale_info = [j1, klist1]
    10. # plot the density estimate using scaling coefficients
    11. def angles_for_equal_components_(N):
    12. """ Generate the angles in spherical coordinates corresponding to a
    13. vector whose components are all equal """
    14. # N = Number of angles required to specify sphere
    15. arg_for_sqrt = np.arange(N+1, 3-0.5, -1)
    16. polar_list = np.arccos( 1 /
    17. np.sqrt(arg_for_sqrt)
    18. )
    19. azi_list = np.array([np.pi/4])
    20. return np.concatenate((polar_list, azi_list))
    21. def initial_scaling_angle_generator_(N, choice):
    22. """Generates a set of initial scaling angles on the sphere
    23. N = Dimension of Euclidean Space
    24. choice = Determine type of scaling angles to generate
    25. 'random': Generate a random sample of angles on the sphere
    26. 'unbiased': All scaling coefficients have the same positive value
    27. 'equiangle': All angles correspond to pi/4"""
    28. if choice == 'random':
    29. return np.concatenate((np.pi * np.random.random(size = N - 2), 2*np.pi*np.random.random(size = 1) ))
    30. elif choice == 'unbiased':
    31. return angles_for_equal_components_(N-1)
    32. elif choice == 'equiangle':
    33. return np.concatenate((np.pi/4 * np.random.random(size = N - 2), np.pi/4*np.ones(1) ))
    34. def ct(r, arr):
    35. """
    36. coordinate transformation from spherical to cartesian coordinates
    37. """
    38. a = np.concatenate((np.array([2*np.pi]), arr))
    39. si = np.sin(a)
    40. si[0] = 1
    41. si = np.cumprod(si)
    42. co = np.cos(a)
    43. co = np.roll(co, -1)
    44. return si*co*r
    45. def scaling_coefficients_(scaling_angles):
    46. """
    47. convert scaling angles in hypersphere to scaling coefficients
    48. """
    49. return ct(1,scaling_angles)
    50. def sqrt_p_(x_data, scaling_angles):
    51. """
    52. Calculate the square root of the density estimate as the wavelet expansion
    53. with the scaling coefficients denoted by the scaling angles
    54. """
    55. N = len(x_data)
    56. j1, klist1 = scale_info
    57. phi_arr = np.array([phi_(x_data,j1,klist1[nk]) for nk in range(Nk1)])
    58. scaling_coefficients = scaling_coefficients_(scaling_angles)
    59. scaling_coefficients_mat = np.outer(scaling_coefficients, np.ones((N,)))
    60. scale_terms = scaling_coefficients_mat * phi_arr
    61. return np.sum(scale_terms, axis = 0)
    62. def safe_log_(x):
    63. # guard against x = 0
    64. return np.log(x + 1e-323)
    65. def unbinned_nll_(x):
    66. return -np.sum(safe_log_(x))
    67. def unbinned_nll_sqrt_p_(scaling_angles):
    68. sqrt_p_arr = sqrt_p_(x_data, scaling_angles)
    69. p_arr = sqrt_p_arr * sqrt_p_arr
    70. return unbinned_nll_(p_arr) / N
    71. from iminuit import cost, Minuit
    72. # unbiased initial scaling angles
    73. initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'unbiased')
    74. # initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'random')
    75. # define iminuit object for minimization
    76. m = Minuit(unbinned_nll_sqrt_p_, initial_scaling_angles)
    77. # limited to sphere
    78. limits_theta = [(0,np.pi) for n in range(Nk1-2)]
    79. limits_phi = [(0,2*np.pi)]
    80. limits = np.concatenate((limits_theta, limits_phi))
    81. # set limits
    82. m.limits = limits
    83. # run fit
    84. m.errordef = Minuit.LIKELIHOOD
    85. m.migrad()

    Migrad
    FCN = -0.3265Nfcn = 10376
    EDM = 8.51e-09 (Goal: 0.0001)time = 544.2 sec
    Valid MinimumBelow EDM threshold (goal x 10)
    SOME parameters at limitBelow call limit
    Hesse okCovariance accurate

    1. # plot the fit
    2. x_plot = np.linspace(0,1,128)
    3. scaling_angles_fit = m.values[:]
    4. sqrt_p_plot = sqrt_p_(x_plot, scaling_angles_fit)
    5. p_plot = sqrt_p_plot * sqrt_p_plot
    6. plt.plot(x_plot, p_plot, label = 'fit')
    7. counts, bins, _ = plt.hist(x_data, bins = 128, density = True, label = 'data')
    8. plt.xlim([0,1])
    9. plt.legend()

    1. sqrt_p_plot_j1 = sqrt_p_plot
    2. j0 = j1 - 4
    3. level = j1 - j0
    4. scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
    5. coeffs = pywt.wavedec(scaling_coefficients_fit, wavelet_name, level=level)
    6. scaling_coefficients_j0 = coeffs[0]
    7. wavelet_coefficients_j0 = coeffs[1:]
    8. klist0 = np.arange(0,2**j0 - 1 + 0.5,1)
    9. Nk0 = np.size(klist0)
    10. scale_info = [j0, klist0]
    11. def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
    12. '''
    13. Coarse representation of the density estimate
    14. '''
    15. N = len(x_data)
    16. j1, klist1 = scale_info
    17. phi_arr = np.array([phi_(x_data,j0,klist0[nk]) for nk in range(Nk0)])
    18. scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
    19. scale_terms = scaling_coefficients_mat * phi_arr
    20. return np.sum(scale_terms, axis = 0)
    21. x_plot = np.linspace(0,1,128)
    22. sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
    23. p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
    24. plt.plot(x_plot, p_plot_j0)
    25. plt.plot(x_plot, p_plot)
    26. counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
    27. plt.xlim([0,1])
    28. plt.legend(["Coarse", "Fine", "Data"])

    1. # Fit summary using DWT
    2. counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
    3. plt.plot(x_plot, p_plot_j0, label = 'Coarse')
    4. plt.plot(x_plot, p_plot, label = 'Fine')
    5. plt.plot(x_plot, sqrt_p_plot**2 - sqrt_p_plot_j0**2, label = '$p_f - p_c$')
    6. plt.xlim([0,1])
    7. plt.xlabel('$x$')
    8. plt.ylabel('$p(x)$')
    9. plt.legend()

    1. # stationary wavelet transform (test)
    2. # Note: You can't just take the scaling coefficients from the coarse representation;
    3. ## need to take the inverse (see next cell)
    4. # This leads to a shifted signal (idk why)
    5. sqrt_p_plot_j1 = sqrt_p_plot
    6. j0 = j1 - 4
    7. level = j1 - j0
    8. scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
    9. coeffs = pywt.swt(scaling_coefficients_fit, wavelet_name, level=level, norm = True, trim_approx = False)
    10. scaling_coefficients_j0 = coeffs[0][0]
    11. wavelet_coefficients_j0 = coeffs[1:]
    12. klist0 = np.arange(0,2**j1 - 1 + 0.5,1)
    13. Nk0 = np.size(klist1)
    14. scale_info = [j1, klist1]
    15. def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
    16. N = len(x_data)
    17. j1, klist1 = scale_info
    18. phi_arr = np.array([phi_(x_data,j1,klist0[nk]) for nk in range(Nk0)])
    19. scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
    20. scale_terms = scaling_coefficients_mat * phi_arr
    21. return np.sum(scale_terms, axis = 0)
    22. x_plot = np.linspace(0,1,128)
    23. sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
    24. p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
    25. plt.plot(x_plot, p_plot_j0)
    26. plt.plot(x_plot, p_plot)
    27. counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
    28. plt.xlim([0,1])
    29. plt.legend(["Coarse", "Fine", "Data"])

    1. 知乎学术咨询:
    2. https://www.zhihu.com/consult/people/792359672131756032?isMe=1

    工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

  • 相关阅读:
    python二次开发CATIA:根据已知数据点创建曲线
    数字藏品NFT交易系统开发 实现元宇宙虚拟事物资产化
    RTX3080安装DEEPLABCUT使用GPU(win11)
    数字图像基础
    Qt 5.12.12 静态编译(MinGW)
    m分别通过matlab和FPGA实现基于高阶循环谱的信号载波调制识别(四阶循环累量)仿真
    swin Transformer
    IC刷卡数据如何获取?5个渠道的IC刷卡数据免费下载~
    Android文件关联
    ubuntu 20.04.4+uWSGI+Nginx安装部署Django+Vue的web前后端全过程记录(1)
  • 原文地址:https://blog.csdn.net/weixin_39402231/article/details/140021919