graphomotor.core.orchestrator

Runner for the Graphomotor pipeline.

  1"""Runner for the Graphomotor pipeline."""
  2
  3import datetime
  4import os
  5import pathlib
  6import typing
  7
  8import numpy as np
  9import pandas as pd
 10
 11from graphomotor.core import config, models
 12from graphomotor.io import reader
 13from graphomotor.utils import center_spiral, generate_reference_spiral
 14
 15logger = config.get_logger()
 16
 17FeatureCategories = typing.Literal["duration", "velocity", "hausdorff", "AUC"]
 18
 19
 20def _ensure_path(path: pathlib.Path | str) -> pathlib.Path:
 21    """Ensure that the input is a Path object.
 22
 23    Args:
 24        path: Input path, can be string or Path
 25
 26    Returns:
 27        Path object
 28    """
 29    return pathlib.Path(path) if isinstance(path, str) else path
 30
 31
 32def _validate_feature_categories(
 33    feature_categories: list[FeatureCategories],
 34) -> set[str]:
 35    """Validate requested feature categories and return valid ones.
 36
 37    Args:
 38        feature_categories: List of feature categories to validate.
 39
 40    Returns:
 41        Set of valid feature categories.
 42
 43    Raises:
 44        ValueError: If no valid feature categories are provided.
 45    """
 46    feature_categories_set: set[str] = set(feature_categories)
 47    supported_categories_set = models.FeatureCategories.all()
 48    unknown_categories = feature_categories_set - supported_categories_set
 49    valid_requested_categories = feature_categories_set & supported_categories_set
 50
 51    if unknown_categories:
 52        logger.warning(
 53            "Unknown feature categories requested, these categories will be ignored: "
 54            f"{unknown_categories}"
 55        )
 56
 57    if not valid_requested_categories:
 58        error_msg = (
 59            "No valid feature categories provided. "
 60            f"Supported categories: {supported_categories_set}"
 61        )
 62        logger.error(error_msg)
 63        raise ValueError(error_msg)
 64
 65    return valid_requested_categories
 66
 67
 68def _get_feature_categories(
 69    spiral: models.Spiral,
 70    reference_spiral: np.ndarray,
 71    feature_categories: list[FeatureCategories],
 72) -> dict[str, float]:
 73    """Feature categories dispatcher.
 74
 75    This function chooses which feature categories to extract based on the provided
 76    sequence of valid category names and returns a dictionary containing the extracted
 77    features.
 78
 79    Args:
 80        spiral: The spiral data to extract features from.
 81        reference_spiral: The reference spiral used for calculating features.
 82        feature_categories: List of feature categories to extract.
 83
 84    Returns:
 85        Dictionary containing the extracted features.
 86    """
 87    valid_categories = _validate_feature_categories(feature_categories)
 88
 89    feature_extractors = models.FeatureCategories.get_extractors(
 90        spiral, reference_spiral
 91    )
 92
 93    features = {}
 94    for category in valid_categories:
 95        logger.debug(f"Extracting {category} features")
 96        category_features = feature_extractors[category]()
 97        features.update(category_features)
 98        logger.debug(f"{category.capitalize()} features extracted: {category_features}")
 99
100    return features
101
102
103def _export_features_to_csv(
104    spiral: models.Spiral,
105    features: dict[str, str],
106    input_path: pathlib.Path,
107    output_path: pathlib.Path,
108) -> None:
109    """Export extracted features to a CSV file.
110
111    Args:
112        spiral: The spiral data used for feature extraction.
113        features: Dictionary containing the extracted features.
114        input_path: Path to the input CSV file.
115        output_path: Path to the output CSV file.
116    """
117    logger.info(f"Saving extracted features to {output_path}")
118
119    participant_id = spiral.metadata.get("id")
120    task = spiral.metadata.get("task")
121    hand = spiral.metadata.get("hand")
122
123    filename = (
124        f"{participant_id}_{task}_{hand}_features_"
125        f"{datetime.datetime.today().strftime('%Y%m%d')}.csv"
126    )
127
128    if not output_path.suffix:
129        if not os.path.exists(output_path):
130            logger.info(f"Creating directory that doesn't exist: {output_path}")
131        os.makedirs(output_path, exist_ok=True)
132        output_file = output_path / filename
133    else:
134        parent_dir = output_path.parent
135        if not os.path.exists(parent_dir):
136            logger.info(f"Creating parent directory that doesn't exist: {parent_dir}")
137        os.makedirs(parent_dir, exist_ok=True)
138        output_file = output_path
139
140    if os.path.exists(output_file):
141        logger.info(f"Overwriting existing file: {output_file}")
142
143    metadata = {
144        "participant_id": participant_id,
145        "task": task,
146        "hand": hand,
147        "source_file": str(input_path),
148    }
149
150    features_df = pd.DataFrame(
151        {
152            "variable": list(metadata.keys()) + list(features.keys()),
153            "value": list(metadata.values()) + list(features.values()),
154        }
155    )
156
157    try:
158        features_df.to_csv(output_file, index=False, header=False)
159        logger.info(f"Features saved successfully to {output_file}")
160    except Exception as e:
161        # Allowed to pass in Jupyter Notebook scenarios.
162        logger.error(f"Failed to save features to {output_file}: {str(e)}")
163
164
165def extract_features(
166    input_path: pathlib.Path | str,
167    output_path: pathlib.Path | str | None,
168    feature_categories: list[FeatureCategories],
169    spiral_config: config.SpiralConfig | None,
170) -> dict[str, str]:
171    """Extract features from spiral drawing data.
172
173    Args:
174        input_path: Path to the input CSV file containing spiral drawing data.
175        output_path: Path to the output directory for saving extracted features. If
176            None, features are not saved.
177        feature_categories: List of feature categories to extract. Valid options are:
178            - "duration": Extract task duration.
179            - "velocity": Extract velocity-based metrics.
180            - "hausdorff": Extract Hausdorff distance metrics.
181            - "AUC": Extract area under the curve metric.
182        spiral_config: Optional configuration for spiral parameters. If None, default
183            parameters are used.
184
185    Returns:
186        Dictionary containing the extracted features.
187    """
188    logger.debug(f"Loading spiral data from {input_path}")
189    input_path = _ensure_path(input_path)
190    spiral = reader.load_spiral(input_path)
191    centered_spiral = center_spiral.center_spiral(spiral)
192
193    logger.debug("Generating reference spiral to calculate features")
194    config_to_use = spiral_config or config.SpiralConfig()
195    reference_spiral = generate_reference_spiral.generate_reference_spiral(
196        config=config_to_use
197    )
198    centered_reference_spiral = center_spiral.center_spiral(reference_spiral)
199
200    features = _get_feature_categories(
201        centered_spiral, centered_reference_spiral, feature_categories
202    )
203    logger.info(f"Feature extraction complete. Extracted {len(features)} features")
204
205    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
206
207    if output_path:
208        output_path = _ensure_path(output_path)
209        _export_features_to_csv(spiral, formatted_features, input_path, output_path)
210
211    return formatted_features
212
213
214def run_pipeline(
215    input_path: pathlib.Path | str,
216    output_path: pathlib.Path | str | None = None,
217    feature_categories: list[FeatureCategories] = [
218        "duration",
219        "velocity",
220        "hausdorff",
221        "AUC",
222    ],
223    config_params: dict[
224        typing.Literal[
225            "center_x",
226            "center_y",
227            "start_radius",
228            "growth_rate",
229            "start_angle",
230            "end_angle",
231            "num_points",
232        ],
233        float | int,
234    ]
235    | None = None,
236) -> dict[str, str]:
237    """Run the Graphomotor pipeline to extract features from spiral drawings.
238
239    Args:
240        input_path: Path to the input CSV file with spiral drawing data.
241        output_path: Path to save extracted features. If None, features aren't saved. If
242            path has an extension, features are saved to that file. If path points to a
243            directory, a file is created with participant ID, task, hand, and date in
244            the filename.
245        feature_categories: Feature categories to extract. Defaults to all available
246            categories:
247            - "duration": Task duration
248            - "velocity": Velocity-based metrics
249            - "hausdorff": Hausdorff distance metrics
250            - "AUC": Area under the curve metric
251        config_params: Optional dictionary with custom spiral configuration parameters.
252            These parameters control reference spiral generation and spiral centering.
253            If None, default configuration is used. Supported parameters are:
254            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
255            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
256            - "start_radius" (float): Starting radius of the spiral. Default is 0.
257            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
258            - "start_angle" (float): Starting angle of the spiral. Default is 0.
259            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
260            - "num_points" (int): Number of points in the spiral. Default is 10000.
261
262    Returns:
263        Dictionary of extracted features.
264    """
265    logger.info("Starting Graphomotor pipeline")
266    logger.info(f"Input path: {input_path}")
267    logger.info(f"Output path: {output_path}")
268    logger.info(f"Feature categories: {feature_categories}")
269
270    spiral_config = None
271    if config_params:
272        logger.info(f"Custom spiral configuration: {config_params}")
273        spiral_config = config.SpiralConfig.add_custom_params(
274            typing.cast(dict, config_params)
275        )
276
277    features = extract_features(
278        input_path, output_path, feature_categories, spiral_config
279    )
280
281    logger.info("Graphomotor pipeline completed successfully")
282    return features
logger = <Logger graphomotor (INFO)>
FeatureCategories = typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']
def extract_features( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']], spiral_config: graphomotor.core.config.SpiralConfig | None) -> dict[str, str]:
166def extract_features(
167    input_path: pathlib.Path | str,
168    output_path: pathlib.Path | str | None,
169    feature_categories: list[FeatureCategories],
170    spiral_config: config.SpiralConfig | None,
171) -> dict[str, str]:
172    """Extract features from spiral drawing data.
173
174    Args:
175        input_path: Path to the input CSV file containing spiral drawing data.
176        output_path: Path to the output directory for saving extracted features. If
177            None, features are not saved.
178        feature_categories: List of feature categories to extract. Valid options are:
179            - "duration": Extract task duration.
180            - "velocity": Extract velocity-based metrics.
181            - "hausdorff": Extract Hausdorff distance metrics.
182            - "AUC": Extract area under the curve metric.
183        spiral_config: Optional configuration for spiral parameters. If None, default
184            parameters are used.
185
186    Returns:
187        Dictionary containing the extracted features.
188    """
189    logger.debug(f"Loading spiral data from {input_path}")
190    input_path = _ensure_path(input_path)
191    spiral = reader.load_spiral(input_path)
192    centered_spiral = center_spiral.center_spiral(spiral)
193
194    logger.debug("Generating reference spiral to calculate features")
195    config_to_use = spiral_config or config.SpiralConfig()
196    reference_spiral = generate_reference_spiral.generate_reference_spiral(
197        config=config_to_use
198    )
199    centered_reference_spiral = center_spiral.center_spiral(reference_spiral)
200
201    features = _get_feature_categories(
202        centered_spiral, centered_reference_spiral, feature_categories
203    )
204    logger.info(f"Feature extraction complete. Extracted {len(features)} features")
205
206    formatted_features = {k: f"{v:.15f}" for k, v in features.items()}
207
208    if output_path:
209        output_path = _ensure_path(output_path)
210        _export_features_to_csv(spiral, formatted_features, input_path, output_path)
211
212    return formatted_features

Extract features from spiral drawing data.

Arguments:
  • input_path: Path to the input CSV file containing spiral drawing data.
  • output_path: Path to the output directory for saving extracted features. If None, features are not saved.
  • feature_categories: List of feature categories to extract. Valid options are:
    • "duration": Extract task duration.
    • "velocity": Extract velocity-based metrics.
    • "hausdorff": Extract Hausdorff distance metrics.
    • "AUC": Extract area under the curve metric.
  • spiral_config: Optional configuration for spiral parameters. If None, default parameters are used.
Returns:

Dictionary containing the extracted features.

def run_pipeline( input_path: pathlib._local.Path | str, output_path: pathlib._local.Path | str | None = None, feature_categories: list[typing.Literal['duration', 'velocity', 'hausdorff', 'AUC']] = ['duration', 'velocity', 'hausdorff', 'AUC'], config_params: dict[typing.Literal['center_x', 'center_y', 'start_radius', 'growth_rate', 'start_angle', 'end_angle', 'num_points'], float | int] | None = None) -> dict[str, str]:
215def run_pipeline(
216    input_path: pathlib.Path | str,
217    output_path: pathlib.Path | str | None = None,
218    feature_categories: list[FeatureCategories] = [
219        "duration",
220        "velocity",
221        "hausdorff",
222        "AUC",
223    ],
224    config_params: dict[
225        typing.Literal[
226            "center_x",
227            "center_y",
228            "start_radius",
229            "growth_rate",
230            "start_angle",
231            "end_angle",
232            "num_points",
233        ],
234        float | int,
235    ]
236    | None = None,
237) -> dict[str, str]:
238    """Run the Graphomotor pipeline to extract features from spiral drawings.
239
240    Args:
241        input_path: Path to the input CSV file with spiral drawing data.
242        output_path: Path to save extracted features. If None, features aren't saved. If
243            path has an extension, features are saved to that file. If path points to a
244            directory, a file is created with participant ID, task, hand, and date in
245            the filename.
246        feature_categories: Feature categories to extract. Defaults to all available
247            categories:
248            - "duration": Task duration
249            - "velocity": Velocity-based metrics
250            - "hausdorff": Hausdorff distance metrics
251            - "AUC": Area under the curve metric
252        config_params: Optional dictionary with custom spiral configuration parameters.
253            These parameters control reference spiral generation and spiral centering.
254            If None, default configuration is used. Supported parameters are:
255            - "center_x" (float): X-coordinate of the spiral center. Default is 50.
256            - "center_y" (float): Y-coordinate of the spiral center. Default is 50.
257            - "start_radius" (float): Starting radius of the spiral. Default is 0.
258            - "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
259            - "start_angle" (float): Starting angle of the spiral. Default is 0.
260            - "end_angle" (float): Ending angle of the spiral. Default is 8π.
261            - "num_points" (int): Number of points in the spiral. Default is 10000.
262
263    Returns:
264        Dictionary of extracted features.
265    """
266    logger.info("Starting Graphomotor pipeline")
267    logger.info(f"Input path: {input_path}")
268    logger.info(f"Output path: {output_path}")
269    logger.info(f"Feature categories: {feature_categories}")
270
271    spiral_config = None
272    if config_params:
273        logger.info(f"Custom spiral configuration: {config_params}")
274        spiral_config = config.SpiralConfig.add_custom_params(
275            typing.cast(dict, config_params)
276        )
277
278    features = extract_features(
279        input_path, output_path, feature_categories, spiral_config
280    )
281
282    logger.info("Graphomotor pipeline completed successfully")
283    return features

Run the Graphomotor pipeline to extract features from spiral drawings.

Arguments:
  • input_path: Path to the input CSV file with spiral drawing data.
  • output_path: Path to save extracted features. If None, features aren't saved. If path has an extension, features are saved to that file. If path points to a directory, a file is created with participant ID, task, hand, and date in the filename.
  • feature_categories: Feature categories to extract. Defaults to all available categories:
    • "duration": Task duration
    • "velocity": Velocity-based metrics
    • "hausdorff": Hausdorff distance metrics
    • "AUC": Area under the curve metric
  • config_params: Optional dictionary with custom spiral configuration parameters. These parameters control reference spiral generation and spiral centering. If None, default configuration is used. Supported parameters are:
    • "center_x" (float): X-coordinate of the spiral center. Default is 50.
    • "center_y" (float): Y-coordinate of the spiral center. Default is 50.
    • "start_radius" (float): Starting radius of the spiral. Default is 0.
    • "growth_rate" (float): Growth rate of the spiral. Default is 1.075.
    • "start_angle" (float): Starting angle of the spiral. Default is 0.
    • "end_angle" (float): Ending angle of the spiral. Default is 8π.
    • "num_points" (int): Number of points in the spiral. Default is 10000.
Returns:

Dictionary of extracted features.