Skip to content

Workflow

The Workflow class lets you configure & run jobs and query existing jobs related to this workflow.

Create a new workflow:

workflow = project.create_workflow(name="new_workflow")

Use an existing workflow:

workflow = up42.initialize_workflow(workflow_id="7fb2ec8a-45be-41ad-a50f-98ba6b528b98")

Attributes

info: dict property readonly

Gets the workflow metadata information.

max_concurrent_jobs: int property readonly

Gets the maximum number of concurrent jobs allowed by the project settings.

workflow_tasks: Dict[str, str] property readonly

Gets the building blocks of the workflow as a dictionary with task_name:block-version.

Methods

add_workflow_tasks(self, input_tasks)

Adds or overwrites workflow tasks in a workflow on UP42.

Parameters:

Name Type Description Default
input_tasks Union[List[str], List[dict]]

The blocks to be added to the workflow. Can be a list of the block names, block ids (use up42.get_blocks()) or block display names (The names shown on the UP42 marketplace.

required

Info

With block names or block display names, the most recent version of a block will be added. Using block ids specifies a specific version of a block.

Examples:

# Block names
input_tasks = ["sentinelhub-s2", "tiling"]
# Block Display names
input_tasks = ["Sentinel-2 Level 2 (BOA) AOI clipped",
               "Raster Tiling"]
# Block Ids
input_tasks = ['018dfb34-fc19-4334-8125-14fd7535f979',
               '4ed70368-d4e1-4462-bef6-14e768049471']

Using Custom Blocks

To use a custom block in your workspace, you need to provide the custom block id directly in the full workflow definition (dict of block id, block name and parent block name). See example below.

Examples:

# Full workflow definition
input_tasks = [{'name': 'sentinelhub-s2:1',
                'parentName': None,
                'blockId': '018dfb34-fc19-4334-8125-14fd7535f979'},
               {'name': 'tiling:1',
                'parentName': 'sentinelhub-s2:1',
                'blockId': '4ed70368-d4e1-4462-bef6-14e768049471'}]
Source code in up42/workflow.py
def add_workflow_tasks(self, input_tasks: Union[List[str], List[dict]]) -> None:
    # pylint: disable=line-too-long
    """
    Adds or overwrites workflow tasks in a workflow on UP42.

    Args:
        input_tasks: The blocks to be added to the workflow. Can be a list of the
            block names, block ids (use `up42.get_blocks()`) or block display names
            (The names shown on the [UP42 marketplace](https://marketplace.up42.com).

    !!! Info
        With block names or block display names, the most recent version of a block
        will be added. Using block ids specifies a specific version of a block.

    Examples:
        ```python
        # Block names
        input_tasks = ["sentinelhub-s2", "tiling"]
        ```

        ```python
        # Block Display names
        input_tasks = ["Sentinel-2 Level 2 (BOA) AOI clipped",
                       "Raster Tiling"]
        ```

        ```python
        # Block Ids
        input_tasks = ['018dfb34-fc19-4334-8125-14fd7535f979',
                       '4ed70368-d4e1-4462-bef6-14e768049471']
        ```

    !!! Info "Using Custom Blocks"
        To use a custom block in your workspace, you need to provide the custom block
        id directly in the [full workflow definition](https://docs.up42.com/going-further/api-walkthrough.html#creating-the-the-second-task-processing-block-addition)
        (dict of block id, block name and parent block name). See example below.

    Examples:
        ```python
        # Full workflow definition
        input_tasks = [{'name': 'sentinelhub-s2:1',
                        'parentName': None,
                        'blockId': '018dfb34-fc19-4334-8125-14fd7535f979'},
                       {'name': 'tiling:1',
                        'parentName': 'sentinelhub-s2:1',
                        'blockId': '4ed70368-d4e1-4462-bef6-14e768049471'}]
        ```
    """
    # Construct proper task definition from simplified input.
    if isinstance(input_tasks[0], str) and not isinstance(input_tasks[0], dict):
        input_tasks = self._construct_full_workflow_tasks_dict(input_tasks)

    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}/tasks/"
    )
    self.auth._request(request_type="POST", url=url, data=input_tasks)
    logger.info(f"Added tasks to workflow: {input_tasks}")

construct_parameters(self, geometry=None, geometry_operation=None, handle_multiple_features='union', start_date=None, end_date=None, limit=None, scene_ids=None, assets=None)

Constructs workflow input parameters with a specified aoi, the default input parameters, and optionally limit and order-ids. Further parameter editing needs to be done manually via dict.update({key:value}).

Parameters:

Name Type Description Default
geometry Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, geojson.geometry.Polygon, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon, shapely.geometry.point.Point]

One of Dict, FeatureCollection, Feature, List, GeoDataFrame, shapely.geometry.Polygon, shapely.geometry.Point. All assume EPSG 4326.

None
geometry_operation Optional[str]

Desired operation, One of "bbox", "intersects", "contains".

None
limit Optional[int]

Maximum number of expected results.

None
start_date Optional[str]

Query period starting day, format "2020-01-01".

None
end_date Optional[str]

Query period ending day, format "2020-01-01".

None
scene_ids Optional[list]

List of scene_ids, if given ignores all other parameters except geometry.

None
assets Optional[List[up42.asset.Asset]]

Optional, can be used to incorporate existing assets in Storage (result of Orders for instance) into new workflows.

None

Returns:

Type Description
dict

Dictionary of constructed input parameters.

Source code in up42/workflow.py
def construct_parameters(
    self,
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        geojson_Polygon,
        list,
        GeoDataFrame,
        Polygon,
        Point,
    ] = None,
    geometry_operation: Optional[str] = None,
    handle_multiple_features: str = "union",
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    limit: Optional[int] = None,
    scene_ids: Optional[list] = None,
    assets: Optional[List[Asset]] = None,
) -> dict:
    """
    Constructs workflow input parameters with a specified aoi, the default input parameters, and
    optionally limit and order-ids. Further parameter editing needs to be done manually
    via dict.update({key:value}).

    Args:
        geometry: One of Dict, FeatureCollection, Feature, List,
            GeoDataFrame, shapely.geometry.Polygon, shapely.geometry.Point. All
            assume EPSG 4326.
        geometry_operation: Desired operation, One of "bbox", "intersects", "contains".
        limit: Maximum number of expected results.
        start_date: Query period starting day, format "2020-01-01".
        end_date: Query period ending day, format "2020-01-01".
        scene_ids: List of scene_ids, if given ignores all other parameters except geometry.
        assets: Optional, can be used to incorporate existing assets in Storage (result
            of Orders for instance) into new workflows.

    Returns:
        Dictionary of constructed input parameters.
    """
    input_parameters = self._get_default_parameters()
    try:
        data_block_name = list(input_parameters.keys())[0]
    except IndexError as e:
        raise ValueError("The Workflow has no workflow tasks.") from e

    if assets is not None:
        # Needs to be handled in this function(not run_job) as it is only
        # relevant for the data block.
        asset_ids = [asset.asset_id for asset in assets if asset.source == "BLOCK"]
        if not asset_ids:
            raise ValueError(
                "None of the assets are usable in a workflow since the source is not `BLOCK`."
            )
        input_parameters[data_block_name] = {"asset_ids": asset_ids}
    else:
        if limit is not None:
            input_parameters[data_block_name]["limit"] = limit

        if scene_ids is not None:
            if not isinstance(scene_ids, list):
                scene_ids = [scene_ids]
            input_parameters[data_block_name]["ids"] = scene_ids
            input_parameters[data_block_name]["limit"] = len(scene_ids)
            input_parameters[data_block_name].pop("time")
        elif start_date is not None and end_date is not None:
            time = f"{start_date}T00:00:00Z/{end_date}T23:59:59Z"
            input_parameters[data_block_name]["time"] = time

        if geometry is not None:
            aoi_fc = any_vector_to_fc(
                vector=geometry,
            )
            aoi_feature = fc_to_query_geometry(
                fc=aoi_fc,
                geometry_operation=geometry_operation,  # type: ignore
                squash_multiple_features=handle_multiple_features,
            )

            input_parameters[data_block_name][geometry_operation] = aoi_feature
    return input_parameters

construct_parameters_parallel(self, geometries=None, interval_dates=None, scene_ids=None, limit_per_job=1, geometry_operation='intersects')

Maps a list of geometries and a list of time series into a list of input parameters of a workflow. If you pass 2 geometries and 1 time interval this will result in 2 x 1 input parameters.

Parameters:

Name Type Description Default
geometries List[Union[dict, geojson.feature.Feature, geojson.geometry.Polygon, shapely.geometry.polygon.Polygon, shapely.geometry.point.Point]]

List of unit geometries to map with times.

None
interval_dates List[Tuple[str, str]]

List of tuples of start and end dates, i.e. ("2014-01-01","2015-01-01").

None
scene_ids List[str]

List of scene ids. Will be mapped 1:1 to each job. All other arguments are ignored except geometries if passed.

None
limit_per_job int

Limit passed to be passed to each individual job parameter.

1
geometry_operation str

Geometry operation to be passed to each job parameter.

'intersects'

Returns:

Type Description
List[dict]

List of dictionary of constructed input parameters.

Source code in up42/workflow.py
def construct_parameters_parallel(
    self,
    geometries: List[
        Union[
            dict,
            Feature,
            geojson_Polygon,
            Polygon,
            Point,
        ]
    ] = None,
    interval_dates: List[Tuple[str, str]] = None,
    scene_ids: List[str] = None,
    limit_per_job: int = 1,
    geometry_operation: str = "intersects",
) -> List[dict]:
    """
    Maps a list of geometries and a list of time series into a list
    of input parameters of a workflow. If you pass 2 geometries and 1 time
    interval this will result in 2 x 1 input parameters.

    Args:
        geometries: List of unit geometries to map with times.
        interval_dates: List of tuples of start and end dates,
            i.e. `("2014-01-01","2015-01-01")`.
        scene_ids: List of scene ids. Will be mapped 1:1 to each job.
            All other arguments are ignored except geometries if passed.
        limit_per_job: Limit passed to be passed to each individual job parameter.
        geometry_operation: Geometry operation to be passed to each job parameter.

    Returns:
        List of dictionary of constructed input parameters.
    """
    # TODO: Rename arguments
    result_params = []
    # scene_ids mapped to geometries
    if scene_ids is not None and geometries is not None:
        for geo in geometries:
            for scene_id in scene_ids:
                params = self.construct_parameters(
                    geometry=geo,
                    scene_ids=[scene_id],
                    geometry_operation=geometry_operation,
                )
                result_params.append(params)

    # interval_dates mapped to geometries
    elif interval_dates is not None and geometries is not None:
        for geo in geometries:
            for start_date, end_date in interval_dates:
                params = self.construct_parameters(
                    geometry=geo,
                    geometry_operation=geometry_operation,
                    start_date=start_date,
                    end_date=end_date,
                    limit=limit_per_job,
                )
                result_params.append(params)

    # only scene_ids
    elif scene_ids is not None:
        for scene_id in scene_ids:
            result_params.append(
                self.construct_parameters(
                    geometry=None,
                    scene_ids=[scene_id],
                )
            )
    else:
        raise ValueError(
            "Please provides geometries and scene_ids, geometries"
            "and time_interval or scene_ids."
        )

    return result_params

delete(self)

Deletes the workflow and sets the Python object to None.

Source code in up42/workflow.py
def delete(self) -> None:
    """
    Deletes the workflow and sets the Python object to None.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}"
    )
    self.auth._request(request_type="DELETE", url=url, return_text=False)
    logger.info(f"Successfully deleted workflow: {self.workflow_id}")
    del self

estimate_job(self, input_parameters=None)

Estimation of price and duration of the workflow for the provided input parameters.

Parameters:

Name Type Description Default
input_parameters Union[dict, str, pathlib.Path]

Either json string of workflow parameters or filepath to json.

None

Returns:

Type Description
dict

A dictionary of estimation for each task in the workflow.

Source code in up42/workflow.py
def estimate_job(self, input_parameters: Union[dict, str, Path] = None) -> dict:
    """
    Estimation of price and duration of the workflow for the provided input parameters.

    Args:
        input_parameters: Either json string of workflow parameters or filepath to json.

    Returns:
        A dictionary of estimation for each task in the workflow.
    """
    if input_parameters is None:
        raise ValueError(
            "Select the job_parameters, use workflow.construct_parameters()!"
        )

    workflow_tasks = self.workflow_tasks
    block_names = [task_name.split(":")[0] for task_name in workflow_tasks.keys()]
    input_tasks = self._construct_full_workflow_tasks_dict(block_names)
    for task in input_tasks:
        task["blockVersionTag"] = workflow_tasks[task["name"]]

    estimation = Estimation(
        auth=self.auth, input_parameters=input_parameters, input_tasks=input_tasks
    ).estimate()

    min_credits, max_credits, min_duration, max_duration = [], [], [], []
    for e in estimation.values():
        min_credits.append(e["blockConsumption"]["credit"]["min"])
        max_credits.append(e["blockConsumption"]["credit"]["max"])
        min_credits.append(e["machineConsumption"]["credit"]["min"])
        max_credits.append(e["machineConsumption"]["credit"]["max"])

        min_duration.append(e["machineConsumption"]["duration"]["min"])
        max_duration.append(e["machineConsumption"]["duration"]["max"])

    logger.info(
        f"Estimated: {sum(min_credits)}-{sum(max_credits)} Credits, "
        f"Duration: {int(sum(min_duration) / 60)}-{int(sum(max_duration) / 60)} min."
    )

    return estimation

get_compatible_blocks(self)

Gets all compatible blocks for the current workflow. If the workflow is empty it will provide all data blocks, if the workflow already has workflow tasks, it will provide the compatible blocks for the last workflow task in the workflow.

Currently no data blocks can be attached to other data blocks.

Source code in up42/workflow.py
def get_compatible_blocks(self) -> dict:
    """
    Gets all compatible blocks for the current workflow. If the workflow is empty
    it will provide all data blocks, if the workflow already has workflow tasks, it
    will provide the compatible blocks for the last workflow task in the workflow.

    Currently no data blocks can be attached to other data blocks.
    """
    last_task = list(self.get_workflow_tasks(basic=True).keys())[-1]  # type: ignore
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/{self.workflow_id}/"
        f"compatible-blocks?parentTaskName={last_task}"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    compatible_blocks = response_json["data"]["blocks"]
    compatible_blocks = {
        block["name"]: block["blockId"] for block in compatible_blocks
    }
    return compatible_blocks

get_jobs(self, return_json=False, test_jobs=True, real_jobs=True)

Get all jobs associated with the workflow as a JobCollection or json.

Parameters:

Name Type Description Default
return_json bool

If true, returns the job info jsons instead of a JobCollection.

False
test_jobs bool

Return test jobs or test queries.

True
real_jobs bool

Return real jobs.

True

Returns:

Type Description
Union[up42.jobcollection.JobCollection, List[Dict]]

A JobCollection, or alternatively the jobs info as json.

Source code in up42/workflow.py
def get_jobs(
    self, return_json: bool = False, test_jobs: bool = True, real_jobs: bool = True
) -> Union[JobCollection, List[Dict]]:
    """
    Get all jobs associated with the workflow as a JobCollection or json.

    Args:
        return_json: If true, returns the job info jsons instead of a JobCollection.
        test_jobs: Return test jobs or test queries.
        real_jobs: Return real jobs.

    Returns:
        A JobCollection, or alternatively the jobs info as json.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs"
    response_json = self.auth._request(request_type="GET", url=url)
    jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

    jobs_workflow_json = [
        j for j in jobs_json if j["workflowId"] == self.workflow_id
    ]

    logger.info(
        f"Got {len(jobs_workflow_json)} jobs for workflow "
        f"{self.workflow_id} in project {self.project_id}."
    )
    if return_json:
        return jobs_workflow_json
    else:
        jobs = [
            Job(self.auth, job_id=job["id"], project_id=self.project_id)
            for job in tqdm(jobs_workflow_json)
        ]
        jobcollection = JobCollection(
            auth=self.auth, project_id=self.project_id, jobs=jobs
        )
        return jobcollection

get_parameters_info(self)

Gets infos about the workflow parameters of each block in the current workflow to make it easy to construct the desired parameters.

Returns:

Type Description
dict

Workflow parameters info json.

Source code in up42/workflow.py
def get_parameters_info(self) -> dict:
    """
    Gets infos about the workflow parameters of each block in the current workflow
    to make it easy to construct the desired parameters.

    Returns:
        Workflow parameters info json.
    """
    workflow_parameters_info = {}
    workflow_tasks = self.get_workflow_tasks()
    for task in workflow_tasks:
        task_name = task["name"]
        task_default_parameters = task["block"]["parameters"]
        workflow_parameters_info[task_name] = task_default_parameters
    return workflow_parameters_info

get_workflow_tasks(self, basic=False)

Get the workflow-tasks of the workflow (Blocks in a workflow are called workflow_tasks).

Parameters:

Name Type Description Default
basic bool

If selected returns a simplified task-name : block-version dict.

False

Returns:

Type Description
Union[List, dict]

The workflow task info.

Source code in up42/workflow.py
def get_workflow_tasks(self, basic: bool = False) -> Union[List, dict]:
    """
    Get the workflow-tasks of the workflow (Blocks in a workflow are called workflow_tasks).

    Args:
        basic: If selected returns a simplified task-name : block-version dict.

    Returns:
        The workflow task info.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}/tasks"
    )

    response_json = self.auth._request(request_type="GET", url=url)
    tasks = response_json["data"]
    logger.info(f"Got {len(tasks)} tasks/blocks in workflow {self.workflow_id}.")

    if basic:
        return {task["name"]: task["blockVersionTag"] for task in tasks}
    else:
        return tasks

run_job(self, input_parameters=None, track_status=False, name=None)

Creates and runs a new job.

Parameters:

Name Type Description Default
input_parameters Union[Dict, str, pathlib.Path]

Either json string of workflow parameters or filepath to json.

None
track_status bool

Automatically attaches workflow.track_status which queries the job status every 30 seconds.

False
name str

The job name. Optional, by default the workflow name is assigned.

None

Returns:

Type Description
Job

The spawned job object.

Source code in up42/workflow.py
def run_job(
    self,
    input_parameters: Union[Dict, str, Path] = None,
    track_status: bool = False,
    name: str = None,
) -> "Job":
    """
    Creates and runs a new job.

    Args:
        input_parameters: Either json string of workflow parameters or filepath to json.
        track_status: Automatically attaches workflow.track_status which queries
            the job status every 30 seconds.
        name: The job name. Optional, by default the workflow name is assigned.

    Returns:
        The spawned job object.
    """
    return self._helper_run_job(
        input_parameters=input_parameters, track_status=track_status, name=name
    )

run_jobs_parallel(self, input_parameters_list=None, name=None, max_concurrent_jobs=10)

Create and run jobs in parallel.

Parameters:

Name Type Description Default
input_parameters_list Optional[List[dict]]

List of dictionary of input parameters

None
name str

The job name. Optional, by default the workflow name is assigned.

None
max_concurrent_jobs int

The maximum number of jobs to run in parallel. This is defined in the project settings.

10

Returns:

Type Description
JobCollection

The spawned JobCollection object.

Exceptions:

Type Description
ValueError

When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.

Source code in up42/workflow.py
def run_jobs_parallel(
    self,
    input_parameters_list: Optional[List[dict]] = None,
    name: str = None,
    max_concurrent_jobs: int = 10,
) -> "JobCollection":
    """
    Create and run jobs in parallel.

    Args:
        input_parameters_list: List of dictionary of input parameters
        name: The job name. Optional, by default the workflow name is assigned.
        max_concurrent_jobs: The maximum number of jobs to run in parallel. This is defined in the project settings.

    Returns:
        The spawned JobCollection object.

    Raises:
        ValueError: When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.
    """
    jobcollection = self._helper_run_parallel_jobs(
        input_parameters_list=input_parameters_list,
        max_concurrent_jobs=max_concurrent_jobs,
        name=name,
    )
    return jobcollection

test_job(self, input_parameters=None, track_status=False, name=None, get_estimation=False)

Create a run a new test job (Test Query). With this test query you will not be charged with any data or processing credits, but have a preview of the job result.

Parameters:

Name Type Description Default
input_parameters Union[Dict, str, pathlib.Path]

Either json string of workflow parameters or filepath to json.

None
track_status bool

Automatically attaches workflow.track_status which queries the job status every 30 seconds.

False
name str

The job name. Optional, by default the workflow name is assigned.

None

Returns:

Type Description
Job

The spawned test job object.

Source code in up42/workflow.py
def test_job(
    self,
    input_parameters: Union[Dict, str, Path] = None,
    track_status: bool = False,
    name: str = None,
    get_estimation: bool = False,
) -> "Job":
    """
    Create a run a new test job (Test Query). With this test query you will not be
    charged with any data or processing credits, but have a preview of the job result.

    Args:
        input_parameters: Either json string of workflow parameters or filepath to json.
        track_status: Automatically attaches workflow.track_status which queries
            the job status every 30 seconds.
        name: The job name. Optional, by default the workflow name is assigned.

    Returns:
        The spawned test job object.
    """
    if get_estimation:
        self.estimate_job(input_parameters)

    return self._helper_run_job(
        input_parameters=input_parameters,
        test_job=True,
        track_status=track_status,
        name=name,
    )

test_jobs_parallel(self, input_parameters_list=None, name=None, max_concurrent_jobs=10)

Create and run test jobs (Test Query) in parallel. With this test query you will not be charged with any data or processing credits, but have a preview of the job result.

Parameters:

Name Type Description Default
input_parameters_list List[dict]

List of dictionary of input parameters

None
name str

The job name. Optional, by default the workflow name is assigned.

None
max_concurrent_jobs int

The maximum number of jobs to run in parallel. This is defined in the project settings.

10

Returns:

Type Description
JobCollection

The spawned test JobCollection object.

Exceptions:

Type Description
ValueError

When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.

Source code in up42/workflow.py
def test_jobs_parallel(
    self,
    input_parameters_list: List[dict] = None,
    name: str = None,
    max_concurrent_jobs: int = 10,
) -> "JobCollection":
    """
    Create and run test jobs (Test Query) in parallel. With this test query you will not be
    charged with any data or processing credits, but have a preview of the job result.

    Args:
        input_parameters_list: List of dictionary of input parameters
        name: The job name. Optional, by default the workflow name is assigned.
        max_concurrent_jobs: The maximum number of jobs to run in parallel.
            This is defined in the project settings.

    Returns:
        The spawned test JobCollection object.

    Raises:
        ValueError: When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.
    """
    return self._helper_run_parallel_jobs(
        input_parameters_list=input_parameters_list,
        max_concurrent_jobs=max_concurrent_jobs,
        test_job=True,
        name=name,
    )

update_name(self, name=None, description=None)

Updates the workflow name and description.

Parameters:

Name Type Description Default
name Optional[str]

New name of the workflow.

None
description Optional[str]

New description of the workflow.

None
Source code in up42/workflow.py
def update_name(
    self, name: Optional[str] = None, description: Optional[str] = None
) -> None:
    """
    Updates the workflow name and description.

    Args:
        name: New name of the workflow.
        description: New description of the workflow.
    """
    properties_to_update = {"name": name, "description": description}
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}"
    )
    self.auth._request(request_type="PUT", url=url, data=properties_to_update)
    # TODO: Renew info
    logger.info(f"Updated workflow name: {properties_to_update}")