graphomotor.core.orchestrator
Runner for the Graphomotor pipeline.
1"""Runner for the Graphomotor pipeline.""" 2 3import datetime 4import pathlib 5import time 6import typing 7 8import numpy as np 9import pandas as pd 10import tqdm 11 12from graphomotor.core import config, models 13from graphomotor.io import reader 14from graphomotor.utils import center_spiral, generate_reference_spiral 15 16logger = config.get_logger() 17 18FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"] 19 20 21def _validate_feature_categories( 22 feature_categories: list[FeatureCategories], 23) -> set[str]: 24 """Validate requested feature categories and return valid ones. 25 26 Args: 27 feature_categories: List of feature categories to validate. 28 29 Returns: 30 Set of valid feature categories. 31 32 Raises: 33 ValueError: If no valid feature categories are provided. 34 """ 35 feature_categories_set: set[str] = set(feature_categories) 36 supported_categories_set = models.FeatureCategories.all() 37 unknown_categories = feature_categories_set - supported_categories_set 38 valid_requested_categories = feature_categories_set & supported_categories_set 39 40 if unknown_categories: 41 logger.warning( 42 "Unknown feature categories requested, these categories will be ignored: " 43 f"{unknown_categories}" 44 ) 45 46 if not valid_requested_categories: 47 error_msg = ( 48 "No valid feature categories provided. " 49 f"Supported categories: {supported_categories_set}" 50 ) 51 logger.error(error_msg) 52 raise ValueError(error_msg) 53 54 return valid_requested_categories 55 56 57def extract_features( 58 spiral: models.Spiral, 59 feature_categories: list[FeatureCategories], 60 reference_spiral: np.ndarray, 61) -> dict[str, str]: 62 """Extract feature categories from spiral drawing data. 63 64 This function chooses which feature categories to extract based on the provided 65 sequence of valid category names and returns a dictionary containing the extracted 66 features with metadata. 67 68 Args: 69 spiral: Spiral object containing drawing data and metadata. 70 feature_categories: List of feature categories to extract. Valid options are: 71 - "duration": Extract task duration. 72 - "velocity": Extract velocity-based metrics. 73 - "hausdorff": Extract Hausdorff distance metrics. 74 - "AUC": Extract area under the curve metric. 75 reference_spiral: Reference spiral for comparison. 76 77 Returns: 78 Dictionary containing the extracted features with metadata. 79 """ 80 valid_categories = sorted(_validate_feature_categories(feature_categories)) 81 82 feature_extractors = models.FeatureCategories.get_extractors( 83 spiral, reference_spiral 84 ) 85 86 features: dict[str, float] = {} 87 for category in valid_categories: 88 logger.debug(f"Extracting {category} features") 89 category_features = feature_extractors[category]() 90 features.update(category_features) 91 logger.debug(f"{category.capitalize()} features extracted") 92 93 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 94 95 formatted_features_with_metadata = { 96 "source_file": str(spiral.metadata.get("source_path", "")), 97 "participant_id": str(spiral.metadata.get("id", "")), 98 "task": str(spiral.metadata.get("task", "")), 99 "hand": str(spiral.metadata.get("hand", "")), 100 "start_time": str(spiral.metadata.get("start_time", "")), 101 **formatted_features, 102 } 103 104 return formatted_features_with_metadata 105 106 107def export_features_to_csv( 108 features_df: pd.DataFrame, 109 output_path: pathlib.Path, 110) -> None: 111 """Export extracted features to a CSV file. 112 113 Args: 114 features_df: DataFrame containing all metadata and features. 115 output_path: Path to the output CSV file. 116 """ 117 if not output_path.suffix: 118 if not output_path.exists(): 119 logger.debug(f"Creating directory that doesn't exist: {output_path}") 120 output_path.mkdir(parents=True) 121 if features_df.shape[0] == 1: 122 filename = ( 123 f"{features_df['participant_id'].iloc[0]}_" 124 f"{features_df['task'].iloc[0]}_" 125 f"{features_df['hand'].iloc[0]}_features_" 126 ) 127 else: 128 filename = "batch_features_" 129 output_file = ( 130 output_path 131 / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv" 132 ) 133 else: 134 parent_dir = output_path.parent 135 if not parent_dir.exists(): 136 logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}") 137 parent_dir.mkdir(parents=True) 138 output_file = output_path 139 140 logger.debug(f"Saving extracted features to {output_file}") 141 142 if output_file.exists(): 143 logger.debug(f"Overwriting existing file: {output_file}") 144 145 try: 146 features_df.to_csv(output_file) 147 logger.debug(f"Features saved successfully to {output_file}") 148 except Exception as e: 149 logger.warning(f"Failed to save features to {output_file}: {str(e)}") 150 151 152def _run_file( 153 input_path: pathlib.Path, 154 feature_categories: list[FeatureCategories], 155 spiral_config: config.SpiralConfig, 156) -> dict[str, str]: 157 """Process a single file for feature extraction. 158 159 Args: 160 input_path: Path to the input CSV file containing spiral drawing data. 161 feature_categories: List of feature categories to extract. 162 spiral_config: Configuration for spiral parameters. 163 164 Returns: 165 Dictionary containing the extracted features with metadata. 166 """ 167 logger.debug(f"Processing file: {input_path}") 168 spiral = reader.load_spiral(input_path) 169 centered_spiral = center_spiral.center_spiral(spiral) 170 reference_spiral = generate_reference_spiral.generate_reference_spiral( 171 spiral_config 172 ) 173 centered_reference_spiral = center_spiral.center_spiral(reference_spiral) 174 175 return extract_features( 176 centered_spiral, feature_categories, centered_reference_spiral 177 ) 178 179 180def _run_directory( 181 input_path: pathlib.Path, 182 feature_categories: list[FeatureCategories], 183 spiral_config: config.SpiralConfig, 184) -> list[dict[str, str]]: 185 """Process all CSV files in a directory and its subdirectories. 186 187 Args: 188 input_path: Path to the input directory containing CSV files. 189 feature_categories: List of feature categories to extract. 190 spiral_config: Configuration for spiral parameters. 191 192 Returns: 193 List of dictionaries, each containing extracted features with metadata 194 for one processed file. 195 196 Raises: 197 ValueError: If no CSV files are found in the directory. 198 """ 199 logger.debug(f"Processing directory: {input_path}") 200 201 csv_files = list(input_path.rglob("*.csv")) 202 203 if not csv_files: 204 error_msg = f"No CSV files found in directory: {input_path}" 205 logger.error(error_msg) 206 raise ValueError(error_msg) 207 208 logger.debug(f"Found {len(csv_files)} CSV files to process") 209 210 results: list[dict[str, str]] = [] 211 failed_files: list[str] = [] 212 213 progress_bar = tqdm.tqdm( 214 csv_files, 215 desc="Processing files", 216 unit="file", 217 bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} " 218 "[{elapsed}<{remaining}, {rate_fmt}]", 219 ) 220 221 for file_index, csv_file in enumerate(progress_bar, 1): 222 try: 223 progress_bar.set_postfix({"Current": csv_file.name}) 224 logger.debug( 225 f"Processing file {csv_file.name} ({file_index}/{len(csv_files)})" 226 ) 227 features = _run_file(csv_file, feature_categories, spiral_config) 228 results.append(features) 229 logger.debug(f"Successfully processed {csv_file.name}") 230 except Exception as e: 231 logger.warning(f"Failed to process {csv_file.name}: {str(e)}") 232 failed_files.append(csv_file.name) 233 continue 234 235 if not results: 236 error_msg = "Could not extract features from any file in the directory." 237 logger.error(error_msg) 238 raise ValueError(error_msg) 239 240 if failed_files: 241 logger.warning(f"Failed to process {len(failed_files)} files") 242 243 return results 244 245 246def run_pipeline( 247 input_path: pathlib.Path | str, 248 output_path: pathlib.Path | str | None = None, 249 feature_categories: list[FeatureCategories] = [ 250 "duration", 251 "velocity", 252 "hausdorff", 253 "AUC", 254 ], 255 config_params: dict[ 256 typing.Literal[ 257 "center_x", 258 "center_y", 259 "start_radius", 260 "growth_rate", 261 "start_angle", 262 "end_angle", 263 "num_points", 264 ], 265 float | int, 266 ] 267 | None = None, 268) -> pd.DataFrame: 269 """Run the Graphomotor pipeline to extract features from spiral drawing data. 270 271 Supports both single-file and batch (directory) processing. 272 273 Args: 274 input_path: Path to a CSV file (single-file mode) or a directory containing CSV 275 files (batch mode). 276 output_path: Path to save extracted features. If specifying a file, the path 277 must have a `.csv` extension. 278 - If None, features are not saved. 279 - If path has a CSV file extension, features are saved to that file. 280 - If path is a directory, features are saved to a CSV file with a custom 281 name and timestamp. 282 feature_categories: List of feature categories to extract. Defaults to all 283 available: 284 - "duration": Task duration. 285 - "velocity": Velocity-based metrics. 286 - "hausdorff": Hausdorff distance metrics. 287 - "AUC": Area under the curve metric. 288 config_params: Dictionary of custom spiral configuration parameters for 289 reference spiral generation and centering. If None, default configuration is 290 used. Supported parameters are: 291 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 292 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 293 - "start_radius" (float): Starting radius of the spiral. Default is 0. 294 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 295 - "start_angle" (float): Starting angle of the spiral. Default is 0. 296 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 297 - "num_points" (int): Number of points in the spiral. Default is 10000. 298 299 Returns: 300 DataFrame containing the metadata and extracted features. 301 302 Raises: 303 ValueError: If the input path does not exist or is not a file/directory, if the 304 output path does not have a .csv extension, or if no valid feature 305 categories are provided. 306 """ 307 start_time = time.time() 308 309 logger.info("Starting Graphomotor pipeline") 310 logger.info(f"Input path: {input_path}") 311 logger.info(f"Output path: {output_path}") 312 logger.info(f"Feature categories: {feature_categories}") 313 314 input_path = pathlib.Path(input_path) 315 316 if not input_path.exists() or ( 317 not input_path.is_file() and not input_path.is_dir() 318 ): 319 error_msg = ( 320 f"Input path does not exist or is not a file/directory: {input_path}" 321 ) 322 logger.error(error_msg) 323 raise ValueError(error_msg) 324 325 if output_path: 326 output_path = pathlib.Path(output_path) 327 if output_path.suffix and output_path.suffix.lower() != ".csv": 328 error_msg = ( 329 f"Output file must have a .csv extension, got: {output_path.suffix}" 330 ) 331 logger.error(error_msg) 332 raise ValueError(error_msg) 333 334 if config_params: 335 logger.info(f"Custom spiral configuration: {config_params}") 336 spiral_config = config.SpiralConfig.add_custom_params( 337 typing.cast(dict, config_params) 338 ) 339 else: 340 spiral_config = config.SpiralConfig() 341 342 if input_path.is_file(): 343 logger.info("Processing single file") 344 features = [_run_file(input_path, feature_categories, spiral_config)] 345 logger.info( 346 "Single file processing complete, " 347 f"successfully extracted {len(features[0]) - 5} features" 348 ) 349 else: 350 logger.info("Processing directory") 351 features = _run_directory(input_path, feature_categories, spiral_config) 352 logger.info( 353 f"Batch processing complete, successfully processed {len(features)} files" 354 ) 355 356 features_df = pd.DataFrame(features) 357 features_df = features_df.set_index("source_file") 358 359 if output_path: 360 export_features_to_csv(features_df, output_path) 361 362 logger.info( 363 "Graphomotor pipeline completed successfully in " 364 f"{time.time() - start_time:.2f} seconds" 365 ) 366 367 return features_df
logger =
<Logger graphomotor (INFO)>
FeatureCategories =
typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
def
extract_features( spiral: graphomotor.core.models.Spiral, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']], reference_spiral: numpy.ndarray) -> dict[str, str]:
58def extract_features( 59 spiral: models.Spiral, 60 feature_categories: list[FeatureCategories], 61 reference_spiral: np.ndarray, 62) -> dict[str, str]: 63 """Extract feature categories from spiral drawing data. 64 65 This function chooses which feature categories to extract based on the provided 66 sequence of valid category names and returns a dictionary containing the extracted 67 features with metadata. 68 69 Args: 70 spiral: Spiral object containing drawing data and metadata. 71 feature_categories: List of feature categories to extract. Valid options are: 72 - "duration": Extract task duration. 73 - "velocity": Extract velocity-based metrics. 74 - "hausdorff": Extract Hausdorff distance metrics. 75 - "AUC": Extract area under the curve metric. 76 reference_spiral: Reference spiral for comparison. 77 78 Returns: 79 Dictionary containing the extracted features with metadata. 80 """ 81 valid_categories = sorted(_validate_feature_categories(feature_categories)) 82 83 feature_extractors = models.FeatureCategories.get_extractors( 84 spiral, reference_spiral 85 ) 86 87 features: dict[str, float] = {} 88 for category in valid_categories: 89 logger.debug(f"Extracting {category} features") 90 category_features = feature_extractors[category]() 91 features.update(category_features) 92 logger.debug(f"{category.capitalize()} features extracted") 93 94 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 95 96 formatted_features_with_metadata = { 97 "source_file": str(spiral.metadata.get("source_path", "")), 98 "participant_id": str(spiral.metadata.get("id", "")), 99 "task": str(spiral.metadata.get("task", "")), 100 "hand": str(spiral.metadata.get("hand", "")), 101 "start_time": str(spiral.metadata.get("start_time", "")), 102 **formatted_features, 103 } 104 105 return formatted_features_with_metadata
Extract feature categories from spiral drawing data.
This function chooses which feature categories to extract based on the provided sequence of valid category names and returns a dictionary containing the extracted features with metadata.
Arguments:
- spiral: Spiral object containing drawing data and metadata.
- feature_categories: List of feature categories to extract. Valid options are:
- "duration": Extract task duration.
- "velocity": Extract velocity-based metrics.
- "hausdorff": Extract Hausdorff distance metrics.
- "AUC": Extract area under the curve metric.
- reference_spiral: Reference spiral for comparison.
Returns:
Dictionary containing the extracted features with metadata.
def
export_features_to_csv( features_df: pandas.core.frame.DataFrame, output_path: pathlib._local.Path) -> None:
108def export_features_to_csv( 109 features_df: pd.DataFrame, 110 output_path: pathlib.Path, 111) -> None: 112 """Export extracted features to a CSV file. 113 114 Args: 115 features_df: DataFrame containing all metadata and features. 116 output_path: Path to the output CSV file. 117 """ 118 if not output_path.suffix: 119 if not output_path.exists(): 120 logger.debug(f"Creating directory that doesn't exist: {output_path}") 121 output_path.mkdir(parents=True) 122 if features_df.shape[0] == 1: 123 filename = ( 124 f"{features_df['participant_id'].iloc[0]}_" 125 f"{features_df['task'].iloc[0]}_" 126 f"{features_df['hand'].iloc[0]}_features_" 127 ) 128 else: 129 filename = "batch_features_" 130 output_file = ( 131 output_path 132 / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv" 133 ) 134 else: 135 parent_dir = output_path.parent 136 if not parent_dir.exists(): 137 logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}") 138 parent_dir.mkdir(parents=True) 139 output_file = output_path 140 141 logger.debug(f"Saving extracted features to {output_file}") 142 143 if output_file.exists(): 144 logger.debug(f"Overwriting existing file: {output_file}") 145 146 try: 147 features_df.to_csv(output_file) 148 logger.debug(f"Features saved successfully to {output_file}") 149 except Exception as e: 150 logger.warning(f"Failed to save features to {output_file}: {str(e)}")
Export extracted features to a CSV file.
Arguments:
- features_df: DataFrame containing all metadata and features.
- output_path: Path to the output CSV file.
def
run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] = ['duration', 'velocity', 'hausdorff', 'AUC'], config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None) -> pandas.core.frame.DataFrame:
247def run_pipeline( 248 input_path: pathlib.Path | str, 249 output_path: pathlib.Path | str | None = None, 250 feature_categories: list[FeatureCategories] = [ 251 "duration", 252 "velocity", 253 "hausdorff", 254 "AUC", 255 ], 256 config_params: dict[ 257 typing.Literal[ 258 "center_x", 259 "center_y", 260 "start_radius", 261 "growth_rate", 262 "start_angle", 263 "end_angle", 264 "num_points", 265 ], 266 float | int, 267 ] 268 | None = None, 269) -> pd.DataFrame: 270 """Run the Graphomotor pipeline to extract features from spiral drawing data. 271 272 Supports both single-file and batch (directory) processing. 273 274 Args: 275 input_path: Path to a CSV file (single-file mode) or a directory containing CSV 276 files (batch mode). 277 output_path: Path to save extracted features. If specifying a file, the path 278 must have a `.csv` extension. 279 - If None, features are not saved. 280 - If path has a CSV file extension, features are saved to that file. 281 - If path is a directory, features are saved to a CSV file with a custom 282 name and timestamp. 283 feature_categories: List of feature categories to extract. Defaults to all 284 available: 285 - "duration": Task duration. 286 - "velocity": Velocity-based metrics. 287 - "hausdorff": Hausdorff distance metrics. 288 - "AUC": Area under the curve metric. 289 config_params: Dictionary of custom spiral configuration parameters for 290 reference spiral generation and centering. If None, default configuration is 291 used. Supported parameters are: 292 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 293 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 294 - "start_radius" (float): Starting radius of the spiral. Default is 0. 295 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 296 - "start_angle" (float): Starting angle of the spiral. Default is 0. 297 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 298 - "num_points" (int): Number of points in the spiral. Default is 10000. 299 300 Returns: 301 DataFrame containing the metadata and extracted features. 302 303 Raises: 304 ValueError: If the input path does not exist or is not a file/directory, if the 305 output path does not have a .csv extension, or if no valid feature 306 categories are provided. 307 """ 308 start_time = time.time() 309 310 logger.info("Starting Graphomotor pipeline") 311 logger.info(f"Input path: {input_path}") 312 logger.info(f"Output path: {output_path}") 313 logger.info(f"Feature categories: {feature_categories}") 314 315 input_path = pathlib.Path(input_path) 316 317 if not input_path.exists() or ( 318 not input_path.is_file() and not input_path.is_dir() 319 ): 320 error_msg = ( 321 f"Input path does not exist or is not a file/directory: {input_path}" 322 ) 323 logger.error(error_msg) 324 raise ValueError(error_msg) 325 326 if output_path: 327 output_path = pathlib.Path(output_path) 328 if output_path.suffix and output_path.suffix.lower() != ".csv": 329 error_msg = ( 330 f"Output file must have a .csv extension, got: {output_path.suffix}" 331 ) 332 logger.error(error_msg) 333 raise ValueError(error_msg) 334 335 if config_params: 336 logger.info(f"Custom spiral configuration: {config_params}") 337 spiral_config = config.SpiralConfig.add_custom_params( 338 typing.cast(dict, config_params) 339 ) 340 else: 341 spiral_config = config.SpiralConfig() 342 343 if input_path.is_file(): 344 logger.info("Processing single file") 345 features = [_run_file(input_path, feature_categories, spiral_config)] 346 logger.info( 347 "Single file processing complete, " 348 f"successfully extracted {len(features[0]) - 5} features" 349 ) 350 else: 351 logger.info("Processing directory") 352 features = _run_directory(input_path, feature_categories, spiral_config) 353 logger.info( 354 f"Batch processing complete, successfully processed {len(features)} files" 355 ) 356 357 features_df = pd.DataFrame(features) 358 features_df = features_df.set_index("source_file") 359 360 if output_path: 361 export_features_to_csv(features_df, output_path) 362 363 logger.info( 364 "Graphomotor pipeline completed successfully in " 365 f"{time.time() - start_time:.2f} seconds" 366 ) 367 368 return features_df
Run the Graphomotor pipeline to extract features from spiral drawing data.
Supports both single-file and batch (directory) processing.
Arguments:
- input_path: Path to a CSV file (single-file mode) or a directory containing CSV files (batch mode).
- output_path: Path to save extracted features. If specifying a file, the path
must have a
.csv
extension.- If None, features are not saved.
- If path has a CSV file extension, features are saved to that file.
- If path is a directory, features are saved to a CSV file with a custom name and timestamp.
- feature_categories: List of feature categories to extract. Defaults to all
available:
- "duration": Task duration.
- "velocity": Velocity-based metrics.
- "hausdorff": Hausdorff distance metrics.
- "AUC": Area under the curve metric.
- config_params: Dictionary of custom spiral configuration parameters for
reference spiral generation and centering. If None, default configuration is
used. Supported parameters are:
- "center_x" (float): X-coordinate of the spiral center. Default is 50.
- "center_y" (float): Y-coordinate of the spiral center. Default is 50.
- "start_radius" (float): Starting radius of the spiral. Default is 0.
- "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
- "start_angle" (float): Starting angle of the spiral. Default is 0.
- "end_angle" (float): Ending angle of the spiral. Default is 8π.
- "num_points" (int): Number of points in the spiral. Default is 10000.
Returns:
DataFrame containing the metadata and extracted features.
Raises:
- ValueError: If the input path does not exist or is not a file/directory, if the output path does not have a .csv extension, or if no valid feature categories are provided.