Source code for cosinorage.datahandlers.utils.nhanes

###########################################################################
# Copyright (C) 2025 ETH Zurich
# CosinorAge: Prediction of biological age based on accelerometer data
# using the CosinorAge method proposed by Shim, Fleisch and Barata
# (https://www.nature.com/articles/s41746-024-01111-x)
#
# Authors: Jacob Leo Oskar Hunecke
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##########################################################################

import os
from datetime import datetime, timedelta
from typing import Optional

import numpy as np
import pandas as pd
from tqdm import tqdm

from .calc_enmo import calculate_enmo
from .filtering import filter_consecutive_days, filter_incomplete_days


[docs] def read_nhanes_data( file_dir: str, seqn: Optional[str] = None, meta_dict: dict = {}, verbose: bool = False, ) -> pd.DataFrame: """ Read and process NHANES accelerometer data files for a specific person. This function loads and processes National Health and Nutrition Examination Survey (NHANES) accelerometer data for a specific participant. It handles the complex NHANES data structure including day-level, minute-level, and header files. Parameters ---------- file_dir : str Directory containing NHANES data files. Must contain: - PAXDAY_*.xpt: Day-level data files - PAXHD_*.xpt: Header data files - PAXMIN_*.xpt: Minute-level data files seqn : str, optional Unique identifier for the participant. Required for data extraction. meta_dict : dict, default={} Dictionary to store metadata about the loaded data. Will be populated with: - raw_n_datapoints: Number of data points - raw_start_datetime: Start timestamp - raw_end_datetime: End timestamp - raw_data_frequency: Sampling frequency ('minute-level') - raw_data_type: Type of data ('accelerometer') - raw_data_unit: Unit of data ('MIMS') verbose : bool, default=False Whether to print processing status and progress information. Returns ------- pd.DataFrame Processed accelerometer data with columns: - 'x', 'y', 'z': Accelerometer values in MIMS units - 'wear': Binary wear detection (1=worn, 0=not worn) - 'sleep': Binary sleep detection (1=sleep, 0=wake) - 'paxpredm': Original NHANES prediction values The DataFrame is indexed by timestamp. Raises ------ ValueError If seqn is None or if no valid NHANES data is found for the participant. Notes ----- - Automatically detects and processes multiple NHANES data versions - Applies data quality filters (paxqfd < 1, valid_hours > 16) - Requires at least 4 days of valid data per participant - Filters for complete days (288 epochs per day) - Converts column names to lowercase for consistency - Removes byte-encoded data using remove_bytes function Examples -------- >>> import os >>> >>> # Load NHANES data for a specific participant >>> file_dir = '/path/to/nhanes/data' >>> seqn = '12345' # Participant ID >>> meta_dict = {} >>> data = read_nhanes_data( ... file_dir=file_dir, ... seqn=seqn, ... meta_dict=meta_dict, ... verbose=True ... ) >>> print(f"Loaded {len(data)} records for participant {seqn}") >>> print(f"Data columns: {data.columns.tolist()}") """ if seqn is None: raise ValueError("The seqn is required for nhanes data") # list all files in directory starting with PAX pax_files = [f for f in os.listdir(file_dir) if f.startswith("PAX")] # for each file starting with PAXDAY check if PAXHD and PAXMIN are present versions = [] for file in pax_files: if file.startswith("PAXDAY"): version = file.split("_")[1].strip(".xpt") if ( f"PAXHD_{version}.xpt" in pax_files and f"PAXMIN_{version}.xpt" in pax_files ): if ( seqn in pd.read_sas(f"{file_dir}/PAXDAY_{version}.xpt")[ "SEQN" ].unique() ): versions.append(version) if verbose: print(f"Found {len(versions)} versions of NHANES data: {versions}") if len(versions) == 0: raise ValueError( f"No valid versions of NHANES data found - this might be due to missing files. For each version we expect to find PAXDAY, PAXHD and PAXMIN files." ) # read all day-level files day_x = pd.DataFrame() for version in tqdm(versions, desc="Reading day-level files"): curr = pd.read_sas(f"{file_dir}/PAXDAY_{version}.xpt") curr = curr[curr["SEQN"] == seqn] day_x = pd.concat([day_x, curr], ignore_index=True) if day_x.empty: raise ValueError(f"No day-level data found for person {seqn}") # rename columns day_x = day_x.rename(columns=str.lower) day_x = remove_bytes(day_x) if verbose: print(f"Read {day_x.shape[0]} day-level records for person {seqn}") # check data quality flags day_x = day_x[day_x["paxqfd"] < 1] # check if valid hours are greater than 16 day_x = day_x.assign( valid_hours=(day_x["paxwwmd"] + day_x["paxswmd"]) / 60 ) day_x = day_x[day_x["valid_hours"] > 16] # check if there are at least 4 days of data day_x = day_x.groupby("seqn").filter(lambda x: len(x) >= 4) # read all minute-level files min_x = pd.DataFrame() for version in tqdm(versions, desc="Reading minute-level files"): itr_x = pd.read_sas( f"{file_dir}/PAXMIN_{version}.xpt", chunksize=100000 ) for chunk in tqdm( itr_x, desc=f"Processing chunks for version {version}" ): curr = clean_data(chunk, day_x) curr = curr[curr["SEQN"] == seqn] min_x = pd.concat([min_x, curr], ignore_index=True) min_x = min_x.rename(columns=str.lower) min_x = remove_bytes(min_x) if verbose: print(f"Read {min_x.shape[0]} minute-level records for person {seqn}") # add header data head_x = pd.DataFrame() for version in tqdm(versions, desc="Reading header files"): curr = pd.read_sas(f"{file_dir}/PAXHD_{version}.xpt") curr = curr[curr["SEQN"] == seqn] head_x = pd.concat([head_x, curr], ignore_index=True) head_x = head_x.rename(columns=str.lower) head_x = head_x[["seqn", "paxftime", "paxfday"]].rename( columns={"paxftime": "day1_start_time", "paxfday": "day1_which_day"} ) min_x = min_x.merge(head_x, on="seqn") min_x = remove_bytes(min_x) if verbose: print(f"Merged header and minute-level data for person {seqn}") # calculate measure time min_x["measure_time"] = min_x.apply(calculate_measure_time, axis=1) min_x["measure_hour"] = min_x["measure_time"].dt.hour valid_startend = ( min_x.groupby(["seqn", "paxdaym"]) .agg(start=("measure_hour", "min"), end=("measure_hour", "max")) .reset_index() ) min_x = min_x.merge(valid_startend, on=["seqn", "paxdaym"]) min_x = min_x[(min_x["start"] == 0) & (min_x["end"] == 23)] min_x["measure_min"] = min_x["measure_time"].dt.minute min_x["myepoch"] = ( 12 * min_x["measure_hour"] + np.floor(min_x["measure_min"] / 5 + 1) ).astype(int) # Count epochs per day and filter for complete days (288 epochs) epoch_counts = ( min_x.groupby(["seqn", "paxdaym"])["myepoch"].nunique().reset_index() ) epoch_counts = epoch_counts[epoch_counts["myepoch"] == 288] min_x = min_x.merge( epoch_counts[["seqn", "paxdaym"]], on=["seqn", "paxdaym"] ) # Count valid days per participant and filter for at least 4 valid days valid_days = min_x.groupby("seqn")["paxdaym"].unique().reset_index() valid_days = valid_days[valid_days["paxdaym"].apply(len) >= 4] min_x = min_x[min_x["seqn"].isin(valid_days["seqn"])] min_x = min_x.rename( columns={ "paxmxm": "x", "paxmym": "y", "paxmzm": "z", "measure_time": "timestamp", } ) if verbose: print(f"Renamed columns and set timestamp index for person {seqn}") # set wear and sleep columns min_x["wear"] = min_x["paxpredm"].astype(int).isin([1, 2]).astype(int) min_x["sleep"] = min_x["paxpredm"].astype(int).isin([2]).astype(int) min_x.set_index("timestamp", inplace=True) min_x = min_x[["x", "y", "z", "wear", "sleep", "paxpredm"]] meta_dict["raw_n_datapoints"] = min_x.shape[0] meta_dict["raw_start_datetime"] = min_x.index.min() meta_dict["raw_end_datetime"] = min_x.index.max() meta_dict["raw_data_frequency"] = "minute-level" meta_dict["raw_data_type"] = "accelerometer" meta_dict["raw_data_unit"] = "MIMS" if verbose: print( f"Loaded {min_x.shape[0]} minute-level Accelerometer records from {file_dir}" ) return min_x
[docs] def filter_and_preprocess_nhanes_data( data: pd.DataFrame, meta_dict: dict = {}, verbose: bool = False ) -> pd.DataFrame: """ Filter NHANES accelerometer data for incomplete days and non-consecutive sequences. This function applies data quality filters to NHANES accelerometer data and converts the data to the standard format used by the CosinorAge pipeline. Parameters ---------- data : pd.DataFrame Raw NHANES accelerometer data with columns ['x', 'y', 'z', 'wear', 'sleep', 'paxpredm'] and datetime index. meta_dict : dict, default={} Dictionary to store metadata about the filtering process. Will be populated with: - n_days: Number of valid days after filtering verbose : bool, default=False Whether to print processing status and progress information. Returns ------- pd.DataFrame Filtered and preprocessed accelerometer data with columns: - 'x', 'y', 'z': Accelerometer values converted from MIMS to mg units - 'x_raw', 'y_raw', 'z_raw': Original accelerometer values - 'wear': Binary wear detection - 'sleep': Binary sleep detection - 'paxpredm': Original NHANES prediction values - 'ENMO': Calculated ENMO values (scaled by factor of 257) Notes ----- - Removes incomplete days using filter_incomplete_days - Selects longest consecutive sequence using filter_consecutive_days - Converts accelerometer values from MIMS to mg units (division by 9.81) - Calculates ENMO values with a scaling factor of 257 for parameter tuning - Stores original values in *_raw columns for reference Examples -------- >>> import pandas as pd >>> >>> # Create sample NHANES data >>> dates = pd.date_range('2023-01-01', periods=10000, freq='min') >>> data = pd.DataFrame({ ... 'x': np.random.randn(10000), ... 'y': np.random.randn(10000), ... 'z': np.random.randn(10000), ... 'wear': np.random.choice([0, 1], 10000), ... 'sleep': np.random.choice([0, 1], 10000), ... 'paxpredm': np.random.choice([0, 1, 2], 10000) ... }, index=dates) >>> >>> # Filter and preprocess the data >>> meta_dict = {} >>> processed_data = filter_and_preprocess_nhanes_data( ... data, meta_dict=meta_dict, verbose=True ... ) >>> print(f"Processed data shape: {processed_data.shape}") >>> print(f"Number of days: {meta_dict.get('n_days', 'N/A')}") """ _data = data.copy() old_n = _data.shape[0] _data = filter_incomplete_days(_data, data_freq=1 / 60) if verbose: print( f"Filtered out {old_n - data.shape[0]} minute-level ENMO records due to incomplete daily coverage" ) _data.index = pd.to_datetime(_data.index) old_n = _data.shape[0] _data = filter_consecutive_days(_data) if verbose: print( f"Filtered out {old_n - _data.shape[0]} minute-level ENMO records due to filtering for longest consecutive sequence of days" ) meta_dict["n_days"] = len(np.unique(_data.index.date)) _data[["x_raw", "y_raw", "z_raw"]] = _data[["x", "y", "z"]] _data[["x", "y", "z"]] = ( _data[["x", "y", "z"]] / 9.81 ) # convert from MIMS to aprrox. mg _data["enmo"] = ( calculate_enmo(_data) * 257 ) # factor of 257 as a result of parameter tuning for making cosinorage predictions match if verbose: print(f"Calculated ENMO data") return _data
[docs] def resample_nhanes_data( data: pd.DataFrame, meta_dict: dict = {}, verbose: bool = False ) -> pd.DataFrame: """ Resample NHANES accelerometer data to 1-minute intervals using linear interpolation. This function ensures consistent minute-level resolution for NHANES accelerometer data by resampling to 1-minute intervals and handling categorical variables appropriately. Parameters ---------- data : pd.DataFrame NHANES accelerometer data with datetime index and columns including 'x', 'y', 'z', 'sleep', 'wear'. meta_dict : dict, default={} Dictionary to store metadata about the resampling process. verbose : bool, default=False Whether to print processing status and progress information. Returns ------- pd.DataFrame Resampled accelerometer data with consistent 1-minute intervals. Categorical variables ('sleep', 'wear') are rounded to nearest integer. Notes ----- - Uses pandas resample('1min') with linear interpolation for continuous variables - Applies forward fill (bfill) to handle any remaining gaps - Rounds categorical variables ('sleep', 'wear') to nearest integer - Maintains data integrity for binary classification variables Examples -------- >>> import pandas as pd >>> >>> # Create sample NHANES data with irregular intervals >>> dates = pd.to_datetime(['2023-01-01 00:00:00', '2023-01-01 00:01:30', ... '2023-01-01 00:03:00', '2023-01-01 00:04:30']) >>> data = pd.DataFrame({ ... 'x': [0.1, 0.2, 0.3, 0.4], ... 'y': [0.1, 0.2, 0.3, 0.4], ... 'z': [0.1, 0.2, 0.3, 0.4], ... 'sleep': [0, 1, 0, 1], ... 'wear': [1, 1, 0, 1] ... }, index=dates) >>> >>> # Resample to minute level >>> meta_dict = {} >>> resampled_data = resample_nhanes_data(data, meta_dict=meta_dict, verbose=True) >>> print(f"Original data points: {len(data)}") >>> print(f"Resampled data points: {len(resampled_data)}") """ _data = data.copy() _data = _data.resample("1min").mean(numeric_only=True).interpolate(method="linear").bfill() _data["sleep"] = _data["sleep"].round(0) _data["wear"] = _data["wear"].round(0) if verbose: print(f"Resampled {data.shape[0]} to {_data.shape[0]} timestamps") return _data
[docs] def remove_bytes(df: pd.DataFrame) -> pd.DataFrame: """ Convert byte string columns to regular strings in a DataFrame. This function handles byte-encoded string columns that are common in NHANES data files, converting them to UTF-8 encoded strings for proper processing. Parameters ---------- df : pd.DataFrame Input DataFrame containing potential byte string columns. Returns ------- pd.DataFrame DataFrame with byte strings converted to UTF-8 strings. Non-byte string columns remain unchanged. Notes ----- - Only processes columns with object dtype (likely to contain byte strings) - Uses UTF-8 encoding for conversion - Leaves non-byte string values unchanged - Common in NHANES data due to SAS file format Examples -------- >>> import pandas as pd >>> >>> # Create sample DataFrame with byte strings >>> data = { ... 'col1': [b'hello', b'world', 'normal_string'], ... 'col2': [1, 2, 3], ... 'col3': [b'byte1', b'byte2', b'byte3'] ... } >>> df = pd.DataFrame(data) >>> >>> # Convert byte strings >>> cleaned_df = remove_bytes(df) >>> print(cleaned_df['col1'].iloc[0]) # 'hello' instead of b'hello' """ for col in df.select_dtypes( [object] ): # Select columns with object type (likely byte strings) df[col] = df[col].apply( lambda x: x.decode("utf-8") if isinstance(x, bytes) else x ) return df
[docs] def clean_data(df: pd.DataFrame, days: pd.DataFrame) -> pd.DataFrame: """ Clean NHANES minute-level data by applying quality filters. This function applies multiple quality filters to NHANES minute-level data to ensure only valid measurements are included in the analysis. Parameters ---------- df : pd.DataFrame Raw minute-level NHANES data containing columns: - 'SEQN': Participant identifier - 'PAXMTSM': Minute-level timestamp - 'PAXPREDM': Prediction values - 'PAXQFM': Quality flag days : pd.DataFrame Day-level NHANES data containing valid participant identifiers in 'seqn' column. Returns ------- pd.DataFrame Cleaned minute-level data with invalid measurements and participants removed. Notes ----- - Filters for participants present in day-level data - Removes measurements with PAXMTSM = -0.01 (invalid timestamp) - Excludes PAXPREDM values of 3 or 4 (invalid predictions) - Removes measurements with PAXQFM >= 1 (poor quality) Examples -------- >>> import pandas as pd >>> >>> # Create sample NHANES data >>> minute_data = pd.DataFrame({ ... 'SEQN': ['12345', '12345', '12346', '12345'], ... 'PAXMTSM': [0, -0.01, 60, 120], ... 'PAXPREDM': [1, 2, 3, 1], ... 'PAXQFM': [0, 0, 1, 0] ... }) >>> >>> day_data = pd.DataFrame({'seqn': ['12345']}) >>> >>> # Clean the data >>> cleaned_data = clean_data(minute_data, day_data) >>> print(f"Original records: {len(minute_data)}") >>> print(f"Cleaned records: {len(cleaned_data)}") """ df = df[df["SEQN"].isin(days["seqn"])] df = df[df["PAXMTSM"] != -0.01] df = df[~df["PAXPREDM"].isin([3, 4])] df = df[df["PAXQFM"] < 1] return df
[docs] def calculate_measure_time(row): """ Calculate the measurement timestamp for a row of NHANES data. This function converts NHANES timing information into actual datetime timestamps by combining the day start time with the seconds since midnight. Parameters ---------- row : pd.Series Row containing timing information: - 'day1_start_time': Start time of the first day in format "HH:MM:SS" - 'paxssnmp': Seconds since midnight (scaled by 80) Returns ------- datetime Calculated measurement timestamp combining base time and offset. Notes ----- - Converts day1_start_time string to datetime object - Divides paxssnmp by 80 to get actual seconds (NHANES scaling factor) - Adds the offset to the base time to get measurement timestamp - Used for creating proper datetime index for NHANES data Examples -------- >>> import pandas as pd >>> >>> # Create sample row with timing information >>> row = pd.Series({ ... 'day1_start_time': '08:30:00', ... 'paxssnmp': 8000 # 100 seconds * 80 ... }) >>> >>> # Calculate measurement time >>> measure_time = calculate_measure_time(row) >>> print(f"Measurement time: {measure_time}") >>> # Output: 1900-01-01 08:31:40 (base time + 100 seconds) """ base_time = datetime.strptime(row["day1_start_time"], "%H:%M:%S") base_time = base_time.replace(year=2012, month=1, day=1) measure_time = base_time + timedelta(seconds=row["paxssnmp"] / 80) return measure_time