Connector

StreamFlow demands the lifecycle management of each execution environment to a specific implementation of the Connector interface. In particular, a single Connector object is created for each deployment object described in the StreamFlow file.

The streamflow.core.deployment module defines the Connector interface, which exposes the following public methods:

async def copy_local_to_remote(
    self,
    src: str,
    dst: str,
    locations: MutableSequence[ExecutionLocation],
    read_only: bool = False,
) -> None:
    ...

async def copy_remote_to_local(
    self,
    src: str,
    dst: str,
    locations: MutableSequence[ExecutionLocation],
    read_only: bool = False,
) -> None:
    ...

async def copy_remote_to_remote(
    self,
    src: str,
    dst: str,
    locations: MutableSequence[ExecutionLocation],
    source_location: ExecutionLocation,
    source_connector: Connector | None = None,
    read_only: bool = False,
) -> None:
    ...

async def deploy(
    self, external: bool
) -> None:
    ...

async def get_available_locations(
    self,
    service: str | None = None,
    input_directory: str | None = None,
    output_directory: str | None = None,
    tmp_directory: str | None = None,
) -> MutableMapping[str, AvailableLocation]:
    ...

async def get_stream_reader(
    self,
    command: MutableSequence[str],
    location: ExecutionLocation,
) -> StreamWrapperContextManager:
    ...

async def get_stream_writer(
    self,
    command: MutableSequence[str],
    location: ExecutionLocation,
) -> StreamWrapperContextManager:
    ...

async def run(
    self,
    location: ExecutionLocation,
    command: MutableSequence[str],
    environment: MutableMapping[str, str] = None,
    workdir: str | None = None,
    stdin: int | str | None = None,
    stdout: int | str = asyncio.subprocess.STDOUT,
    stderr: int | str = asyncio.subprocess.STDOUT,
    capture_output: bool = False,
    timeout: int | None = None,
    job_name: str | None = None,
) -> tuple[Any | None, int] | None:
    ...

async def undeploy(
    self, external: bool
) -> None:
    ...

The deploy method instantiates the remote execution environment, making it ready to receive requests for data transfers and command executions. A deployment object can be marked as external in the StreamFlow file. In that case, the Connector should assume that the execution environment is already up and running, and the deploy method should only open the necessary connections to communicate with it.

The undeploy method destroys the remote execution environment, potentially cleaning up all the temporary resources instantiated during the workflow execution (e.g., intermediate results). If a deployment object is marked as external, the undeploy method should not destroy it but just close all the connections opened by the deploy method.

The get_available_locations method is used in the scheduling phase to obtain the locations available for job execution, identified by their unique name (see here). The method receives some optional input parameters to filter valid locations. The service parameter specifies a specific set of locations in a deployment, and its precise meaning differs for each deployment type (see here). The other three parameters (input_directory, output_directory, and tmp_directory) allow the Connector to return correct disk usage values for each of the three folders in case of remote instances with multiple volumes attached.

The get_stream_reader and get_stream_writer methods return a StreamWrapperContextManager instance, obtained by executing the command on the location, to read or write data using a stream (see here). The streams must be read and written respecting the size of the available buffer, which is defined by the transferBufferSize attribute of the Connector instance. These methods improve performance of data copies between pairs of remote locations.

The copy methods perform a data transfer from a src path to a dst path in one or more destination locations in the execution environment controlled by the Connector. The read_only parameter notifies the Connector if the destination files will be modified in place or not. This parameter prevents unattended side effects (e.g., symlink optimizations on the remote locations). The copy_remote_to_remote method accepts two additional parameters: a source_location and an optional source_connector. The latter identifies the Connector instance that controls the source_location and defaults to self when not specified.

The run method performs a remote command execution on a remote location. The command parameter is a list of arguments, mimicking the Python subprocess abstraction. Many optional parameters can be passed to the run method. The environment parameter is a dictionary of environment variables, which should be defined in the remote execution context before executing the command. The workdir parameter identifies the remote working directory. The stdin, stdout, and stderr parameters are used for remote stream redirection. The capture_output parameter specifies if the command output should be retrieved or not. If capture_output is set to True, the run method returns the command output and return code, while it does not return anything if capture_output is set to False. The timeout parameter specifies a maximum completion time for the remote execution, after which the run method throws a WorkflowExecutionException. Finally, the job_name parameter is the unique identifier of a StreamFlow job, which is used for debugging purposes.

BaseConnector

Users who want to implement their own Connector class should extend from the BaseConnector whenever possible. The StreamFlow BaseConnector implementation, defined in the streamflow.deployment.connector.base module, already provides some essential support for logging and tar-based streaming data transfers. Plus, it correctly handles FutureConnector instances by extending the FutureAware base class. However, the BaseConnector does not allow wrapping inner connectors using the wraps directive (see here). Indeed, only connectors extending the ConnectorWrapper interface support the wraps directive.

LocalConnector

The LocalConnector class is a special subtype of the Connector instance that identifies the StreamFlow local node. As discussed above, data transfers that involve the local node are treated differently from remote-to-remote data movements. In general, several StreamFlow classes adopt different strategies when an action involves the local node or a remote one, and these decisions involve verifying if a Connector object extends the LocalConnector class. For this reason, users who want to provide their version of a local Connector must extend the LocalConnector class and not the BaseConnector as in other cases.

FutureConnector

In the eager setting, all the Connector objects deploy their related execution environment at the beginning of a workflow execution. However, to save resources, it is sometimes desirable to adopt a lazy approach, deploying each execution environment only when it receives the first request from the StreamFlow control plane. Users can switch between these behaviours by setting the lazy attribute of each target object to True (the default) or False in the StreamFlow file.

A FutureConnector instance wraps an actual Connector instance and implements the lazy behaviour: the deploy method does nothing, and each other method calls the deploy method on the inner Connector to initialize it and delegate the action. The main drawback of this implementation is that the type checking on a FutureConnector instance will return the wrong connector type. A FutureAware class solves this issue by transparently returning the type of the inner Connector. All custom Connector instances defined by the users should extend the FutureAware class directly or indirectly by extending the BaseConnector or ConnectorWrapper classes.

ConnectorWrapper

StreamFlow supports stacked locations using the wraps directive. However, not all Connector instances support inner connectors, but only those that extend the ConenctorWrapper interface. By default, a ConnectorWrapper instance receives an internal Connector object as a constructor parameter and delegates all the method calls to the wrapped Connector. Plus, it already extends the FutureAware class, correctly handling FutureConnector instances. Users who want to create a custom Connector instance with support for the wraps directive must extend the ConnectorWrapper class and not the BaseConnector as in other cases.

BatchConnector

Some Connector instances implement remote executions through batch systems (e.g., Slurm, PBS, or AWS Batch). These connectors should extend the BatchConnector base class to notify users that they cannot manage deployment, execution, and undeployment operations of an internal ConnectorWrapper instance as separate phases of its life-cycle (see QueueManagerConnector). On the other hand, ConnectorWrapper implementers can explicitly disallow their class to wrap inner BatchConnector classes by failing fast during object construction (see ContainerConnector).

Streaming

StreamFlow uses tar streams as the primary way to transfer data between locations. The main reason is that the tar command is so standard nowadays that it can be found OOTB in almost all execution environments, and its API does not vary significantly across implementations.

To ensure compatibility between different Connector instances when performing data transfers, StreamFlow implements two interfaces: a StreamWrapper API to read and write data streams and two methods called get_stream_reader and get_stream_writer to obtain StreamWrapper objects from a Connector instance.

The StreamWrapper interface is straightforward. It is reported below:

def __init__(self, stream: Any):
    self.stream: Any = stream

@abstractmethod
async def close(self):
    ...

@abstractmethod
async def read(self, size: int | None = None):
    ...

@abstractmethod
async def write(self, data: Any):
    ...

The constructor receives an internal stream object, which can be of Any type. The read, write, and close methods wrap the APIs of the native stream object to provide a unified API to interact with streams. In particular, the read method reads up to size bytes from the internal stream. The write method writes the content of the data parameter into the internal stream. The close method closes the inner stream.

Each Connector instance can implement its own StreamWrapper classes by extending the BaseStreamWrapper class. In particular, it can be helpful to specialize further the StreamWrapper interface to implement unidirectional streams. This can be achieved by extending the StreamReaderWrapper and StreamWriterWrapper base classes, which raise a NotImplementedError if the stream is used in the wrong direction.

The StreamWrapperContextManager interface provides the Asynchronous Context Manager primitives for the StreamWrapper object, allowing it to be used inside async with statements.

Implementations

Name

Class

docker

streamflow.deployment.connector.docker.DockerConnector

docker-compose

streamflow.deployment.connector.docker.DockerComposeConnector

flux

streamflow.deployment.connector.queue_manager.FluxConnector

helm

streamflow.deployment.connector.kubernetes.Helm3Connector

helm3

streamflow.deployment.connector.kubernetes.Helm3Connector

kubernetes

streamflow.deployment.connector.kubernetes.KubernetesConnector

occam

streamflow.deployment.connector.occam.OccamConnector

pbs

streamflow.deployment.connector.queue_manager.PBSConnector

singularity

streamflow.deployment.connector.singularity.SingularityConnector

slurm

streamflow.deployment.connector.queue_manager.SlurmConnector

ssh

streamflow.deployment.connector.ssh.SSHConnector