graphomotor.core.orchestrator
Runner for the Graphomotor pipeline.
1"""Runner for the Graphomotor pipeline.""" 2 3import datetime 4import os 5import pathlib 6import typing 7 8import numpy as np 9import pandas as pd 10 11from graphomotor.core import config, models 12from graphomotor.io import reader 13from graphomotor.utils import center_spiral, generate_reference_spiral 14 15logger = config.get_logger() 16 17FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"] 18 19 20def _ensure_path(path: pathlib.Path | str) -> pathlib.Path: 21 """Ensure that the input is a Path object. 22 23 Args: 24 path: Input path, can be string or Path 25 26 Returns: 27 Path object 28 """ 29 return pathlib.Path(path) if isinstance(path, str) else path 30 31 32def _validate_feature_categories( 33 feature_categories: list[FeatureCategories], 34) -> set[str]: 35 """Validate requested feature categories and return valid ones. 36 37 Args: 38 feature_categories: List of feature categories to validate. 39 40 Returns: 41 Set of valid feature categories. 42 43 Raises: 44 ValueError: If no valid feature categories are provided. 45 """ 46 feature_categories_set: set[str] = set(feature_categories) 47 supported_categories_set = models.FeatureCategories.all() 48 unknown_categories = feature_categories_set - supported_categories_set 49 valid_requested_categories = feature_categories_set & supported_categories_set 50 51 if unknown_categories: 52 logger.warning( 53 "Unknown feature categories requested, these categories will be ignored: " 54 f"{unknown_categories}" 55 ) 56 57 if not valid_requested_categories: 58 error_msg = ( 59 "No valid feature categories provided. " 60 f"Supported categories: {supported_categories_set}" 61 ) 62 logger.error(error_msg) 63 raise ValueError(error_msg) 64 65 return valid_requested_categories 66 67 68def _get_feature_categories( 69 spiral: models.Spiral, 70 reference_spiral: np.ndarray, 71 feature_categories: list[FeatureCategories], 72) -> dict[str, float]: 73 """Feature categories dispatcher. 74 75 This function chooses which feature categories to extract based on the provided 76 sequence of valid category names and returns a dictionary containing the extracted 77 features. 78 79 Args: 80 spiral: The spiral data to extract features from. 81 reference_spiral: The reference spiral used for calculating features. 82 feature_categories: List of feature categories to extract. 83 84 Returns: 85 Dictionary containing the extracted features. 86 """ 87 valid_categories = _validate_feature_categories(feature_categories) 88 89 feature_extractors = models.FeatureCategories.get_extractors( 90 spiral, reference_spiral 91 ) 92 93 features = {} 94 for category in valid_categories: 95 logger.debug(f"Extracting {category} features") 96 category_features = feature_extractors[category]() 97 features.update(category_features) 98 logger.debug(f"{category.capitalize()} features extracted: {category_features}") 99 100 return features 101 102 103def _export_features_to_csv( 104 spiral: models.Spiral, 105 features: dict[str, str], 106 input_path: pathlib.Path, 107 output_path: pathlib.Path, 108) -> None: 109 """Export extracted features to a CSV file. 110 111 Args: 112 spiral: The spiral data used for feature extraction. 113 features: Dictionary containing the extracted features. 114 input_path: Path to the input CSV file. 115 output_path: Path to the output CSV file. 116 """ 117 logger.info(f"Saving extracted features to {output_path}") 118 119 participant_id = spiral.metadata.get("id") 120 task = spiral.metadata.get("task") 121 hand = spiral.metadata.get("hand") 122 123 filename = ( 124 f"{participant_id}_{task}_{hand}_features_" 125 f"{datetime.datetime.today().strftime('%Y%m%d')}.csv" 126 ) 127 128 if not output_path.suffix: 129 if not os.path.exists(output_path): 130 logger.info(f"Creating directory that doesn't exist: {output_path}") 131 os.makedirs(output_path, exist_ok=True) 132 output_file = output_path / filename 133 else: 134 parent_dir = output_path.parent 135 if not os.path.exists(parent_dir): 136 logger.info(f"Creating parent directory that doesn't exist: {parent_dir}") 137 os.makedirs(parent_dir, exist_ok=True) 138 output_file = output_path 139 140 if os.path.exists(output_file): 141 logger.info(f"Overwriting existing file: {output_file}") 142 143 metadata = { 144 "participant_id": participant_id, 145 "task": task, 146 "hand": hand, 147 "source_file": str(input_path), 148 } 149 150 features_df = pd.DataFrame( 151 { 152 "variable": list(metadata.keys()) + list(features.keys()), 153 "value": list(metadata.values()) + list(features.values()), 154 } 155 ) 156 157 try: 158 features_df.to_csv(output_file, index=False, header=False) 159 logger.info(f"Features saved successfully to {output_file}") 160 except Exception as e: 161 # Allowed to pass in Jupyter Notebook scenarios. 162 logger.error(f"Failed to save features to {output_file}: {str(e)}") 163 164 165def extract_features( 166 input_path: pathlib.Path | str, 167 output_path: pathlib.Path | str | None, 168 feature_categories: list[FeatureCategories], 169 spiral_config: config.SpiralConfig | None, 170) -> dict[str, str]: 171 """Extract features from spiral drawing data. 172 173 Args: 174 input_path: Path to the input CSV file containing spiral drawing data. 175 output_path: Path to the output directory for saving extracted features. If 176 None, features are not saved. 177 feature_categories: List of feature categories to extract. Valid options are: 178 - "duration": Extract task duration. 179 - "velocity": Extract velocity-based metrics. 180 - "hausdorff": Extract Hausdorff distance metrics. 181 - "AUC": Extract area under the curve metric. 182 spiral_config: Optional configuration for spiral parameters. If None, default 183 parameters are used. 184 185 Returns: 186 Dictionary containing the extracted features. 187 """ 188 logger.debug(f"Loading spiral data from {input_path}") 189 input_path = _ensure_path(input_path) 190 spiral = reader.load_spiral(input_path) 191 centered_spiral = center_spiral.center_spiral(spiral) 192 193 logger.debug("Generating reference spiral to calculate features") 194 config_to_use = spiral_config or config.SpiralConfig() 195 reference_spiral = generate_reference_spiral.generate_reference_spiral( 196 config=config_to_use 197 ) 198 centered_reference_spiral = center_spiral.center_spiral(reference_spiral) 199 200 features = _get_feature_categories( 201 centered_spiral, centered_reference_spiral, feature_categories 202 ) 203 logger.info(f"Feature extraction complete. Extracted {len(features)} features") 204 205 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 206 207 if output_path: 208 output_path = _ensure_path(output_path) 209 _export_features_to_csv(spiral, formatted_features, input_path, output_path) 210 211 return formatted_features 212 213 214def run_pipeline( 215 input_path: pathlib.Path | str, 216 output_path: pathlib.Path | str | None = None, 217 feature_categories: list[FeatureCategories] = [ 218 "duration", 219 "velocity", 220 "hausdorff", 221 "AUC", 222 ], 223 config_params: dict[ 224 typing.Literal[ 225 "center_x", 226 "center_y", 227 "start_radius", 228 "growth_rate", 229 "start_angle", 230 "end_angle", 231 "num_points", 232 ], 233 float | int, 234 ] 235 | None = None, 236) -> dict[str, str]: 237 """Run the Graphomotor pipeline to extract features from spiral drawings. 238 239 Args: 240 input_path: Path to the input CSV file with spiral drawing data. 241 output_path: Path to save extracted features. If None, features aren't saved. If 242 path has an extension, features are saved to that file. If path points to a 243 directory, a file is created with participant ID, task, hand, and date in 244 the filename. 245 feature_categories: Feature categories to extract. Defaults to all available 246 categories: 247 - "duration": Task duration 248 - "velocity": Velocity-based metrics 249 - "hausdorff": Hausdorff distance metrics 250 - "AUC": Area under the curve metric 251 config_params: Optional dictionary with custom spiral configuration parameters. 252 These parameters control reference spiral generation and spiral centering. 253 If None, default configuration is used. Supported parameters are: 254 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 255 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 256 - "start_radius" (float): Starting radius of the spiral. Default is 0. 257 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 258 - "start_angle" (float): Starting angle of the spiral. Default is 0. 259 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 260 - "num_points" (int): Number of points in the spiral. Default is 10000. 261 262 Returns: 263 Dictionary of extracted features. 264 """ 265 logger.info("Starting Graphomotor pipeline") 266 logger.info(f"Input path: {input_path}") 267 logger.info(f"Output path: {output_path}") 268 logger.info(f"Feature categories: {feature_categories}") 269 270 spiral_config = None 271 if config_params: 272 logger.info(f"Custom spiral configuration: {config_params}") 273 spiral_config = config.SpiralConfig.add_custom_params( 274 typing.cast(dict, config_params) 275 ) 276 277 features = extract_features( 278 input_path, output_path, feature_categories, spiral_config 279 ) 280 281 logger.info("Graphomotor pipeline completed successfully") 282 return features
logger =
<Logger graphomotor (INFO)>
FeatureCategories =
typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
def
extract_features( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']], spiral_config: graphomotor.core.config.SpiralConfig | None) -> dict[str, str]:
166def extract_features( 167 input_path: pathlib.Path | str, 168 output_path: pathlib.Path | str | None, 169 feature_categories: list[FeatureCategories], 170 spiral_config: config.SpiralConfig | None, 171) -> dict[str, str]: 172 """Extract features from spiral drawing data. 173 174 Args: 175 input_path: Path to the input CSV file containing spiral drawing data. 176 output_path: Path to the output directory for saving extracted features. If 177 None, features are not saved. 178 feature_categories: List of feature categories to extract. Valid options are: 179 - "duration": Extract task duration. 180 - "velocity": Extract velocity-based metrics. 181 - "hausdorff": Extract Hausdorff distance metrics. 182 - "AUC": Extract area under the curve metric. 183 spiral_config: Optional configuration for spiral parameters. If None, default 184 parameters are used. 185 186 Returns: 187 Dictionary containing the extracted features. 188 """ 189 logger.debug(f"Loading spiral data from {input_path}") 190 input_path = _ensure_path(input_path) 191 spiral = reader.load_spiral(input_path) 192 centered_spiral = center_spiral.center_spiral(spiral) 193 194 logger.debug("Generating reference spiral to calculate features") 195 config_to_use = spiral_config or config.SpiralConfig() 196 reference_spiral = generate_reference_spiral.generate_reference_spiral( 197 config=config_to_use 198 ) 199 centered_reference_spiral = center_spiral.center_spiral(reference_spiral) 200 201 features = _get_feature_categories( 202 centered_spiral, centered_reference_spiral, feature_categories 203 ) 204 logger.info(f"Feature extraction complete. Extracted {len(features)} features") 205 206 formatted_features = {k: f"{v:.15f}" for k, v in features.items()} 207 208 if output_path: 209 output_path = _ensure_path(output_path) 210 _export_features_to_csv(spiral, formatted_features, input_path, output_path) 211 212 return formatted_features
Extract features from spiral drawing data.
Arguments:
- input_path: Path to the input CSV file containing spiral drawing data.
- output_path: Path to the output directory for saving extracted features. If None, features are not saved.
- feature_categories: List of feature categories to extract. Valid options are:
- "duration": Extract task duration.
- "velocity": Extract velocity-based metrics.
- "hausdorff": Extract Hausdorff distance metrics.
- "AUC": Extract area under the curve metric.
- spiral_config: Optional configuration for spiral parameters. If None, default parameters are used.
Returns:
Dictionary containing the extracted features.
def
run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] = ['duration', 'velocity', 'hausdorff', 'AUC'], config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None) -> dict[str, str]:
215def run_pipeline( 216 input_path: pathlib.Path | str, 217 output_path: pathlib.Path | str | None = None, 218 feature_categories: list[FeatureCategories] = [ 219 "duration", 220 "velocity", 221 "hausdorff", 222 "AUC", 223 ], 224 config_params: dict[ 225 typing.Literal[ 226 "center_x", 227 "center_y", 228 "start_radius", 229 "growth_rate", 230 "start_angle", 231 "end_angle", 232 "num_points", 233 ], 234 float | int, 235 ] 236 | None = None, 237) -> dict[str, str]: 238 """Run the Graphomotor pipeline to extract features from spiral drawings. 239 240 Args: 241 input_path: Path to the input CSV file with spiral drawing data. 242 output_path: Path to save extracted features. If None, features aren't saved. If 243 path has an extension, features are saved to that file. If path points to a 244 directory, a file is created with participant ID, task, hand, and date in 245 the filename. 246 feature_categories: Feature categories to extract. Defaults to all available 247 categories: 248 - "duration": Task duration 249 - "velocity": Velocity-based metrics 250 - "hausdorff": Hausdorff distance metrics 251 - "AUC": Area under the curve metric 252 config_params: Optional dictionary with custom spiral configuration parameters. 253 These parameters control reference spiral generation and spiral centering. 254 If None, default configuration is used. Supported parameters are: 255 - "center_x" (float): X-coordinate of the spiral center. Default is 50. 256 - "center_y" (float): Y-coordinate of the spiral center. Default is 50. 257 - "start_radius" (float): Starting radius of the spiral. Default is 0. 258 - "growth_rate" (float): Growth rate of the spiral. Default is 1.075. 259 - "start_angle" (float): Starting angle of the spiral. Default is 0. 260 - "end_angle" (float): Ending angle of the spiral. Default is 8π. 261 - "num_points" (int): Number of points in the spiral. Default is 10000. 262 263 Returns: 264 Dictionary of extracted features. 265 """ 266 logger.info("Starting Graphomotor pipeline") 267 logger.info(f"Input path: {input_path}") 268 logger.info(f"Output path: {output_path}") 269 logger.info(f"Feature categories: {feature_categories}") 270 271 spiral_config = None 272 if config_params: 273 logger.info(f"Custom spiral configuration: {config_params}") 274 spiral_config = config.SpiralConfig.add_custom_params( 275 typing.cast(dict, config_params) 276 ) 277 278 features = extract_features( 279 input_path, output_path, feature_categories, spiral_config 280 ) 281 282 logger.info("Graphomotor pipeline completed successfully") 283 return features
Run the Graphomotor pipeline to extract features from spiral drawings.
Arguments:
- input_path: Path to the input CSV file with spiral drawing data.
- output_path: Path to save extracted features. If None, features aren't saved. If path has an extension, features are saved to that file. If path points to a directory, a file is created with participant ID, task, hand, and date in the filename.
- feature_categories: Feature categories to extract. Defaults to all available
categories:
- "duration": Task duration
- "velocity": Velocity-based metrics
- "hausdorff": Hausdorff distance metrics
- "AUC": Area under the curve metric
- config_params: Optional dictionary with custom spiral configuration parameters.
These parameters control reference spiral generation and spiral centering.
If None, default configuration is used. Supported parameters are:
- "center_x" (float): X-coordinate of the spiral center. Default is 50.
- "center_y" (float): Y-coordinate of the spiral center. Default is 50.
- "start_radius" (float): Starting radius of the spiral. Default is 0.
- "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
- "start_angle" (float): Starting angle of the spiral. Default is 0.
- "end_angle" (float): Ending angle of the spiral. Default is 8π.
- "num_points" (int): Number of points in the spiral. Default is 10000.
Returns:
Dictionary of extracted features.