Coverage for /var/devmt/py/utils4_1.5.0rc1/utils4/stats.py: 100%

70 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-12 15:38 +0100

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3""" 

4:Purpose: Provide access to various statistical calculations, namely: 

5 

6 - **CUSUM:** :meth:`~Stats.cusum` 

7 - **Gaussian KDE:** :meth:`~Stats.kde` 

8 - **Linear Regression:** :class:`~LinearRegression` 

9 

10:Platform: Linux/Windows | Python 3.6+ 

11:Developer: J Berendt 

12:Email: development@s3dev.uk 

13 

14:Comments: n/a 

15 

16:Example: 

17 

18 Create a sample dataset for the stats methods:: 

19 

20 >>> import matplotlib.pyplot as plt 

21 >>> import numpy as np 

22 >>> import pandas as pd 

23 

24 >>> np.random.seed(73) 

25 >>> data = np.random.normal(size=100)*100 

26 >>> x = np.arange(data.size) 

27 >>> y = pd.Series(data).rolling(window=25, min_periods=25).mean().cumsum() 

28 

29 >>> # Preview the trend. 

30 >>> plt.plot(x, y) 

31 

32""" 

33# pylint: disable=line-too-long 

34# pylint: disable=wrong-import-order 

35 

36import numpy as np 

37import pandas as pd 

38from scipy.stats import gaussian_kde 

39from typing import Union 

40# local 

41from utils4.reporterror import reporterror 

42 

43 

44class LinearRegression: 

45 """Calculate the linear regression of a dataset. 

46 

47 Args: 

48 x (np.array): Array of X-values. 

49 y (np.array): Array of Y-values. 

50 

51 :Slope Calculation: 

52 The calculation for the slope itself is borrowed from the 

53 :func:`scipy.stats.linregress` function. Whose `source code`_ was 

54 obtained on GitHub. 

55 

56 .. _source code: https://github.com/scipy/scipy/blob/v1.8.0/scipy/stats/_stats_mstats_common.py#L16-L203 

57 

58 :Example Use: 

59 

60 .. tip:: 

61 

62 For a sample dataset and imports to go along with this 

63 example, refer to the docstring for 

64 :mod:`this module <stats>`. 

65 

66 Calculate a linear regression line on an X/Y dataset:: 

67 

68 >>> from lib.stats import LinearRegression 

69 

70 >>> linreg = LinearRegression(x, y) 

71 >>> linreg.calculate() 

72 

73 >>> # Obtain the regression line array. 

74 >>> y_ = linreg.regression_line 

75 

76 >>> # View the intercept value. 

77 >>> linreg.intercept 

78 -31.26630 

79 

80 >>> # View the slope value. 

81 >>> linreg.slope 

82 1.95332 

83 

84 >>> # Plot the trend and regression line. 

85 >>> plt.plot(x, y, 'grey') 

86 >>> plt.plot(x, y_, 'red') 

87 >>> plt.show() 

88 

89 """ 

90 

91 def __init__(self, x: np.array, y: np.array): 

92 """LinearRegression class initialiser.""" 

93 self._x = x 

94 self._y = y 

95 self._xbar = 0.0 

96 self._ybar = 0.0 

97 self._c = 0.0 

98 self._m = 0.0 

99 self._line = np.array(()) 

100 

101 @property 

102 def slope(self): 

103 """Accessor to the slope value.""" 

104 return self._m 

105 

106 @property 

107 def intercept(self): 

108 """Accessor to the slope's y-intercept.""" 

109 return self._c 

110 

111 @property 

112 def regression_line(self): 

113 """Accessor to the calculated regression line, as y-values.""" 

114 return self._line 

115 

116 def calculate(self): 

117 """Calculate the linear regression for the X/Y data arrays. 

118 

119 The result of the calculation is accessible via the 

120 :attr:`regression_line` property. 

121 

122 """ 

123 self._calc_means() 

124 self._calc_slope() 

125 self._calc_intercept() 

126 self._calc_regression_line() 

127 

128 def _calc_intercept(self): 

129 """Calculate the intercept as: ybar - m * xbar.""" 

130 self._c = self._ybar - self._m * self._xbar 

131 

132 def _calc_means(self) -> float: 

133 """Calculate the mean of the X and Y arrays.""" 

134 self._xbar = self._x.mean() 

135 self._ybar = self._y.mean() 

136 

137 def _calc_regression_line(self): 

138 """Calculate the regression line as: y = mx + c.""" 

139 self._line = self._m * self._x + self._c 

140 

141 def _calc_slope(self): 

142 """Calculate the slope value as: R * ( std(y) / std(x) ). 

143 

144 Per the ``scipy`` source code comments:: 

145 

146 # Average sums of square differences from the mean 

147 # ssxm = mean( (x-mean(x))^2 ) 

148 # ssxym = mean( (x-mean(x)) * (y-mean(y)) ) 

149 

150 ... 

151 

152 slope = ssxym / ssxm 

153 

154 """ 

155 ssxm = np.mean( (self._x - self._xbar)**2 ) 

156 ssxym = np.mean( (self._x - self._xbar) * (self._y - self._ybar) ) 

157 self._m = ssxym / ssxm 

158 

159 @staticmethod 

160 def _calc_std(data: np.array, ddof: int=1) -> float: # pragma nocover 

161 """Calculate the standard deviation. 

162 

163 Args: 

164 data (np.array): Array of values. 

165 ddof (int): Degrees of freedom. Defaults to 1. 

166 

167 Returns: 

168 float: Standard deviation of the given values. 

169 

170 """ 

171 return np.std(data, ddof=ddof) 

172 

173 

174class Stats: 

175 """Wrapper class for various statistical calculations.""" 

176 

177 @staticmethod 

178 def cusum(df: pd.DataFrame, 

179 cols: Union[list, str], 

180 window: int=None, 

181 min_periods: int=1, 

182 inplace=False, 

183 show_plot: bool=False) -> Union[pd.DataFrame, None]: 

184 r"""Calculate a CUSUM on a set of data. 

185 

186 A CUSUM is a generalised method for smoothing a noisy trend, or 

187 for detecting a change in the trend. 

188 

189 Note: 

190 A CUSUM is *not* a cumulative sum (cumsum), although a 

191 cumulative sum is used. A CUSUM is a cumulative sum of 

192 derived values, where each derived value is calculated as the 

193 delta of a single value relative to the rolling mean of all 

194 previous values. 

195 

196 Args: 

197 df (pd.DataFrame): The DataFrame containing the column(s) on 

198 which a CUSUM is to be calculated. 

199 cols (Union[list, str]): The column (or list of columns) on 

200 which the CUSUM is to be calculated. 

201 window (int, optional): Size of the window on which the 

202 rolling mean is to be calculated. This corresponds to the 

203 ``pandas.df.rolling(window)`` parameter. 

204 Defaults to None. 

205 

206 - If None is received, a %5 window is calculated based on 

207 the length of the DataFrame. This method helps smooth 

208 the trend, while keeping a representation of the 

209 original trend. 

210 - For a *true* CUSUM, a running average should be 

211 calculated on the length of the DataFrame, except for 

212 the current value. For this method, pass 

213 ``window=len(df)``. 

214 

215 min_periods (int, optional): Number of periods to wait before 

216 calculating the rolling average. Defaults to 1. 

217 inplace (bool, optional): Update the passed DataFrame 

218 (in-place), rather returning a *copy* of the passed 

219 DataFrame. Defaults to False. 

220 show_plot (bool, optional): Display a graph of the raw value, 

221 and the calculated CUSUM results. Defaults to False. 

222 

223 :Calculation: 

224 The CUSUM is calculated by taking a rolling mean :math:`RA` 

225 (optionally locked at the first value), and calculate the 

226 delta of the current value, relative to the rolling mean all 

227 previous values. A cumulative sum is applied to the deltas. 

228 The cumulative sum for each data point is returned as the 

229 CUSUM value. 

230 

231 :Equation: 

232 

233 :math:`c_i = \sum_{i=1}^{n}(x_i - RA_i)` 

234 

235 where :math:`RA` (Rolling Mean) is defined as: 

236 

237 :math:`RA_{i+1} = \frac{1}{n}\sum_{j=1}^{n}x_j` 

238 

239 :Example Use: 

240 

241 Generate a *sample* trend dataset:: 

242 

243 >>> import numpy as np 

244 >>> import pandas as pd 

245 

246 >>> np.random.seed(13) 

247 >>> s1 = pd.Series(np.random.randn(1000)).rolling(window=100).mean() 

248 >>> np.random.seed(73) 

249 >>> s2 = pd.Series(np.random.randn(1000)).rolling(window=100).mean() 

250 >>> df = pd.DataFrame({'sample1': s1, 'sample2': s2}) 

251 

252 

253 Example for calculating a CUSUM on two columns:: 

254 

255 >>> from EHM.stats import stats 

256 

257 >>> df_c = stats.cusum(df=df, 

258 cols=['sample1', 'sample2'], 

259 window=len(df), 

260 inplace=False, 

261 show_plot=True) 

262 >>> df_c.tail() 

263 sample1 sample2 sample1_cusum sample2_cusum 

264 995 0.057574 0.065887 23.465337 29.279936 

265 996 0.062781 0.072213 23.556592 29.369397 

266 997 0.028513 0.072658 23.613478 29.459204 

267 998 0.024518 0.070769 23.666305 29.547022 

268 999 0.000346 0.074849 23.694901 29.638822 

269 

270 Returns: 

271 Union[pd.DataFrame, None]: If the ``inplace`` argument is 

272 ``False``, a *copy* of the original DataFrame with the new 

273 CUSUM columns appended is returned. Otherwise, the passed 

274 DataFrame is *updated*, and ``None`` is returned. 

275 

276 """ 

277 # Convert a single column name to a list. 

278 cols = [cols] if not isinstance(cols, list) else cols 

279 if not inplace: 

280 df = df.copy(deep=True) 

281 window = int(len(df) * 0.05) if window is None else window # Set default window as 5% 

282 for col in cols: 

283 new_col = f'{col}_cusum' 

284 # CUSUM calculation (rolling_sum on rolling_mean, with a shift of 1). 

285 df[new_col] = ((df[col] - df[col].rolling(window=window, min_periods=min_periods) 

286 .mean() 

287 .shift(1)) 

288 .rolling(window=len(df), min_periods=min_periods).sum()) 

289 # Show simple plot if requested. 

290 if show_plot: # pragma: nocover 

291 df[[col, new_col]].plot(title=f'TEMP PLOT\n{col} vs {new_col}', 

292 color=['lightgrey', 'red'], 

293 secondary_y=new_col, 

294 legend=False, 

295 grid=False) 

296 return None if inplace else df 

297 

298 def kde(self, 

299 data: Union[list, np.array, pd.Series], 

300 n: int=500) -> tuple: 

301 """Calculate the kernel density estimate (KDE) for an array X. 

302 

303 This function returns the *probability density* (PDF) using 

304 Gaussian KDE. 

305 

306 Args: 

307 data (Union[list, np.array, pd.Series]): An array-like object 

308 containing the data against which the Gaussian KDE is 

309 calculated. This can be a list, numpy array, or pandas 

310 Series. 

311 n (int, optional): Number of values returned in the X, Y 

312 arrays. Defaults to 500. 

313 

314 :Example Use: 

315 

316 .. tip:: 

317 

318 For a sample dataset and imports to go along with this 

319 example, refer to the docstring for 

320 :mod:`this module <stats>`. 

321 

322 Calculate a Gaussian KDE on Y:: 

323 

324 >>> from utils4.stats import stats 

325 

326 >>> # Preview the histogram. 

327 >>> _ = plt.hist(data) 

328 

329 >>> X, Y, max_x = stats.kde(data=data, n=500) 

330 >>> plt.plot(X, Y) 

331 

332 >>> # Show X value at peak of curve. 

333 >>> max_x 

334 -9.718684033029376 

335 

336 :Max X: 

337 This function also returns the X value of the curve's peak; 

338 where ``max_x`` is the ``X`` value corresponding to the max 

339 ``Y`` value on the curve. The result (``max_x``) is 

340 returned as the third tuple element. 

341 

342 :Further Detail: 

343 This method uses the :func:`scipy.stats.gaussian_kde` method 

344 for the KDE calculation. For further detail on the 

345 calculation itself, refer to that function's docstring. 

346 

347 :Background: 

348 Originally, :func:`plotly.figure_factory.dist_plot` was used 

349 to calculate the KDE. However, to remove the ``plotly`` 

350 dependency from this library, their code was copied and 

351 refactored (simplified) into this function. Both the 

352 :func:`dist_plot` and :func:`pandas.DataFrame.plot.kde` 

353 method call :func:`scipy.stats.gaussian_kde` for the 

354 calculation, which this function also calls. 

355 

356 Returns: 

357 tuple: A tuple containing the X-array, Y-array 

358 (both of ``n`` size), as well a the X value at max Y, as:: 

359 

360 (curve_x, curve_y, max_x) 

361 

362 """ 

363 try: 

364 data_ = self._obj_to_array(data=data) 

365 curve_x = np.linspace(data_.min(), data_.max(), n) 

366 curve_y = gaussian_kde(data_).evaluate(curve_x) 

367 max_x = curve_x[curve_y.argmax()] 

368 return (curve_x, curve_y, max_x) 

369 except Exception as err: 

370 reporterror(err) 

371 return (np.array(()), np.array(()), 0.0) 

372 

373 @staticmethod 

374 def _obj_to_array(data: Union[list, np.array, pd.Series]) -> np.ndarray: 

375 """Convert an iterable object to a numpy array. 

376 

377 Args: 

378 data (Union[list, np.array, pd.Series]): Array-like object 

379 to be converted into a ``numpy.ndarray``. 

380 

381 :NaN Values: 

382 In addition to converting the following types to a 

383 ``numpy.ndarray``, any ``nan`` values are dropped from 

384 the ``numpy.array`` and ``pd.Series`` objects. 

385 

386 Returns: 

387 np.array: A ``numpy.ndarray``, with ``nan`` values removed. 

388 

389 """ 

390 data_ = None 

391 if isinstance(data, np.ndarray): 

392 data_ = data 

393 elif isinstance(data, pd.Series): 

394 data_ = data.astype(float).to_numpy() 

395 elif isinstance(data, list): 

396 data_ = np.array(data) 

397 data_ = data_[~np.isnan(data_)] 

398 return data_ 

399 

400 

401stats = Stats()