Skip to content

Models

Workflow Runs

WorkflowRun

Bases: Base

One particular run of a workflow.

Source code in germinate_ai/data/models/workflow_runs.py
class WorkflowRun(Base):
    """One particular run of a workflow."""
    __tablename__ = "workflow_runs"

    id: Mapped[UUID] = mapped_column(
        primary_key=True, server_default=text("gen_random_uuid()")
    )
    """Workflow run UUID"""

    workflow_name: Mapped[str] = mapped_column(String())

    workflow_version: Mapped[str] = mapped_column(String())

    workflow_id: Mapped[str] = mapped_column(String())


    # picked workflow state_machine
    workflow_state_machine: Mapped[Workflow] = mapped_column(
        PickleType(pickler=cloudpickle), nullable=True
    )

    state: Mapped[WorkflowRunStateEnum] = mapped_column(
        Enum(WorkflowRunStateEnum), default=WorkflowRunStateEnum.created
    )

    input: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    output: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    payload: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    attributes: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")

    state_instances: Mapped[List["StateInstance"]] = relationship(
        back_populates="workflow_run", foreign_keys="StateInstance.workflow_run_id"
    )

    # Track name of initial state
    initial_state_name: Mapped[str] = mapped_column(String())

    current_state_id: Mapped[UUID] = mapped_column(
        ForeignKey("state_instances.id"), nullable=True
    )
    current_state: Mapped["StateInstance"] = relationship(
        foreign_keys=[current_state_id], post_update=True
    )

    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    modified_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        onupdate=func.now(),
        nullable=True,
    )
    completed_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), nullable=True
    )

    def __repr__(self) -> str:
        return f"<Workflow Run: {self.name}>"


    def state_instance_by_name(self, name: str) -> "StateInstance":
        """Find related state instance by name."""
        try:
            return next(si for si in self.state_instances if si.name == name)
        except StopIteration:
            return None

id: Mapped[UUID] = mapped_column(primary_key=True, server_default=text('gen_random_uuid()')) class-attribute instance-attribute

Workflow run UUID

state_instance_by_name(name)

Find related state instance by name.

Source code in germinate_ai/data/models/workflow_runs.py
def state_instance_by_name(self, name: str) -> "StateInstance":
    """Find related state instance by name."""
    try:
        return next(si for si in self.state_instances if si.name == name)
    except StopIteration:
        return None

States

StateInstance

Bases: Base

An instance of a state in a Workflow State Machine.

Source code in germinate_ai/data/models/states.py
class StateInstance(Base):
    """An instance of a state in a Workflow State Machine."""

    __tablename__ = "state_instances"

    id: Mapped[UUID] = mapped_column(
        primary_key=True, server_default=text("gen_random_uuid()")
    )

    # State name
    name: Mapped[str] = mapped_column(String())

    state: Mapped[StateInstanceStateEnum] = mapped_column(
        Enum(StateInstanceStateEnum), default=StateInstanceStateEnum.created
    )

    # Store sorted DAG generations i.e. array of (array of tasks that can be run in parallel)
    sorted_tasks_phases: Mapped[list[list[str]]] = mapped_column(JSON)
    # index of current running phase
    current_phase_index: Mapped[int] = mapped_column(default=0)

    transitions: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")

    input: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    output: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    payload: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    attributes: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")

    workflow_run_id: Mapped[UUID] = mapped_column(ForeignKey("workflow_runs.id"))
    workflow_run: Mapped["WorkflowRun"] = relationship(
        back_populates="state_instances", foreign_keys=[workflow_run_id]
    )

    task_instances: Mapped[List["TaskInstance"]] = relationship(
        back_populates="state_instance"
    )

    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    modified_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), onupdate=func.now(), nullable=True
    )
    completed_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), nullable=True
    )

    def __repr__(self) -> str:
        return f"<State Instance: {self.name}>"

    @property
    def phase_task_names(self):
        """Get the names of all the tasks in the current phase."""
        tasks = set(self.sorted_tasks_phases[self.current_phase_index])
        return tasks

    @property
    def current_phase_complete(self):
        """Are all the tasks in the current phase complete?"""
        # TODO rewrite this part
        phase_tasks = self.phase_task_names
        completed_tasks = {
            t.name
            for t in self.task_instances
            if t.state == TaskInstanceStateEnum.completed
        }
        remaining = phase_tasks.difference(completed_tasks)
        return len(remaining) == 0

    @property
    def all_phases_complete(self):
        """Are all phases in this state's tasks DAG complete?"""
        return (
            self.current_phase_complete
            and self.current_phase_index >= len(self.sorted_tasks_phases) - 1
        )

    @property
    def phase_tasks(self) -> set["TaskInstance"]:
        """Return a set of tasks in the current phase."""
        tasks= {
            t for t in self.task_instances
            if t.name in self.phase_task_names
        }
        return tasks

    def next_phase(self) -> typ.Set["TaskInstance"]:
        """Enter next phase by incrementing the phase index and returning all the tasks in the new phase.

        Note: Commit to persist the change.
        """
        self.current_phase_index += 1

        # sanity check
        if self.all_phases_complete:
            self.current_phase_index -= 1
            raise IndexError(f"All phases in state {self.name} already complete")

        return self.phase_tasks

    def final_phase(self) -> typ.Set["TaskInstance"]:
        """Return the "final" phase of tasks in the state.

        If a state has transitions, then the actual final phase is composed of transition condition evaluation tasks.
        In that case, this function instead returns the tasks in the penultimate phase which are actually the last phase of user defined tasks.

        Note: End states, for example, might not have any transitions into other states. 
        """
        # does this state have any transitions
        has_transitions = len(self.transitions) > 0
        if has_transitions:
            final_phase = self.sorted_tasks_phases[-2]
        else:
            final_phase = self.sorted_tasks_phases[-1]
        tasks = {
            t for t in self.task_instances
            if t.name in final_phase
        }
        return tasks

    def state_output(self) -> dict:
        """Returns the merged output from the final phase of tasks (see `final_phase`)."""
        outputs = dict(ChainMap(*(t.output for t in self.final_phase())))
        return outputs

    def next_state(self) -> str:
        """Figure out the next state to transition to."""
        # Get tasks corresponding to transition tas
        transition_conditions = {
            t.name: t for t in self.task_instances
            if t.name in self.transitions and t.state == TaskInstanceStateEnum.completed
        }

        # check transition condition evaluations in order
        # trigger first triggered transition
        for transition_name, target_state in self.transitions.items():
            # name -> target state
            task = transition_conditions[transition_name]
            output = ConditionOutputSchema.model_validate(task.output)
            result = output.condition_evaluation
            if result:
                return target_state

        return None

all_phases_complete property

Are all phases in this state's tasks DAG complete?

current_phase_complete property

Are all the tasks in the current phase complete?

phase_task_names property

Get the names of all the tasks in the current phase.

phase_tasks: set[TaskInstance] property

Return a set of tasks in the current phase.

final_phase()

Return the "final" phase of tasks in the state.

If a state has transitions, then the actual final phase is composed of transition condition evaluation tasks. In that case, this function instead returns the tasks in the penultimate phase which are actually the last phase of user defined tasks.

Note: End states, for example, might not have any transitions into other states.

Source code in germinate_ai/data/models/states.py
def final_phase(self) -> typ.Set["TaskInstance"]:
    """Return the "final" phase of tasks in the state.

    If a state has transitions, then the actual final phase is composed of transition condition evaluation tasks.
    In that case, this function instead returns the tasks in the penultimate phase which are actually the last phase of user defined tasks.

    Note: End states, for example, might not have any transitions into other states. 
    """
    # does this state have any transitions
    has_transitions = len(self.transitions) > 0
    if has_transitions:
        final_phase = self.sorted_tasks_phases[-2]
    else:
        final_phase = self.sorted_tasks_phases[-1]
    tasks = {
        t for t in self.task_instances
        if t.name in final_phase
    }
    return tasks

next_phase()

Enter next phase by incrementing the phase index and returning all the tasks in the new phase.

Note: Commit to persist the change.

Source code in germinate_ai/data/models/states.py
def next_phase(self) -> typ.Set["TaskInstance"]:
    """Enter next phase by incrementing the phase index and returning all the tasks in the new phase.

    Note: Commit to persist the change.
    """
    self.current_phase_index += 1

    # sanity check
    if self.all_phases_complete:
        self.current_phase_index -= 1
        raise IndexError(f"All phases in state {self.name} already complete")

    return self.phase_tasks

next_state()

Figure out the next state to transition to.

Source code in germinate_ai/data/models/states.py
def next_state(self) -> str:
    """Figure out the next state to transition to."""
    # Get tasks corresponding to transition tas
    transition_conditions = {
        t.name: t for t in self.task_instances
        if t.name in self.transitions and t.state == TaskInstanceStateEnum.completed
    }

    # check transition condition evaluations in order
    # trigger first triggered transition
    for transition_name, target_state in self.transitions.items():
        # name -> target state
        task = transition_conditions[transition_name]
        output = ConditionOutputSchema.model_validate(task.output)
        result = output.condition_evaluation
        if result:
            return target_state

    return None

state_output()

Returns the merged output from the final phase of tasks (see final_phase).

Source code in germinate_ai/data/models/states.py
def state_output(self) -> dict:
    """Returns the merged output from the final phase of tasks (see `final_phase`)."""
    outputs = dict(ChainMap(*(t.output for t in self.final_phase())))
    return outputs

Tasks

TaskInstance

Bases: Base

An instance of a task.

Source code in germinate_ai/data/models/tasks.py
class TaskInstance(Base):
    """An instance of a task."""

    __tablename__ = "task_instances"

    id: Mapped[UUID] = mapped_column(
        primary_key=True, server_default=text("gen_random_uuid()")
    )

    name: Mapped[str] = mapped_column(String())

    # Just need the names to build subject and get parent tasks' outputs
    depends_on: Mapped[List[str]] = mapped_column(ARRAY(String), default=[])

    state: Mapped[TaskInstanceStateEnum] = mapped_column(
        Enum(TaskInstanceStateEnum), default=TaskInstanceStateEnum.created
    )

    # Need either a name of a preset, or a picked executor
    task_executor_name: Mapped[str] = mapped_column(nullable=True)
    task_executor: Mapped[TaskExecutor] = mapped_column(
        PickleType(pickler=cloudpickle), nullable=True
    )

    input: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    output: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    payload: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")
    attributes: Mapped[dict[str, Any]] = mapped_column(default={}, server_default="{}")

    state_instance_id: Mapped[UUID] = mapped_column(ForeignKey("state_instances.id"))
    state_instance: Mapped["StateInstance"] = relationship(
        back_populates="task_instances"
    )

    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    modified_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), onupdate=func.now(), nullable=True
    )
    completed_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), nullable=True
    )

    def __repr__(self) -> str:
        return f"<Task Instance: {self.name}>"

Enums

StateInstanceStateEnum

Bases: str, Enum

State progress state.

Source code in germinate_ai/data/models/enums.py
class StateInstanceStateEnum(str, enum.Enum):
    """State progress state."""

    created = "Created"
    queued = "Queued"
    in_progress = "InProgress"

    completed = "Completed"
    failed = "Failed"
    canceled = "Canceled"

TaskInstanceStateEnum

Bases: str, Enum

Task progress state.

Source code in germinate_ai/data/models/enums.py
class TaskInstanceStateEnum(str, enum.Enum):
    """Task progress state."""

    created = "Created"
    queued = "Queued"
    in_progress = "InProgress"

    completed = "Completed"
    failed = "Failed"
    canceled = "Canceled"

WorkflowRunStateEnum

Bases: str, Enum

Workflow Run progress state.

Source code in germinate_ai/data/models/enums.py
class WorkflowRunStateEnum(str, enum.Enum):
    """Workflow Run progress state."""

    created = "Created"
    queued = "Queued"
    in_progress = "InProgress"

    completed = "Completed"
    failed = "Failed"
    canceled = "Canceled"