graphomotor.core.orchestrator

Runner for the Graphomotor pipeline.

  1"""Runner for the Graphomotor pipeline."""
  2
  3import datetime
  4import pathlib
  5import time
  6import typing
  7
  8import numpy as np
  9import pandas as pd
 10import tqdm
 11
 12from graphomotor.core import config, models
 13from graphomotor.io import reader
 14from graphomotor.utils import center_spiral, generate_reference_spiral
 15
 16logger = config.get_logger()
 17
 18FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"]
 19
 20
 21def _validate_feature_categories(
 22    feature_categories: list[FeatureCategories],
 23) -> set[str]:
 24    """Validate requested feature categories and return valid ones.
 25
 26    Args:
 27        feature_categories: List of feature categories to validate.
 28
 29    Returns:
 30        Set of valid feature categories.
 31
 32    Raises:
 33        ValueError: If no valid feature categories are provided.
 34    """
 35    feature_categories_set: set[str] = set(feature_categories)
 36    supported_categories_set = models.FeatureCategories.all()
 37    unknown_categories = feature_categories_set - supported_categories_set
 38    valid_requested_categories = feature_categories_set & supported_categories_set
 39
 40    if unknown_categories:
 41        logger.warning(
 42            "Unknown feature categories requested, these categories will be ignored: "
 43            f"{unknown_categories}"
 44        )
 45
 46    if not valid_requested_categories:
 47        error_msg = (
 48            "No valid feature categories provided. "
 49            f"Supported categories: {supported_categories_set}"
 50        )
 51        logger.error(error_msg)
 52        raise ValueError(error_msg)
 53
 54    return valid_requested_categories
 55
 56
 57def extract_features(
 58    spiral: models.Spiral,
 59    feature_categories: list[FeatureCategories],
 60    reference_spiral: np.ndarray,
 61) -> dict[str, str]:
 62    """Extract feature categories from spiral drawing data.
 63
 64    This function chooses which feature categories to extract based on the provided
 65    sequence of valid category names and returns a dictionary containing the extracted
 66    features with metadata.
 67
 68    Args:
 69        spiral: Spiral object containing drawing data and metadata.
 70        feature_categories: List of feature categories to extract. Valid options are:
 71            - "duration": Extract task duration.
 72            - "velocity": Extract velocity-based metrics.
 73            - "hausdorff": Extract Hausdorff distance metrics.
 74            - "AUC": Extract area under the curve metric.
 75        reference_spiral: Reference spiral for comparison.
 76
 77    Returns:
 78        Dictionary containing the extracted features with metadata.
 79    """
 80    valid_categories = sorted(_validate_feature_categories(feature_categories))
 81
 82    feature_extractors = models.FeatureCategories.get_extractors(
 83        spiral, reference_spiral
 84    )
 85
 86    features: dict[str, float] = {}
 87    for category in valid_categories:
 88        logger.debug(f"Extracting {category} features")
 89        category_features = feature_extractors[category]()
 90        features.update(category_features)
 91        logger.debug(f"{category.capitalize()} features extracted")
 92
 93    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
 94
 95    formatted_features_with_metadata = {
 96        "source_file": str(spiral.metadata.get("source_path", "")),
 97        "participant_id": str(spiral.metadata.get("id", "")),
 98        "task": str(spiral.metadata.get("task", "")),
 99        "hand": str(spiral.metadata.get("hand", "")),
100        "start_time": str(spiral.metadata.get("start_time", "")),
101        **formatted_features,
102    }
103
104    return formatted_features_with_metadata
105
106
107def export_features_to_csv(
108    features_df: pd.DataFrame,
109    output_path: pathlib.Path,
110) -> None:
111    """Export extracted features to a CSV file.
112
113    Args:
114        features_df: DataFrame containing all metadata and features.
115        output_path: Path to the output CSV file.
116    """
117    if not output_path.suffix:
118        if not output_path.exists():
119            logger.debug(f"Creating directory that doesn't exist: {output_path}")
120            output_path.mkdir(parents=True)
121        if features_df.shape[0] == 1:
122            filename = (
123                f"{features_df['participant_id'].iloc[0]}_"
124                f"{features_df['task'].iloc[0]}_"
125                f"{features_df['hand'].iloc[0]}_features_"
126            )
127        else:
128            filename = "batch_features_"
129        output_file = (
130            output_path
131            / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv"
132        )
133    else:
134        parent_dir = output_path.parent
135        if not parent_dir.exists():
136            logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}")
137            parent_dir.mkdir(parents=True)
138        output_file = output_path
139
140    logger.debug(f"Saving extracted features to {output_file}")
141
142    if output_file.exists():
143        logger.debug(f"Overwriting existing file: {output_file}")
144
145    try:
146        features_df.to_csv(output_file)
147        logger.debug(f"Features saved successfully to {output_file}")
148    except Exception as e:
149        logger.warning(f"Failed to save features to {output_file}: {str(e)}")
150
151
152def _run_file(
153    input_path: pathlib.Path,
154    feature_categories: list[FeatureCategories],
155    spiral_config: config.SpiralConfig,
156) -> dict[str, str]:
157    """Process a single file for feature extraction.
158
159    Args:
160        input_path: Path to the input CSV file containing spiral drawing data.
161        feature_categories: List of feature categories to extract.
162        spiral_config: Configuration for spiral parameters.
163
164    Returns:
165        Dictionary containing the extracted features with metadata.
166    """
167    logger.debug(f"Processing file: {input_path}")
168    spiral = reader.load_spiral(input_path)
169    centered_spiral = center_spiral.center_spiral(spiral)
170    reference_spiral = generate_reference_spiral.generate_reference_spiral(
171        spiral_config
172    )
173    centered_reference_spiral = center_spiral.center_spiral(reference_spiral)
174
175    return extract_features(
176        centered_spiral, feature_categories, centered_reference_spiral
177    )
178
179
180def _run_directory(
181    input_path: pathlib.Path,
182    feature_categories: list[FeatureCategories],
183    spiral_config: config.SpiralConfig,
184) -> list[dict[str, str]]:
185    """Process all CSV files in a directory and its subdirectories.
186
187    Args:
188        input_path: Path to the input directory containing CSV files.
189        feature_categories: List of feature categories to extract.
190        spiral_config: Configuration for spiral parameters.
191
192    Returns:
193        List of dictionaries, each containing extracted features with metadata
194        for one processed file.
195
196    Raises:
197        ValueError: If no CSV files are found in the directory.
198    """
199    logger.debug(f"Processing directory: {input_path}")
200
201    csv_files = list(input_path.rglob("*.csv"))
202
203    if not csv_files:
204        error_msg = f"No CSV files found in directory: {input_path}"
205        logger.error(error_msg)
206        raise ValueError(error_msg)
207
208    logger.debug(f"Found {len(csv_files)} CSV files to process")
209
210    results: list[dict[str, str]] = []
211    failed_files: list[str] = []
212
213    progress_bar = tqdm.tqdm(
214        csv_files,
215        desc="Processing files",
216        unit="file",
217        bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} "
218        "[{elapsed}<{remaining}, {rate_fmt}]",
219    )
220
221    for file_index, csv_file in enumerate(progress_bar, 1):
222        try:
223            progress_bar.set_postfix({"Current": csv_file.name})
224            logger.debug(
225                f"Processing file {csv_file.name} ({file_index}/{len(csv_files)})"
226            )
227            features = _run_file(csv_file, feature_categories, spiral_config)
228            results.append(features)
229            logger.debug(f"Successfully processed {csv_file.name}")
230        except Exception as e:
231            logger.warning(f"Failed to process {csv_file.name}: {str(e)}")
232            failed_files.append(csv_file.name)
233            continue
234
235    if not results:
236        error_msg = "Could not extract features from any file in the directory."
237        logger.error(error_msg)
238        raise ValueError(error_msg)
239
240    if failed_files:
241        logger.warning(f"Failed to process {len(failed_files)} files")
242
243    return results
244
245
246def run_pipeline(
247    input_path: pathlib.Path | str,
248    output_path: pathlib.Path | str | None = None,
249    feature_categories: list[FeatureCategories] = [
250        "duration",
251        "velocity",
252        "hausdorff",
253        "AUC",
254    ],
255    config_params: dict[
256        typing.Literal[
257            "center_x",
258            "center_y",
259            "start_radius",
260            "growth_rate",
261            "start_angle",
262            "end_angle",
263            "num_points",
264        ],
265        float | int,
266    ]
267    | None = None,
268) -> pd.DataFrame:
269    """Run the Graphomotor pipeline to extract features from spiral drawing data.
270
271    Supports both single-file and batch (directory) processing.
272
273    Args:
274        input_path: Path to a CSV file (single-file mode) or a directory containing CSV
275            files (batch mode).
276        output_path: Path to save extracted features. If specifying a file, the path
277            must have a `.csv` extension.
278            - If None, features are not saved.
279            - If path has a CSV file extension, features are saved to that file.
280            - If path is a directory, features are saved to a CSV file with a custom
281              name and timestamp.
282        feature_categories: List of feature categories to extract. Defaults to all
283            available:
284            - "duration": Task duration.
285            - "velocity": Velocity-based metrics.
286            - "hausdorff": Hausdorff distance metrics.
287            - "AUC": Area under the curve metric.
288        config_params: Dictionary of custom spiral configuration parameters for
289            reference spiral generation and centering. If None, default configuration is
290            used. Supported parameters are:
291            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
292            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
293            - "start_radius" (float): Starting radius of the spiral. Default is 0.
294            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
295            - "start_angle" (float): Starting angle of the spiral. Default is 0.
296            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
297            - "num_points" (int): Number of points in the spiral. Default is 10000.
298
299    Returns:
300        DataFrame containing the metadata and extracted features.
301
302    Raises:
303        ValueError: If the input path does not exist or is not a file/directory, if the
304            output path does not have a .csv extension, or if no valid feature
305            categories are provided.
306    """
307    start_time = time.time()
308
309    logger.info("Starting Graphomotor pipeline")
310    logger.info(f"Input path: {input_path}")
311    logger.info(f"Output path: {output_path}")
312    logger.info(f"Feature categories: {feature_categories}")
313
314    input_path = pathlib.Path(input_path)
315
316    if not input_path.exists() or (
317        not input_path.is_file() and not input_path.is_dir()
318    ):
319        error_msg = (
320            f"Input path does not exist or is not a file/directory: {input_path}"
321        )
322        logger.error(error_msg)
323        raise ValueError(error_msg)
324
325    if output_path:
326        output_path = pathlib.Path(output_path)
327        if output_path.suffix and output_path.suffix.lower() != ".csv":
328            error_msg = (
329                f"Output file must have a .csv extension, got: {output_path.suffix}"
330            )
331            logger.error(error_msg)
332            raise ValueError(error_msg)
333
334    if config_params:
335        logger.info(f"Custom spiral configuration: {config_params}")
336        spiral_config = config.SpiralConfig.add_custom_params(
337            typing.cast(dict, config_params)
338        )
339    else:
340        spiral_config = config.SpiralConfig()
341
342    if input_path.is_file():
343        logger.info("Processing single file")
344        features = [_run_file(input_path, feature_categories, spiral_config)]
345        logger.info(
346            "Single file processing complete, "
347            f"successfully extracted {len(features[0]) - 5} features"
348        )
349    else:
350        logger.info("Processing directory")
351        features = _run_directory(input_path, feature_categories, spiral_config)
352        logger.info(
353            f"Batch processing complete, successfully processed {len(features)} files"
354        )
355
356    features_df = pd.DataFrame(features)
357    features_df = features_df.set_index("source_file")
358
359    if output_path:
360        export_features_to_csv(features_df, output_path)
361
362    logger.info(
363        "Graphomotor pipeline completed successfully in "
364        f"{time.time() - start_time:.2f} seconds"
365    )
366
367    return features_df
logger = <Logger graphomotor (INFO)>
FeatureCategories = typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
def extract_features( spiral: graphomotor.core.models.Spiral, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']], reference_spiral: numpy.ndarray) -> dict[str, str]:
 58def extract_features(
 59    spiral: models.Spiral,
 60    feature_categories: list[FeatureCategories],
 61    reference_spiral: np.ndarray,
 62) -> dict[str, str]:
 63    """Extract feature categories from spiral drawing data.
 64
 65    This function chooses which feature categories to extract based on the provided
 66    sequence of valid category names and returns a dictionary containing the extracted
 67    features with metadata.
 68
 69    Args:
 70        spiral: Spiral object containing drawing data and metadata.
 71        feature_categories: List of feature categories to extract. Valid options are:
 72            - "duration": Extract task duration.
 73            - "velocity": Extract velocity-based metrics.
 74            - "hausdorff": Extract Hausdorff distance metrics.
 75            - "AUC": Extract area under the curve metric.
 76        reference_spiral: Reference spiral for comparison.
 77
 78    Returns:
 79        Dictionary containing the extracted features with metadata.
 80    """
 81    valid_categories = sorted(_validate_feature_categories(feature_categories))
 82
 83    feature_extractors = models.FeatureCategories.get_extractors(
 84        spiral, reference_spiral
 85    )
 86
 87    features: dict[str, float] = {}
 88    for category in valid_categories:
 89        logger.debug(f"Extracting {category} features")
 90        category_features = feature_extractors[category]()
 91        features.update(category_features)
 92        logger.debug(f"{category.capitalize()} features extracted")
 93
 94    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
 95
 96    formatted_features_with_metadata = {
 97        "source_file": str(spiral.metadata.get("source_path", "")),
 98        "participant_id": str(spiral.metadata.get("id", "")),
 99        "task": str(spiral.metadata.get("task", "")),
100        "hand": str(spiral.metadata.get("hand", "")),
101        "start_time": str(spiral.metadata.get("start_time", "")),
102        **formatted_features,
103    }
104
105    return formatted_features_with_metadata

Extract feature categories from spiral drawing data.

This function chooses which feature categories to extract based on the provided sequence of valid category names and returns a dictionary containing the extracted features with metadata.

Arguments:
  • spiral: Spiral object containing drawing data and metadata.
  • feature_categories: List of feature categories to extract. Valid options are:
    • "duration": Extract task duration.
    • "velocity": Extract velocity-based metrics.
    • "hausdorff": Extract Hausdorff distance metrics.
    • "AUC": Extract area under the curve metric.
  • reference_spiral: Reference spiral for comparison.
Returns:

Dictionary containing the extracted features with metadata.

def export_features_to_csv( features_df: pandas.core.frame.DataFrame, output_path: pathlib._local.Path) -> None:
108def export_features_to_csv(
109    features_df: pd.DataFrame,
110    output_path: pathlib.Path,
111) -> None:
112    """Export extracted features to a CSV file.
113
114    Args:
115        features_df: DataFrame containing all metadata and features.
116        output_path: Path to the output CSV file.
117    """
118    if not output_path.suffix:
119        if not output_path.exists():
120            logger.debug(f"Creating directory that doesn't exist: {output_path}")
121            output_path.mkdir(parents=True)
122        if features_df.shape[0] == 1:
123            filename = (
124                f"{features_df['participant_id'].iloc[0]}_"
125                f"{features_df['task'].iloc[0]}_"
126                f"{features_df['hand'].iloc[0]}_features_"
127            )
128        else:
129            filename = "batch_features_"
130        output_file = (
131            output_path
132            / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv"
133        )
134    else:
135        parent_dir = output_path.parent
136        if not parent_dir.exists():
137            logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}")
138            parent_dir.mkdir(parents=True)
139        output_file = output_path
140
141    logger.debug(f"Saving extracted features to {output_file}")
142
143    if output_file.exists():
144        logger.debug(f"Overwriting existing file: {output_file}")
145
146    try:
147        features_df.to_csv(output_file)
148        logger.debug(f"Features saved successfully to {output_file}")
149    except Exception as e:
150        logger.warning(f"Failed to save features to {output_file}: {str(e)}")

Export extracted features to a CSV file.

Arguments:
  • features_df: DataFrame containing all metadata and features.
  • output_path: Path to the output CSV file.
def run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] = ['duration', 'velocity', 'hausdorff', 'AUC'], config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None) -> pandas.core.frame.DataFrame:
247def run_pipeline(
248    input_path: pathlib.Path | str,
249    output_path: pathlib.Path | str | None = None,
250    feature_categories: list[FeatureCategories] = [
251        "duration",
252        "velocity",
253        "hausdorff",
254        "AUC",
255    ],
256    config_params: dict[
257        typing.Literal[
258            "center_x",
259            "center_y",
260            "start_radius",
261            "growth_rate",
262            "start_angle",
263            "end_angle",
264            "num_points",
265        ],
266        float | int,
267    ]
268    | None = None,
269) -> pd.DataFrame:
270    """Run the Graphomotor pipeline to extract features from spiral drawing data.
271
272    Supports both single-file and batch (directory) processing.
273
274    Args:
275        input_path: Path to a CSV file (single-file mode) or a directory containing CSV
276            files (batch mode).
277        output_path: Path to save extracted features. If specifying a file, the path
278            must have a `.csv` extension.
279            - If None, features are not saved.
280            - If path has a CSV file extension, features are saved to that file.
281            - If path is a directory, features are saved to a CSV file with a custom
282              name and timestamp.
283        feature_categories: List of feature categories to extract. Defaults to all
284            available:
285            - "duration": Task duration.
286            - "velocity": Velocity-based metrics.
287            - "hausdorff": Hausdorff distance metrics.
288            - "AUC": Area under the curve metric.
289        config_params: Dictionary of custom spiral configuration parameters for
290            reference spiral generation and centering. If None, default configuration is
291            used. Supported parameters are:
292            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
293            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
294            - "start_radius" (float): Starting radius of the spiral. Default is 0.
295            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
296            - "start_angle" (float): Starting angle of the spiral. Default is 0.
297            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
298            - "num_points" (int): Number of points in the spiral. Default is 10000.
299
300    Returns:
301        DataFrame containing the metadata and extracted features.
302
303    Raises:
304        ValueError: If the input path does not exist or is not a file/directory, if the
305            output path does not have a .csv extension, or if no valid feature
306            categories are provided.
307    """
308    start_time = time.time()
309
310    logger.info("Starting Graphomotor pipeline")
311    logger.info(f"Input path: {input_path}")
312    logger.info(f"Output path: {output_path}")
313    logger.info(f"Feature categories: {feature_categories}")
314
315    input_path = pathlib.Path(input_path)
316
317    if not input_path.exists() or (
318        not input_path.is_file() and not input_path.is_dir()
319    ):
320        error_msg = (
321            f"Input path does not exist or is not a file/directory: {input_path}"
322        )
323        logger.error(error_msg)
324        raise ValueError(error_msg)
325
326    if output_path:
327        output_path = pathlib.Path(output_path)
328        if output_path.suffix and output_path.suffix.lower() != ".csv":
329            error_msg = (
330                f"Output file must have a .csv extension, got: {output_path.suffix}"
331            )
332            logger.error(error_msg)
333            raise ValueError(error_msg)
334
335    if config_params:
336        logger.info(f"Custom spiral configuration: {config_params}")
337        spiral_config = config.SpiralConfig.add_custom_params(
338            typing.cast(dict, config_params)
339        )
340    else:
341        spiral_config = config.SpiralConfig()
342
343    if input_path.is_file():
344        logger.info("Processing single file")
345        features = [_run_file(input_path, feature_categories, spiral_config)]
346        logger.info(
347            "Single file processing complete, "
348            f"successfully extracted {len(features[0]) - 5} features"
349        )
350    else:
351        logger.info("Processing directory")
352        features = _run_directory(input_path, feature_categories, spiral_config)
353        logger.info(
354            f"Batch processing complete, successfully processed {len(features)} files"
355        )
356
357    features_df = pd.DataFrame(features)
358    features_df = features_df.set_index("source_file")
359
360    if output_path:
361        export_features_to_csv(features_df, output_path)
362
363    logger.info(
364        "Graphomotor pipeline completed successfully in "
365        f"{time.time() - start_time:.2f} seconds"
366    )
367
368    return features_df

Run the Graphomotor pipeline to extract features from spiral drawing data.

Supports both single-file and batch (directory) processing.

Arguments:
  • input_path: Path to a CSV file (single-file mode) or a directory containing CSV files (batch mode).
  • output_path: Path to save extracted features. If specifying a file, the path must have a .csv extension.
    • If None, features are not saved.
    • If path has a CSV file extension, features are saved to that file.
    • If path is a directory, features are saved to a CSV file with a custom name and timestamp.
  • feature_categories: List of feature categories to extract. Defaults to all available:
    • "duration": Task duration.
    • "velocity": Velocity-based metrics.
    • "hausdorff": Hausdorff distance metrics.
    • "AUC": Area under the curve metric.
  • config_params: Dictionary of custom spiral configuration parameters for reference spiral generation and centering. If None, default configuration is used. Supported parameters are:
    • "center_x" (float): X-coordinate of the spiral center. Default is 50.
    • "center_y" (float): Y-coordinate of the spiral center. Default is 50.
    • "start_radius" (float): Starting radius of the spiral. Default is 0.
    • "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
    • "start_angle" (float): Starting angle of the spiral. Default is 0.
    • "end_angle" (float): Ending angle of the spiral. Default is 8π.
    • "num_points" (int): Number of points in the spiral. Default is 10000.
Returns:

DataFrame containing the metadata and extracted features.

Raises:
  • ValueError: If the input path does not exist or is not a file/directory, if the output path does not have a .csv extension, or if no valid feature categories are provided.