wei.core.workflow

The module that initializes and runs the step by step WEI workflow

Functions

cancel_active_workflow_runs()

Cancels all currently running workflow runs

cancel_workflow_run(wf_run)

Cancels the workflow run

create_run(workflow, workcell, experiment_id)

Pulls the workcell and builds a list of dictionary steps to be executed

inject_payload(payload, step)

Injects the payload into the step args

insert_parameter_values(workflow, parameters)

Replace the parameter strings in the workflow with the provided values

replace_positions(workcell, step)

Replaces the positions in the step with the actual positions from the workcell

save_workflow_files(wf_run, files)

Saves the files to the workflow run directory, and updates the step files to point to the new location

value_substitution(input_string, ...)

Perform $-string substitution on input string, returns string with substituted values

walk_and_replace(args, input_parameters)

Recursively walk the arguments and replace all parameters

wei.core.workflow.cancel_active_workflow_runs() None

Cancels all currently running workflow runs

wei.core.workflow.cancel_workflow_run(wf_run: WorkflowRun) None

Cancels the workflow run

wei.core.workflow.create_run(workflow: Workflow, workcell: Workcell, experiment_id: str, payload: Dict[str, Any] | None = None, simulate: bool = False) WorkflowRun

Pulls the workcell and builds a list of dictionary steps to be executed

Parameters:
  • workflow (Workflow) – The workflow data file loaded in from the workflow yaml file

  • workcell (Workcell) – The Workcell data file loaded in from the workcell yaml file

  • payload (Dict) – The input to the workflow

  • experiment_path (PathLike) – The path to the data of the experiment for the workflow

  • simulate (bool) – Whether or not to use real robots

Returns:

steps (WorkflowRun) – a completely initialized workflow run

wei.core.workflow.inject_payload(payload: Dict[str, Any], step: Step) None

Injects the payload into the step args

wei.core.workflow.insert_parameter_values(workflow: Workflow, parameters: Dict[str, Any])

Replace the parameter strings in the workflow with the provided values

wei.core.workflow.replace_positions(workcell: Workcell, step: Step) None

Replaces the positions in the step with the actual positions from the workcell

wei.core.workflow.save_workflow_files(wf_run: WorkflowRun, files: List[UploadFile]) WorkflowRun

Saves the files to the workflow run directory, and updates the step files to point to the new location

wei.core.workflow.value_substitution(input_string: str, input_parameters: Dict[str, Any])

Perform $-string substitution on input string, returns string with substituted values

wei.core.workflow.walk_and_replace(args: Dict[str, Any], input_parameters: Dict[str, Any])

Recursively walk the arguments and replace all parameters