先说点其他的东西。
关于强非线性、强间断、多物理场强耦合或高度复杂几何形态问题能够得以有效求解的核心难题之一,是如何构建在多尺度情形、非线性作用下具有准确地识别、定位、捕获以及分离各个尺度特征尤其是小尺度局部特征能力的数值工具,这之中包括了大尺度低阶近似解与小尺度高阶微小截断误差的分离解耦。例如,对于湍流等强非线性问题,其核心就在于如何有效地从所关心的大尺度近似解中剥离小尺度特征。对于激波等强间断问题,其核心就在于准确地得到无非物理数值振荡的激波面,且在光滑区域不引入过多的数值粘性。对于多场耦合问题,其难点在于如何以稀疏的数据信息准确表征解和有效地传递局部细节特征。而对于复杂的几何形态,则关键在于有效地刻画出局部几何细节。针对这些问题求解中所体现出的共性需求——即对函数时频局部化特征进行提取与分析的数学能力,小波多分辨分析提供了一种非常有效的数学工具。我们可以通过小波分解系数量级识别局部大梯度特征和局部高频显著信息,且能够通过各种滤波手段得到数值解的稀疏表征,设计出自适应小波数值算法。
其次,开始基于哈尔小波基的一维密度估计,代码较为简单。
- import numpy as np
- import matplotlib.pyplot as plt
- from scipy.stats import norm, uniform
- from scipy.integrate import quad
-
-
- import sys, os, time, fileinput
-
-
- import matplotlib.pyplot as plt
- import matplotlib as mpl
-
-
- plt.style.use('default')
- # sample data from normal distribution
- N_data = 10000
-
-
- # preload sample data
- x_data = np.array([])
-
-
- # point source samples
- Ng = 5
- N_scales = uniform.rvs(size = Ng)
- scales = 0.005 * uniform.rvs(size = Ng)
- scales = 0.005 * np.ones(Ng)
- locs = 0.8 * uniform.rvs(size = Ng) + 0.1
- for n in range(Ng):
- nm_small = norm(scale = scales[n], loc = locs[n])
- x_data_small = nm_small.rvs(size = int(N_scales[n] * N_data / 20))
- x_data = np.concatenate((x_data, x_data_small))
-
- # gaussian samples
- nm_large = norm(scale = 0.1, loc = 0.5)
- x_data_large = nm_large.rvs(size = N_data)
- x_data = np.concatenate((x_data, x_data_large))
-
-
- # uniform samples
- uni = uniform
- x_data_uni = uni.rvs(size = int(N_data / 2))
- x_data = np.concatenate((x_data, x_data_uni))
-
-
- # plot histogram of distribution
- counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
- plt.xlim([0,1])
-
-
- N = len(x_data)

- # load pywt haar wavelets for comparison
-
-
- import pywt
- wavelet_name = 'haar'
-
-
- wavelet = pywt.Wavelet(wavelet_name)
- phi, psi, x = wavelet.wavefun(level=9) # level does not affect timing
- L = int(x[-1] - x[0])
- # rescale to unit length
- x = x/L
- phi = np.sqrt(L) * phi
- psi = np.sqrt(L) * psi
-
-
- # define standard haar wavelet and scaling function
- def haar_mother_(t):
- return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,psi)
-
-
- def haar_scale_(t):
- return (np.heaviside(t,0) - np.heaviside(t-1,0) ) * np.interp(t,x,phi)
-
-
- x_p = np.linspace(-1,1,1000)
- plt.plot(x_p, haar_scale_(x_p - 0), c = 'red', alpha = 0.5)
- plt.plot(x_p, haar_scale_(2 * x_p - 1), c = 'lightblue', alpha = 0.5)
- plt.plot(x_p, haar_scale_(2 * x_p - 0) - haar_scale_(2 * x_p - 1), c = 'purple')
-
-
- plt.xlim([-0.25,1.25])

- # define dyadic version of wavelet and scaling function
- def psi_(x,j,k):
- return 2**(j/2) * haar_mother_(2**j * x - k)
-
-
- def phi_(x,j0,k):
- return 2**(j0/2) * haar_scale_(2**j0 * x - k)
- j1 = 7 # maximum scale (6 -> ~2 min , 7 -> ~9 min)
- klist1 = np.arange(0,2**j1 - 1 + 0.5,1) # translations
- Nk1 = np.size(klist1)
-
-
- scale_info = [j1, klist1]
- # plot the density estimate using scaling coefficients
-
-
- def angles_for_equal_components_(N):
- """ Generate the angles in spherical coordinates corresponding to a
- vector whose components are all equal """
- # N = Number of angles required to specify sphere
- arg_for_sqrt = np.arange(N+1, 3-0.5, -1)
- polar_list = np.arccos( 1 /
- np.sqrt(arg_for_sqrt)
- )
- azi_list = np.array([np.pi/4])
- return np.concatenate((polar_list, azi_list))
-
-
- def initial_scaling_angle_generator_(N, choice):
- """Generates a set of initial scaling angles on the sphere
- N = Dimension of Euclidean Space
- choice = Determine type of scaling angles to generate
- 'random': Generate a random sample of angles on the sphere
- 'unbiased': All scaling coefficients have the same positive value
- 'equiangle': All angles correspond to pi/4"""
- if choice == 'random':
- return np.concatenate((np.pi * np.random.random(size = N - 2), 2*np.pi*np.random.random(size = 1) ))
- elif choice == 'unbiased':
- return angles_for_equal_components_(N-1)
- elif choice == 'equiangle':
- return np.concatenate((np.pi/4 * np.random.random(size = N - 2), np.pi/4*np.ones(1) ))
-
- def ct(r, arr):
- """
- coordinate transformation from spherical to cartesian coordinates
- """
- a = np.concatenate((np.array([2*np.pi]), arr))
- si = np.sin(a)
- si[0] = 1
- si = np.cumprod(si)
- co = np.cos(a)
- co = np.roll(co, -1)
- return si*co*r
-
-
- def scaling_coefficients_(scaling_angles):
- """
- convert scaling angles in hypersphere to scaling coefficients
- """
- return ct(1,scaling_angles)
-
-
- def sqrt_p_(x_data, scaling_angles):
- """
- Calculate the square root of the density estimate as the wavelet expansion
- with the scaling coefficients denoted by the scaling angles
- """
- N = len(x_data)
- j1, klist1 = scale_info
- phi_arr = np.array([phi_(x_data,j1,klist1[nk]) for nk in range(Nk1)])
- scaling_coefficients = scaling_coefficients_(scaling_angles)
- scaling_coefficients_mat = np.outer(scaling_coefficients, np.ones((N,)))
- scale_terms = scaling_coefficients_mat * phi_arr
- return np.sum(scale_terms, axis = 0)
-
-
- def safe_log_(x):
- # guard against x = 0
- return np.log(x + 1e-323)
-
-
- def unbinned_nll_(x):
- return -np.sum(safe_log_(x))
-
-
- def unbinned_nll_sqrt_p_(scaling_angles):
- sqrt_p_arr = sqrt_p_(x_data, scaling_angles)
- p_arr = sqrt_p_arr * sqrt_p_arr
- return unbinned_nll_(p_arr) / N
- from iminuit import cost, Minuit
-
-
- # unbiased initial scaling angles
- initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'unbiased')
- # initial_scaling_angles = initial_scaling_angle_generator_(Nk1, 'random')
-
-
- # define iminuit object for minimization
- m = Minuit(unbinned_nll_sqrt_p_, initial_scaling_angles)
-
-
- # limited to sphere
- limits_theta = [(0,np.pi) for n in range(Nk1-2)]
- limits_phi = [(0,2*np.pi)]
- limits = np.concatenate((limits_theta, limits_phi))
-
-
- # set limits
- m.limits = limits
-
-
- # run fit
- m.errordef = Minuit.LIKELIHOOD
- m.migrad()
| Migrad | |
| FCN = -0.3265 | Nfcn = 10376 |
| EDM = 8.51e-09 (Goal: 0.0001) | time = 544.2 sec |
| Valid Minimum | Below EDM threshold (goal x 10) |
| SOME parameters at limit | Below call limit |
| Hesse ok | Covariance accurate |

- # plot the fit
- x_plot = np.linspace(0,1,128)
- scaling_angles_fit = m.values[:]
- sqrt_p_plot = sqrt_p_(x_plot, scaling_angles_fit)
- p_plot = sqrt_p_plot * sqrt_p_plot
-
-
- plt.plot(x_plot, p_plot, label = 'fit')
- counts, bins, _ = plt.hist(x_data, bins = 128, density = True, label = 'data')
-
-
- plt.xlim([0,1])
- plt.legend()

- sqrt_p_plot_j1 = sqrt_p_plot
-
-
- j0 = j1 - 4
- level = j1 - j0
- scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
- coeffs = pywt.wavedec(scaling_coefficients_fit, wavelet_name, level=level)
- scaling_coefficients_j0 = coeffs[0]
- wavelet_coefficients_j0 = coeffs[1:]
-
-
- klist0 = np.arange(0,2**j0 - 1 + 0.5,1)
- Nk0 = np.size(klist0)
-
-
- scale_info = [j0, klist0]
-
-
- def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
- '''
- Coarse representation of the density estimate
- '''
- N = len(x_data)
- j1, klist1 = scale_info
- phi_arr = np.array([phi_(x_data,j0,klist0[nk]) for nk in range(Nk0)])
- scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
- scale_terms = scaling_coefficients_mat * phi_arr
- return np.sum(scale_terms, axis = 0)
-
-
- x_plot = np.linspace(0,1,128)
- sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
- p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
-
-
- plt.plot(x_plot, p_plot_j0)
- plt.plot(x_plot, p_plot)
- counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
- plt.xlim([0,1])
- plt.legend(["Coarse", "Fine", "Data"])

- # Fit summary using DWT
-
-
- counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
- plt.plot(x_plot, p_plot_j0, label = 'Coarse')
- plt.plot(x_plot, p_plot, label = 'Fine')
- plt.plot(x_plot, sqrt_p_plot**2 - sqrt_p_plot_j0**2, label = '$p_f - p_c$')
- plt.xlim([0,1])
- plt.xlabel('$x$')
- plt.ylabel('$p(x)$')
- plt.legend()

- # stationary wavelet transform (test)
- # Note: You can't just take the scaling coefficients from the coarse representation;
- ## need to take the inverse (see next cell)
- # This leads to a shifted signal (idk why)
-
-
- sqrt_p_plot_j1 = sqrt_p_plot
-
-
- j0 = j1 - 4
- level = j1 - j0
- scaling_coefficients_fit = np.abs(scaling_coefficients_(scaling_angles_fit))
- coeffs = pywt.swt(scaling_coefficients_fit, wavelet_name, level=level, norm = True, trim_approx = False)
- scaling_coefficients_j0 = coeffs[0][0]
- wavelet_coefficients_j0 = coeffs[1:]
-
-
- klist0 = np.arange(0,2**j1 - 1 + 0.5,1)
- Nk0 = np.size(klist1)
-
-
- scale_info = [j1, klist1]
-
-
- def sqrt_p_coarse_(x_data, scaling_coefficients_j0):
- N = len(x_data)
- j1, klist1 = scale_info
- phi_arr = np.array([phi_(x_data,j1,klist0[nk]) for nk in range(Nk0)])
- scaling_coefficients_mat = np.outer(scaling_coefficients_j0, np.ones((N,)))
- scale_terms = scaling_coefficients_mat * phi_arr
- return np.sum(scale_terms, axis = 0)
-
-
- x_plot = np.linspace(0,1,128)
- sqrt_p_plot_j0 = sqrt_p_coarse_(x_plot, scaling_coefficients_j0)
- p_plot_j0 = sqrt_p_plot_j0 * sqrt_p_plot_j0
-
-
- plt.plot(x_plot, p_plot_j0)
- plt.plot(x_plot, p_plot)
- counts, bins, _ = plt.hist(x_data, bins = 100, density = True)
- plt.xlim([0,1])
- plt.legend(["Coarse", "Fine", "Data"])

- 知乎学术咨询:
- https://www.zhihu.com/consult/people/792359672131756032?isMe=1
工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。