优秀的编程知识分享平台

网站首页 > 技术文章 正文

模态测试和核密度估计(模态测试方法)

nanyue 2024-09-11 05:28:23 技术文章 8 ℃

处理大量可能具有不同数据分布的机器学习数据集时,我们面临以下注意事项:

  • 数据分布是单峰的,如果是这种情况,哪个模型最接近它(均匀分布,T分布,卡方分布,柯西分布等)?
  • 如果机器学习中的数据分布是多模态的,我们能自动识别模态的数量并提供更细粒度的描述性统计吗?
  • 我们如何估计新机器学习数据集的概率密度函数?

本文涉及以下主题:

  • 直方图Vs概率密度函数
  • 核密度估计
  • 最佳带宽的选择:Silverman / Scott / Grid Search交叉验证
  • 单峰分布的统计检验
  • DIP测试单峰性
  • 基于核密度估计识别数据分布的模式数

生成数据

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from sklearn.neighbors import KernelDensity
# Generate data distribution combining 3 normal distributions
data = np.concatenate([
 norm(-10, 3).rvs(200), 
 norm(0, 2).rvs(200), 
 norm(7, 2).rvs(100)
])
# The x values corresponding to the generated distribution
x = np.linspace(data.min(),data.max(), data.shape[0])
true_pdf = (0.4 * norm(-10, 3).pdf(x) + 
 0.4 * norm(0, 2).pdf(x) +
 0.2* norm(7, 2).pdf(x))

直方图和PDF

直方图的缺点是:将一些实际数据分布的细节,隐藏在大小不合适的容器中。看下面的Python代码示例

def plotHistogramAndPdf(data, x, pdf):
 ax = plt.gca()
 plt.hist(data, bins = 4, alpha = 0.4, label = 'histogram of input values');
 plt.ylabel('Frequency')
 plt.xlabel('x values')
 ax2 = ax.twinx()
 plt.plot(x, pdf, c = 'red', label = 'probability density function');
 plt.ylabel('PDF')
 [tl.set_color('r') for tl in ax2.get_yticklabels()]
 ax.legend(bbox_to_anchor=(0.4, 1.15))
 ax2.legend(bbox_to_anchor=(1.15,1.15))
 plt.savefig('figures/hist.jpg', bbox_inches='tight')
 
plotHistogramAndPdf(data, x, true_pdf)

核密度估计

核密度估计取决于bandwidth,该bandwidth控制返回的近似的平滑程度。以下示例说明了各种bandwidth值的影响:

def getKernelDensityEstimation(values, x, bandwidth = 0.2, kernel = 'gaussian'):
 model = KernelDensity(kernel = kernel, bandwidth=bandwidth)
 model.fit(values[:, np.newaxis])
 log_density = model.score_samples(x[:, np.newaxis])
 return np.exp(log_density)
for bandwidth in np.linspace(0.2, 3, 3):
 kde = getKernelDensityEstimation(data, x, bandwidth=bandwidth)
 plt.plot(x, kde, alpha = 0.8, label = f'bandwidth = {round(bandwidth, 2)}')
plt.plot(x, true_pdf, label = 'True PDF')
plt.legend()
plt.title('Effect of various bandwidth values \nThe larger the bandwidth, the smoother the approximation becomes');
plt.savefig('figures/bw.jpg', bbox_inches='tight')

为核密度估计选择最佳bandwidth的方法

为了确定最佳bandwidth,有几种方法:

  • Silverman法则:假设未知密度为高斯分布。它不是最佳的bandwidth选择器,但可以用作非常快速的合理良好的估计器,也可以用作多级bandwidth选择器中的第一个估算器。更精确的求解方程式plug-in规则使用积分平方密度导数函数的估计来估计最佳bandwidth。用迭代法求解非线性方程需要很高的计算量。使用ROT作为初步估计
  • Scott法则:是对于正态分布数据的随机样本是最优的,因为它最小化了密度估计的积分均方误差。

这两种方法的好处是可以快速计算,但他们通常只提供太少的数据,而且很可能不符合底层的数据分布。这两种方法都已在statsmodels包中实现,如下所示。

  • 基于交叉验证的方法:statsmodels带有cv bandwidth参数。或者,我们可以实现网格搜索交叉验证。与前两种方法不同,执行网格搜索的计算成本可能更高,尤其是对于较大的机器学习数据集
from statsmodels.nonparametric.bandwidths import bw_silverman, bw_scott, select_bandwidth
silverman_bandwidth = bw_silverman(data)
# select bandwidth allows to set a different kernel
silverman_bandwidth_gauss = select_bandwidth(data, bw = 'silverman', kernel = 'gauss')
scott_bandwidth = bw_scott(data)
def bestBandwidth(data, minBandwidth = 0.1, maxBandwidth = 2, nb_bandwidths = 30, cv = 30):
 """
 Run a cross validation grid search to identify the optimal bandwidth for the kernel density
 estimation.
 """
 from sklearn.model_selection import GridSearchCV
 model = GridSearchCV(KernelDensity(),
 {'bandwidth': np.linspace(minBandwidth, maxBandwidth, nb_bandwidths)}, cv=cv) 
 model.fit(data[:, None])
 return model.best_params_['bandwidth']
cv_bandwidth = bestBandwidth(data)
print(f"Silverman bandwidth = {silverman_bandwidth}")
print(f"Scott bandwidth = {scott_bandwidth}")
print(f"CV bandwidth = {cv_bandwidth}")

Silverman bandwidth = 1.8293406725911592

Scott bandwidth = 2.152524191415597

CV bandwidth = 0.886206896551724

正如预期的那样,第一个Silverman和Scott返回更大的bandwidth值,这将导致更大的容器,从而丢失关于数据分布的信息。

Statsmodels允许基于交叉验证和最大似然算子自动搜索最佳带宽:

from statsmodels.nonparametric.kernel_density import KDEMultivariate
stats_models_cv = KDEMultivariate(data, 'c', bw = 'cv_ml').pdf(x)

绘制不同的近似值

plt.figure(figsize= (14, 6))
plt.plot(x, true_pdf, label = 'True PDF')
kde = getKernelDensityEstimation(data, x, bandwidth=silverman_bandwidth)
plt.plot(x, kde, alpha = 0.8, label = f'Silverman bandwidth')
kde = getKernelDensityEstimation(data, x, bandwidth=scott_bandwidth)
plt.plot(x, kde, alpha = 0.8, label = f'Scott bandwidth')
kde = getKernelDensityEstimation(data, x, bandwidth=cv_bandwidth)
plt.plot(x, kde, alpha = 0.8, label = f'CV bandwidth')
plt.plot(x, stats_models_cv, alpha = 0.8, label = f'Statsmodels CV maximum likelihood')
plt.legend()
plt.title('Comparative of various bandwidth estimations for KDE');
plt.savefig('figures/comp_bw.jpg', bbox_inches='tight')

单峰分布的统计检验

有许多统计测试可以解决数据模态问题:

  • DIP test
  • excess mass test
  • MAP test
  • mode existence test
  • runt test
  • span test
  • saddle test

不幸的是,在python开源库中实现的并不多。

DIP test

以下python包https://github.com/BenjaminDoran/unidip提供了dip测试的实现,并且还提供了使用单峰性的Hartigan Dip-test以递归方式提取数据中的密度峰值的功能。

安装:pip install unidip

from unidip import UniDip
import unidip.dip as dip
data = np.msort(data)
print(dip.diptst(data))
intervals = UniDip(data).run()
print(intervals)

(0.03959813034127299, 0.000999000999000999, (210, 392))

[(12, 161), (210, 391)]

当在高斯分布上尝试这个函数时,我们得到一个interval和一个大的p值。

gaussianData=norm(-10, 3).rvs(200)
gaussianData = np.msort(gaussianData)
print(dip.diptst(gaussianData))
intervals = UniDip(gaussianData).run()
print(intervals)

(0.018270619849269046, 0.955044955044955, (62, 92))

[(0, 199)]

识别并绘制KDE的局部最大值

辅助函数(utils.py)

import simdkalman
import numpy as np
import pandas as pd
def getInflexionPoints(data, typeOfInflexion = None, maxPoints = None):
 """
 This method returns the indeces where there is a change in the trend of the input series.
 typeOfInflexion = None returns all inflexion points, max only maximum values and min
 only min,
 """
 a = np.diff(data)
 asign = np.sign(a)
 signchange = ((np.roll(asign, 1) - asign) != 0).astype(int)
 idx = np.where(signchange ==1)[0]
 if typeOfInflexion == 'max' and data[idx[0]] < data[idx[1]]:
 idx = idx[1:][::2]
 
 elif typeOfInflexion == 'min' and data[idx[0]] > data[idx[1]]:
 idx = idx[1:][::2]
 elif typeOfInflexion is not None:
 idx = idx[::2]
 
 # sort ids by min value
 if 0 in idx:
 idx = np.delete(idx, 0)
 if (len(data)-1) in idx:
 idx = np.delete(idx, len(data)-1)
 idx = idx[np.argsort(data[idx])]
 # If we have maxpoints we want to make sure the timeseries has a cutpoint
 # in each segment, not all on a small interval
 if maxPoints is not None:
 idx= idx[:maxPoints]
 if len(idx) < maxPoints:
 return (np.arange(maxPoints) + 1) * (len(data)//(maxPoints + 1))
 
 return idx
def kalman(train):
 # define a Kalman filter model with a weekly cycle, parametrized by a simple "smoothing factor", s
 smoothing_factor = 2
 n_seasons = 7
 # --- define state transition matrix A
 state_transition = np.zeros((n_seasons+1, n_seasons+1))
 # hidden level
 state_transition[0,0] = 1
 # season cycle
 state_transition[1,1:-1] = [-1.0] * (n_seasons-1)
 state_transition[2:,1:-1] = np.eye(n_seasons-1)
 # --- observation model H
 # observation is hidden level + weekly seasonal compoenent
 observation_model = [[1,1] + [0]*(n_seasons-1)]
 # --- noise models, parametrized by the smoothing factor
 level_noise = 0.2 / smoothing_factor
 observation_noise = 0.2
 season_noise = 1e-3
 process_noise_cov = np.diag([level_noise, season_noise] + [0]*(n_seasons-1))**2
 observation_noise_cov = observation_noise**2
 kf = simdkalman.KalmanFilter(
 state_transition,
 process_noise_cov,
 observation_model,
 observation_noise_cov)
 result = kf.compute(train, 10)
 return result.smoothed.states.mean[:,:,0]
def smooth(x,window_len=7,window='hanning'):
 if window_len<3:
 return x
# if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
# raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
 s=np.r_[2*x[0]-x[window_len-1::-1],x,2*x[-1]-x[-1:-window_len:-1]]
 if window == 'flat': #moving average
 w=np.ones(window_len,'d')
 else: 
 w=eval('np.'+window+'(window_len)')
 y=np.convolve(w/w.sum(),s,mode='same')
 return y[window_len:-window_len+1]

一旦我们估计了核密度函数,我们就可以确定分布是否是多模态的,并确定对应于模式的最大值或峰值。

这可以通过识别一阶导数改变符号的点来完成。方法getInflexion点可以默认返回所有拐点(最小值+最大值)或仅选择(typeOfInflexion ='max'/'min')。

下图描绘了这些最大值,其可以对应于数据分布的multiple modes。可以通过基于峰的高度设置阈值来继续分析,以滤除一些不太重要的值。

plt.plot(kde)
kde = getKernelDensityEstimation(data, x, bandwidth=cv_bandwidth)
idx = utils.getInflexionPoints(kde, typeOfInflexion='max')
plt.plot(x,kde)
ax = plt.gca()
for i in idx:
 plt.scatter(x[i], kde[i], s= 40, c = 'red')
for i in idx:
 ax.annotate(f' Max = {round(x[i], 2)}', (x[i], kde[i]))
plt.title('Maximum density values identified on the CV KDE')
modeValues =np.sort(x[idx])
print(f'Mode values {modeValues}')
plt.savefig('figures/max.jpg', bbox_inches='tight')

Mode values [-9.26859422 -0.2028573 6.5520055 ]

我们注意到,获得的值与生成的分布的初始anchors相对应。

Tags:

最近发表
标签列表