API Reference#
Auto-generated reference documentation for Scythe's public Python API.
scythe.base#
Core base classes for experiment input and output schemas.
base
#
Models for Simulation Specifications.
BaseSpec
#
Bases: FileReferenceMixin
A base spec for running a simulation.
The main features are utilities to fetch files from uris and generate a locally scoped path for the files. according to the experiment_id.
local_path(pth)
#
Return the local path of a uri scoped to the experiment_id.
Note that this should only be used for non-ephemeral files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pth
|
AnyUrl
|
The uri to convert to a local path |
required |
Returns:
| Name | Type | Description |
|---|---|---|
local_path |
Path
|
The local path of the uri |
fetch_uri(uri, use_cache=True)
#
Fetch a file from a uri and return the local path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
AnyUrl
|
The uri to fetch |
required |
use_cache
|
bool
|
Whether to use the cache |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
local_path |
Path
|
The local path of the fetched file |
ExperimentIndexNotSerializableError
#
Bases: Exception
An error for when an experiment index is not serializable.
__init__(cls)
#
Initialize the error.
ExperimentIndexAdditionalDataDoesNotMatchNRowsError
#
Bases: Exception
An error for when the additional index data does not match the number of rows.
__init__(n_rows, additional_index_data)
#
Initialize the error.
ExperimentIndexAdditionalDataOverlapsWithSpecError
#
Bases: Exception
An error for when the additional index data overlaps with the spec.
__init__(overlapping_keys)
#
Initialize the error.
ExperimentInputSpec
#
Bases: BaseSpec
A spec for running a leaf workflow.
computed_features
property
#
Scalar features merged into :meth:make_multiindex for combined result frames.
Override in subclasses to attach derived index levels (e.g. labels from other fields). Keys must not overlap Pydantic fields included in the index dump.
prefix
property
#
Get the scoped key for the spec.
make_multiindex(additional_index_data=None, n_rows=1, include_sort_subindex=True, additional_excludes=frozenset())
#
Make a MultiIndex from the Spec, and any other methods which might create index data.
Note that index data should generally be considered as features or inputs, rather than outputs.
TODO: Feel free to add more args to this method if more values need to be computed.
Returns:
| Name | Type | Description |
|---|---|---|
multi_index |
MultiIndex
|
The MultiIndex. |
construct_output_key(field_name, fpath)
#
Construct an output key for a file.
ScalarInDataframesError
#
Bases: Exception
An error for when a scalar is in the dataframes.
__init__(scalar)
#
Initialize the error.
ResultFileRefsInDataframesError
#
Bases: Exception
An error for when a result file ref is in the dataframes.
__init__(result_file_refs)
#
Initialize the error.
ExperimentOutputSpec
#
Bases: FileReferenceMixin
A spec for the output of a leaf workflow.
options: members: - BaseSpec - ExperimentInputSpec - ExperimentOutputSpec - ComputedFeatureValue show_root_heading: false
scythe.registry#
Experiment registration and middleware.
registry
#
Register experiments with Scythe.
ExperimentTypeNotFound
#
ExperimentTypeExists
#
ExperimentFunctionWithoutTempdir
#
ExperimentFunctionWithTempdir
#
ExperimentRegistry
#
An experiment registry for standalone task steps.
Include(task)
classmethod
#
Register an experiment whose input and output schemas conform to the base types.
Register(*, worker=None, description=None, name=None, desired_worker_labels=None, schedule_timeout=timeout_settings.EXPERIMENT_SCHEDULE, execution_timeout=timeout_settings.EXPERIMENT_EXECUTION, retries=1, inject_workflow_run_id=True, auto_fetch_files=True, local_file_location='cache', **task_config)
classmethod
#
Decorator to make a standalone experiment from a function.
Usage
@ExperimentRegistry.Register(description="desc", ...) def my_experiment(input_spec: MyInput) -> MyOutput: ...
get_runnable(name)
classmethod
#
Get an experiment's Hatchet Stanadalone.
experiments()
classmethod
#
Get all experiments.
options: members: - ExperimentRegistry show_root_heading: false
scythe.experiments#
Experiment allocation, versioning, and result retrieval.
experiments
#
Allocate an experiment to a workflow run.
ExperimentSpecsMismatchError
#
Bases: Exception
An error raised when the specs for an experiment do not match the expected type.
__init__(expected_type, actual_type)
#
Initialize the error.
DuplicateInputArtifactsError
#
Bases: Exception
An error raised when a file is duplicated in the input artifacts.
__init__(file_name, field_name)
#
Initialize the error.
BaseNotFoundError
#
Bases: Exception
An error raised when a resource is not found.
ExperimentNotFoundError
#
Bases: BaseNotFoundError
An error raised when an experiment is not found.
__init__(experiment_name)
#
Initialize the error.
ExperimentVersionNotFoundError
#
Bases: BaseNotFoundError
An error raised when an experiment version is not found.
__init__(experiment_name, version)
#
Initialize the error.
ExperimentRunNotFoundError
#
Bases: BaseNotFoundError
An error raised when an experiment run is not found.
__init__(experiment_name, version, run_name)
#
Initialize the error.
InputArtifactLocations
#
Bases: BaseModel
The locations of the input artifacts for an experiment.
ExperimentRunManifest
#
Bases: BaseModel
The manifest for an experiment run.
SemVer
#
Bases: BaseModel
A semantic version.
FromString(version)
classmethod
#
Parse a semantic version from a string.
__lt__(other)
#
Compare two semantic versions.
__le__(other)
#
Compare two semantic versions.
__gt__(other)
#
Compare two semantic versions.
__ge__(other)
#
Compare two semantic versions.
next_major_version()
#
Get the next major version.
next_minor_version()
#
Get the next minor version.
next_patch_version()
#
Get the next patch version.
__str__()
#
Get the string representation of the semantic version.
SerializableRunnable
#
BaseExperiment
#
Bases: SerializableRunnable[TInput, TOutput]
A base experiment.
base_id
property
#
The base experiment id.
prefix
property
#
The prefix for the experiment.
latest_run
property
#
The latest run for the experiment.
latest_results
property
#
The latest results for the experiment.
list_versions(s3_client=None)
#
Get all of the versions of the experiment.
latest_version(s3_client=None, from_cache=True)
#
Get the latest version of the experiment.
latest_results_for_version(version)
#
The latest results for a given version.
resolve_next_version(version, s3_client=None)
#
Resolve the next version of the experiment.
check_spec_type(spec)
#
Check that the type of the spec matches the expected type.
check_spec_types(specs)
#
Check that the types of the specs match the expected type.
allocate(specs, version='bumpmajor', overwrite_sort_index=True, overwrite_experiment_id=True, recursion_map=None, s3_client=None)
#
allocate(
specs: Sequence[TInput],
version: SemVer | VersioningStrategy = "bumpmajor",
overwrite_sort_index: bool = True,
overwrite_experiment_id: bool = True,
recursion_map: RecursionMap | None = None,
s3_client: S3Client | None = None,
) -> tuple[
ExperimentRun,
TaskRunRef[ScatterGatherInput, ScatterGatherResult],
]
Allocate an experiment to a workflow run.
VersionedExperiment
#
Bases: BaseModel, Generic[TInput, TOutput]
A versioned experiment.
base_id
property
#
The base id for the versioned experiment.
prefix
property
#
The prefix for the versioned experiment.
latest_run
property
#
The latest run for the versioned experiment.
latest_run_results
property
#
The latest run results for the versioned experiment.
list_runs(s3_client=None)
#
List all of the runs for the versioned experiment.
ExperimentRun
#
Bases: BaseModel, Generic[TInput, TOutput]
An experiment run.
dt_str
property
#
The timestamp as a string.
experiment_id
property
#
The base id for the experiment run.
prefix
property
#
The prefix for the run.
artifact_prefix
property
#
The prefix for the artifacts for the run.
specs_filename
property
#
The filename for the specs file.
specs_filekey
property
#
The key for the specs file.
manifest_filekey
property
#
The key for the manifest file.
io_spec_filekey
property
#
The key for the io spec file.
input_artifacts_filekey
property
#
The key for the input artifacts file.
workflow_spec_filekey
property
#
The key for the workflow spec file.
final_results_dirkey
property
#
The key for the final results directory.
scalars_filekey
property
#
The key for the scalars file.
construct_artifact_key(field_name, file_name)
#
Construct the key for an artifact.
construct_specs_filekey(filename)
#
Construct the key for the specs file.
as_uri(key)
#
Convert a key to a uri.
construct_manifest(workflow_run_id)
#
The manifest for the experiment run.
overwrite_spec_meta(specs, overwrite_experiment_id=True, overwrite_sort_index=True)
#
Overwrite the metadata for the specs.
list_results_files(s3_client=None)
#
List the results files for the experiment run.
upload_input_artifacts(input_artifacts, s3_client, bucket, construct_artifact_key)
#
Upload source files to S3.
options: members: - SerializableRunnable - BaseExperiment - VersionedExperiment - ExperimentRun - SemVer - ExperimentRunManifest - InputArtifactLocations show_root_heading: false
scythe.scatter_gather#
Recursive scatter/gather workflow implementation.
scatter_gather
#
Fanout Handling.
RecursionSpec
#
Bases: BaseModel
A spec for recursive calls.
validate_offset_less_than_factor(values)
classmethod
#
Validate that the offset is less than the factor.
RecursionMap
#
Bases: BaseModel
A map of recursion specs to use in recursive calls.
This allows a recursion node to understand where it is in the recursion tree and how to behave.
GatheredExperimentRuns
#
Bases: BaseModel
A class for gathering experiment results.
ScatterGatherInput
#
Bases: BaseSpec
Input for the scatter gather workflow.
standalone
property
#
Get the experiment standalone.
specs
cached
property
#
Fetch the specs and convert to the input type.
is_root
property
#
Check if the current payload is a root, i.e. the original call.
is_base_case
property
#
Check if the current payload is a base case, i.e. no recursion needed.
construct_filekey(filename, *, mode, workflow_run_id, suffix)
#
Cosntruct an output key for the scatter gather workflow.
add_root_workflow_run_id(root_workflow_run_id)
#
Add the root workflow run id to the specs.
run_experiments()
async
#
Run the actual experiments and collect results.
create_recursion_payloads(parent_workflow_run_id)
#
Splits up the specs into a list of children scatter gather payloads for recursion.
run_or_recurse(ctx)
async
#
Run the experiments if not a base case, otherwise recurse.
ScatterGatherResult
#
Bases: BaseModel
The result of the scatter gather workflow.
to_gathered_experiment_runs()
#
Convert the scatter gather result to a gathered experiment runs.
scatter_gather(payload, ctx)
async
#
Run the scatter gather workflow.
sift_results(spec_data, results, make_error_specs=False)
#
Sift results into safe and errored results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spec_data
|
list[ZipDataContent]
|
The list of spec data. |
required |
results
|
list[ResultDataContent | BaseException]
|
The list of results. |
required |
make_error_specs
|
bool
|
Whether or not create error dataframe. |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
safe_results |
list[ResultDataContent]
|
The list of safe results. |
errors |
DataFrame
|
The DataFrame of errored results. |
combine_experiment_outputs(experiment_outputs)
#
Combine a list of experiment outputs into a single experiment output.
options: members: - RecursionMap - RecursionSpec - ScatterGatherInput - ScatterGatherResult - GatheredExperimentRuns show_root_heading: false
scythe.worker#
Worker configuration and startup.
worker
#
Worker for Scythe.
ScytheWorkerLabel
#
Bases: StrEnum
Label keys used by Scythe workers for task affinity.
Use these when specifying desired_worker_labels on experiments so tasks
are routed to workers with matching capabilities. For example::
from scythe.worker import ScytheWorkerLabel from hatchet_sdk.labels import DesiredWorkerLabel
@ExperimentRegistry.Register(
desired_worker_labels=ScytheWorkerLabel.HAS_GPU.worker_label
)
def my_gpu_experiment(input_spec: MyInput) -> MyOutput:
...
Or for attaching multiple labels:
WorkerNameConfig
#
Bases: BaseSettings
Configuration for the worker name.
in_aws_batch
property
#
Return whether the worker is running in AWS Batch.
in_aws_copilot
property
#
Return whether the worker is running in AWS Copilot.
in_aws
property
#
Return whether the worker is running in AWS.
aws_hosting_str
property
#
Return the AWS hosting string for the worker.
in_fly
property
#
Return whether the worker is running in Fly.io.
fly_hosting_str
property
#
Return the Fly hosting string for the worker.
in_local
property
#
Return whether the worker is running locally.
hosting_str
property
#
Return the hosting string for the worker.
name
property
#
Return the name of the worker.
ScytheWorkerConfig
#
Bases: BaseSettings
Configuration for the Scythe worker.
labels
property
#
Return the labels for the worker.
computed_slots
property
#
Return the number of slots for the worker.
computed_durable_slots
property
#
Return the number of durable slots for the worker.
computed_name
property
#
Return the name of the worker.
start(experiments=None, additional_workflows=None)
#
Make a worker.
options: members: - ScytheWorkerLabel - ScytheWorkerConfig - WorkerNameConfig show_root_heading: false
scythe.settings#
Environment-based settings for storage and timeouts.
settings
#
options: members: - ScytheStorageSettings - TimeoutSettings show_root_heading: false
scythe.utils.filesys#
File reference types and URI fetching utilities.
filesys
#
Filesystem utilities.
S3Url
#
Bases: AnyUrl
A URL for an S3 object.
FileReferenceMixin
#
Bases: BaseModel
A mixin for file reference fields.
remote_artifact_file_paths
property
#
Get the remote source file paths.
fetch_uri(uri, local_path, use_cache=True, s3=s3)
#
Fetch a file from a uri and return the local path.
Caching is enabled by default and works by checking if the file exists locally before downloading it to avoid downloading the same file multiple times.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
AnyUrl
|
The uri to fetch |
required |
local_path
|
Path
|
The local path to save the fetched file |
required |
use_cache
|
bool
|
Whether to use the cache |
True
|
s3
|
S3Client
|
The S3 client to use |
s3_client
|
Returns:
| Name | Type | Description |
|---|---|---|
local_path |
Path
|
The local path of the fetched file |
options: members: - FileReference - OptionalFileReference - S3Url - FileReferenceMixin - fetch_uri show_root_heading: false
scythe.utils.results#
DataFrame serialization and S3 upload helpers.
results
#
This module contains functions to postprocess and serialize results.
serialize_df_dict(dfs)
#
Serialize a dictionary of dataframes into a dictionary of dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dfs
|
dict[str, DataFrame]
|
A dictionary of dataframes |
required |
Returns:
| Type | Description |
|---|---|
dict[str, dict]
|
dict[str, dict]: A dictionary of dictionaries |
transpose_dataframe_dict(dataframe_results)
#
Transpose a list of dictionaries of dataframes into a dictionary of combined dataframes.
make_onerow_multiindex_from_dict(d, n_rows=1)
#
Makes a MultiIndex from a dictionary.
This is useful for returning a wide-form dataframe of results for a single task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict[str, Any]
|
The dictionary to make the MultiIndex from. |
required |
n_rows
|
int
|
The number of rows to repeat the MultiIndex. |
1
|
Returns:
| Name | Type | Description |
|---|---|---|
multi_index |
MultiIndex
|
The MultiIndex. |
save_and_upload_parquets(collected_dfs, s3, bucket, output_key_constructor, save_errors=False)
#
Save and upload results to s3.
options: show_root_heading: false
scythe.utils.s3#
S3 client utilities.
s3
#
options: show_root_heading: false