Scheduling

StreamFlow lets user implement their scheduling infrastructure. There are two extension points related to scheduling: Scheduler and Policy. The Scheduler interface implements all the scheduling infrastructure, including data structures, to store the global current allocation status. The Policy interface implements a specific placement strategy to map jobs onto available locations. Both interfaces are specified in the streamflow.core.scheduling module.

In StreamFlow, the Job object is the allocation unit. Each workflow step generates zero or more Job objects sent to the scheduling infrastructure for placement.

class Job:
    def __init__(
        self,
        name: str,
        workflow_id: int,
        inputs: MutableMapping[str, Token],
        input_directory: str | None,
        output_directory: str | None,
        tmp_directory: str | None,
    ):
        ...

In practice, a Job is a data structure containing a unique name, the workflow_id pointing to the workflow it belongs to, a dictionary of inputs containing the input data needed for execution, and three folder paths pointing to a potentially remote filesystem: input_directory, output_directory, tmp_directory. Since the actual paths depend on the chosen execution location, these parameters are not specified before the scheduling phase.

Scheduler

The Scheduler interface contains three abstract methods: schedule, notify_status, and close.

async def schedule(
    self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware
) -> None:
    ...

async def notify_status(
    self, job_name: str, status: Status
) -> None:
    ...

async def close(
    self
) -> None:
    ...

The schedule method tries to allocate one or more available locations for a new Job object. It receives three input parameters: a new Job object to be allocated, a BindingConfig object containing the list of potential allocation targets for the Job and a list of BindingFilter objects, and a HardwareRequirement object specifying the resource requirements of the Job. Resource requirements are extracted automatically from the workflow specification, e.g., CWL files. Conversely, the BindingFilter object derives from the StreamFlow file.

The notify_status method is called whenever a Job object changes its status, e.g., when it starts, completes, or fails. It receives two input parameters, the name of an existing Job and its new Status, and returns nothing. When a Job reaches a final status (i.e., FAILED, COMPLETED, or CANCELLED), its related locations are marked as available, and the Scheduler starts a new scheduling attempt.

The close method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections.

Implementations

Type

Class

default

streamflow.scheduling.scheduler.DefaultScheduler

In the DefaultScheduler implementation, scheduling attempts follow a simple First Come, First Served (FCFS) approach. The schedule method demands the allocation strategy to a Policy object specified in the StreamFlow file’s deployments section through the deployment object’s scheduling_policy directive. If no available allocation configuration can be found for a given Job, it is queued until the next scheduling attempt.

As discussed above, a scheduling attempt occurs whenever a Job reaches a final state. Plus, to account for dynamic resource creation and deletion in remote execution environments (e.g., through the Kubernetes HorizontalPodAutoscaler) the DefaultScheduler can automatically perform a scheduling attempt for each queued Job at regular intervals. The duration of such intervals can be configured through the retry_delay parameter. A value of 0 (the default) turns off this behaviour.

properties

retry_delay

Time (in seconds) to wait before retrying to schedule a task.

type

integer

default

0

Policy

The Policy interface contains a single method get_location, which returns the AvailableLocation chosen for placement or None if there is no available location.

async def get_location(
    self,
    context: StreamFlowContext,
    job: Job,
    hardware_requirement: Hardware,
    available_locations: MutableMapping[str, AvailableLocation],
    jobs: MutableMapping[str, JobAllocation],
    locations: MutableMapping[str, MutableMapping[str, LocationAllocation]],
) -> AvailableLocation | None:
    ...

The get_location method receives much information about the current execution context, enabling it to cover a broad class of potential scheduling strategies. In particular, the context parameter can query all the StreamFlow’s relevant data structures, such as the Database, the DataManager, and the DeploymentManager.

The Job parameter contains the Job object to be allocated, and the hardware_requirement parameter is a HardwareRequirement object specifying the Job’s resource requirements. The available_locations parameter contains the list of locations available for placement in the target deployment. They are obtained by calling the get_available_locations method of the related Connector object.

An AvailableLocation object should specify the stacked flag to state if the location relies on hardware resources from the underlying wrapped locations (e.g., a Docker container) or not (e.g., a Slurm submission to a remote node). When stacked is true, a Scheduler implementation should take into account the lower levels of the stack to validate if the execution environment provides enough resources for running a Job. Conversely, when stacked is false (the default value), the Scheduler should only consider hardware information provided by the current AvailableLocation object.

Note that a Scheduler implementation should run these checks before calling the get_location method of a scheduling Policy, properly filtering the available_locations parameter before propagating it. Therefore, the available_locations parameter should only contain the locations that satisfy the hardware requirements of the processed Job object.

The jobs and locations parameters describe the current status of the workflow execution. The jobs parameter is a dictionary of JobAllocation objects, containing information about all the previously allocated Job objects, indexed by their unique name. Each JobAllocation structure contains the Job name, its target, the list of locations associated with the Job execution, the current Status of the Job, and the hardware resources allocated for its execution on each selected location.

The locations parameter is the set of locations allocated to at least one Job in the past, indexed by their deployment and unique name. Each LocationAllocation object contains the location name, the name of its deployment, and the list of Job objects allocated to it, identified by their unique name.

Note that when multiple locations are stacked through the wraps directive and specify the stacked flag, a LocationAllocation object contains also the jobs allocated to the locations that wrap the associated AvailableLocation object, either directly or indirectly. Conversely, JobAllocation objects only register direct allocations.

Implementations

Type

Class

data_locality

streamflow.scheduling.policy.data_locality.DataLocalityPolicy

The DataLocalityPolicy is the default scheduling policy in StreamFlow. The adopted strategy is the following:

  1. File input tokens are sorted by weight in descending order;

  2. All the locations containing the related files are retrieved from the DataManager for each token. If data are already present in one of the available locations, that location is chosen for placement;

  3. If data-driven allocation is not possible, one location is randomly picked up from the remaining ones;

  4. If there are no available locations, return None (and queue the Job).