Skip to content

API Reference#

Auto-generated reference documentation for Scythe's public Python API.

scythe.base#

Core base classes for experiment input and output schemas.

base #

Models for Simulation Specifications.

BaseSpec #

Bases: FileReferenceMixin

A base spec for running a simulation.

The main features are utilities to fetch files from uris and generate a locally scoped path for the files. according to the experiment_id.

local_path(pth) #

Return the local path of a uri scoped to the experiment_id.

Note that this should only be used for non-ephemeral files.

Parameters:

Name Type Description Default
pth AnyUrl

The uri to convert to a local path

required

Returns:

Name Type Description
local_path Path

The local path of the uri

fetch_uri(uri, use_cache=True) #

Fetch a file from a uri and return the local path.

Parameters:

Name Type Description Default
uri AnyUrl

The uri to fetch

required
use_cache bool

Whether to use the cache

True

Returns:

Name Type Description
local_path Path

The local path of the fetched file

ExperimentIndexNotSerializableError #

Bases: Exception

An error for when an experiment index is not serializable.

__init__(cls) #

Initialize the error.

ExperimentIndexAdditionalDataDoesNotMatchNRowsError #

Bases: Exception

An error for when the additional index data does not match the number of rows.

__init__(n_rows, additional_index_data) #

Initialize the error.

ExperimentIndexAdditionalDataOverlapsWithSpecError #

Bases: Exception

An error for when the additional index data overlaps with the spec.

__init__(overlapping_keys) #

Initialize the error.

ExperimentInputSpec #

Bases: BaseSpec

A spec for running a leaf workflow.

computed_features property #

Scalar features merged into :meth:make_multiindex for combined result frames.

Override in subclasses to attach derived index levels (e.g. labels from other fields). Keys must not overlap Pydantic fields included in the index dump.

prefix property #

Get the scoped key for the spec.

make_multiindex(additional_index_data=None, n_rows=1, include_sort_subindex=True, additional_excludes=frozenset()) #

Make a MultiIndex from the Spec, and any other methods which might create index data.

Note that index data should generally be considered as features or inputs, rather than outputs.

TODO: Feel free to add more args to this method if more values need to be computed.

Returns:

Name Type Description
multi_index MultiIndex

The MultiIndex.

construct_output_key(field_name, fpath) #

Construct an output key for a file.

ScalarInDataframesError #

Bases: Exception

An error for when a scalar is in the dataframes.

__init__(scalar) #

Initialize the error.

ResultFileRefsInDataframesError #

Bases: Exception

An error for when a result file ref is in the dataframes.

__init__(result_file_refs) #

Initialize the error.

ExperimentOutputSpec #

Bases: FileReferenceMixin

A spec for the output of a leaf workflow.

validate_dataframes(v) #

Validate the dataframes via deserialization.

serialize_dataframes(v) #

Serialize the dataframes via serialization.

options: members: - BaseSpec - ExperimentInputSpec - ExperimentOutputSpec - ComputedFeatureValue show_root_heading: false

scythe.registry#

Experiment registration and middleware.

registry #

Register experiments with Scythe.

ExperimentTypeNotFound #

Bases: Exception

An experiment was not found.

__init__(name) #

Initialize the exception.

ExperimentTypeExists #

Bases: Exception

An experiment type already exists.

__init__(name) #

Initialize the exception.

ExperimentFunctionWithoutTempdir #

Bases: Protocol[TInput, TOutput]

A function that can run experiments, with or without tempdir support.

__call__(input_spec) #

Invoke the experiment function with flexible signature.

__name__() #

The name of the experiment function.

ExperimentFunctionWithTempdir #

Bases: Protocol[TInput, TOutput]

A function that can run experiments, with tempdir support.

__call__(input_spec, tempdir) #

Invoke the experiment function with flexible signature.

__name__() #

The name of the experiment function.

ExperimentRegistry #

An experiment registry for standalone task steps.

Include(task) classmethod #
Include(
    task: Standalone[TInput, TOutput],
) -> ExperimentStandaloneType
Include(task: Workflow[TInput]) -> ExperimentWorkflowType

Register an experiment whose input and output schemas conform to the base types.

Register(*, worker=None, description=None, name=None, desired_worker_labels=None, schedule_timeout=timeout_settings.EXPERIMENT_SCHEDULE, execution_timeout=timeout_settings.EXPERIMENT_EXECUTION, retries=1, inject_workflow_run_id=True, auto_fetch_files=True, local_file_location='cache', **task_config) classmethod #

Decorator to make a standalone experiment from a function.

Usage

@ExperimentRegistry.Register(description="desc", ...) def my_experiment(input_spec: MyInput) -> MyOutput: ...

get_runnable(name) classmethod #

Get an experiment's Hatchet Stanadalone.

experiments() classmethod #

Get all experiments.

options: members: - ExperimentRegistry show_root_heading: false

scythe.experiments#

Experiment allocation, versioning, and result retrieval.

experiments #

Allocate an experiment to a workflow run.

ExperimentSpecsMismatchError #

Bases: Exception

An error raised when the specs for an experiment do not match the expected type.

__init__(expected_type, actual_type) #

Initialize the error.

DuplicateInputArtifactsError #

Bases: Exception

An error raised when a file is duplicated in the input artifacts.

__init__(file_name, field_name) #

Initialize the error.

BaseNotFoundError #

Bases: Exception

An error raised when a resource is not found.

ExperimentNotFoundError #

Bases: BaseNotFoundError

An error raised when an experiment is not found.

__init__(experiment_name) #

Initialize the error.

ExperimentVersionNotFoundError #

Bases: BaseNotFoundError

An error raised when an experiment version is not found.

__init__(experiment_name, version) #

Initialize the error.

ExperimentRunNotFoundError #

Bases: BaseNotFoundError

An error raised when an experiment run is not found.

__init__(experiment_name, version, run_name) #

Initialize the error.

InputArtifactLocations #

Bases: BaseModel

The locations of the input artifacts for an experiment.

ExperimentRunManifest #

Bases: BaseModel

The manifest for an experiment run.

SemVer #

Bases: BaseModel

A semantic version.

FromString(version) classmethod #

Parse a semantic version from a string.

__lt__(other) #

Compare two semantic versions.

__le__(other) #

Compare two semantic versions.

__gt__(other) #

Compare two semantic versions.

__ge__(other) #

Compare two semantic versions.

next_major_version() #

Get the next major version.

next_minor_version() #

Get the next minor version.

next_patch_version() #

Get the next patch version.

__str__() #

Get the string representation of the semantic version.

SerializableRunnable #

Bases: BaseModel, Generic[TInput, TOutput]

A serializable runnable.

__deepcopy__(memo=None) #

Deep copy the runnable which is not copyable.

get_runnable_from_str(v) #

Get the runnable from a string.

serialize_runnable(v, b) #

Serialize the runnable to a string.

BaseExperiment #

Bases: SerializableRunnable[TInput, TOutput]

A base experiment.

base_id property #

The base experiment id.

prefix property #

The prefix for the experiment.

latest_run property #

The latest run for the experiment.

latest_results property #

The latest results for the experiment.

list_versions(s3_client=None) #

Get all of the versions of the experiment.

latest_version(s3_client=None, from_cache=True) #

Get the latest version of the experiment.

latest_results_for_version(version) #

The latest results for a given version.

resolve_next_version(version, s3_client=None) #

Resolve the next version of the experiment.

check_spec_type(spec) #

Check that the type of the spec matches the expected type.

check_spec_types(specs) #

Check that the types of the specs match the expected type.

allocate(specs, version='bumpmajor', overwrite_sort_index=True, overwrite_experiment_id=True, recursion_map=None, s3_client=None) #
allocate(
    specs: Sequence[TInput],
    version: SemVer | VersioningStrategy = "bumpmajor",
    overwrite_sort_index: bool = True,
    overwrite_experiment_id: bool = True,
    recursion_map: RecursionMap | None = None,
    s3_client: S3Client | None = None,
) -> tuple[
    ExperimentRun,
    TaskRunRef[ScatterGatherInput, ScatterGatherResult],
]
allocate(
    specs: TInput,
    version: SemVer | VersioningStrategy = "bumpmajor",
    overwrite_sort_index: bool = True,
    overwrite_experiment_id: bool = True,
    recursion_map: RecursionMap | None = None,
    s3_client: S3Client | None = None,
) -> tuple[ExperimentRun, WorkflowRunRef]

Allocate an experiment to a workflow run.

VersionedExperiment #

Bases: BaseModel, Generic[TInput, TOutput]

A versioned experiment.

base_id property #

The base id for the versioned experiment.

prefix property #

The prefix for the versioned experiment.

latest_run property #

The latest run for the versioned experiment.

latest_run_results property #

The latest run results for the versioned experiment.

list_runs(s3_client=None) #

List all of the runs for the versioned experiment.

ExperimentRun #

Bases: BaseModel, Generic[TInput, TOutput]

An experiment run.

dt_str property #

The timestamp as a string.

experiment_id property #

The base id for the experiment run.

prefix property #

The prefix for the run.

artifact_prefix property #

The prefix for the artifacts for the run.

specs_filename property #

The filename for the specs file.

specs_filekey property #

The key for the specs file.

manifest_filekey property #

The key for the manifest file.

io_spec_filekey property #

The key for the io spec file.

input_artifacts_filekey property #

The key for the input artifacts file.

workflow_spec_filekey property #

The key for the workflow spec file.

final_results_dirkey property #

The key for the final results directory.

scalars_filekey property #

The key for the scalars file.

construct_artifact_key(field_name, file_name) #

Construct the key for an artifact.

construct_specs_filekey(filename) #

Construct the key for the specs file.

as_uri(key) #

Convert a key to a uri.

construct_manifest(workflow_run_id) #

The manifest for the experiment run.

overwrite_spec_meta(specs, overwrite_experiment_id=True, overwrite_sort_index=True) #

Overwrite the metadata for the specs.

list_results_files(s3_client=None) #

List the results files for the experiment run.

upload_input_artifacts(input_artifacts, s3_client, bucket, construct_artifact_key) #

Upload source files to S3.

options: members: - SerializableRunnable - BaseExperiment - VersionedExperiment - ExperimentRun - SemVer - ExperimentRunManifest - InputArtifactLocations show_root_heading: false

scythe.scatter_gather#

Recursive scatter/gather workflow implementation.

scatter_gather #

Fanout Handling.

RecursionSpec #

Bases: BaseModel

A spec for recursive calls.

validate_offset_less_than_factor(values) classmethod #

Validate that the offset is less than the factor.

RecursionMap #

Bases: BaseModel

A map of recursion specs to use in recursive calls.

This allows a recursion node to understand where it is in the recursion tree and how to behave.

is_root property #

Check if the recursion map is the root.

validate_path_is_length_ge_1(values) classmethod #

Validate that the path is at least length 1.

GatheredExperimentRuns #

Bases: BaseModel

A class for gathering experiment results.

ScatterGatherInput #

Bases: BaseSpec

Input for the scatter gather workflow.

standalone property #

Get the experiment standalone.

specs cached property #

Fetch the specs and convert to the input type.

is_root property #

Check if the current payload is a root, i.e. the original call.

is_base_case property #

Check if the current payload is a base case, i.e. no recursion needed.

construct_filekey(filename, *, mode, workflow_run_id, suffix) #

Cosntruct an output key for the scatter gather workflow.

add_root_workflow_run_id(root_workflow_run_id) #

Add the root workflow run id to the specs.

run_experiments() async #

Run the actual experiments and collect results.

create_recursion_payloads(parent_workflow_run_id) #

Splits up the specs into a list of children scatter gather payloads for recursion.

run_or_recurse(ctx) async #

Run the experiments if not a base case, otherwise recurse.

ScatterGatherResult #

Bases: BaseModel

The result of the scatter gather workflow.

to_gathered_experiment_runs() #

Convert the scatter gather result to a gathered experiment runs.

scatter_gather(payload, ctx) async #

Run the scatter gather workflow.

sift_results(spec_data, results, make_error_specs=False) #

Sift results into safe and errored results.

Parameters:

Name Type Description Default
spec_data list[ZipDataContent]

The list of spec data.

required
results list[ResultDataContent | BaseException]

The list of results.

required
make_error_specs bool

Whether or not create error dataframe.

False

Returns:

Name Type Description
safe_results list[ResultDataContent]

The list of safe results.

errors DataFrame

The DataFrame of errored results.

combine_experiment_outputs(experiment_outputs) #

Combine a list of experiment outputs into a single experiment output.

options: members: - RecursionMap - RecursionSpec - ScatterGatherInput - ScatterGatherResult - GatheredExperimentRuns show_root_heading: false

scythe.worker#

Worker configuration and startup.

worker #

Worker for Scythe.

ScytheWorkerLabel #

Bases: StrEnum

Label keys used by Scythe workers for task affinity.

Use these when specifying desired_worker_labels on experiments so tasks are routed to workers with matching capabilities. For example::

from scythe.worker import ScytheWorkerLabel from hatchet_sdk.labels import DesiredWorkerLabel

@ExperimentRegistry.Register(
    desired_worker_labels=ScytheWorkerLabel.HAS_GPU.worker_label
)
def my_gpu_experiment(input_spec: MyInput) -> MyOutput:
    ...

Or for attaching multiple labels:

@ExperimentRegistry.Register(
    desired_worker_labels={**ScytheWorkerLabel.HAS_GPU.worker_label, **ScytheWorkerLabel.HIGH_MEMORY.worker_label}
)
def my_gpu_experiment(input_spec: MyInput) -> MyOutput:
    ...
worker_label property #

Return the worker label.

yes property #

Return the yes value for the worker label.

WorkerNameConfig #

Bases: BaseSettings

Configuration for the worker name.

in_aws_batch property #

Return whether the worker is running in AWS Batch.

in_aws_copilot property #

Return whether the worker is running in AWS Copilot.

in_aws property #

Return whether the worker is running in AWS.

aws_hosting_str property #

Return the AWS hosting string for the worker.

in_fly property #

Return whether the worker is running in Fly.io.

fly_hosting_str property #

Return the Fly hosting string for the worker.

in_local property #

Return whether the worker is running locally.

hosting_str property #

Return the hosting string for the worker.

name property #

Return the name of the worker.

ScytheWorkerConfig #

Bases: BaseSettings

Configuration for the Scythe worker.

labels property #

Return the labels for the worker.

computed_slots property #

Return the number of slots for the worker.

computed_durable_slots property #

Return the number of durable slots for the worker.

computed_name property #

Return the name of the worker.

start(experiments=None, additional_workflows=None) #

Make a worker.

options: members: - ScytheWorkerLabel - ScytheWorkerConfig - WorkerNameConfig show_root_heading: false

scythe.settings#

Environment-based settings for storage and timeouts.

settings #

A module for Scythe's settings.

TimeoutSettings #

Bases: BaseSettings

A class for Scythe's settings.

ScytheStorageSettings #

Bases: BaseSettings

Storage Settings.

options: members: - ScytheStorageSettings - TimeoutSettings show_root_heading: false

scythe.utils.filesys#

File reference types and URI fetching utilities.

filesys #

Filesystem utilities.

S3Url #

Bases: AnyUrl

A URL for an S3 object.

FileReferenceMixin #

Bases: BaseModel

A mixin for file reference fields.

remote_artifact_file_paths property #

Get the remote source file paths.

fetch_uri(uri, local_path, use_cache=True, s3=s3) #

Fetch a file from a uri and return the local path.

Caching is enabled by default and works by checking if the file exists locally before downloading it to avoid downloading the same file multiple times.

Parameters:

Name Type Description Default
uri AnyUrl

The uri to fetch

required
local_path Path

The local path to save the fetched file

required
use_cache bool

Whether to use the cache

True
s3 S3Client

The S3 client to use

s3_client

Returns:

Name Type Description
local_path Path

The local path of the fetched file

options: members: - FileReference - OptionalFileReference - S3Url - FileReferenceMixin - fetch_uri show_root_heading: false

scythe.utils.results#

DataFrame serialization and S3 upload helpers.

results #

This module contains functions to postprocess and serialize results.

serialize_df_dict(dfs) #

Serialize a dictionary of dataframes into a dictionary of dictionaries.

Parameters:

Name Type Description Default
dfs dict[str, DataFrame]

A dictionary of dataframes

required

Returns:

Type Description
dict[str, dict]

dict[str, dict]: A dictionary of dictionaries

transpose_dataframe_dict(dataframe_results) #

Transpose a list of dictionaries of dataframes into a dictionary of combined dataframes.

make_onerow_multiindex_from_dict(d, n_rows=1) #

Makes a MultiIndex from a dictionary.

This is useful for returning a wide-form dataframe of results for a single task.

Parameters:

Name Type Description Default
d dict[str, Any]

The dictionary to make the MultiIndex from.

required
n_rows int

The number of rows to repeat the MultiIndex.

1

Returns:

Name Type Description
multi_index MultiIndex

The MultiIndex.

save_and_upload_parquets(collected_dfs, s3, bucket, output_key_constructor, save_errors=False) #

Save and upload results to s3.

options: show_root_heading: false

scythe.utils.s3#

S3 client utilities.

s3 #

Utilities for interacting with S3.

check_experiment_exists(s3, bucket, bucket_prefix, experiment_id) #

Check if an experiment exists in S3.

raise_on_forbidden_experiment(s3, bucket, bucket_prefix, experiment_id, existing_artifacts='forbid') #

Raise an error if an experiment exists in S3.

options: show_root_heading: false