Source code for cosinorage.features.bulk_features

###########################################################################
# Copyright (C) 2025 ETH Zurich
# CosinorAge: Prediction of biological age based on accelerometer data
# using the CosinorAge method proposed by Shim, Fleisch and Barata
# (https://www.nature.com/articles/s41746-024-01111-x)
#
# Authors: Jacob Leo Oskar Hunecke
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##########################################################################

"""
bulk_features.py
----------------

Provides the BulkWearableFeatures class for batch computation and statistical analysis
of wearable-derived features across multiple datasets. This module is essential for
cohort studies and large-scale data analysis, enabling comprehensive feature extraction,
statistical summarization, and correlation analysis across multiple participants.

The BulkWearableFeatures class processes multiple DataHandler instances simultaneously,
computes features for each using the WearableFeatures class, and provides statistical
distributions and correlation matrices across all datasets. It includes robust error
handling for failed computations and supports both individual feature access and
summary statistics.

Typical usage example::

    # Create multiple data handlers
    handlers = [DataHandler1, DataHandler2, DataHandler3]

    # Initialize bulk feature computation
    bulk = BulkWearableFeatures(handlers, compute_distributions=True)

    # Access individual features
    individual_features = bulk.get_individual_features()

    # Get statistical distributions
    stats = bulk.get_distribution_stats()

    # Get summary DataFrame
    summary_df = bulk.get_summary_dataframe()

    # Get correlation matrix
    corr_matrix = bulk.get_feature_correlation_matrix()

    # Check for failed handlers
    failed = bulk.get_failed_handlers()

Features computed include:
    - Cosinor analysis (MESOR, amplitude, acrophase)
    - Non-parametric measures (IV, IS, RA, M10, L5)
    - Physical activity metrics (sedentary, light, moderate, vigorous)
    - Sleep metrics (TST, WASO, PTA, NWB, SOL, SRI)

Statistical measures provided:
    - count, mean, std, min, max, median
    - q25, q75, iqr (interquartile range)
    - mode, skewness
"""

from typing import Dict, List, Optional

import numpy as np
import pandas as pd

from ..datahandlers import DataHandler
from .features import WearableFeatures


[docs] class BulkWearableFeatures: """A class for computing and managing features from multiple wearable accelerometer datasets. This class processes multiple DataHandler instances to compute features for each and then calculates statistical distributions (mean, std, quartiles, etc.) across all datasets. It provides comprehensive analysis capabilities for cohort studies and large-scale wearable data analysis. The class handles feature computation failures gracefully, allowing analysis to continue even when some datasets fail to process. It provides both individual feature access and aggregated statistical summaries. Parameters ---------- handlers : List[DataHandler] List of DataHandler instances containing ENMO data. Each handler should have been properly initialized and loaded with data. features_args : dict, optional Arguments for feature computation passed to WearableFeatures. Common arguments include: - 'pa_params': Physical activity parameters - 'sleep_params': Sleep detection parameters Defaults to empty dict. compute_distributions : bool, optional Whether to compute statistical distributions across all features. If False, only individual features are computed. Defaults to True. cosinor_age_inputs : List[dict], optional List of dictionaries containing age and gender information for CosinorAge computation. Each dictionary should contain: - 'age': Chronological age (float) - 'gender': Gender ('male', 'female', or 'unknown', optional, defaults to 'unknown') - 'gt_cosinor_age': Ground truth cosinor age (float, optional) Must be the same length as handlers if provided. If all dictionaries contain 'gt_cosinor_age', a 'cosinor_age_prediction_error' feature will be computed. Defaults to None. Attributes ---------- handlers : List[DataHandler] List of DataHandler instances provided during initialization features_args : dict Arguments for feature computation cosinor_age_inputs : List[dict] List of age/gender dictionaries for CosinorAge computation individual_features : List[dict] List of feature dictionaries for each handler. Failed computations are represented as None. distribution_stats : dict Statistical distributions across all features. Only populated if compute_distributions=True. failed_handlers : List[tuple] List of (handler_index, error_message) tuples for handlers that failed during feature computation. Examples -------- >>> from cosinorage.datahandlers import GalaxyDataHandler >>> from cosinorage.features import BulkWearableFeatures >>> >>> # Create multiple handlers >>> handlers = [] >>> for i in range(3): ... handler = GalaxyDataHandler(f"data/participant_{i}.csv") ... handler.load_data() ... handlers.append(handler) >>> >>> # Define age and gender information for CosinorAge computation >>> cosinor_age_inputs = [ ... {"age": 25.5, "gender": "female", "gt_cosinor_age": 26.2}, ... {"age": 30.2, "gender": "male", "gt_cosinor_age": 31.1}, ... {"age": 28.0, "gender": "unknown", "gt_cosinor_age": 27.8} ... ] >>> >>> # Compute bulk features with CosinorAge >>> bulk = BulkWearableFeatures( ... handlers, ... compute_distributions=True, ... cosinor_age_inputs=cosinor_age_inputs ... ) >>> >>> # Get statistical summary (includes CosinorAge features) >>> stats = bulk.get_distribution_stats() >>> print(f"Computed features for {len(stats)} feature types") >>> >>> # Check for failures >>> failed = bulk.get_failed_handlers() >>> if failed: ... print(f"Failed handlers: {len(failed)}") """
[docs] def __init__( self, handlers: List[DataHandler], features_args: dict = {}, cosinor_age_inputs: Optional[List[dict]] = None, compute_distributions: bool = True ): """Initialize BulkWearableFeatures with multiple DataHandler instances. Parameters ---------- handlers : List[DataHandler] List of DataHandler instances containing ENMO data. Each handler should have been properly initialized and loaded with data. features_args : dict, optional Arguments for feature computation passed to WearableFeatures. Common arguments include: - 'pa_params': Physical activity parameters - 'sleep_params': Sleep detection parameters Defaults to empty dict. compute_distributions : bool, optional Whether to compute statistical distributions across all features. If False, only individual features are computed. Defaults to True. Notes ----- Empty handlers list is allowed and will result in empty individual_features and distribution_stats. """ self.handlers = handlers self.features_args = features_args self.cosinor_age_inputs = cosinor_age_inputs self.individual_features = [] self.distribution_stats = {} self.failed_handlers = [] # Validate cosinor_age_inputs if provided if self.cosinor_age_inputs is not None and len(self.cosinor_age_inputs) > 0: if len(self.cosinor_age_inputs) != len(self.handlers): raise ValueError( f"cosinor_age_inputs length ({len(self.cosinor_age_inputs)}) " f"must match handlers length ({len(self.handlers)})" ) for i, input_dict in enumerate(self.cosinor_age_inputs): if not isinstance(input_dict, dict) or 'age' not in input_dict: raise ValueError( f"cosinor_age_inputs[{i}] must be a dictionary with 'age' key" ) # Check if all handlers have gt_cosinor_age for prediction error computation self.compute_prediction_error = all( 'gt_cosinor_age' in input_dict and input_dict['gt_cosinor_age'] is not None for input_dict in self.cosinor_age_inputs ) else: self.compute_prediction_error = False self.__run(compute_distributions)
def __run(self, compute_distributions: bool = True): """Compute features for all handlers and optionally compute distributions. This method processes each handler sequentially, computing features using the WearableFeatures class. Failed computations are logged and stored for later inspection. If cosinor_age_inputs is provided, CosinorAge features are also computed and added to the individual features. Parameters ---------- compute_distributions : bool Whether to compute statistical distributions after individual feature computation. """ # Compute features for each handler for i, handler in enumerate(self.handlers): try: wearable_features = WearableFeatures( handler, self.features_args ) self.individual_features.append( wearable_features.get_features() ) except Exception as e: print(f"Failed to compute features for handler {i}: {str(e)}") self.failed_handlers.append((i, str(e))) self.individual_features.append(None) # Compute CosinorAge features if inputs are provided if self.cosinor_age_inputs is not None: self.__compute_cosinorage_features() # Compute distributions if requested and we have successful computations if compute_distributions and len(self.individual_features) > 0: self.__compute_distributions() def __compute_cosinorage_features(self): """Compute CosinorAge features for all handlers with valid age inputs. This method creates records for CosinorAge computation and adds the resulting features to the individual_features list. Only handlers with successful feature computations will have CosinorAge features added. """ # Import here to avoid circular import from ..bioages import CosinorAge # Type assertion since we know cosinor_age_inputs is not None when this method is called assert self.cosinor_age_inputs is not None # Create records for CosinorAge computation records = [] for i, (handler, age_input) in enumerate(zip(self.handlers, self.cosinor_age_inputs)): if self.individual_features[i] is not None: # Only process successful computations record = { "handler": handler, "age": age_input["age"], "gender": age_input.get("gender", "unknown"), "gt_cosinor_age": age_input.get("gt_cosinor_age", None) } records.append((i, record)) if not records: print("No valid records found for CosinorAge computation") return # Process each record individually with try-except for original_index, record in records: try: # Compute CosinorAge for this single record cosinorage_computer = CosinorAge([record]) predictions = cosinorage_computer.get_predictions() prediction = predictions[0] # Single record cosinorage_features = { "cosinorage": prediction["cosinorage"], "cosinorage_advance": prediction["cosinorage_advance"], } # Add prediction error if ground truth is available if self.compute_prediction_error: gt_cosinor_age = self.cosinor_age_inputs[original_index]["gt_cosinor_age"] cosinorage_features["cosinor_age_prediction_error"] = ( prediction["cosinorage"] - gt_cosinor_age ) # Add to existing features self.individual_features[original_index]["cosinorage"] = cosinorage_features except Exception as e: print(f"Failed to compute CosinorAge features for record {original_index}: {str(e)}") # Add null cosinorage features for this specific record cosinorage_features = { "cosinorage": None, "cosinorage_advance": None, } if self.compute_prediction_error: cosinorage_features["cosinor_age_prediction_error"] = None self.individual_features[original_index]["cosinorage"] = cosinorage_features def __compute_distributions(self): """Compute statistical distributions across all features. This method flattens all individual features into a single DataFrame and computes comprehensive statistical measures for each feature across all successful computations. """ # Filter out None values (failed computations) valid_features = [f for f in self.individual_features if f is not None] if len(valid_features) == 0: print("No valid features found for distribution computation") return # Flatten all features into a single DataFrame flattened_features = self.__flatten_features(valid_features) # Compute statistics for each feature self.distribution_stats = self.__compute_feature_statistics( flattened_features ) def __flatten_features(self, features_list: List[dict]) -> pd.DataFrame: """Flatten nested feature dictionaries into a DataFrame. This method converts the nested structure of individual feature dictionaries into a flat DataFrame where each row represents one handler and each column represents one feature. Nested features are flattened using the pattern 'category_feature_name'. Parameters ---------- features_list : List[dict] List of feature dictionaries from successful computations. Each dictionary contains nested feature categories. Returns ------- pd.DataFrame Flattened features DataFrame with handler_index column and one column per feature. Non-numeric features are excluded. """ flattened_data = [] for i, features in enumerate(features_list): row = {"handler_index": i} # Flatten nested dictionaries for category, category_features in features.items(): if isinstance(category_features, dict): for ( feature_name, feature_value, ) in category_features.items(): # Skip flag features if feature_name.endswith("_flag"): continue # Handle different data types if isinstance(feature_value, (list, np.ndarray)): # Only aggregate if all elements are numeric if len(feature_value) > 0 and all( isinstance( x, ( int, float, np.number, np.floating, np.integer, ), ) for x in feature_value ): # Special handling for cosinorage category to avoid duplication if category == "cosinorage": row[feature_name] = np.mean(feature_value) else: row[f"{category}_{feature_name}"] = np.mean( feature_value ) else: # Skip non-numeric lists (e.g., Timestamps) continue elif isinstance( feature_value, (int, float, np.number, np.floating, np.integer), ): # Special handling for cosinorage category to avoid duplication if category == "cosinorage": row[feature_name] = feature_value else: row[f"{category}_{feature_name}"] = feature_value else: # Skip non-numeric features continue else: # Direct feature value if isinstance( category_features, (int, float, np.number, np.floating, np.integer), ): row[category] = category_features flattened_data.append(row) return pd.DataFrame(flattened_data) def __compute_feature_statistics( self, df: pd.DataFrame ) -> Dict[str, Dict[str, float]]: """Compute statistical measures for each feature. This method computes comprehensive statistical measures for each numeric feature across all handlers. It includes descriptive statistics, distribution measures, and handles edge cases like empty data or single values. Parameters ---------- df : pd.DataFrame Flattened features DataFrame with handler_index column and numeric feature columns. Returns ------- Dict[str, Dict[str, float]]: Dictionary where keys are feature names and values are dictionaries containing statistical measures: - count: Number of non-null values - mean: Arithmetic mean - std: Standard deviation - min: Minimum value - max: Maximum value - median: Median value - q25: 25th percentile - q75: 75th percentile - iqr: Interquartile range (q75 - q25) - mode: Most frequent value (if available) - skewness: Distribution skewness (if available) """ stats = {} # Exclude non-numeric columns numeric_columns = df.select_dtypes(include=[np.number]).columns numeric_columns = [ col for col in numeric_columns if col != "handler_index" ] for column in numeric_columns: values = df[column].dropna() if len(values) == 0: continue column_stats = { "count": len(values), "mean": float(np.mean(values)), "std": float(np.std(values)), "min": float(np.min(values)), "max": float(np.max(values)), "median": float(np.median(values)), "q25": float(np.percentile(values, 25)), "q75": float(np.percentile(values, 75)), "iqr": float( np.percentile(values, 75) - np.percentile(values, 25) ), } # Compute mode (most frequent value) try: mode_values = values.mode() if len(mode_values) > 0: column_stats["mode"] = float(mode_values.iloc[0]) else: column_stats["mode"] = float("nan") except: column_stats["mode"] = float("nan") # Compute skewness try: column_stats["skewness"] = float(pd.Series(values).skew()) except: column_stats["skewness"] = float("nan") stats[column] = column_stats return stats
[docs] def get_individual_features(self) -> List[dict]: """Returns the individual feature dictionaries for each handler. This method provides access to the raw feature dictionaries computed for each handler. Failed computations are represented as None entries in the list. Returns ------- List[dict] List of feature dictionaries, one per handler. Each dictionary contains nested feature categories (cosinor, nonparam, physical_activity, sleep). If a handler failed during computation, its entry is None. Examples -------- >>> features = bulk.get_individual_features() >>> for i, feat in enumerate(features): ... if feat is not None: ... print(f"Handler {i}: MESOR = {feat['cosinor']['mesor']:.3f}") ... else: ... print(f"Handler {i}: Failed") """ return self.individual_features
[docs] def get_distribution_stats(self) -> Dict[str, Dict[str, float]]: """Returns the statistical distributions across all features. This method provides comprehensive statistical measures for each feature across all successful computations. The statistics include descriptive measures, distribution characteristics, and quartile information. Returns ------- Dict[str, Dict[str, float]] Statistical distributions for each feature. Keys are feature names (e.g., 'cosinor_mesor', 'nonparam_IS'). Values are dictionaries containing statistical measures: - count, mean, std, min, max, median - q25, q75, iqr (interquartile range) - mode, skewness Examples -------- >>> stats = bulk.get_distribution_stats() >>> mesor_stats = stats['cosinor_mesor'] >>> print(f"MESOR: mean={mesor_stats['mean']:.3f}, std={mesor_stats['std']:.3f}") """ return self.distribution_stats
[docs] def get_failed_handlers(self) -> List[tuple]: """Returns information about handlers that failed during feature computation. This method provides details about which handlers failed and why, allowing for debugging and quality control in large-scale analyses. Returns ------- List[tuple] List of (handler_index, error_message) tuples for handlers that failed during feature computation. Empty list if all handlers succeeded. Examples -------- >>> failed = bulk.get_failed_handlers() >>> for idx, error in failed: ... print(f"Handler {idx} failed: {error}") """ return self.failed_handlers
[docs] def get_summary_dataframe(self) -> pd.DataFrame: """Returns a summary DataFrame with all statistical measures for each feature. This method converts the statistical distributions into a pandas DataFrame format, making it easy to export, analyze, or visualize the results. Returns ------- pd.DataFrame Summary DataFrame with features as rows and statistics as columns. Columns include: feature, count, mean, std, min, max, median, q25, q75, iqr, mode, skewness. Empty DataFrame if no distributions were computed. Examples -------- >>> summary_df = bulk.get_summary_dataframe() >>> print(summary_df.head()) >>> # Export to CSV >>> summary_df.to_csv('feature_summary.csv', index=False) """ if not self.distribution_stats: return pd.DataFrame() # Convert to DataFrame summary_df = pd.DataFrame.from_dict( self.distribution_stats, orient="index" ) summary_df.index.name = "feature" summary_df.reset_index(inplace=True) return summary_df
[docs] def get_feature_correlation_matrix(self) -> pd.DataFrame: """Returns correlation matrix between features across all handlers. This method computes pairwise correlations between all numeric features across all successful computations. This is useful for understanding feature relationships and identifying redundant or highly correlated features. Returns ------- pd.DataFrame Correlation matrix of features. Values range from -1 to 1, where 1 indicates perfect positive correlation, -1 indicates perfect negative correlation, and 0 indicates no correlation. Empty DataFrame if insufficient data (less than 2 features or no successful computations). Examples -------- >>> corr_matrix = bulk.get_feature_correlation_matrix() >>> print(corr_matrix['cosinor_mesor']['nonparam_IS']) # Correlation between MESOR and IS >>> # Visualize with heatmap >>> import seaborn as sns >>> sns.heatmap(corr_matrix, annot=True, cmap='coolwarm') """ # Flatten features and create DataFrame valid_features = [f for f in self.individual_features if f is not None] if len(valid_features) == 0: return pd.DataFrame() flattened_df = self.__flatten_features(valid_features) # Select only numeric columns and compute correlation numeric_columns = flattened_df.select_dtypes( include=[np.number] ).columns numeric_columns = [ col for col in numeric_columns if col != "handler_index" ] if len(numeric_columns) < 2: return pd.DataFrame() # only keep rows where all features are not nan flattened_df = flattened_df.dropna(subset=numeric_columns) correlation_matrix = flattened_df[numeric_columns].corr() return correlation_matrix