Coverage for /var/devmt/py/utils4_1.5.0rc1/utils4/stats.py: 100%
70 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 15:38 +0100
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-12 15:38 +0100
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3"""
4:Purpose: Provide access to various statistical calculations, namely:
6 - **CUSUM:** :meth:`~Stats.cusum`
7 - **Gaussian KDE:** :meth:`~Stats.kde`
8 - **Linear Regression:** :class:`~LinearRegression`
10:Platform: Linux/Windows | Python 3.6+
11:Developer: J Berendt
12:Email: development@s3dev.uk
14:Comments: n/a
16:Example:
18 Create a sample dataset for the stats methods::
20 >>> import matplotlib.pyplot as plt
21 >>> import numpy as np
22 >>> import pandas as pd
24 >>> np.random.seed(73)
25 >>> data = np.random.normal(size=100)*100
26 >>> x = np.arange(data.size)
27 >>> y = pd.Series(data).rolling(window=25, min_periods=25).mean().cumsum()
29 >>> # Preview the trend.
30 >>> plt.plot(x, y)
32"""
33# pylint: disable=line-too-long
34# pylint: disable=wrong-import-order
36import numpy as np
37import pandas as pd
38from scipy.stats import gaussian_kde
39from typing import Union
40# local
41from utils4.reporterror import reporterror
44class LinearRegression:
45 """Calculate the linear regression of a dataset.
47 Args:
48 x (np.array): Array of X-values.
49 y (np.array): Array of Y-values.
51 :Slope Calculation:
52 The calculation for the slope itself is borrowed from the
53 :func:`scipy.stats.linregress` function. Whose `source code`_ was
54 obtained on GitHub.
56 .. _source code: https://github.com/scipy/scipy/blob/v1.8.0/scipy/stats/_stats_mstats_common.py#L16-L203
58 :Example Use:
60 .. tip::
62 For a sample dataset and imports to go along with this
63 example, refer to the docstring for
64 :mod:`this module <stats>`.
66 Calculate a linear regression line on an X/Y dataset::
68 >>> from lib.stats import LinearRegression
70 >>> linreg = LinearRegression(x, y)
71 >>> linreg.calculate()
73 >>> # Obtain the regression line array.
74 >>> y_ = linreg.regression_line
76 >>> # View the intercept value.
77 >>> linreg.intercept
78 -31.26630
80 >>> # View the slope value.
81 >>> linreg.slope
82 1.95332
84 >>> # Plot the trend and regression line.
85 >>> plt.plot(x, y, 'grey')
86 >>> plt.plot(x, y_, 'red')
87 >>> plt.show()
89 """
91 def __init__(self, x: np.array, y: np.array):
92 """LinearRegression class initialiser."""
93 self._x = x
94 self._y = y
95 self._xbar = 0.0
96 self._ybar = 0.0
97 self._c = 0.0
98 self._m = 0.0
99 self._line = np.array(())
101 @property
102 def slope(self):
103 """Accessor to the slope value."""
104 return self._m
106 @property
107 def intercept(self):
108 """Accessor to the slope's y-intercept."""
109 return self._c
111 @property
112 def regression_line(self):
113 """Accessor to the calculated regression line, as y-values."""
114 return self._line
116 def calculate(self):
117 """Calculate the linear regression for the X/Y data arrays.
119 The result of the calculation is accessible via the
120 :attr:`regression_line` property.
122 """
123 self._calc_means()
124 self._calc_slope()
125 self._calc_intercept()
126 self._calc_regression_line()
128 def _calc_intercept(self):
129 """Calculate the intercept as: ybar - m * xbar."""
130 self._c = self._ybar - self._m * self._xbar
132 def _calc_means(self) -> float:
133 """Calculate the mean of the X and Y arrays."""
134 self._xbar = self._x.mean()
135 self._ybar = self._y.mean()
137 def _calc_regression_line(self):
138 """Calculate the regression line as: y = mx + c."""
139 self._line = self._m * self._x + self._c
141 def _calc_slope(self):
142 """Calculate the slope value as: R * ( std(y) / std(x) ).
144 Per the ``scipy`` source code comments::
146 # Average sums of square differences from the mean
147 # ssxm = mean( (x-mean(x))^2 )
148 # ssxym = mean( (x-mean(x)) * (y-mean(y)) )
150 ...
152 slope = ssxym / ssxm
154 """
155 ssxm = np.mean( (self._x - self._xbar)**2 )
156 ssxym = np.mean( (self._x - self._xbar) * (self._y - self._ybar) )
157 self._m = ssxym / ssxm
159 @staticmethod
160 def _calc_std(data: np.array, ddof: int=1) -> float: # pragma nocover
161 """Calculate the standard deviation.
163 Args:
164 data (np.array): Array of values.
165 ddof (int): Degrees of freedom. Defaults to 1.
167 Returns:
168 float: Standard deviation of the given values.
170 """
171 return np.std(data, ddof=ddof)
174class Stats:
175 """Wrapper class for various statistical calculations."""
177 @staticmethod
178 def cusum(df: pd.DataFrame,
179 cols: Union[list, str],
180 window: int=None,
181 min_periods: int=1,
182 inplace=False,
183 show_plot: bool=False) -> Union[pd.DataFrame, None]:
184 r"""Calculate a CUSUM on a set of data.
186 A CUSUM is a generalised method for smoothing a noisy trend, or
187 for detecting a change in the trend.
189 Note:
190 A CUSUM is *not* a cumulative sum (cumsum), although a
191 cumulative sum is used. A CUSUM is a cumulative sum of
192 derived values, where each derived value is calculated as the
193 delta of a single value relative to the rolling mean of all
194 previous values.
196 Args:
197 df (pd.DataFrame): The DataFrame containing the column(s) on
198 which a CUSUM is to be calculated.
199 cols (Union[list, str]): The column (or list of columns) on
200 which the CUSUM is to be calculated.
201 window (int, optional): Size of the window on which the
202 rolling mean is to be calculated. This corresponds to the
203 ``pandas.df.rolling(window)`` parameter.
204 Defaults to None.
206 - If None is received, a %5 window is calculated based on
207 the length of the DataFrame. This method helps smooth
208 the trend, while keeping a representation of the
209 original trend.
210 - For a *true* CUSUM, a running average should be
211 calculated on the length of the DataFrame, except for
212 the current value. For this method, pass
213 ``window=len(df)``.
215 min_periods (int, optional): Number of periods to wait before
216 calculating the rolling average. Defaults to 1.
217 inplace (bool, optional): Update the passed DataFrame
218 (in-place), rather returning a *copy* of the passed
219 DataFrame. Defaults to False.
220 show_plot (bool, optional): Display a graph of the raw value,
221 and the calculated CUSUM results. Defaults to False.
223 :Calculation:
224 The CUSUM is calculated by taking a rolling mean :math:`RA`
225 (optionally locked at the first value), and calculate the
226 delta of the current value, relative to the rolling mean all
227 previous values. A cumulative sum is applied to the deltas.
228 The cumulative sum for each data point is returned as the
229 CUSUM value.
231 :Equation:
233 :math:`c_i = \sum_{i=1}^{n}(x_i - RA_i)`
235 where :math:`RA` (Rolling Mean) is defined as:
237 :math:`RA_{i+1} = \frac{1}{n}\sum_{j=1}^{n}x_j`
239 :Example Use:
241 Generate a *sample* trend dataset::
243 >>> import numpy as np
244 >>> import pandas as pd
246 >>> np.random.seed(13)
247 >>> s1 = pd.Series(np.random.randn(1000)).rolling(window=100).mean()
248 >>> np.random.seed(73)
249 >>> s2 = pd.Series(np.random.randn(1000)).rolling(window=100).mean()
250 >>> df = pd.DataFrame({'sample1': s1, 'sample2': s2})
253 Example for calculating a CUSUM on two columns::
255 >>> from EHM.stats import stats
257 >>> df_c = stats.cusum(df=df,
258 cols=['sample1', 'sample2'],
259 window=len(df),
260 inplace=False,
261 show_plot=True)
262 >>> df_c.tail()
263 sample1 sample2 sample1_cusum sample2_cusum
264 995 0.057574 0.065887 23.465337 29.279936
265 996 0.062781 0.072213 23.556592 29.369397
266 997 0.028513 0.072658 23.613478 29.459204
267 998 0.024518 0.070769 23.666305 29.547022
268 999 0.000346 0.074849 23.694901 29.638822
270 Returns:
271 Union[pd.DataFrame, None]: If the ``inplace`` argument is
272 ``False``, a *copy* of the original DataFrame with the new
273 CUSUM columns appended is returned. Otherwise, the passed
274 DataFrame is *updated*, and ``None`` is returned.
276 """
277 # Convert a single column name to a list.
278 cols = [cols] if not isinstance(cols, list) else cols
279 if not inplace:
280 df = df.copy(deep=True)
281 window = int(len(df) * 0.05) if window is None else window # Set default window as 5%
282 for col in cols:
283 new_col = f'{col}_cusum'
284 # CUSUM calculation (rolling_sum on rolling_mean, with a shift of 1).
285 df[new_col] = ((df[col] - df[col].rolling(window=window, min_periods=min_periods)
286 .mean()
287 .shift(1))
288 .rolling(window=len(df), min_periods=min_periods).sum())
289 # Show simple plot if requested.
290 if show_plot: # pragma: nocover
291 df[[col, new_col]].plot(title=f'TEMP PLOT\n{col} vs {new_col}',
292 color=['lightgrey', 'red'],
293 secondary_y=new_col,
294 legend=False,
295 grid=False)
296 return None if inplace else df
298 def kde(self,
299 data: Union[list, np.array, pd.Series],
300 n: int=500) -> tuple:
301 """Calculate the kernel density estimate (KDE) for an array X.
303 This function returns the *probability density* (PDF) using
304 Gaussian KDE.
306 Args:
307 data (Union[list, np.array, pd.Series]): An array-like object
308 containing the data against which the Gaussian KDE is
309 calculated. This can be a list, numpy array, or pandas
310 Series.
311 n (int, optional): Number of values returned in the X, Y
312 arrays. Defaults to 500.
314 :Example Use:
316 .. tip::
318 For a sample dataset and imports to go along with this
319 example, refer to the docstring for
320 :mod:`this module <stats>`.
322 Calculate a Gaussian KDE on Y::
324 >>> from utils4.stats import stats
326 >>> # Preview the histogram.
327 >>> _ = plt.hist(data)
329 >>> X, Y, max_x = stats.kde(data=data, n=500)
330 >>> plt.plot(X, Y)
332 >>> # Show X value at peak of curve.
333 >>> max_x
334 -9.718684033029376
336 :Max X:
337 This function also returns the X value of the curve's peak;
338 where ``max_x`` is the ``X`` value corresponding to the max
339 ``Y`` value on the curve. The result (``max_x``) is
340 returned as the third tuple element.
342 :Further Detail:
343 This method uses the :func:`scipy.stats.gaussian_kde` method
344 for the KDE calculation. For further detail on the
345 calculation itself, refer to that function's docstring.
347 :Background:
348 Originally, :func:`plotly.figure_factory.dist_plot` was used
349 to calculate the KDE. However, to remove the ``plotly``
350 dependency from this library, their code was copied and
351 refactored (simplified) into this function. Both the
352 :func:`dist_plot` and :func:`pandas.DataFrame.plot.kde`
353 method call :func:`scipy.stats.gaussian_kde` for the
354 calculation, which this function also calls.
356 Returns:
357 tuple: A tuple containing the X-array, Y-array
358 (both of ``n`` size), as well a the X value at max Y, as::
360 (curve_x, curve_y, max_x)
362 """
363 try:
364 data_ = self._obj_to_array(data=data)
365 curve_x = np.linspace(data_.min(), data_.max(), n)
366 curve_y = gaussian_kde(data_).evaluate(curve_x)
367 max_x = curve_x[curve_y.argmax()]
368 return (curve_x, curve_y, max_x)
369 except Exception as err:
370 reporterror(err)
371 return (np.array(()), np.array(()), 0.0)
373 @staticmethod
374 def _obj_to_array(data: Union[list, np.array, pd.Series]) -> np.ndarray:
375 """Convert an iterable object to a numpy array.
377 Args:
378 data (Union[list, np.array, pd.Series]): Array-like object
379 to be converted into a ``numpy.ndarray``.
381 :NaN Values:
382 In addition to converting the following types to a
383 ``numpy.ndarray``, any ``nan`` values are dropped from
384 the ``numpy.array`` and ``pd.Series`` objects.
386 Returns:
387 np.array: A ``numpy.ndarray``, with ``nan`` values removed.
389 """
390 data_ = None
391 if isinstance(data, np.ndarray):
392 data_ = data
393 elif isinstance(data, pd.Series):
394 data_ = data.astype(float).to_numpy()
395 elif isinstance(data, list):
396 data_ = np.array(data)
397 data_ = data_[~np.isnan(data_)]
398 return data_
401stats = Stats()