Source code for tsfel.feature_extraction.calc_features
import glob
import importlib
import multiprocessing as mp
import numbers
import os
import pathlib
import sys
import warnings
from functools import partial
from pathlib import Path
import numpy as np
import pandas as pd
from IPython import get_ipython
from IPython.display import display
from tsfel.utils.progress_bar import display_progress_bar, progress_bar_notebook
from tsfel.utils.signal_processing import merge_time_series, signal_window_splitter
[docs]
def dataset_features_extractor(main_directory, feat_dict, verbose=1, **kwargs):
r"""Extracts features from a dataset.
Parameters
----------
main_directory : String
Input directory
feat_dict : dict
Dictionary with features
verbose : int
Verbosity mode. 0 = silent, 1 = progress bar.
(0 or 1 (Default))
\**kwargs:
See below:
* *search_criteria* (``list``) --
List of file names to compute features. (Example: 'Accelerometer.txt')
(default: ``None``)
* *time_unit* (``float``) --
Time unit
(default: ``1e9``)
* *resampling_rate* (``int``) --
Resampling rate
(default: ``100``)
* *window_size* (``int``) --
Window size in number of samples
(default: ``100``)
* *overlap* (``float``) --
Overlap between 0 and 1
(default: ``0``)
* *pre_process* (``function``) --
Function with pre processing code
(default: ``None``)
* *output_directory* (``String``) --
Output directory
(default: ``'output_directory', str(Path.home()) + '/tsfel_output'``)
* *features_path* (``string``) --
Directory of script with personal features
* *header_names* (``list or array``) --
Names of each column window
* *n_jobs* (``int``) --
The number of jobs to run in parallel. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
(default: ``None`` in Windows and ``-1`` for other systems)
Returns
-------
file
csv file with the extracted features
"""
search_criteria = kwargs.get("search_criteria", None)
time_unit = kwargs.get("time_unit", 1e9)
resample_rate = kwargs.get("resample_rate", 30)
window_size = kwargs.get("window_size", 100)
overlap = kwargs.get("overlap", 0)
pre_process = kwargs.get("pre_process", None)
output_directory = kwargs.get(
"output_directory",
str(Path.home()) + "/tsfel_output",
)
features_path = kwargs.get("features_path", None)
names = kwargs.get("header_names", None)
# Choosing default of n_jobs by operating system
if sys.platform[:-2] == "win":
n_jobs_default = None
else:
n_jobs_default = -1
# Choosing default of n_jobs by python interface
if get_ipython().__class__.__name__ == "ZMQInteractiveShell" or get_ipython().__class__.__name__ == "Shell":
n_jobs_default = -1
n_jobs = kwargs.get("n_jobs", n_jobs_default)
if main_directory[-1] != os.sep:
main_directory = main_directory + os.sep
folders = list(glob.glob(main_directory + "**/", recursive=True))
if folders:
for fl in folders:
sensor_data = {}
if search_criteria:
for c in search_criteria:
if os.path.isfile(fl + c):
key = c.split(".")[0]
sensor_data[key] = pd.read_csv(fl + c, header=None)
else:
all_files = np.concatenate(
(glob.glob(fl + "/*.txt"), glob.glob(fl + "/*.csv")),
)
for c in all_files:
key = c.split(os.sep)[-1].split(".")[0]
try:
data_file = pd.read_csv(c, header=None)
except pd.io.common.CParserError:
continue
if np.dtype("O") in np.array(data_file.dtypes):
continue
sensor_data[key] = pd.read_csv(c, header=None)
if not sensor_data:
continue
pp_sensor_data = sensor_data if pre_process is None else pre_process(sensor_data)
data_new = merge_time_series(pp_sensor_data, resample_rate, time_unit)
windows = signal_window_splitter(data_new, window_size, overlap)
if features_path:
features = time_series_features_extractor(
feat_dict,
windows,
fs=resample_rate,
verbose=0,
features_path=features_path,
header_names=names,
n_jobs=n_jobs,
)
else:
features = time_series_features_extractor(
feat_dict,
windows,
fs=resample_rate,
verbose=0,
header_names=names,
n_jobs=n_jobs,
)
fl = "/".join(fl.split(os.sep))
invalid_char = r'<>:"\|?* '
for char in invalid_char:
fl = fl.replace(char, "")
pathlib.Path(output_directory + fl).mkdir(parents=True, exist_ok=True)
features.to_csv(
output_directory + fl + "/Features.csv",
sep=",",
encoding="utf-8",
)
if verbose == 1:
print("Features files saved in: ", output_directory)
else:
raise FileNotFoundError("There is no folder(s) in directory: " + main_directory)
[docs]
def calc_features(wind_sig, dict_features, fs, **kwargs):
r"""Extraction of time series features.
Parameters
----------
wind_sig: list
Input from which features are computed, window
dict_features : dict
Dictionary with features
fs : float or None
Sampling frequency
\**kwargs:
* *features_path* (``string``) --
Directory of script with personal features
* *header_names* (``list or array``) --
Names of each column window
Returns
-------
DataFrame
Extracted features
"""
features_path = kwargs.get("features_path", None)
names = kwargs.get("header_names", None)
feat_val = calc_window_features(
dict_features,
wind_sig,
fs,
features_path=features_path,
header_names=names,
)
feat_val.reset_index(drop=True)
return feat_val
[docs]
def time_series_features_extractor(
dict_features,
signal_windows,
fs=None,
verbose=1,
**kwargs,
):
r"""Extraction of time series features.
Parameters
----------
dict_features : dict
Dictionary with features
signal_windows: list
Input from which features are computed, window
fs : int or None
Sampling frequency
verbose : int
Verbosity mode. 0 = silent, 1 = progress bar.
(0 or 1 (Default))
\**kwargs:
See below:
* *window_size* (``int``) --
Window size in number of samples
(default: ``100``)
* *overlap* (``float``) --
Overlap between 0 and 1
(default: ``0``)
* *features_path* (``string``) --
Directory of script with personal features
* *header_names* (``list or array``) --
Names of each column window
* *n_jobs* (``int``) --
The number of jobs to run in parallel. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
(default: ``None`` in Windows and ``-1`` for other systems)
Returns
-------
DataFrame
Extracted features
"""
if verbose == 1:
print("*** Feature extraction started ***")
window_size = kwargs.get("window_size", None)
overlap = kwargs.get("overlap", 0)
features_path = kwargs.get("features_path", None)
names = kwargs.get("header_names", None)
# Choosing default of n_jobs by operating system
if sys.platform[:-2] == "win":
n_jobs_default = None
else:
n_jobs_default = -1
# Choosing default of n_jobs by python interface
if get_ipython().__class__.__name__ == "ZMQInteractiveShell" or get_ipython().__class__.__name__ == "Shell":
n_jobs_default = -1
n_jobs = kwargs.get("n_jobs", n_jobs_default)
if fs is None:
warnings.warn(
"Using default sampling frequency set in configuration file.",
stacklevel=2,
)
if names is not None:
names = list(names)
else:
# Name of each column to be concatenated with feature name
if isinstance(signal_windows, pd.DataFrame):
names = signal_windows.columns.values
elif isinstance(signal_windows[0], pd.DataFrame):
names = signal_windows[0].columns.values
if window_size is not None:
signal_windows = signal_window_splitter(signal_windows, window_size, overlap)
if len(signal_windows) == 0:
raise SystemExit(
"Empty signal windows. Please check window size input parameter.",
)
features_final = pd.DataFrame()
if isinstance(signal_windows, list) and isinstance(signal_windows[0], numbers.Real):
signal_windows = np.array(signal_windows)
if len(np.shape(signal_windows)) > 2:
signal_windows = list(signal_windows)
# more than one window
if isinstance(signal_windows, list):
# Starting the display of progress bar for notebooks interfaces
if (get_ipython().__class__.__name__ == "ZMQInteractiveShell") or (get_ipython().__class__.__name__ == "Shell"):
out = display(
progress_bar_notebook(0, len(signal_windows)),
display_id=True,
)
else:
out = None
if isinstance(n_jobs, int):
# Multiprocessing use
if n_jobs == -1:
cpu_count = mp.cpu_count()
else:
cpu_count = n_jobs
pool = mp.Pool(cpu_count)
features = pool.imap(
partial(
calc_features,
dict_features=dict_features,
fs=fs,
features_path=features_path,
header_names=names,
),
signal_windows,
)
for i, feat in enumerate(features):
if verbose == 1:
display_progress_bar(i, len(signal_windows), out)
features_final = pd.concat([features_final, feat], axis=0)
pool.close()
pool.join()
elif n_jobs is None:
for i, feat in enumerate(signal_windows):
features_final = pd.concat(
[
features_final,
calc_window_features(
dict_features,
feat,
fs,
features_path=features_path,
header_names=names,
),
],
axis=0,
)
if verbose == 1:
display_progress_bar(i, len(signal_windows), out)
else:
raise SystemExit(
"n_jobs value is not valid. " "Choose an integer value or None for no multiprocessing.",
)
# single window
else:
features_final = calc_window_features(
dict_features,
signal_windows,
fs,
verbose=verbose,
features_path=features_path,
header_names=names,
single_window=True,
)
if verbose == 1:
print("\n" + "*** Feature extraction finished ***")
# Assuring the same feature extraction order
features_final = features_final.reindex(sorted(features_final.columns), axis=1)
return features_final.reset_index(drop=True)
[docs]
def calc_window_features(
dict_features,
signal_window,
fs,
verbose=1,
single_window=False,
**kwargs,
):
r"""This function computes features matrix for one window.
Parameters
----------
dict_features : dict
Dictionary with features
signal_window: pandas DataFrame
Input from which features are computed, window
fs : float
Sampling frequency
verbose : int
Level of function communication
(0 or 1 (Default))
single_window: Bool
Boolean value for printing the progress bar for only one window feature extraction
\**kwargs:
See below:
* *features_path* (``string``) --
Directory of script with personal features
* *header_names* (``list or array``) --
Names of each column window
Returns
-------
pandas DataFrame
(columns) names of the features
(data) values of each features for signal
"""
features_path = kwargs.get("features_path", None)
header_names = kwargs.get("header_names", None)
# To handle object type signals
signal_window = np.array(signal_window).astype(float)
single_axis = True if len(signal_window.shape) == 1 else False
if header_names is None:
header_names = np.array([0]) if single_axis else np.arange(signal_window.shape[-1])
else:
if (len(header_names) != signal_window.shape[-1] and not single_axis) or (
len(header_names) != 1 and single_axis
):
raise Exception("header_names dimension does not match input columns.")
# Execute imports
exec("from tsfel import *")
domain = dict_features.keys()
if features_path:
sys.path.append(features_path[: -len(features_path.split(os.sep)[-1]) - 1])
exec("import " + features_path.split(os.sep)[-1][:-3])
importlib.reload(sys.modules[features_path.split(os.sep)[-1][:-3]])
exec("from " + features_path.split(os.sep)[-1][:-3] + " import *")
# Create global arrays
feature_results = []
feature_names = []
# Starting the display of progress bar for notebooks interfaces
# Iterating over features of a single window
if verbose == 1 and single_window:
feat_nb = np.hstack([list(dict_features[_type].keys()) for _type in domain])
if (get_ipython().__class__.__name__ == "ZMQInteractiveShell") or (get_ipython().__class__.__name__ == "Shell"):
out = display(progress_bar_notebook(0, len(feat_nb)), display_id=True)
else:
out = None
i_feat = -1
for _type in domain:
domain_feats = dict_features[_type].keys()
for feat in domain_feats:
if verbose == 1 and single_window:
i_feat = i_feat + 1
display_progress_bar(i_feat, len(feat_nb), out)
# Only returns used functions
if dict_features[_type][feat]["use"] == "yes":
# Read Function (real name of function)
func_total = dict_features[_type][feat]["function"]
if func_total.find("tsfel.") == 0:
func_total = func_total.replace("tsfel.", "")
# Check for parameters
parameters_total = {}
if dict_features[_type][feat]["parameters"] != "":
parameters_total = dict_features[_type][feat]["parameters"]
# Check assert fs parameter:
if "fs" in parameters_total:
# Select which fs to use
if fs is None:
# Check if features dict has default sampling frequency value
if not (type(parameters_total["fs"]) is int or type(parameters_total["fs"]) is float):
raise Exception("No sampling frequency assigned.")
else:
parameters_total["fs"] = fs
# Eval feature results
if single_axis:
eval_result = locals()[func_total](
signal_window,
**parameters_total,
)
eval_result = np.array([eval_result])
for ax in range(len(header_names)):
sig_ax = signal_window if single_axis else signal_window[:, ax]
eval_result_ax = locals()[func_total](sig_ax, **parameters_total)
# Function returns more than one element
if isinstance(eval_result_ax, tuple):
if np.isnan(eval_result_ax[0]):
eval_result_ax = np.zeros(len(eval_result_ax))
for rr in range(len(eval_result_ax)):
feature_results += [eval_result_ax[rr]]
feature_names += [
str(header_names[ax]) + "_" + feat + "_" + str(rr),
]
else:
feature_results += [eval_result_ax]
feature_names += [str(header_names[ax]) + "_" + feat]
features = pd.DataFrame(
data=np.array(feature_results).reshape(1, len(feature_results)),
columns=np.array(feature_names),
)
return features