graphomotor.core.orchestrator

Runner for graphomotor.

  1"""Runner for graphomotor."""
  2
  3import dataclasses
  4import datetime
  5import pathlib
  6import time
  7import typing
  8
  9import numpy as np
 10import pandas as pd
 11import tqdm
 12
 13from graphomotor.core import config, models
 14from graphomotor.io import reader
 15from graphomotor.utils import center_spiral, generate_reference_spiral
 16
 17logger = config.get_logger()
 18
 19FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"]
 20ConfigParams = typing.Literal[
 21    "center_x",
 22    "center_y",
 23    "start_radius",
 24    "growth_rate",
 25    "start_angle",
 26    "end_angle",
 27    "num_points",
 28]
 29
 30
 31def _validate_feature_categories(
 32    feature_categories: list[FeatureCategories],
 33) -> set[str]:
 34    """Validate requested feature categories and return valid ones.
 35
 36    Args:
 37        feature_categories: List of feature categories to validate.
 38
 39    Returns:
 40        Set of valid feature categories.
 41
 42    Raises:
 43        ValueError: If no valid feature categories are provided.
 44    """
 45    feature_categories_set: set[str] = set(feature_categories)
 46    supported_categories_set = models.FeatureCategories.all()
 47    unknown_categories = feature_categories_set - supported_categories_set
 48    valid_requested_categories = feature_categories_set & supported_categories_set
 49
 50    if unknown_categories:
 51        logger.warning(
 52            "Unknown feature categories requested, these categories will be ignored: "
 53            f"{unknown_categories}"
 54        )
 55
 56    if not valid_requested_categories:
 57        error_msg = (
 58            "No valid feature categories provided. "
 59            f"Supported categories: {supported_categories_set}"
 60        )
 61        logger.error(error_msg)
 62        raise ValueError(error_msg)
 63
 64    return valid_requested_categories
 65
 66
 67def extract_features(
 68    spiral: models.Spiral,
 69    feature_categories: list[str],
 70    reference_spiral: np.ndarray,
 71) -> dict[str, str]:
 72    """Extract feature categories from spiral drawing data.
 73
 74    This function chooses which feature categories to extract based on the provided
 75    sequence of valid category names and returns a dictionary containing the extracted
 76    features with metadata.
 77
 78    Args:
 79        spiral: Spiral object containing drawing data and metadata.
 80        feature_categories: List of feature categories to extract.
 81        reference_spiral: Reference spiral for comparison.
 82
 83    Returns:
 84        Dictionary containing the extracted features with metadata.
 85    """
 86    feature_extractors = models.FeatureCategories.get_extractors(
 87        spiral, reference_spiral
 88    )
 89
 90    features: dict[str, float] = {}
 91    for category in feature_categories:
 92        logger.debug(f"Extracting {category} features")
 93        category_features = feature_extractors[category]()
 94        features.update(category_features)
 95        logger.debug(f"{category} features extracted")
 96
 97    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
 98
 99    formatted_features_with_metadata = {
100        "source_file": str(spiral.metadata.get("source_path")),
101        "participant_id": str(spiral.metadata.get("id")),
102        "task": str(spiral.metadata.get("task")),
103        "hand": str(spiral.metadata.get("hand")),
104        "start_time": str(spiral.metadata.get("start_time")),
105        **formatted_features,
106    }
107
108    return formatted_features_with_metadata
109
110
111def export_features_to_csv(
112    features_df: pd.DataFrame,
113    output_path: pathlib.Path,
114) -> None:
115    """Export extracted features to a CSV file.
116
117    Args:
118        features_df: DataFrame containing all metadata and features.
119        output_path: Path to the output CSV file.
120    """
121    if not output_path.suffix:
122        if not output_path.exists():
123            logger.debug(f"Creating directory that doesn't exist: {output_path}")
124            output_path.mkdir(parents=True)
125        if features_df.shape[0] == 1:
126            filename = (
127                f"{features_df['participant_id'].iloc[0]}_"
128                f"{features_df['task'].iloc[0]}_"
129                f"{features_df['hand'].iloc[0]}_features_"
130            )
131        else:
132            filename = "batch_features_"
133        output_file = (
134            output_path
135            / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv"
136        )
137    else:
138        parent_dir = output_path.parent
139        if not parent_dir.exists():
140            logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}")
141            parent_dir.mkdir(parents=True)
142        output_file = output_path
143
144    logger.debug(f"Saving extracted features to {output_file}")
145
146    if output_file.exists():
147        logger.debug(f"Overwriting existing file: {output_file}")
148
149    try:
150        features_df.to_csv(output_file)
151        logger.info(f"Features saved successfully to {output_file}")
152    except Exception as e:
153        logger.warning(f"Failed to save features to {output_file}: {str(e)}")
154
155
156def _run_file(
157    input_path: pathlib.Path,
158    feature_categories: list[str],
159    spiral_config: config.SpiralConfig,
160) -> dict[str, str]:
161    """Process a single file for feature extraction.
162
163    Args:
164        input_path: Path to the input CSV file containing spiral drawing data.
165        feature_categories: List of feature categories to extract.
166        spiral_config: Configuration for spiral parameters.
167
168    Returns:
169        Dictionary containing the extracted features with metadata.
170    """
171    spiral = reader.load_spiral(input_path)
172    centered_spiral = center_spiral.center_spiral(spiral)
173    reference_spiral = generate_reference_spiral.generate_reference_spiral(
174        spiral_config
175    )
176    centered_reference_spiral = center_spiral.center_spiral(reference_spiral)
177
178    return extract_features(
179        centered_spiral, feature_categories, centered_reference_spiral
180    )
181
182
183def _run_directory(
184    input_path: pathlib.Path,
185    feature_categories: list[str],
186    spiral_config: config.SpiralConfig,
187) -> list[dict[str, str]]:
188    """Process all CSV files in a directory and its subdirectories.
189
190    Args:
191        input_path: Path to the input directory containing CSV files.
192        feature_categories: List of feature categories to extract.
193        spiral_config: Configuration for spiral parameters.
194
195    Returns:
196        List of dictionaries, each containing extracted features with metadata
197        for one processed file.
198
199    Raises:
200        ValueError: If no CSV files are found in the directory.
201    """
202    csv_files = list(input_path.rglob("*.csv"))
203
204    if not csv_files:
205        error_msg = f"No CSV files found in directory: {input_path}"
206        logger.error(error_msg)
207        raise ValueError(error_msg)
208
209    logger.debug(f"Found {len(csv_files)} CSV files to process")
210
211    results: list[dict[str, str]] = []
212    failed_files: list[str] = []
213
214    progress_bar = tqdm.tqdm(
215        csv_files,
216        desc="Processing files",
217        unit="file",
218        bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} "
219        "[{elapsed}<{remaining}, {rate_fmt}]",
220    )
221
222    for file_index, csv_file in enumerate(progress_bar, 1):
223        try:
224            progress_bar.set_postfix({"Current": csv_file.name})
225            logger.debug(
226                f"Processing file {csv_file.name} ({file_index}/{len(csv_files)})"
227            )
228            features = _run_file(csv_file, feature_categories, spiral_config)
229            results.append(features)
230            logger.debug(f"Successfully processed {csv_file.name}")
231        except Exception as e:
232            logger.warning(f"Failed to process {csv_file.name}: {str(e)}")
233            failed_files.append(csv_file.name)
234            continue
235
236    if not results:
237        error_msg = "Could not extract features from any file in the directory."
238        logger.error(error_msg)
239        raise ValueError(error_msg)
240
241    if failed_files:
242        logger.warning(f"Failed to process {len(failed_files)} files")
243
244    return results
245
246
247def run_pipeline(
248    input_path: pathlib.Path | str,
249    output_path: pathlib.Path | str | None = None,
250    feature_categories: list[FeatureCategories] | None = None,
251    config_params: dict[ConfigParams, float | int] | None = None,
252    verbosity: int | None = None,
253) -> pd.DataFrame:
254    """Run the Graphomotor pipeline to extract features from spiral drawing data.
255
256    Supports both single-file and batch (directory) processing.
257
258    Args:
259        input_path: Path to a CSV file (single-file mode) or a directory containing CSV
260            files (batch mode).
261        output_path: Path to save extracted features. If specifying a file, the path
262            must have a `.csv` extension.
263            - If None, features are not saved.
264            - If path has a CSV file extension, features are saved to that file.
265            - If path is a directory, features are saved to a CSV file with a custom
266              name and timestamp.
267        feature_categories: List of feature categories to extract. If None, defaults to
268            all available categories. Supported categories are:
269            - "duration": Task duration.
270            - "velocity": Velocity-based metrics.
271            - "hausdorff": Hausdorff distance metrics.
272            - "AUC": Area under the curve metric.
273        config_params: Dictionary of custom spiral configuration parameters for
274            reference spiral generation and centering. If None, default configuration is
275            used. Supported parameters are:
276            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
277            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
278            - "start_radius" (float): Starting radius of the spiral. Default is 0.
279            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
280            - "start_angle" (float): Starting angle of the spiral. Default is 0.
281            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
282            - "num_points" (int): Number of points in the spiral. Default is 10000.
283        verbosity: Logging verbosity level. If None, uses current logger level.
284            - 0: WARNING level (quiet - only warnings and errors)
285            - 1: INFO level (normal - includes info messages)
286            - 2: DEBUG level (verbose - includes debug messages)
287
288    Returns:
289        DataFrame containing the metadata and extracted features.
290
291    Raises:
292        ValueError: If the input path does not exist or is not a file/directory, if the
293            output path does not have a .csv extension, or if no valid feature
294            categories are provided.
295    """
296    start_time = time.time()
297
298    if verbosity:
299        config.set_verbosity_level(verbosity)
300
301    logger.debug("Starting Graphomotor pipeline")
302
303    input_path = pathlib.Path(input_path)
304
305    if not input_path.exists() or (
306        not input_path.is_file() and not input_path.is_dir()
307    ):
308        error_msg = (
309            f"Input path does not exist or is not a file/directory: {input_path}"
310        )
311        logger.error(error_msg)
312        raise ValueError(error_msg)
313    logger.debug(f"Input path: {input_path}")
314
315    if output_path:
316        output_path = pathlib.Path(output_path)
317        if output_path.suffix and output_path.suffix.lower() != ".csv":
318            error_msg = (
319                f"Output file must have a .csv extension, got: {output_path.suffix}"
320            )
321            logger.error(error_msg)
322            raise ValueError(error_msg)
323        logger.debug(f"Output path: {output_path}")
324
325    if feature_categories:
326        valid_categories = sorted(_validate_feature_categories(feature_categories))
327        logger.debug(f"Requested feature categories: {valid_categories}")
328    else:
329        valid_categories = [*models.FeatureCategories.all()]
330        logger.debug(f"Using default feature categories: {valid_categories}")
331
332    if config_params and config_params != dataclasses.asdict(config.SpiralConfig()):
333        logger.debug(f"Custom spiral configuration: {config_params}")
334        spiral_config = config.SpiralConfig.add_custom_params(
335            typing.cast(dict, config_params)
336        )
337    else:
338        spiral_config = config.SpiralConfig()
339        logger.debug(
340            f"Using default spiral configuration: {dataclasses.asdict(spiral_config)}"
341        )
342
343    if input_path.is_file():
344        logger.debug("Processing single file")
345        features = [_run_file(input_path, valid_categories, spiral_config)]
346        logger.debug(
347            "Single file processing complete, "
348            f"successfully extracted {len(features[0]) - 5} features"
349        )
350    else:
351        logger.debug("Processing directory")
352        features = _run_directory(input_path, valid_categories, spiral_config)
353        logger.debug(
354            f"Batch processing complete, successfully processed {len(features)} files"
355        )
356
357    features_df = pd.DataFrame(features)
358    features_df = features_df.set_index("source_file")
359
360    if output_path:
361        export_features_to_csv(features_df, output_path)
362
363    logger.info(
364        "Graphomotor pipeline completed successfully in "
365        f"{time.time() - start_time:.2f} seconds"
366    )
367
368    return features_df
logger = <Logger graphomotor (WARNING)>
FeatureCategories = typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
ConfigParams = typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points']
def extract_features( spiral: graphomotor.core.models.Spiral, feature_categories: list[str], reference_spiral: numpy.ndarray) -> dict[str, str]:
 68def extract_features(
 69    spiral: models.Spiral,
 70    feature_categories: list[str],
 71    reference_spiral: np.ndarray,
 72) -> dict[str, str]:
 73    """Extract feature categories from spiral drawing data.
 74
 75    This function chooses which feature categories to extract based on the provided
 76    sequence of valid category names and returns a dictionary containing the extracted
 77    features with metadata.
 78
 79    Args:
 80        spiral: Spiral object containing drawing data and metadata.
 81        feature_categories: List of feature categories to extract.
 82        reference_spiral: Reference spiral for comparison.
 83
 84    Returns:
 85        Dictionary containing the extracted features with metadata.
 86    """
 87    feature_extractors = models.FeatureCategories.get_extractors(
 88        spiral, reference_spiral
 89    )
 90
 91    features: dict[str, float] = {}
 92    for category in feature_categories:
 93        logger.debug(f"Extracting {category} features")
 94        category_features = feature_extractors[category]()
 95        features.update(category_features)
 96        logger.debug(f"{category} features extracted")
 97
 98    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
 99
100    formatted_features_with_metadata = {
101        "source_file": str(spiral.metadata.get("source_path")),
102        "participant_id": str(spiral.metadata.get("id")),
103        "task": str(spiral.metadata.get("task")),
104        "hand": str(spiral.metadata.get("hand")),
105        "start_time": str(spiral.metadata.get("start_time")),
106        **formatted_features,
107    }
108
109    return formatted_features_with_metadata

Extract feature categories from spiral drawing data.

This function chooses which feature categories to extract based on the provided sequence of valid category names and returns a dictionary containing the extracted features with metadata.

Arguments:
  • spiral: Spiral object containing drawing data and metadata.
  • feature_categories: List of feature categories to extract.
  • reference_spiral: Reference spiral for comparison.
Returns:

Dictionary containing the extracted features with metadata.

def export_features_to_csv( features_df: pandas.core.frame.DataFrame, output_path: pathlib._local.Path) -> None:
112def export_features_to_csv(
113    features_df: pd.DataFrame,
114    output_path: pathlib.Path,
115) -> None:
116    """Export extracted features to a CSV file.
117
118    Args:
119        features_df: DataFrame containing all metadata and features.
120        output_path: Path to the output CSV file.
121    """
122    if not output_path.suffix:
123        if not output_path.exists():
124            logger.debug(f"Creating directory that doesn't exist: {output_path}")
125            output_path.mkdir(parents=True)
126        if features_df.shape[0] == 1:
127            filename = (
128                f"{features_df['participant_id'].iloc[0]}_"
129                f"{features_df['task'].iloc[0]}_"
130                f"{features_df['hand'].iloc[0]}_features_"
131            )
132        else:
133            filename = "batch_features_"
134        output_file = (
135            output_path
136            / f"{filename}{datetime.datetime.now().strftime('%Y%m%d_%H%M')}.csv"
137        )
138    else:
139        parent_dir = output_path.parent
140        if not parent_dir.exists():
141            logger.debug(f"Creating parent directory that doesn't exist: {parent_dir}")
142            parent_dir.mkdir(parents=True)
143        output_file = output_path
144
145    logger.debug(f"Saving extracted features to {output_file}")
146
147    if output_file.exists():
148        logger.debug(f"Overwriting existing file: {output_file}")
149
150    try:
151        features_df.to_csv(output_file)
152        logger.info(f"Features saved successfully to {output_file}")
153    except Exception as e:
154        logger.warning(f"Failed to save features to {output_file}: {str(e)}")

Export extracted features to a CSV file.

Arguments:
  • features_df: DataFrame containing all metadata and features.
  • output_path: Path to the output CSV file.
def run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] | None = None, config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None, verbosity: int | None = None) -> pandas.core.frame.DataFrame:
248def run_pipeline(
249    input_path: pathlib.Path | str,
250    output_path: pathlib.Path | str | None = None,
251    feature_categories: list[FeatureCategories] | None = None,
252    config_params: dict[ConfigParams, float | int] | None = None,
253    verbosity: int | None = None,
254) -> pd.DataFrame:
255    """Run the Graphomotor pipeline to extract features from spiral drawing data.
256
257    Supports both single-file and batch (directory) processing.
258
259    Args:
260        input_path: Path to a CSV file (single-file mode) or a directory containing CSV
261            files (batch mode).
262        output_path: Path to save extracted features. If specifying a file, the path
263            must have a `.csv` extension.
264            - If None, features are not saved.
265            - If path has a CSV file extension, features are saved to that file.
266            - If path is a directory, features are saved to a CSV file with a custom
267              name and timestamp.
268        feature_categories: List of feature categories to extract. If None, defaults to
269            all available categories. Supported categories are:
270            - "duration": Task duration.
271            - "velocity": Velocity-based metrics.
272            - "hausdorff": Hausdorff distance metrics.
273            - "AUC": Area under the curve metric.
274        config_params: Dictionary of custom spiral configuration parameters for
275            reference spiral generation and centering. If None, default configuration is
276            used. Supported parameters are:
277            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
278            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
279            - "start_radius" (float): Starting radius of the spiral. Default is 0.
280            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
281            - "start_angle" (float): Starting angle of the spiral. Default is 0.
282            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
283            - "num_points" (int): Number of points in the spiral. Default is 10000.
284        verbosity: Logging verbosity level. If None, uses current logger level.
285            - 0: WARNING level (quiet - only warnings and errors)
286            - 1: INFO level (normal - includes info messages)
287            - 2: DEBUG level (verbose - includes debug messages)
288
289    Returns:
290        DataFrame containing the metadata and extracted features.
291
292    Raises:
293        ValueError: If the input path does not exist or is not a file/directory, if the
294            output path does not have a .csv extension, or if no valid feature
295            categories are provided.
296    """
297    start_time = time.time()
298
299    if verbosity:
300        config.set_verbosity_level(verbosity)
301
302    logger.debug("Starting Graphomotor pipeline")
303
304    input_path = pathlib.Path(input_path)
305
306    if not input_path.exists() or (
307        not input_path.is_file() and not input_path.is_dir()
308    ):
309        error_msg = (
310            f"Input path does not exist or is not a file/directory: {input_path}"
311        )
312        logger.error(error_msg)
313        raise ValueError(error_msg)
314    logger.debug(f"Input path: {input_path}")
315
316    if output_path:
317        output_path = pathlib.Path(output_path)
318        if output_path.suffix and output_path.suffix.lower() != ".csv":
319            error_msg = (
320                f"Output file must have a .csv extension, got: {output_path.suffix}"
321            )
322            logger.error(error_msg)
323            raise ValueError(error_msg)
324        logger.debug(f"Output path: {output_path}")
325
326    if feature_categories:
327        valid_categories = sorted(_validate_feature_categories(feature_categories))
328        logger.debug(f"Requested feature categories: {valid_categories}")
329    else:
330        valid_categories = [*models.FeatureCategories.all()]
331        logger.debug(f"Using default feature categories: {valid_categories}")
332
333    if config_params and config_params != dataclasses.asdict(config.SpiralConfig()):
334        logger.debug(f"Custom spiral configuration: {config_params}")
335        spiral_config = config.SpiralConfig.add_custom_params(
336            typing.cast(dict, config_params)
337        )
338    else:
339        spiral_config = config.SpiralConfig()
340        logger.debug(
341            f"Using default spiral configuration: {dataclasses.asdict(spiral_config)}"
342        )
343
344    if input_path.is_file():
345        logger.debug("Processing single file")
346        features = [_run_file(input_path, valid_categories, spiral_config)]
347        logger.debug(
348            "Single file processing complete, "
349            f"successfully extracted {len(features[0]) - 5} features"
350        )
351    else:
352        logger.debug("Processing directory")
353        features = _run_directory(input_path, valid_categories, spiral_config)
354        logger.debug(
355            f"Batch processing complete, successfully processed {len(features)} files"
356        )
357
358    features_df = pd.DataFrame(features)
359    features_df = features_df.set_index("source_file")
360
361    if output_path:
362        export_features_to_csv(features_df, output_path)
363
364    logger.info(
365        "Graphomotor pipeline completed successfully in "
366        f"{time.time() - start_time:.2f} seconds"
367    )
368
369    return features_df

Run the Graphomotor pipeline to extract features from spiral drawing data.

Supports both single-file and batch (directory) processing.

Arguments:
  • input_path: Path to a CSV file (single-file mode) or a directory containing CSV files (batch mode).
  • output_path: Path to save extracted features. If specifying a file, the path must have a .csv extension.
    • If None, features are not saved.
    • If path has a CSV file extension, features are saved to that file.
    • If path is a directory, features are saved to a CSV file with a custom name and timestamp.
  • feature_categories: List of feature categories to extract. If None, defaults to all available categories. Supported categories are:
    • "duration": Task duration.
    • "velocity": Velocity-based metrics.
    • "hausdorff": Hausdorff distance metrics.
    • "AUC": Area under the curve metric.
  • config_params: Dictionary of custom spiral configuration parameters for reference spiral generation and centering. If None, default configuration is used. Supported parameters are:
    • "center_x" (float): X-coordinate of the spiral center. Default is 50.
    • "center_y" (float): Y-coordinate of the spiral center. Default is 50.
    • "start_radius" (float): Starting radius of the spiral. Default is 0.
    • "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
    • "start_angle" (float): Starting angle of the spiral. Default is 0.
    • "end_angle" (float): Ending angle of the spiral. Default is 8π.
    • "num_points" (int): Number of points in the spiral. Default is 10000.
  • verbosity: Logging verbosity level. If None, uses current logger level.
    • 0: WARNING level (quiet - only warnings and errors)
    • 1: INFO level (normal - includes info messages)
    • 2: DEBUG level (verbose - includes debug messages)
Returns:

DataFrame containing the metadata and extracted features.

Raises:
  • ValueError: If the input path does not exist or is not a file/directory, if the output path does not have a .csv extension, or if no valid feature categories are provided.