Database

StreamFlow relies on a persistent Database to store all the metadata regarding a workflow execution. These metadata are used for fault tolerance, provenance collection, and reporting. All StreamFlow entities interacting with the Database extend the PersistableEntity interface, adhering to the Object-Relational Mapping (ORM) programming model:

def __init__(self):
    self.persistent_id: int | None = None

@classmethod
async def load(
    cls,
    context: StreamFlowContext,
    persistent_id: int,
    loading_context: DatabaseLoadingContext,
) -> PersistableEntity:
    ...

async def save(
    self, context: StreamFlowContext
) -> None:
    ...

Each PersistableEntity is identified by a unique numerical persistent_id related to the corresponding Database record. Two methods, save and load, allow persisting the entity in the Database and retrieving it from the persistent record. Note that load is a class method, as it must construct a new instance.

The load method receives three input parameters: the current execution context, the persistent_id of the instance that should be loaded, and a loading_context (see DatabaseLoadingContext). Note that the load method should not directly assign the persistent_id to the new entity, as this operation is in charge to the DatabaseLoadingContext class.

Persistence

The Database interface, defined in the streamflow.core.persistence module, contains all the methods to create, modify, and retrieve this metadata. Data deletion is unnecessary, as StreamFlow never removes existing records. Internally, the save and load methods call one or more of these methods to perform the desired operations.

async def add_dependency(
    self, step: int, port: int, type: DependencyType, name: str
) -> None:
    ...

async def add_deployment(
    self,
    name: str,
    type: str,
    config: str,
    external: bool,
    lazy: bool,
    workdir: str | None,
) -> int:
    ...

async def add_execution(
    self, step_id: int, tag: str, cmd: str
) -> int:
    ...

async def add_port(
    self,
    name: str,
    workflow_id: int,
    type: type[Port],
    params: MutableMapping[str, Any],
) -> int:
    ...

async def add_provenance(
    self, inputs: MutableSequence[int], token: int
) -> None:
    ...

async def add_step(
    self,
    name: str,
    workflow_id: int,
    status: int,
    type: type[Step],
    params: MutableMapping[str, Any],
) -> int:
    ...

async def add_target(
    self,
    deployment: int,
    type: type[Target],
    params: MutableMapping[str, Any],
    locations: int = 1,
    service: str | None = None,
    workdir: str | None = None,
) -> int:
    ...

async def add_token(
    self, tag: str, type: type[Token], value: Any, port: int | None = None
) -> int:
    ...

async def add_workflow(
    self, name: str, params: MutableMapping[str, Any], status: int, type: str
) -> int:
    ...

async def close(self) -> None:
    ...

async def get_dependees(
    self, token_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_dependers(
    self, token_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_deployment(
    self, deployment_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_execution(
    self, execution_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_executions_by_step(
    self, step_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_input_ports(
    self, step_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_input_steps(
    self, port_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_output_ports(
    self, step_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_output_steps(
    self, port_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_port(
    self, port_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_port_from_token(
    self, token_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_port_tokens(
    self, port_id: int
) -> MutableSequence[int]:
    ...

async def get_reports(
    self, workflow: str, last_only: bool = False
) -> MutableSequence[MutableSequence[MutableMapping[str, Any]]]:
    ...

async def get_step(
    self, step_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_target(
    self, target_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_token(
    self, token_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_workflow(
    self, workflow_id: int
) -> MutableMapping[str, Any]:
    ...

async def get_workflow_ports(
    self, workflow_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_workflow_steps(
    self, workflow_id: int
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_workflows_by_name(
    self, workflow_name: str, last_only: bool = False
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def get_workflows_list(
    self, name: str | None
) -> MutableSequence[MutableMapping[str, Any]]:
    ...

async def update_deployment(
    self, deployment_id: int, updates: MutableMapping[str, Any]
) -> int:
    ...

async def update_execution(
    self, execution_id: int, updates: MutableMapping[str, Any]
) -> int:
    ...

async def update_port(
    self, port_id: int, updates: MutableMapping[str, Any]
) -> int:
    ...

async def update_step(
    self, step_id: int, updates: MutableMapping[str, Any]
) -> int:
    ...

async def update_target(
    self, target_id: str, updates: MutableMapping[str, Any]
) -> int:
    ...

async def update_workflow(
    self, workflow_id: int, updates: MutableMapping[str, Any]
) -> int:
    ...

There are three families of methods in the Database interface: add_entity, update_entity, and get_data. All these methods are generic to avoid changing the interface whenever the internals of an entity slightly change.

Each add_entity method receives in input the parameter values for each entity attribute and returns the numeric persistent_id of the created entity. Some methods also accept a type field, which identifies a particular class of entities, and a params field, a dictionary of additional entity parameters. Combined, these two features allow reusing the same method (and, optionally, the same database record structure) to store a whole hierarchy of entities inheriting from a base class.

Each update_entity method receives in input the persistent_id of the entity that should be modified and a dictionary, called updates, with the names of fields to be updated as keys and their new contents as values. All of them return the numeric persistent_id of the updated entity.

Each get_data method receives in input the identifier (commonly the persistent_id) of an entity and returns all the data related to that entity. Some methods also accept a boolean last_only parameter, which states if all the entities should be returned or just the most recent. All get_data methods return generic data structures, i.e., lists or dictionaries. The shape of each dictionary varies from one method to another and is documented in the source code.

The close method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections.

Implementations

Type

Class

sqlite

streamflow.persistence.sqlite.SqliteDatabase

By default, StreamFlow uses a local SqliteDatabase instance for metadata persistence. The connection directive can be set to :memory: to avoid disk I/O and improve performance. However, in this case, all the metadata will be erased when the workflow execution terminates.

properties

connection (required)

The path where the sqlite file resides

type

string

default

${HOME}/.streamflow/$(streamflow version)/sqlite.db

timeout

The time (in seconds) after which a thread waiting for the database lock raises an exception

type

integer

default

20

The database schema is structured as follows:

CREATE TABLE IF NOT EXISTS workflow
(
    id         INTEGER PRIMARY KEY,
    name       TEXT,
    params     TEXT,
    status     INTEGER,
    type       TEXT,
    start_time INTEGER,
    end_time   INTEGER
);

CREATE TABLE IF NOT EXISTS step
(
    id       INTEGER PRIMARY KEY,
    name     TEXT,
    workflow INTEGER,
    status   INTEGER,
    type     TEXT,
    params   TEXT,
    FOREIGN KEY (workflow) REFERENCES workflow (id)
);

CREATE TABLE IF NOT EXISTS port
(
    id       INTEGER PRIMARY KEY,
    name     TEXT,
    workflow INTEGER,
    type     TEXT,
    params   TEXT,
    FOREIGN KEY (workflow) REFERENCES workflow (id)
);


CREATE TABLE IF NOT EXISTS dependency
(
    step INTEGER,
    port INTEGER,
    type INTEGER,
    name TEXT,
    PRIMARY KEY (step, port, type, name),
    FOREIGN KEY (step) REFERENCES step (id),
    FOREIGN KEY (port) REFERENCES port (id)
);


CREATE TABLE IF NOT EXISTS execution
(
    id         INTEGER PRIMARY KEY,
    step       INTEGER,
    tag        TEXT,
    cmd        TEXT,
    status     INTEGER,
    start_time INTEGER,
    end_time   INTEGER,
    FOREIGN KEY (step) REFERENCES step (id)
);


CREATE TABLE IF NOT EXISTS token
(
    id    INTEGER PRIMARY KEY,
    port  INTEGER,
    tag   TEXT,
    type  TEXT,
    value BLOB,
    FOREIGN KEY (port) REFERENCES port (id)
);


CREATE TABLE IF NOT EXISTS provenance
(
    dependee INTEGER,
    depender INTEGER,
    PRIMARY KEY (dependee, depender),
    FOREIGN KEY (dependee) REFERENCES token (id),
    FOREIGN KEY (depender) REFERENCES token (id)
);


CREATE TABLE IF NOT EXISTS deployment
(
    id       INTEGER PRIMARY KEY,
    name     TEXT,
    type     TEXT,
    config   TEXT,
    external INTEGER,
    lazy     INTEGER,
    workdir  TEXT,
    wraps    TEXT
);


CREATE TABLE IF NOT EXISTS target
(
    id         INTEGER PRIMARY KEY,
    deployment INTEGER,
    type       TEXT,
    locations  INTEGER,
    service    TEXT,
    workdir    TEXT,
    params     TEXT,
    FOREIGN KEY (deployment) REFERENCES deployment (id)
);


CREATE TABLE IF NOT EXISTS filter
(
    id              INTEGER PRIMARY KEY,
    name            TEXT,
    type            TEXT,
    config          TEXT
);

DatabaseLoadingContext

Workflow loading is a delicate operation. If not managed properly, it can be costly in terms of time and memory and lead to deadlocks in case of circular references. The DatabaseLoadingContext interface allows to define classes in charge of managing these aspects. Users should always rely on these classes to load entities, instead of directly calling load methods from PersistableEntity instances.

def add_deployment(self, persistent_id: int, deployment: DeploymentConfig):
    ...

def add_filter(self, persistent_id: int, filter_config: FilterConfig):
    ...

def add_port(self, persistent_id: int, port: Port):
    ...

def add_step(self, persistent_id: int, step: Step):
    ...

def add_target(self, persistent_id: int, target: Target):
    ...

def add_token(self, persistent_id: int, token: Token):
    ...

def add_workflow(self, persistent_id: int, workflow: Workflow):
    ...

async def load_deployment(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_filter(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_port(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_step(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_target(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_token(self, context: StreamFlowContext, persistent_id: int):
    ...

async def load_workflow(self, context: StreamFlowContext, persistent_id: int):
    ...

Implementations

Name

Class

DefaultDatabaseLoadingContext

streamflow.persistent.loading_context.DefaultDatabaseLoadingContext

WorkflowBuilder

streamflow.persistent.loading_context.WorkflowBuilder

DefaultDatabaseLoadingContext

The DefaultDatabaseLoadingContext keeps track of all the objects already loaded in the current transaction, serving as a cache to efficiently load nested entities and prevent deadlocks when dealing with circular references. Furthermore, it is in charge of assigning the persistent_id when an entity is added to the cache through an add_* method.

WorkflowBuilder

The WorkflowBuilder class loads the steps and ports of an existing workflow from a Database and inserts them into a new workflow object received as a constructor argument. It extends the DefaultDatabaseLoadingContext class and overrides only the methods involving step, port, and workflow entities. In particular, the add_* methods of these entities must not set the persistent_id, as they are dealing with a newly-created workflow, and the load_* methods should reset the internal state of their entities to the initial value (e.g., reset the status to Status.WAITING and clear the terminated flag).

The load_workflow method must behave in two different ways, depending on whether it is called directly from a user or in the internal logic of another entity’s load method. In the first case, it should load all the entities related to the original workflow, identified by the persistent_id argument, into the new one. In the latter case it should simply return the new workflow entity being built.

Other entities, such as deployment and target objects, can be safely shared between the old and the new workflows, as their internal state does not need to be modified. Therefore, they can be loaded following the common path implemented in the DefaultDatabaseLoadingContext class.