Skip to content

Workflow

Workflow object

With whe workflow object you can configure & run jobs and query exisiting jobs of this workflow.

Initialize an existing workflow:

workflow = up42.initialize_workflow(workflow_id="12345")

Create a new workflow:

workflow = project.create_workflow(name="new_workflow")


Attributes

info: Dict property readonly

Gets the workflow metadata information.

max_concurrent_jobs: int property readonly

Gets the maximum number of concurrent jobs allowed by the project settings.

workflow_tasks: Dict[str, str] property readonly

Gets the building blocks of the workflow as a dictionary with task_name : block-version

Methods

add_workflow_tasks(self, input_tasks)

Adds or overwrites workflow tasks in a workflow on UP42.

Parameters:

Name Type Description Default
input_tasks Union[List[str], List[Dict]]

The input tasks, specifying the blocks. Can be a list of the block ids, block names or block display names (The name shown on the marketplace.

required

Info

With block names or block display names, the most recent version of a block will always be added. Using block ids specifies a specific version of the block that will be added to the workflow.

Examples:

input_tasks = ["sobloo-s2-l1c-aoiclipped", "tiling"]
input_tasks = ["Sentinel-2 L1C MSI AOI clipped",
               "Raster Tiling"]
input_tasks = ['a2daaab4-196d-4226-a018-a810444dcad1',
               '4ed70368-d4e1-4462-bef6-14e768049471']
Source code in up42/workflow.py
def add_workflow_tasks(self, input_tasks: Union[List[str], List[Dict]]) -> None:
    """
    Adds or overwrites workflow tasks in a workflow on UP42.

    Args:
        input_tasks: The input tasks, specifying the blocks. Can be a list of the
            block ids, block names or block display names (The name shown on the
            [marketplace](https://marketplace.up42.com).

    !!! Info
        With block names or block display names, the most recent version of a block
        will always be added. Using block ids specifies a specific version of the
        block that will be added to the workflow.

    Example:
        ```python
        input_tasks = ["sobloo-s2-l1c-aoiclipped", "tiling"]
        ```

        ```python
        input_tasks = ["Sentinel-2 L1C MSI AOI clipped",
                       "Raster Tiling"]
        ```

        ```python
        input_tasks = ['a2daaab4-196d-4226-a018-a810444dcad1',
                       '4ed70368-d4e1-4462-bef6-14e768049471']
        ```
    """
    # Relevant when non-linear workflows are introduced:
    # Optional:
    #     The input_tasks can also be provided as the full, detailed workflow task
    #     definition (dict of block id, block name and parent block name). Always use :1
    #     to be able to identify the order when two times the same workflow task is used.
    #     The name is arbitrary, but best use the block name.
    #
    # Example:
    #     ```python
    #     input_tasks_full = [{'name': 'sobloo-s2-l1c-aoiclipped:1',
    #                          'parentName': None,
    #                          'blockId': 'a2daaab4-196d-4226-a018-a810444dcad1'},
    #                         {'name': 'sharpening:1',
    #                          'parentName': 'sobloo-s2-l1c-aoiclipped',
    #                          'blockId': '4ed70368-d4e1-4462-bef6-14e768049471'}]

    # Construct proper task definition from simplified input.
    if isinstance(input_tasks[0], str) and not isinstance(input_tasks[0], dict):
        input_tasks = self._construct_full_workflow_tasks_dict(input_tasks)

    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}/tasks/"
    )
    self.auth._request(request_type="POST", url=url, data=input_tasks)
    logger.info(f"Added tasks to workflow: {input_tasks}")

construct_parameters(self, geometry=None, geometry_operation=None, handle_multiple_features='union', start_date=None, end_date=None, limit=None, scene_ids=None, order_ids=None)

Constructs workflow input parameters with a specified aoi, the default input parameters, and optionally limit and order-ids. Further parameter editing needs to be done manually via dict.update({key:value}).

Parameters:

Name Type Description Default
geometry Union[Dict, geojson.feature.Feature, geojson.feature.FeatureCollection, geojson.geometry.Polygon, List, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon, shapely.geometry.point.Point]

One of Dict, FeatureCollection, Feature, List, GeoDataFrame, shapely.geometry.Polygon, shapely.geometry.Point. All assume EPSG 4326.

None
geometry_operation str

Desired operation, One of "bbox", "intersects", "contains".

None
limit int

Maximum number of expected results.

None
start_date str

Query period starting day, format "2020-01-01".

None
end_date str

Query period ending day, format "2020-01-01".

None
scene_ids List

List of scene_ids, if given ignores all other parameters except geometry.

None
order_ids List[str]

Optional, can be used to incorporate existing bought imagery on UP42 into new workflows.

None

Returns:

Type Description
Dict

Dictionary of constructed input parameters.

Source code in up42/workflow.py
def construct_parameters(
    self,
    geometry: Union[
        Dict,
        Feature,
        FeatureCollection,
        geojson_Polygon,
        List,
        GeoDataFrame,
        Polygon,
        Point,
    ] = None,
    geometry_operation: str = None,
    handle_multiple_features: str = "union",
    start_date: str = None,
    end_date: str = None,
    limit: int = None,
    scene_ids: List = None,
    order_ids: List[str] = None,
) -> Dict:
    """
    Constructs workflow input parameters with a specified aoi, the default input parameters, and
    optionally limit and order-ids. Further parameter editing needs to be done manually
    via dict.update({key:value}).

    Args:
        geometry: One of Dict, FeatureCollection, Feature, List,
            GeoDataFrame, shapely.geometry.Polygon, shapely.geometry.Point. All
            assume EPSG 4326.
        geometry_operation: Desired operation, One of "bbox", "intersects", "contains".
        limit: Maximum number of expected results.
        start_date: Query period starting day, format "2020-01-01".
        end_date: Query period ending day, format "2020-01-01".
        scene_ids: List of scene_ids, if given ignores all other parameters except geometry.
        order_ids: Optional, can be used to incorporate existing bought imagery on UP42
            into new workflows.

    Returns:
        Dictionary of constructed input parameters.
    """
    input_parameters = self._get_default_parameters()
    try:
        data_block_name = list(input_parameters.keys())[0]
    except IndexError as e:
        raise ValueError("The Workflow has no workflow tasks.") from e

    if order_ids is not None:
        # Needs to be handled in this function(not run_job) as it is only
        # relevant for the data block.
        input_parameters[data_block_name] = {"order_ids": order_ids}
    else:
        if limit is not None:
            input_parameters[data_block_name]["limit"] = limit

        if scene_ids is not None:
            if not isinstance(scene_ids, list):
                scene_ids = [scene_ids]
            input_parameters[data_block_name]["ids"] = scene_ids
            input_parameters[data_block_name]["limit"] = len(scene_ids)
            input_parameters[data_block_name].pop("time")
        elif start_date is not None and end_date is not None:
            time = f"{start_date}T00:00:00Z/{end_date}T23:59:59Z"
            input_parameters[data_block_name]["time"] = time

        if geometry is not None:
            aoi_fc = any_vector_to_fc(
                vector=geometry,
            )
            aoi_feature = fc_to_query_geometry(
                fc=aoi_fc,
                geometry_operation=geometry_operation,  # type: ignore
                squash_multiple_features=handle_multiple_features,
            )

            input_parameters[data_block_name][geometry_operation] = aoi_feature
    return input_parameters

construct_parameters_parallel(self, geometries=None, interval_dates=None, scene_ids=None, limit_per_job=1, geometry_operation='intersects')

Maps a list of geometries and a list of time series into a list of input parameters of a workflow. If you pass 2 geometries and 1 time interval this will result in 2 x 1 input parameters.

Parameters:

Name Type Description Default
geometries List[Union[Dict, geojson.feature.Feature, geojson.geometry.Polygon, shapely.geometry.polygon.Polygon, shapely.geometry.point.Point]]

List of unit geometries to map with times.

None
interval_dates List[Tuple[str, str]]

List of tuples of start and end dates, i.e. ("2014-01-01","2015-01-01").

None
scene_ids List

List of scene ids. Will be mapped 1:1 to each job. All other arguments are ignored except geometries if passed.

None
limit_per_job int

Limit passed to be passed to each individual job parameter.

1
geometry_operation str

Geometry operation to be passed to each job parameter.

'intersects'

Returns:

Type Description
List[dict]

List of dictionary of constructed input parameters.

Source code in up42/workflow.py
def construct_parameters_parallel(
    self,
    geometries: List[
        Union[
            Dict,
            Feature,
            geojson_Polygon,
            Polygon,
            Point,
        ]
    ] = None,
    interval_dates: List[Tuple[str, str]] = None,
    scene_ids: List = None,
    limit_per_job: int = 1,
    geometry_operation: str = "intersects",
) -> List[dict]:
    """
    Maps a list of geometries and a list of time series into a list
    of input parameters of a workflow. If you pass 2 geometries and 1 time
    interval this will result in 2 x 1 input parameters.

    Args:
        geometries: List of unit geometries to map with times.
        interval_dates: List of tuples of start and end dates,
            i.e. `("2014-01-01","2015-01-01")`.
        scene_ids: List of scene ids. Will be mapped 1:1 to each job.
            All other arguments are ignored except geometries if passed.
        limit_per_job: Limit passed to be passed to each individual job parameter.
        geometry_operation: Geometry operation to be passed to each job parameter.

    Returns:
        List of dictionary of constructed input parameters.
    """
    # TODO: Rename arguments
    result_params = []
    # scene_ids mapped to geometries
    if scene_ids is not None and geometries is not None:
        for geo in geometries:
            for scene_id in scene_ids:
                params = self.construct_parameters(
                    geometry=geo,
                    scene_ids=[scene_id],
                    geometry_operation=geometry_operation,
                )
                result_params.append(params)

    # interval_dates mapped to geometries
    elif interval_dates is not None and geometries is not None:
        for geo in geometries:
            for start_date, end_date in interval_dates:
                params = self.construct_parameters(
                    geometry=geo,
                    geometry_operation=geometry_operation,
                    start_date=start_date,
                    end_date=end_date,
                    limit=limit_per_job,
                )
                result_params.append(params)

    # only scene_ids
    elif scene_ids is not None:
        for scene_id in scene_ids:
            result_params.append(
                self.construct_parameters(
                    geometry=None,
                    scene_ids=[scene_id],
                )
            )
    else:
        raise ValueError(
            "Please provides geometries and scene_ids, geometries"
            "and time_interval or scene_ids."
        )

    return result_params

delete(self)

Deletes the workflow and sets the Python object to None.

Source code in up42/workflow.py
def delete(self) -> None:
    """
    Deletes the workflow and sets the Python object to None.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}"
    )
    self.auth._request(request_type="DELETE", url=url, return_text=False)
    logger.info(f"Successfully deleted workflow: {self.workflow_id}")
    del self

get_compatible_blocks(self)

Gets all compatible blocks for the current workflow. If the workflow is empty it will provide all data blocks, if the workflow already has workflow tasks, it will provide the compatible blocks for the last workflow task in the workflow.

Currently no data blocks can be attached to other data blocks.

Source code in up42/workflow.py
def get_compatible_blocks(self) -> Dict:
    """
    Gets all compatible blocks for the current workflow. If the workflow is empty
    it will provide all data blocks, if the workflow already has workflow tasks, it
    will provide the compatible blocks for the last workflow task in the workflow.

    Currently no data blocks can be attached to other data blocks.
    """
    last_task = list(self.get_workflow_tasks(basic=True).keys())[-1]  # type: ignore
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/{self.workflow_id}/"
        f"compatible-blocks?parentTaskName={last_task}"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    compatible_blocks = response_json["data"]["blocks"]
    compatible_blocks = {
        block["name"]: block["blockId"] for block in compatible_blocks
    }
    return compatible_blocks

get_jobs(self, return_json=False, test_jobs=True, real_jobs=True)

Get all jobs associated with the workflow as a JobCollection or json.

Parameters:

Name Type Description Default
return_json bool

If true, returns the job info jsons instead of a JobCollection.

False
test_jobs bool

Return test jobs or test queries.

True
real_jobs bool

Return real jobs.

True

Returns:

Type Description
Union[up42.jobcollection.JobCollection, List[Dict]]

A JobCollection, or alternatively the jobs info as json.

Source code in up42/workflow.py
def get_jobs(
    self, return_json: bool = False, test_jobs: bool = True, real_jobs: bool = True
) -> Union[JobCollection, List[Dict]]:
    """
    Get all jobs associated with the workflow as a JobCollection or json.

    Args:
        return_json: If true, returns the job info jsons instead of a JobCollection.
        test_jobs: Return test jobs or test queries.
        real_jobs: Return real jobs.

    Returns:
        A JobCollection, or alternatively the jobs info as json.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs"
    response_json = self.auth._request(request_type="GET", url=url)
    jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

    jobs_workflow_json = [
        j for j in jobs_json if j["workflowId"] == self.workflow_id
    ]

    logger.info(
        f"Got {len(jobs_workflow_json)} jobs for workflow "
        f"{self.workflow_id} in project {self.project_id}."
    )
    if return_json:
        return jobs_workflow_json
    else:
        jobs = [
            Job(self.auth, job_id=job["id"], project_id=self.project_id)
            for job in tqdm(jobs_workflow_json)
        ]
        jobcollection = JobCollection(
            auth=self.auth, project_id=self.project_id, jobs=jobs
        )
        return jobcollection

get_parameters_info(self)

Gets infos about the workflow parameters of each block in the current workflow to make it easy to construct the desired parameters.

Returns:

Type Description
Dict

Workflow parameters info json.

Source code in up42/workflow.py
def get_parameters_info(self) -> Dict:
    """
    Gets infos about the workflow parameters of each block in the current workflow
    to make it easy to construct the desired parameters.

    Returns:
        Workflow parameters info json.
    """
    workflow_parameters_info = {}
    workflow_tasks = self.get_workflow_tasks()
    for task in workflow_tasks:
        task_name = task["name"]
        task_default_parameters = task["block"]["parameters"]
        workflow_parameters_info[task_name] = task_default_parameters
    return workflow_parameters_info

get_workflow_tasks(self, basic=False)

Get the workflow-tasks of the workflow (Blocks in a workflow are called workflow_tasks)

Parameters:

Name Type Description Default
basic bool

If selected returns a simplified task-name : block-version dict.

False

Returns:

Type Description
Union[List, Dict]

The workflow task info.

Source code in up42/workflow.py
def get_workflow_tasks(self, basic: bool = False) -> Union[List, Dict]:
    """
    Get the workflow-tasks of the workflow (Blocks in a workflow are called workflow_tasks)

    Args:
        basic: If selected returns a simplified task-name : block-version dict.

    Returns:
        The workflow task info.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}/tasks"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    tasks = response_json["data"]
    logger.info(f"Got {len(tasks)} tasks/blocks in workflow {self.workflow_id}.")

    if basic:
        return {task["name"]: task["blockVersionTag"] for task in tasks}
    else:
        return tasks

run_job(self, input_parameters=None, track_status=False, name=None)

Creates and runs a new job.

Parameters:

Name Type Description Default
input_parameters Union[Dict, str, pathlib.Path]

Either json string of workflow parameters or filepath to json.

None
track_status bool

Automatically attaches workflow.track_status which queries the job status every 30 seconds.

False
name str

The job name. Optional, by default the workflow name is assigned.

None

Returns:

Type Description
Job

The spawned job object.

Source code in up42/workflow.py
def run_job(
    self,
    input_parameters: Union[Dict, str, Path] = None,
    track_status: bool = False,
    name: str = None,
) -> "Job":
    """
    Creates and runs a new job.

    Args:
        input_parameters: Either json string of workflow parameters or filepath to json.
        track_status: Automatically attaches workflow.track_status which queries
            the job status every 30 seconds.
        name: The job name. Optional, by default the workflow name is assigned.

    Returns:
        The spawned job object.
    """
    return self._helper_run_job(
        input_parameters=input_parameters, track_status=track_status, name=name
    )

run_jobs_parallel(self, input_parameters_list=None, name=None, max_concurrent_jobs=10)

Create and run jobs in parallel.

Parameters:

Name Type Description Default
input_parameters_list List[Dict]

List of dictionary of input parameters

None
name str

The job name. Optional, by default the workflow name is assigned.

None
max_concurrent_jobs int

The maximum number of jobs to run in parallel. This is defined in the project settings.

10

Returns:

Type Description
JobCollection

The spawned JobCollection object.

Exceptions:

Type Description
ValueError

When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.

Source code in up42/workflow.py
def run_jobs_parallel(
    self,
    input_parameters_list: List[Dict] = None,
    name: str = None,
    max_concurrent_jobs: int = 10,
) -> "JobCollection":
    """
    Create and run jobs in parallel.

    Args:
        input_parameters_list: List of dictionary of input parameters
        name: The job name. Optional, by default the workflow name is assigned.
        max_concurrent_jobs: The maximum number of jobs to run in parallel. This is defined in the project settings.

    Returns:
        The spawned JobCollection object.

    Raises:
        ValueError: When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.
    """
    jobcollection = self._helper_run_parallel_jobs(
        input_parameters_list=input_parameters_list,
        max_concurrent_jobs=max_concurrent_jobs,
        name=name,
    )
    return jobcollection

test_job(self, input_parameters=None, track_status=False, name=None)

Create a run a new test job (Test Query). With this test query you will not be charged with any data or processing credits, but have a preview of the job result.

Parameters:

Name Type Description Default
input_parameters Union[Dict, str, pathlib.Path]

Either json string of workflow parameters or filepath to json.

None
track_status bool

Automatically attaches workflow.track_status which queries the job status every 30 seconds.

False
name str

The job name. Optional, by default the workflow name is assigned.

None

Returns:

Type Description
Job

The spawned test job object.

Source code in up42/workflow.py
def test_job(
    self,
    input_parameters: Union[Dict, str, Path] = None,
    track_status: bool = False,
    name: str = None,
) -> "Job":
    """
    Create a run a new test job (Test Query). With this test query you will not be
    charged with any data or processing credits, but have a preview of the job result.

    Args:
        input_parameters: Either json string of workflow parameters or filepath to json.
        track_status: Automatically attaches workflow.track_status which queries
            the job status every 30 seconds.
        name: The job name. Optional, by default the workflow name is assigned.

    Returns:
        The spawned test job object.
    """
    return self._helper_run_job(
        input_parameters=input_parameters,
        test_job=True,
        track_status=track_status,
        name=name,
    )

test_jobs_parallel(self, input_parameters_list=None, name=None, max_concurrent_jobs=10)

Create and run test jobs (Test Query) in parallel. With this test query you will not be charged with any data or processing credits, but have a preview of the job result.

Parameters:

Name Type Description Default
input_parameters_list List[Dict]

List of dictionary of input parameters

None
name str

The job name. Optional, by default the workflow name is assigned.

None
max_concurrent_jobs int

The maximum number of jobs to run in parallel. This is defined in the project settings.

10

Returns:

Type Description
JobCollection

The spawned test JobCollection object.

Exceptions:

Type Description
ValueError

When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.

Source code in up42/workflow.py
def test_jobs_parallel(
    self,
    input_parameters_list: List[Dict] = None,
    name: str = None,
    max_concurrent_jobs: int = 10,
) -> "JobCollection":
    """
    Create and run test jobs (Test Query) in parallel. With this test query you will not be
    charged with any data or processing credits, but have a preview of the job result.

    Args:
        input_parameters_list: List of dictionary of input parameters
        name: The job name. Optional, by default the workflow name is assigned.
        max_concurrent_jobs: The maximum number of jobs to run in parallel.
            This is defined in the project settings.

    Returns:
        The spawned test JobCollection object.

    Raises:
        ValueError: When max_concurrent_jobs is greater than max_concurrent_jobs set in project settings.
    """
    return self._helper_run_parallel_jobs(
        input_parameters_list=input_parameters_list,
        max_concurrent_jobs=max_concurrent_jobs,
        test_job=True,
        name=name,
    )

update_name(self, name=None, description=None)

Updates the workflow name and description.

Parameters:

Name Type Description Default
name str

New name of the workflow.

None
description str

New description of the workflow.

None
Source code in up42/workflow.py
def update_name(self, name: str = None, description: str = None) -> None:
    """
    Updates the workflow name and description.

    Args:
        name: New name of the workflow.
        description: New description of the workflow.
    """
    properties_to_update = {"name": name, "description": description}
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        f"{self.workflow_id}"
    )
    self.auth._request(request_type="PUT", url=url, data=properties_to_update)
    # TODO: Renew info
    logger.info(f"Updated workflow name: {properties_to_update}")