Skip to content

Loader

get_workflow_from_module(module, var_name=None)

Get workflow from the module.

Get the workflow stored in var_name, or the first workflow defined from the module.

Source code in germinate_ai/core/loader.py
def get_workflow_from_module(module: ModuleType, var_name: str = None) -> Workflow:
    """Get workflow from the module.

    Get the workflow stored in `var_name`, or the first workflow defined from the module.
    """
    workflow = None
    if var_name:
        # get workflow with matching variable name
        workflow = _get_workflow_with_name(module.__dict__, var_name)
    else:
        # first Workflow instance found
        workflow = _get_first_workflow(module.__dict__)

    if workflow is not None:
        return workflow

    raise WorkflowImportException("Could not find any workflows")

import_workflow(workflow_import_path, *, working_dir=None)

Import a workflow instance from the given module.

Example usage

./simple_metagpt.py

import_workflow("simple_metagpt:workflow") import_workflow("simple_metagpt:workflow_v2")

./my_project/workflows/simple_metagpt.py

import_workflow("workflows.simple_metagpt:workflow", working_dir="./my_project")

Source code in germinate_ai/core/loader.py
def import_workflow(
    workflow_import_path: str,
    *,
    working_dir: typ.Optional[str] = None,
):
    """
    Import a workflow instance from the given module.

    Example usage:
        # ./simple_metagpt.py
        import_workflow("simple_metagpt:workflow")
        import_workflow("simple_metagpt:workflow_v2")

        # ./my_project/workflows/simple_metagpt.py
        import_workflow("workflows.simple_metagpt:workflow", working_dir="./my_project")
    """

    module_name, var_name = parse_import_path(workflow_import_path)
    module = import_module(module_name, working_dir=working_dir)

    return get_workflow_from_module(module, var_name=var_name)