Scheduling
StreamFlow lets user implement their scheduling infrastructure. There are two extension points related to scheduling: Scheduler and Policy. The Scheduler
interface implements all the scheduling infrastructure, including data structures, to store the global current allocation status. The Policy
interface implements a specific placement strategy to map jobs onto available locations. Both interfaces are specified in the streamflow.core.scheduling
module.
In StreamFlow, the Job
object is the allocation unit. Each workflow step generates zero or more Job
objects sent to the scheduling infrastructure for placement.
class Job:
def __init__(
self,
name: str,
workflow_id: int,
inputs: MutableMapping[str, Token],
input_directory: str | None,
output_directory: str | None,
tmp_directory: str | None,
):
...
In practice, a Job
is a data structure containing a unique name
, the workflow_id
pointing to the workflow it belongs to, a dictionary of inputs
containing the input data needed for execution, and three folder paths pointing to a potentially remote filesystem: input_directory
, output_directory
, tmp_directory
. Since the actual paths depend on the chosen execution location, these parameters are not specified before the scheduling phase.
Scheduler
The Scheduler
interface contains three abstract methods: schedule
, notify_status
, and close
.
async def schedule(
self, job: Job, binding_config: BindingConfig, hardware_requirement: Hardware
) -> None:
...
async def notify_status(
self, job_name: str, status: Status
) -> None:
...
async def close(
self
) -> None:
...
The schedule
method tries to allocate one or more available locations for a new Job
object. It receives three input parameters: a new Job
object to be allocated, a BindingConfig
object containing the list of potential allocation targets for the Job
and a list of BindingFilter objects, and a HardwareRequirement
object specifying the resource requirements of the Job
. Resource requirements are extracted automatically from the workflow specification, e.g., CWL files. Conversely, the BindingFilter
object derives from the StreamFlow file.
The notify_status
method is called whenever a Job
object changes its status, e.g., when it starts, completes, or fails. It receives two input parameters, the name of an existing Job
and its new Status
, and returns nothing. When a Job
reaches a final status (i.e., FAILED
, COMPLETED
, or CANCELLED
), its related locations are marked as available, and the Scheduler
starts a new scheduling attempt.
The close
method receives no input parameter and does not return anything. It frees stateful resources potentially allocated during the object’s lifetime, e.g., network or database connections.
Implementations
Type |
Class |
---|---|
default |
streamflow.scheduling.scheduler.DefaultScheduler |
In the DefaultScheduler
implementation, scheduling attempts follow a simple First Come, First Served (FCFS) approach. The schedule
method demands the allocation strategy to a Policy
object specified in the StreamFlow file’s deployments
section through the deployment
object’s scheduling_policy
directive. If no available allocation configuration can be found for a given Job
, it is queued until the next scheduling attempt.
As discussed above, a scheduling attempt occurs whenever a Job
reaches a final state. Plus, to account for dynamic resource creation and deletion in remote execution environments (e.g., through the Kubernetes HorizontalPodAutoscaler) the DefaultScheduler
can automatically perform a scheduling attempt for each queued Job
at regular intervals. The duration of such intervals can be configured through the retry_delay
parameter. A value of 0
(the default) turns off this behaviour.
properties |
||
retry_delay |
Time (in seconds) to wait before retrying to schedule a task. |
|
type |
integer |
|
default |
0 |
Policy
The Policy
interface contains a single method get_location
, which returns the AvailableLocation
chosen for placement or None
if there is no available location.
async def get_location(
self,
context: StreamFlowContext,
job: Job,
hardware_requirement: Hardware,
available_locations: MutableMapping[str, AvailableLocation],
jobs: MutableMapping[str, JobAllocation],
locations: MutableMapping[str, MutableMapping[str, LocationAllocation]],
) -> AvailableLocation | None:
...
The get_location
method receives much information about the current execution context, enabling it to cover a broad class of potential scheduling strategies. In particular, the context
parameter can query all the StreamFlow’s relevant data structures, such as the Database, the DataManager, and the DeploymentManager.
The Job
parameter contains the Job
object to be allocated, and the hardware_requirement
parameter is a HardwareRequirement
object specifying the Job
’s resource requirements. The available_locations
parameter contains the list of locations available for placement in the target deployment. They are obtained by calling the get_available_locations
method of the related Connector object.
An AvailableLocation
object should specify the stacked
flag to state if the location relies on hardware resources from the underlying wrapped locations (e.g., a Docker container) or not (e.g., a Slurm submission to a remote node). When stacked
is true, a Scheduler
implementation should take into account the lower levels of the stack to validate if the execution environment provides enough resources for running a Job
. Conversely, when stacked
is false (the default value), the Scheduler
should only consider hardware information provided by the current AvailableLocation
object.
Note that a Scheduler
implementation should run these checks before calling the get_location
method of a scheduling Policy
, properly filtering the available_locations
parameter before propagating it. Therefore, the available_locations
parameter should only contain the locations that satisfy the hardware requirements of the processed Job
object.
The jobs
and locations
parameters describe the current status of the workflow execution. The jobs
parameter is a dictionary of JobAllocation
objects, containing information about all the previously allocated Job
objects, indexed by their unique name. Each JobAllocation
structure contains the Job
name, its target, the list of locations associated with the Job
execution, the current Status
of the Job
, and the hardware resources allocated for its execution on each selected location.
The locations
parameter is the set of locations allocated to at least one Job
in the past, indexed by their deployment and unique name. Each LocationAllocation
object contains the location name, the name of its deployment, and the list of Job
objects allocated to it, identified by their unique name.
Note that when multiple locations are stacked through the wraps
directive and specify the stacked
flag, a LocationAllocation
object contains also the jobs allocated to the locations that wrap the associated AvailableLocation
object, either directly or indirectly. Conversely, JobAllocation
objects only register direct allocations.
Implementations
Type |
Class |
---|---|
data_locality |
streamflow.scheduling.policy.data_locality.DataLocalityPolicy |
The DataLocalityPolicy
is the default scheduling policy in StreamFlow. The adopted strategy is the following:
File input tokens are sorted by weight in descending order;
All the locations containing the related files are retrieved from the DataManager for each token. If data are already present in one of the available locations, that location is chosen for placement;
If data-driven allocation is not possible, one location is randomly picked up from the remaining ones;
If there are no available locations, return
None
(and queue theJob
).