Source code for tsfel.feature_extraction.calc_features

import glob
import importlib
import multiprocessing as mp
import numbers
import os
import pathlib
import sys
import warnings
from functools import partial
from pathlib import Path

import numpy as np
import pandas as pd
from IPython import get_ipython
from IPython.display import display

from tsfel.utils.progress_bar import display_progress_bar, progress_bar_notebook
from tsfel.utils.signal_processing import merge_time_series, signal_window_splitter


[docs] def dataset_features_extractor(main_directory, feat_dict, verbose=1, **kwargs): r"""Extracts features from a dataset. Parameters ---------- main_directory : String Input directory feat_dict : dict Dictionary with features verbose : int Verbosity mode. 0 = silent, 1 = progress bar. (0 or 1 (Default)) \**kwargs: See below: * *search_criteria* (``list``) -- List of file names to compute features. (Example: 'Accelerometer.txt') (default: ``None``) * *time_unit* (``float``) -- Time unit (default: ``1e9``) * *resampling_rate* (``int``) -- Resampling rate (default: ``100``) * *window_size* (``int``) -- Window size in number of samples (default: ``100``) * *overlap* (``float``) -- Overlap between 0 and 1 (default: ``0``) * *pre_process* (``function``) -- Function with pre processing code (default: ``None``) * *output_directory* (``String``) -- Output directory (default: ``'output_directory', str(Path.home()) + '/tsfel_output'``) * *features_path* (``string``) -- Directory of script with personal features * *header_names* (``list or array``) -- Names of each column window * *n_jobs* (``int``) -- The number of jobs to run in parallel. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. (default: ``None`` in Windows and ``-1`` for other systems) Returns ------- file csv file with the extracted features """ search_criteria = kwargs.get("search_criteria", None) time_unit = kwargs.get("time_unit", 1e9) resample_rate = kwargs.get("resample_rate", 30) window_size = kwargs.get("window_size", 100) overlap = kwargs.get("overlap", 0) pre_process = kwargs.get("pre_process", None) output_directory = kwargs.get( "output_directory", str(Path.home()) + "/tsfel_output", ) features_path = kwargs.get("features_path", None) names = kwargs.get("header_names", None) # Choosing default of n_jobs by operating system if sys.platform[:-2] == "win": n_jobs_default = None else: n_jobs_default = -1 # Choosing default of n_jobs by python interface if get_ipython().__class__.__name__ == "ZMQInteractiveShell" or get_ipython().__class__.__name__ == "Shell": n_jobs_default = -1 n_jobs = kwargs.get("n_jobs", n_jobs_default) if main_directory[-1] != os.sep: main_directory = main_directory + os.sep folders = list(glob.glob(main_directory + "**/", recursive=True)) if folders: for fl in folders: sensor_data = {} if search_criteria: for c in search_criteria: if os.path.isfile(fl + c): key = c.split(".")[0] sensor_data[key] = pd.read_csv(fl + c, header=None) else: all_files = np.concatenate( (glob.glob(fl + "/*.txt"), glob.glob(fl + "/*.csv")), ) for c in all_files: key = c.split(os.sep)[-1].split(".")[0] try: data_file = pd.read_csv(c, header=None) except pd.io.common.CParserError: continue if np.dtype("O") in np.array(data_file.dtypes): continue sensor_data[key] = pd.read_csv(c, header=None) if not sensor_data: continue pp_sensor_data = sensor_data if pre_process is None else pre_process(sensor_data) data_new = merge_time_series(pp_sensor_data, resample_rate, time_unit) windows = signal_window_splitter(data_new, window_size, overlap) if features_path: features = time_series_features_extractor( feat_dict, windows, fs=resample_rate, verbose=0, features_path=features_path, header_names=names, n_jobs=n_jobs, ) else: features = time_series_features_extractor( feat_dict, windows, fs=resample_rate, verbose=0, header_names=names, n_jobs=n_jobs, ) fl = "/".join(fl.split(os.sep)) invalid_char = r'<>:"\|?* ' for char in invalid_char: fl = fl.replace(char, "") pathlib.Path(output_directory + fl).mkdir(parents=True, exist_ok=True) features.to_csv( output_directory + fl + "/Features.csv", sep=",", encoding="utf-8", ) if verbose == 1: print("Features files saved in: ", output_directory) else: raise FileNotFoundError("There is no folder(s) in directory: " + main_directory)
def _calc_features(timeseries, config, fs, **kwargs): """Extraction of time series features. Parameters ---------- timeseries: list The input signal window from which features will be extracted. config : dict A dictionary containing the settings for feature extraction. fs : float, default=None Sampling frequency of the input signal. \**kwargs: Additional keyword arguments, see below: * *features_path* (str) -- Path to a script with custom features. * *header_names* (list or array-like) -- Names of each column window. Returns ------- pd.DataFrame A DataFrame containing the extracted features. """ features_path = kwargs.get("features_path", None) names = kwargs.get("header_names", None) feat_val = calc_window_features( config, timeseries, fs, features_path=features_path, header_names=names, ) feat_val.reset_index(drop=True) return feat_val
[docs] def time_series_features_extractor( config, timeseries, fs=None, window_size=None, overlap=0, verbose=1, **kwargs, ): """Extract features from univariate or multivariate time series. Parameters ---------- config : dict A dictionary containing the settings for feature extraction. timeseries : list, np.ndarray, pd.DataFrame, pd.Series The input signal from which features will be extracted. fs : float, default=None Sampling frequency of the input signal. window_size : int or None, optional, default=None The size of the windows used to split the input signal, measured in the number of samples. overlap : float, optional, default=0 A value between 0 and 1 that defines the percentage of overlap between consecutive windows. n_jobs : int, optional The number of jobs to run in parallel. - ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. - ``-1`` means using all available processors. - default: ``None`` on Windows, ``-1`` for other systems verbose : int, default=1 The verbosity mode. 0 means silent, and 1 means showing a progress bar. **kwargs : Additional keyword arguments, see below: * *features_path* (str) -- Path to a script with custom features. * *header_names* (list or array-like) -- Names of each column window. Returns ------- pd.DataFrame A DataFrame containing the extracted features, where: - Columns represent the names of the features. - Rows contain the feature values for each signal window. """ features_path = kwargs.get("features_path", None) names = kwargs.get("header_names", None) # Choosing default of n_jobs by operating system if sys.platform[:-2] == "win": n_jobs_default = None else: n_jobs_default = -1 # Choosing default of n_jobs by python interface if get_ipython().__class__.__name__ == "ZMQInteractiveShell" or get_ipython().__class__.__name__ == "Shell": n_jobs_default = -1 n_jobs = kwargs.get("n_jobs", n_jobs_default) if fs is None: warnings.warn( "Using default sampling frequency set in configuration file.", stacklevel=2, ) if names is not None: names = list(names) else: # Name of each column to be concatenated with feature name if isinstance(timeseries, pd.DataFrame): names = timeseries.columns.values elif isinstance(timeseries[0], pd.DataFrame): names = timeseries[0].columns.values if window_size is not None: timeseries = signal_window_splitter(timeseries, window_size, overlap) if len(timeseries) == 0: raise SystemExit( "Empty signal windows. Please check window size input parameter.", ) features_final = pd.DataFrame() if isinstance(timeseries, list) and isinstance(timeseries[0], numbers.Real): timeseries = np.array(timeseries) if not isinstance(timeseries, list) and len(np.shape(timeseries)) > 2: timeseries = list(timeseries) # more than one window if isinstance(timeseries, list): # Starting the display of progress bar for notebooks interfaces if (get_ipython().__class__.__name__ == "ZMQInteractiveShell") or (get_ipython().__class__.__name__ == "Shell"): out = display( progress_bar_notebook(0, len(timeseries)), display_id=True, ) else: out = None if isinstance(n_jobs, int): # Multiprocessing use if n_jobs == -1: cpu_count = mp.cpu_count() else: cpu_count = n_jobs pool = mp.Pool(cpu_count) features = pool.imap( partial( _calc_features, config=config, fs=fs, features_path=features_path, header_names=names, ), timeseries, ) for i, feat in enumerate(features): if verbose == 1: display_progress_bar(i, len(timeseries), out) features_final = pd.concat([features_final, feat], axis=0) pool.close() pool.join() elif n_jobs is None: for i, feat in enumerate(timeseries): features_final = pd.concat( [ features_final, calc_window_features( config, feat, fs, features_path=features_path, header_names=names, ), ], axis=0, ) if verbose == 1: display_progress_bar(i, len(timeseries), out) else: raise SystemExit( "n_jobs value is not valid. " "Choose an integer value or None for no multiprocessing.", ) # single window else: features_final = calc_window_features( config, timeseries, fs, verbose=verbose, features_path=features_path, header_names=names, single_window=True, ) # Assuring the same feature extraction order features_final = features_final.reindex(sorted(features_final.columns), axis=1) return features_final.reset_index(drop=True)
[docs] def calc_window_features( config, window, fs, verbose=1, single_window=False, **kwargs, ): """Extract features from a univariate or multivariate window. Parameters ---------- config : dict A dictionary containing the settings for feature extraction. window : np.ndarray, pd.DataFrame, pd.Series The input signal from which features will be extracted. fs : float, default=None Sampling frequency of the input signal. verbose : int, default=1 The verbosity mode. 0 means silent, and 1 means showing a progress bar. single_window : bool If `True`, the progress bar will be shown only for the extraction of features from a single window. **kwargs : Additional keyword arguments, see below: * *features_path* (str) -- Path to a script with custom features. * *header_names* (list or array-like) -- Names of each column window. Returns ------- pd.DataFrame A DataFrame containing the extracted features. """ features_path = kwargs.get("features_path", None) header_names = kwargs.get("header_names", None) # To handle object type signals window = np.array(window).astype(float) single_axis = True if len(window.shape) == 1 else False if header_names is None: header_names = np.array([0]) if single_axis else np.arange(window.shape[-1]) else: if (len(header_names) != window.shape[-1] and not single_axis) or ( len(header_names) != 1 and single_axis ): raise Exception("header_names dimension does not match input columns.") # Execute imports exec("from tsfel import *") domain = config.keys() if features_path: sys.path.append(features_path[: -len(features_path.split(os.sep)[-1]) - 1]) exec("import " + features_path.split(os.sep)[-1][:-3]) importlib.reload(sys.modules[features_path.split(os.sep)[-1][:-3]]) exec("from " + features_path.split(os.sep)[-1][:-3] + " import *") # Create global arrays feature_results = [] feature_names = [] # Starting the display of progress bar for notebooks interfaces # Iterating over features of a single window if verbose == 1 and single_window: feat_nb = np.hstack([list(config[_type].keys()) for _type in domain]) if (get_ipython().__class__.__name__ == "ZMQInteractiveShell") or (get_ipython().__class__.__name__ == "Shell"): out = display(progress_bar_notebook(0, len(feat_nb)), display_id=True) else: out = None i_feat = -1 for _type in domain: domain_feats = config[_type].keys() for feat in domain_feats: if verbose == 1 and single_window: i_feat = i_feat + 1 display_progress_bar(i_feat, len(feat_nb), out) # Only returns used functions if config[_type][feat]["use"] == "yes": # Read Function (real name of function) func_total = config[_type][feat]["function"] if func_total.find("tsfel.") == 0: func_total = func_total.replace("tsfel.", "") # Check for parameters parameters_total = {} if config[_type][feat]["parameters"] != "": parameters_total = config[_type][feat]["parameters"] # Check assert fs parameter: if "fs" in parameters_total: # Select which fs to use if fs is None: # Check if features dict has default sampling frequency value if not (type(parameters_total["fs"]) is int or type(parameters_total["fs"]) is float): raise Exception("No sampling frequency assigned.") else: parameters_total["fs"] = fs # Eval feature results if single_axis: eval_result = locals()[func_total]( window, **parameters_total, ) eval_result = np.array([eval_result]) for ax in range(len(header_names)): sig_ax = window if single_axis else window[:, ax] eval_result_ax = locals()[func_total](sig_ax, **parameters_total) # Function returns more than one element if isinstance(eval_result_ax, tuple): eval_result_ax = ( np.zeros(len(eval_result_ax)) if np.isnan(eval_result_ax[0]) else eval_result_ax ) for rr, value in enumerate(eval_result_ax): feature_results.append(value) feature_names.append(f"{header_names[ax]}_{feat}_{rr}") elif isinstance(eval_result_ax, dict): names = eval_result_ax["names"] values = eval_result_ax["values"] eval_result_ax = np.zeros(len(values)) if np.isnan(values[0]) else eval_result_ax for name, value in zip(names, values): feature_results.append(value) feature_names.append(f"{header_names[ax]}_{feat}_{name}") else: feature_results += [eval_result_ax] feature_names += [str(header_names[ax]) + "_" + feat] features = pd.DataFrame( data=np.array(feature_results).reshape(1, len(feature_results)), columns=np.array(feature_names), ) return features