graphomotor.core.orchestrator
Runner for graphomotor.
1"""Runner for graphomotor.""" 2 3import dataclasses 4import datetime 5import pathlib 6import time 7import typing 8 9import numpy as np 10import pandas as pd 11import tqdm 12 13from graphomotor.core import config, models 14from graphomotor.io import reader 15from graphomotor.utils import center_spiral, generate_reference_spiral 16 17logger = config.get_logger() 18 19FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"] 20ConfigParams = typing.Literal[ 21 "center_x", 22 "center_y", 23 "start_radius", 24 "growth_rate", 25 "start_angle", 26 "end_angle", 27 "num_points", 28] 29 30 31def _validate_feature_categories( 32 feature_categories: list[FeatureCategories], 33) -> set[str]: 34 """Validate requested feature categories and return valid ones. 35 36 Args: 37 feature_categories: List of feature categories to validate. 38 39 Returns: 40 Set of valid feature categories. 41 42 Raises: 43 ValueError: If no valid feature categories are provided. 44 """ 45 feature_categories_set: set[str] = set(feature_categories) 46 supported_categories_set = models.FeatureCategories.all() 47 unknown_categories = feature_categories_set - supported_categories_set 48 valid_requested_categories = feature_categories_set & supported_categories_set 49 50 if unknown_categories: 51 logger.warning( 52 "Unknown feature categories requested, these categories will be ignored: " 53 f"{unknown_categories}" 54 ) 55 56 if not valid_requested_categories: 57 error_msg = ( 58 "No valid feature categories provided. " 59 f"Supported categories: {supported_categories_set}" 60 ) 61 logger.error(error_msg) 62 raise ValueError(error_msg) 63 64 return valid_requested_categories 65 66 67def extract_features( 68 spiral: models.Spiral, 69 feature_categories: list[str], 70 reference_spiral: np.ndarray, 71) -> dict[str, str]: 72 """Extract feature categories from spiral drawing data. 73 74 This function chooses which feature categories to extract based on the provided 75 sequence of valid category names and returns a dictionary containing the extracted 76 features with metadata. 77 78 Args: 79 spiral: Spiral object containing drawing data and metadata. 80 feature_categories: List of feature categories to extract. 81 reference_spiral: Reference spiral for comparison. 82 83 Returns: 84 Dictionary containing the extracted features with metadata. 85 """ 86 feature_extractors = models.FeatureCategories.get_extractors( 87 spiral, reference_spiral 88 ) 89 90 features: dict[str, float] = {} 91 for category in feature_categories: 92 logger.debug(f"Extracting {category} features") 93 category_features = feature_extractors[category]() 94 features.update(category_features) 95 logger.debug(f"{category} features extracted") 96 97 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 98 99 formatted_features_with_metadata = { 100 "source_file": str(spiral.metadata.get("source_path")), 101 "participant_id": str(spiral.metadata.get("id")), 102 "task": str(spiral.metadata.get("task")), 103 "hand": str(spiral.metadata.get("hand")), 104 "start_time": str(spiral.metadata.get("start_time")), 105 **formatted_features, 106 } 107 108 return formatted_features_with_metadata 109 110 111def export_features_to_csv( 112 features_df: pd.DataFrame, 113 output_path: pathlib.Path, 114) -> None: 115 """Export extracted features to a CSV file. 116 117 Args: 118 features_df: DataFrame containing all metadata and features. 119 output_path: Path to the output CSV file. 120 """ 121 if not output_path.suffix: 122 if not output_path.exists(): 123 logger.debug(f"Creating directory that doesn't exist: {output_path}") 124 output_path.mkdir(parents=True) 125 if features_df.shape[0] == 1: 126 filename = ( 127 f"{features_df['participant_id'].iloc[0]}_" 128 f"{features_df['task'].iloc[0]}_" 129 f"{features_df['hand'].iloc[0]}_features_" 130 ) 131 else: 132 filename = "batch_features_" 133 output_file = ( 134 output_path 135 / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv" 136 ) 137 else: 138 parent_dir = output_path.parent 139 if not parent_dir.exists(): 140 logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}") 141 parent_dir.mkdir(parents=True) 142 output_file = output_path 143 144 logger.debug(f"Saving extracted features to {output_file}") 145 146 if output_file.exists(): 147 logger.debug(f"Overwriting existing file: {output_file}") 148 149 try: 150 features_df.to_csv(output_file) 151 logger.info(f"Features saved successfully to {output_file}") 152 except Exception as e: 153 logger.warning(f"Failed to save features to {output_file}: {str(e)}") 154 155 156def _run_file( 157 input_path: pathlib.Path, 158 feature_categories: list[str], 159 spiral_config: config.SpiralConfig, 160) -> dict[str, str]: 161 """Process a single file for feature extraction. 162 163 Args: 164 input_path: Path to the input CSV file containing spiral drawing data. 165 feature_categories: List of feature categories to extract. 166 spiral_config: Configuration for spiral parameters. 167 168 Returns: 169 Dictionary containing the extracted features with metadata. 170 """ 171 spiral = reader.load_spiral(input_path) 172 centered_spiral = center_spiral.center_spiral(spiral) 173 reference_spiral = generate_reference_spiral.generate_reference_spiral( 174 spiral_config 175 ) 176 centered_reference_spiral = center_spiral.center_spiral(reference_spiral) 177 178 return extract_features( 179 centered_spiral, feature_categories, centered_reference_spiral 180 ) 181 182 183def _run_directory( 184 input_path: pathlib.Path, 185 feature_categories: list[str], 186 spiral_config: config.SpiralConfig, 187) -> list[dict[str, str]]: 188 """Process all CSV files in a directory and its subdirectories. 189 190 Args: 191 input_path: Path to the input directory containing CSV files. 192 feature_categories: List of feature categories to extract. 193 spiral_config: Configuration for spiral parameters. 194 195 Returns: 196 List of dictionaries, each containing extracted features with metadata 197 for one processed file. 198 199 Raises: 200 ValueError: If no CSV files are found in the directory. 201 """ 202 csv_files = list(input_path.rglob("*.csv")) 203 204 if not csv_files: 205 error_msg = f"No CSV files found in directory: {input_path}" 206 logger.error(error_msg) 207 raise ValueError(error_msg) 208 209 logger.debug(f"Found {len(csv_files)} CSV files to process") 210 211 results: list[dict[str, str]] = [] 212 failed_files: list[str] = [] 213 214 progress_bar = tqdm.tqdm( 215 csv_files, 216 desc="Processing files", 217 unit="file", 218 bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} " 219 "[{elapsed}<{remaining}, {rate_fmt}]", 220 ) 221 222 for file_index, csv_file in enumerate(progress_bar, 1): 223 try: 224 progress_bar.set_postfix({"Current": csv_file.name}) 225 logger.debug( 226 f"Processing file {csv_file.name} ({file_index}/{len(csv_files)})" 227 ) 228 features = _run_file(csv_file, feature_categories, spiral_config) 229 results.append(features) 230 logger.debug(f"Successfully processed {csv_file.name}") 231 except Exception as e: 232 logger.warning(f"Failed to process {csv_file.name}: {str(e)}") 233 failed_files.append(csv_file.name) 234 continue 235 236 if not results: 237 error_msg = "Could not extract features from any file in the directory." 238 logger.error(error_msg) 239 raise ValueError(error_msg) 240 241 if failed_files: 242 logger.warning(f"Failed to process {len(failed_files)} files") 243 244 return results 245 246 247def run_pipeline( 248 input_path: pathlib.Path | str, 249 output_path: pathlib.Path | str | None = None, 250 feature_categories: list[FeatureCategories] | None = None, 251 config_params: dict[ConfigParams, float | int] | None = None, 252 verbosity: int | None = None, 253) -> pd.DataFrame: 254 """Run the Graphomotor pipeline to extract features from spiral drawing data. 255 256 Supports both single-file and batch (directory) processing. 257 258 Args: 259 input_path: Path to a CSV file (single-file mode) or a directory containing CSV 260 files (batch mode). 261 output_path: Path to save extracted features. If specifying a file, the path 262 must have a `.csv` extension. 263 - If None, features are not saved. 264 - If path has a CSV file extension, features are saved to that file. 265 - If path is a directory, features are saved to a CSV file with a custom 266 name and timestamp. 267 feature_categories: List of feature categories to extract. If None, defaults to 268 all available categories. Supported categories are: 269 - "duration": Task duration. 270 - "velocity": Velocity-based metrics. 271 - "hausdorff": Hausdorff distance metrics. 272 - "AUC": Area under the curve metric. 273 config_params: Dictionary of custom spiral configuration parameters for 274 reference spiral generation and centering. If None, default configuration is 275 used. Supported parameters are: 276 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 277 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 278 - "start_radius" (float): Starting radius of the spiral. Default is 0. 279 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 280 - "start_angle" (float): Starting angle of the spiral. Default is 0. 281 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 282 - "num_points" (int): Number of points in the spiral. Default is 10000. 283 verbosity: Logging verbosity level. If None, uses current logger level. 284 - 0: WARNING level (quiet - only warnings and errors) 285 - 1: INFO level (normal - includes info messages) 286 - 2: DEBUG level (verbose - includes debug messages) 287 288 Returns: 289 DataFrame containing the metadata and extracted features. 290 291 Raises: 292 ValueError: If the input path does not exist or is not a file/directory, if the 293 output path does not have a .csv extension, or if no valid feature 294 categories are provided. 295 """ 296 start_time = time.time() 297 298 if verbosity: 299 config.set_verbosity_level(verbosity) 300 301 logger.debug("Starting Graphomotor pipeline") 302 303 input_path = pathlib.Path(input_path) 304 305 if not input_path.exists() or ( 306 not input_path.is_file() and not input_path.is_dir() 307 ): 308 error_msg = ( 309 f"Input path does not exist or is not a file/directory: {input_path}" 310 ) 311 logger.error(error_msg) 312 raise ValueError(error_msg) 313 logger.debug(f"Input path: {input_path}") 314 315 if output_path: 316 output_path = pathlib.Path(output_path) 317 if output_path.suffix and output_path.suffix.lower() != ".csv": 318 error_msg = ( 319 f"Output file must have a .csv extension, got: {output_path.suffix}" 320 ) 321 logger.error(error_msg) 322 raise ValueError(error_msg) 323 logger.debug(f"Output path: {output_path}") 324 325 if feature_categories: 326 valid_categories = sorted(_validate_feature_categories(feature_categories)) 327 logger.debug(f"Requested feature categories: {valid_categories}") 328 else: 329 valid_categories = [*models.FeatureCategories.all()] 330 logger.debug(f"Using default feature categories: {valid_categories}") 331 332 if config_params and config_params != dataclasses.asdict(config.SpiralConfig()): 333 logger.debug(f"Custom spiral configuration: {config_params}") 334 spiral_config = config.SpiralConfig.add_custom_params( 335 typing.cast(dict, config_params) 336 ) 337 else: 338 spiral_config = config.SpiralConfig() 339 logger.debug( 340 f"Using default spiral configuration: {dataclasses.asdict(spiral_config)}" 341 ) 342 343 if input_path.is_file(): 344 logger.debug("Processing single file") 345 features = [_run_file(input_path, valid_categories, spiral_config)] 346 logger.debug( 347 "Single file processing complete, " 348 f"successfully extracted {len(features[0]) - 5} features" 349 ) 350 else: 351 logger.debug("Processing directory") 352 features = _run_directory(input_path, valid_categories, spiral_config) 353 logger.debug( 354 f"Batch processing complete, successfully processed {len(features)} files" 355 ) 356 357 features_df = pd.DataFrame(features) 358 features_df = features_df.set_index("source_file") 359 360 if output_path: 361 export_features_to_csv(features_df, output_path) 362 363 logger.info( 364 "Graphomotor pipeline completed successfully in " 365 f"{time.time() - start_time:.2f} seconds" 366 ) 367 368 return features_df
logger =
<Logger graphomotor (WARNING)>
FeatureCategories =
typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
ConfigParams =
typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points']
def
extract_features( spiral: graphomotor.core.models.Spiral, feature_categories: list[str], reference_spiral: numpy.ndarray) -> dict[str, str]:
68def extract_features( 69 spiral: models.Spiral, 70 feature_categories: list[str], 71 reference_spiral: np.ndarray, 72) -> dict[str, str]: 73 """Extract feature categories from spiral drawing data. 74 75 This function chooses which feature categories to extract based on the provided 76 sequence of valid category names and returns a dictionary containing the extracted 77 features with metadata. 78 79 Args: 80 spiral: Spiral object containing drawing data and metadata. 81 feature_categories: List of feature categories to extract. 82 reference_spiral: Reference spiral for comparison. 83 84 Returns: 85 Dictionary containing the extracted features with metadata. 86 """ 87 feature_extractors = models.FeatureCategories.get_extractors( 88 spiral, reference_spiral 89 ) 90 91 features: dict[str, float] = {} 92 for category in feature_categories: 93 logger.debug(f"Extracting {category} features") 94 category_features = feature_extractors[category]() 95 features.update(category_features) 96 logger.debug(f"{category} features extracted") 97 98 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 99 100 formatted_features_with_metadata = { 101 "source_file": str(spiral.metadata.get("source_path")), 102 "participant_id": str(spiral.metadata.get("id")), 103 "task": str(spiral.metadata.get("task")), 104 "hand": str(spiral.metadata.get("hand")), 105 "start_time": str(spiral.metadata.get("start_time")), 106 **formatted_features, 107 } 108 109 return formatted_features_with_metadata
Extract feature categories from spiral drawing data.
This function chooses which feature categories to extract based on the provided sequence of valid category names and returns a dictionary containing the extracted features with metadata.
Arguments:
- spiral: Spiral object containing drawing data and metadata.
- feature_categories: List of feature categories to extract.
- reference_spiral: Reference spiral for comparison.
Returns:
Dictionary containing the extracted features with metadata.
def
export_features_to_csv( features_df: pandas.core.frame.DataFrame, output_path: pathlib._local.Path) -> None:
112def export_features_to_csv( 113 features_df: pd.DataFrame, 114 output_path: pathlib.Path, 115) -> None: 116 """Export extracted features to a CSV file. 117 118 Args: 119 features_df: DataFrame containing all metadata and features. 120 output_path: Path to the output CSV file. 121 """ 122 if not output_path.suffix: 123 if not output_path.exists(): 124 logger.debug(f"Creating directory that doesn't exist: {output_path}") 125 output_path.mkdir(parents=True) 126 if features_df.shape[0] == 1: 127 filename = ( 128 f"{features_df['participant_id'].iloc[0]}_" 129 f"{features_df['task'].iloc[0]}_" 130 f"{features_df['hand'].iloc[0]}_features_" 131 ) 132 else: 133 filename = "batch_features_" 134 output_file = ( 135 output_path 136 / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv" 137 ) 138 else: 139 parent_dir = output_path.parent 140 if not parent_dir.exists(): 141 logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}") 142 parent_dir.mkdir(parents=True) 143 output_file = output_path 144 145 logger.debug(f"Saving extracted features to {output_file}") 146 147 if output_file.exists(): 148 logger.debug(f"Overwriting existing file: {output_file}") 149 150 try: 151 features_df.to_csv(output_file) 152 logger.info(f"Features saved successfully to {output_file}") 153 except Exception as e: 154 logger.warning(f"Failed to save features to {output_file}: {str(e)}")
Export extracted features to a CSV file.
Arguments:
- features_df: DataFrame containing all metadata and features.
- output_path: Path to the output CSV file.
def
run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] | None = None, config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None, verbosity: int | None = None) -> pandas.core.frame.DataFrame:
248def run_pipeline( 249 input_path: pathlib.Path | str, 250 output_path: pathlib.Path | str | None = None, 251 feature_categories: list[FeatureCategories] | None = None, 252 config_params: dict[ConfigParams, float | int] | None = None, 253 verbosity: int | None = None, 254) -> pd.DataFrame: 255 """Run the Graphomotor pipeline to extract features from spiral drawing data. 256 257 Supports both single-file and batch (directory) processing. 258 259 Args: 260 input_path: Path to a CSV file (single-file mode) or a directory containing CSV 261 files (batch mode). 262 output_path: Path to save extracted features. If specifying a file, the path 263 must have a `.csv` extension. 264 - If None, features are not saved. 265 - If path has a CSV file extension, features are saved to that file. 266 - If path is a directory, features are saved to a CSV file with a custom 267 name and timestamp. 268 feature_categories: List of feature categories to extract. If None, defaults to 269 all available categories. Supported categories are: 270 - "duration": Task duration. 271 - "velocity": Velocity-based metrics. 272 - "hausdorff": Hausdorff distance metrics. 273 - "AUC": Area under the curve metric. 274 config_params: Dictionary of custom spiral configuration parameters for 275 reference spiral generation and centering. If None, default configuration is 276 used. Supported parameters are: 277 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 278 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 279 - "start_radius" (float): Starting radius of the spiral. Default is 0. 280 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 281 - "start_angle" (float): Starting angle of the spiral. Default is 0. 282 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 283 - "num_points" (int): Number of points in the spiral. Default is 10000. 284 verbosity: Logging verbosity level. If None, uses current logger level. 285 - 0: WARNING level (quiet - only warnings and errors) 286 - 1: INFO level (normal - includes info messages) 287 - 2: DEBUG level (verbose - includes debug messages) 288 289 Returns: 290 DataFrame containing the metadata and extracted features. 291 292 Raises: 293 ValueError: If the input path does not exist or is not a file/directory, if the 294 output path does not have a .csv extension, or if no valid feature 295 categories are provided. 296 """ 297 start_time = time.time() 298 299 if verbosity: 300 config.set_verbosity_level(verbosity) 301 302 logger.debug("Starting Graphomotor pipeline") 303 304 input_path = pathlib.Path(input_path) 305 306 if not input_path.exists() or ( 307 not input_path.is_file() and not input_path.is_dir() 308 ): 309 error_msg = ( 310 f"Input path does not exist or is not a file/directory: {input_path}" 311 ) 312 logger.error(error_msg) 313 raise ValueError(error_msg) 314 logger.debug(f"Input path: {input_path}") 315 316 if output_path: 317 output_path = pathlib.Path(output_path) 318 if output_path.suffix and output_path.suffix.lower() != ".csv": 319 error_msg = ( 320 f"Output file must have a .csv extension, got: {output_path.suffix}" 321 ) 322 logger.error(error_msg) 323 raise ValueError(error_msg) 324 logger.debug(f"Output path: {output_path}") 325 326 if feature_categories: 327 valid_categories = sorted(_validate_feature_categories(feature_categories)) 328 logger.debug(f"Requested feature categories: {valid_categories}") 329 else: 330 valid_categories = [*models.FeatureCategories.all()] 331 logger.debug(f"Using default feature categories: {valid_categories}") 332 333 if config_params and config_params != dataclasses.asdict(config.SpiralConfig()): 334 logger.debug(f"Custom spiral configuration: {config_params}") 335 spiral_config = config.SpiralConfig.add_custom_params( 336 typing.cast(dict, config_params) 337 ) 338 else: 339 spiral_config = config.SpiralConfig() 340 logger.debug( 341 f"Using default spiral configuration: {dataclasses.asdict(spiral_config)}" 342 ) 343 344 if input_path.is_file(): 345 logger.debug("Processing single file") 346 features = [_run_file(input_path, valid_categories, spiral_config)] 347 logger.debug( 348 "Single file processing complete, " 349 f"successfully extracted {len(features[0]) - 5} features" 350 ) 351 else: 352 logger.debug("Processing directory") 353 features = _run_directory(input_path, valid_categories, spiral_config) 354 logger.debug( 355 f"Batch processing complete, successfully processed {len(features)} files" 356 ) 357 358 features_df = pd.DataFrame(features) 359 features_df = features_df.set_index("source_file") 360 361 if output_path: 362 export_features_to_csv(features_df, output_path) 363 364 logger.info( 365 "Graphomotor pipeline completed successfully in " 366 f"{time.time() - start_time:.2f} seconds" 367 ) 368 369 return features_df
Run the Graphomotor pipeline to extract features from spiral drawing data.
Supports both single-file and batch (directory) processing.
Arguments:
- input_path: Path to a CSV file (single-file mode) or a directory containing CSV files (batch mode).
- output_path: Path to save extracted features. If specifying a file, the path
must have a
.csv
extension.- If None, features are not saved.
- If path has a CSV file extension, features are saved to that file.
- If path is a directory, features are saved to a CSV file with a custom name and timestamp.
- feature_categories: List of feature categories to extract. If None, defaults to
all available categories. Supported categories are:
- "duration": Task duration.
- "velocity": Velocity-based metrics.
- "hausdorff": Hausdorff distance metrics.
- "AUC": Area under the curve metric.
- config_params: Dictionary of custom spiral configuration parameters for
reference spiral generation and centering. If None, default configuration is
used. Supported parameters are:
- "center_x" (float): X-coordinate of the spiral center. Default is 50.
- "center_y" (float): Y-coordinate of the spiral center. Default is 50.
- "start_radius" (float): Starting radius of the spiral. Default is 0.
- "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
- "start_angle" (float): Starting angle of the spiral. Default is 0.
- "end_angle" (float): Ending angle of the spiral. Default is 8π.
- "num_points" (int): Number of points in the spiral. Default is 10000.
- verbosity: Logging verbosity level. If None, uses current logger level.
- 0: WARNING level (quiet - only warnings and errors)
- 1: INFO level (normal - includes info messages)
- 2: DEBUG level (verbose - includes debug messages)
Returns:
DataFrame containing the metadata and extracted features.
Raises:
- ValueError: If the input path does not exist or is not a file/directory, if the output path does not have a .csv extension, or if no valid feature categories are provided.