wei.core.workflow
The module that initializes and runs the step by step WEI workflow
Functions
Cancels all currently running workflow runs |
|
|
Cancels the workflow run |
|
Pulls the workcell and builds a list of dictionary steps to be executed |
|
Injects the payload into the step args |
|
Replace the parameter strings in the workflow with the provided values |
|
Replaces the positions in the step with the actual positions from the workcell |
|
Saves the files to the workflow run directory, and updates the step files to point to the new location |
|
Perform $-string substitution on input string, returns string with substituted values |
|
Recursively walk the arguments and replace all parameters |
- wei.core.workflow.cancel_active_workflow_runs() None
Cancels all currently running workflow runs
- wei.core.workflow.cancel_workflow_run(wf_run: WorkflowRun) None
Cancels the workflow run
- wei.core.workflow.create_run(workflow: Workflow, workcell: Workcell, experiment_id: str, payload: Dict[str, Any] | None = None, simulate: bool = False) WorkflowRun
Pulls the workcell and builds a list of dictionary steps to be executed
- Parameters:
workflow (Workflow) – The workflow data file loaded in from the workflow yaml file
workcell (Workcell) – The Workcell data file loaded in from the workcell yaml file
payload (Dict) – The input to the workflow
experiment_path (PathLike) – The path to the data of the experiment for the workflow
simulate (bool) – Whether or not to use real robots
- Returns:
steps (WorkflowRun) – a completely initialized workflow run
- wei.core.workflow.inject_payload(payload: Dict[str, Any], step: Step) None
Injects the payload into the step args
- wei.core.workflow.insert_parameter_values(workflow: Workflow, parameters: Dict[str, Any])
Replace the parameter strings in the workflow with the provided values
- wei.core.workflow.replace_positions(workcell: Workcell, step: Step) None
Replaces the positions in the step with the actual positions from the workcell
- wei.core.workflow.save_workflow_files(wf_run: WorkflowRun, files: List[UploadFile]) WorkflowRun
Saves the files to the workflow run directory, and updates the step files to point to the new location
- wei.core.workflow.value_substitution(input_string: str, input_parameters: Dict[str, Any])
Perform $-string substitution on input string, returns string with substituted values
- wei.core.workflow.walk_and_replace(args: Dict[str, Any], input_parameters: Dict[str, Any])
Recursively walk the arguments and replace all parameters