Defining Experiments#
This guide covers how to define experiment schemas and register simulation functions with Scythe.
Input Specs#
All experiment inputs inherit from ExperimentInputSpec, which itself inherits from BaseSpec:
from pydantic import Field
from scythe.base import ExperimentInputSpec
class MyInput(ExperimentInputSpec):
temperature: float = Field(..., description="Temperature [K]", ge=0)
pressure: float = Field(..., description="Pressure [Pa]", ge=0)
material: str = Field(..., description="Material identifier")
Inherited Fields#
ExperimentInputSpec provides several fields automatically:
| Field | Type | Description |
|---|---|---|
experiment_id |
str |
Fully resolved experiment identifier (set during allocation) |
sort_index |
int |
Position in the spec list (set during allocation) |
workflow_run_id |
str \| None |
Hatchet workflow run ID (set at task execution time) |
root_workflow_run_id |
str \| None |
Root scatter/gather workflow run ID |
storage_settings |
ScytheStorageSettings \| None |
S3 bucket and prefix (injected by scatter/gather) |
You do not need to set these when creating specs -- they are populated automatically during allocation and execution.
Inherited Methods#
fetch_uri(uri, use_cache=True)-- Fetch a file from S3, HTTP, or the local filesystem and return the local path. Results are cached by default.make_multiindex(n_rows=1, additional_index_data=None)-- Construct apd.MultiIndexfrom the spec fields (plus anycomputed_features), used for indexing output DataFrames.
Logging#
Scythe uses standard Python logging throughout. Recent versions of Hatchet automatically capture Python log output and forward it to the Hatchet dashboard, so no special logging setup is needed. Use a module-level logger in your experiment code:
import logging
logger = logging.getLogger(__name__)
@ExperimentRegistry.Register()
def my_simulation(input_spec: MyInput) -> MyOutput:
logger.info("Running simulation for %s", input_spec.material)
...
Computed Features#
Sometimes you need extra index levels that are derived from existing fields rather than stored as their own Pydantic fields. Override the computed_features property to return a dictionary of scalar values that will be merged into the MultiIndex automatically:
from pydantic import Field
from scythe.base import ComputedFeatureValue, ExperimentInputSpec
class MyInput(ExperimentInputSpec):
temperature: float = Field(..., description="Temperature [K]", ge=0)
pressure: float = Field(..., description="Pressure [Pa]", ge=0)
@property
def computed_features(self) -> dict[str, ComputedFeatureValue]:
return {"temp_bucket": "high" if self.temperature > 500 else "low"}
Computed feature keys must not overlap with Pydantic field names or any keys supplied via additional_index_data. Values must be int, float, or str (the ComputedFeatureValue type alias). In make_multiindex, computed features are inserted after the Pydantic field dump and before additional_index_data.
Output Specs#
Output schemas inherit from ExperimentOutputSpec:
from scythe.base import ExperimentOutputSpec
class MyOutput(ExperimentOutputSpec):
energy: float = Field(..., description="Total energy [J]", ge=0)
efficiency: float = Field(..., description="Efficiency [%]", ge=0, le=100)
Scalar fields (anything that is not a FileReference or dataframes) are automatically collected into a scalars.pq DataFrame at the experiment level.
The dataframes Field#
ExperimentOutputSpec includes a built-in dataframes: dict[str, pd.DataFrame] field for returning additional DataFrames beyond the scalar outputs. This is useful for timeseries, per-timestep metrics, or any tabular data that varies in size across runs:
import pandas as pd
results_df = pd.DataFrame({
"timestep": range(8760),
"temperature": temps,
"load": loads,
})
results_df.index = input_spec.make_multiindex(n_rows=len(results_df))
return MyOutput(
energy=total_energy,
efficiency=eff,
dataframes={"hourly": results_df},
)
Each DataFrame in dataframes is serialized to Parquet and uploaded to S3 individually per task. At the experiment level, all DataFrames with the same key are concatenated across tasks into a single Parquet file in the final/ directory.
Note
The keys scalars and result_file_refs are reserved and will raise an error if used in dataframes.
FileReference#
FileReference is a union type that accepts file paths from multiple sources:
In Input Specs#
When you include a FileReference field in your input spec:
- Local
Pathvalues are uploaded to S3 during allocation and rewritten asS3Urlreferences. This ensures workers can access the files regardless of where they run. S3UrlandHttpUrlvalues are passed through as-is.
During task execution, all remote file references are automatically fetched to a local cache before your function is called (unless you disable this with auto_fetch_files=False).
class SimInput(ExperimentInputSpec):
config_file: FileReference = Field(..., description="Simulation config")
weather_data: FileReference = Field(..., description="Weather data file")
In Output Specs#
When you include a FileReference field in your output spec:
- Local
Pathvalues (typically files written totempdir) are uploaded to S3 after your function returns. The field is rewritten with the S3 URI. - All file reference URIs across tasks are collected into
result_file_refs.pq.
class SimOutput(ExperimentOutputSpec):
report: FileReference = Field(..., description="PDF report")
raw_data: FileReference = Field(..., description="Raw output data")
Registering Experiments#
Use @ExperimentRegistry.Register() to make your function a Scythe experiment:
from scythe.registry import ExperimentRegistry
@ExperimentRegistry.Register()
def my_simulation(input_spec: MyInput) -> MyOutput:
"""Run a simulation."""
...
return MyOutput(energy=42.0, efficiency=95.0)
With a Temporary Directory#
If your simulation needs scratch space for intermediate files, add a tempdir: Path parameter:
from pathlib import Path
@ExperimentRegistry.Register()
def my_simulation(input_spec: MyInput, tempdir: Path) -> MyOutput:
"""Run a simulation that writes intermediate files."""
output_file = tempdir / "results.csv"
# ... write to output_file ...
return MyOutput(
energy=42.0,
efficiency=95.0,
report=output_file, # automatically uploaded to S3
)
The temporary directory is created before your function runs and cleaned up after results are extracted.
Register Options#
The Register decorator accepts several options:
| Parameter | Default | Description |
|---|---|---|
retries |
1 |
Number of retry attempts on failure |
schedule_timeout |
1h |
Maximum time to wait for a worker to pick up the task |
execution_timeout |
1m |
Maximum time for the task to run |
name |
scythe_experiment_<fn_name> |
Custom task name for Hatchet |
description |
Function docstring | Task description in Hatchet |
desired_worker_labels |
None |
Worker affinity labels for routing |
auto_fetch_files |
True |
Whether to automatically fetch remote files before calling your function |
local_file_location |
"cache" |
Where to store fetched files: "cache" (shared) or "copied-to-tempdir" (per-task copy) |
inject_workflow_run_id |
True |
Whether to set workflow_run_id on the input spec |
Production Example#
Here is a more realistic registration with custom timeouts and retries:
@ExperimentRegistry.Register(
retries=2,
schedule_timeout="10h",
execution_timeout="30m",
)
def simulate_building(
input_spec: BuildingSpec, tempdir: Path
) -> BuildingOutput:
"""Run a building energy simulation."""
...
Registering Workflow Runnables#
In addition to standalone functions, you can register Hatchet Workflow objects with ExperimentRegistry.Include(). This is useful for multi-step pipelines or complex DAGs:
Workflow runnables are allocated with a single spec (not a batch) and bypass the scatter/gather system entirely. For a complete guide on defining and running workflows, as well as single-spec allocation for any runnable, see Workflow & Single-Run Experiments.
Multiple Experiments#
You can define multiple experiments in your project. Each gets its own input/output types and registration. The worker will serve all registered experiments:
from experiments.building_energy import * # noqa: F403
from experiments.orbital_dynamics import * # noqa: F403
from experiments.lifespan import * # noqa: F403
from scythe.worker import ScytheWorkerConfig
from experiments import * # noqa: F403
if __name__ == "__main__":
ScytheWorkerConfig().start()
Each experiment allocates independently and writes to its own S3 directory based on the function name.