Skip to content

JobCollection

The JobCollection class provides facilities for handling and downloading multiple jobs results as one object.

A jobcollection is created as the result of e.g. running multiple jobs in parallel:

jobcollection = workflow.run_jobs_parallel()

Initialize a jobcollection from existing jobs:

jobcollection = up42.initialize_jobcollection(job_ids=["12345", "6789"])

Attributes

info: Dict[str, dict] property readonly

Gets the metadata information for each job in the jobcollection, dictionary of job_id : job_information.

status: Dict[str, str] property readonly

Gets the status for each job in the jobcollection, a dictionary with job_id : job status.

Methods

apply(self, worker, only_succeeded=True, **kwargs)

Helper function to apply worker on all jobs in the collection. worker needs to accept Job as first argument. For example, a lambda function that returns the job info:

self.apply(lambda job: job.info)

Parameters:

Name Type Description Default
worker Callable

A function to apply on all jobs in the collection.

required
only_succeeded bool

Only apply to succeeded jobs (default is True).

True
kwargs

additional keyword arguments to pass to worker.

{}

Returns:

Type Description
Dict[str, Any]

Dictionary where the key is the job id and the value the return of worker.

Source code in up42/jobcollection.py
def apply(
    self, worker: Callable, only_succeeded: bool = True, **kwargs
) -> Dict[str, Any]:
    """
    Helper function to apply `worker` on all jobs in the collection.
    `worker` needs to accept `Job` as first argument. For example, a
    lambda function that returns the job info:
    ```python
    self.apply(lambda job: job.info)
    ```

    Args:
        worker: A function to apply on all jobs in the collection.
        only_succeeded: Only apply to succeeded jobs (default is `True`).
        kwargs: additional keyword arguments to pass to `worker`.
    Returns:
        Dictionary where the key is the job id and the value the return
        of `worker`.
    """
    if not self.jobs:
        raise ValueError(
            "This is an empty JobCollection. Cannot apply over an empty job list."
        )

    out_dict = {}
    for job in self.jobs:
        if only_succeeded:
            if job.is_succeeded:
                out_dict[job.job_id] = worker(job, **kwargs)
        else:
            out_dict[job.job_id] = worker(job, **kwargs)

    if not out_dict:
        raise ValueError(
            "All jobs have failed! Cannot apply over an empty succeeded job list."
        )

    return out_dict

download_results(self, output_directory=None, merge=True, unpacking=True)

Downloads the job results. The final results are individually downloaded and by default a merged data.json is generated with all the results in a single feature collection. Unpacking the final will happen as default.

Parameters:

Name Type Description Default
output_directory Union[str, pathlib.Path]

The file output directory, defaults to the current working directory.

None
merge bool

Wether to generate a merged data.json with all results.

True
unpacking bool

By default the final result which is in TAR archive format will be unpacked.

True

Returns:

Type Description
Dict[str, List[str]]

Dict of the job_ids and jobs' downloaded results filepaths. In addition, an additional key merged_result is added with the path to the merged data.json.

Source code in up42/jobcollection.py
def download_results(
    self,
    output_directory: Union[str, Path, None] = None,
    merge: bool = True,
    unpacking: bool = True,
) -> Dict[str, List[str]]:
    """
    Downloads the job results. The final results are individually downloaded
    and by default a merged data.json is generated with all the results in a single
    feature collection. Unpacking the final will happen as default.
    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        merge: Wether to generate a merged data.json with all results.
        unpacking: By default the final result which is in TAR archive format will be unpacked.

    Returns:
        Dict of the job_ids and jobs' downloaded results filepaths. In addition,
        an additional key merged_result is added with the path to the merged
        data.json.
    """
    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}"
    else:
        output_directory = Path(output_directory)

    def download_results_worker(job, output_directory, unpacking):
        out_dir = output_directory / f"job_{job.job_id}"
        out_filepaths_job = job.download_results(
            output_directory=out_dir, unpacking=unpacking
        )
        return out_filepaths_job

    out_filepaths = self.apply(
        download_results_worker,
        output_directory=output_directory,
        unpacking=unpacking,
    )

    if merge:
        merged_data_json = output_directory / "data.json"
        with open(merged_data_json, "w") as dst:
            out_features = []
            for job_id in out_filepaths:
                all_files = out_filepaths[job_id]
                data_json = [d for d in all_files if Path(d).name == "data.json"][0]
                with open(data_json) as src:
                    data_json_fc = geojson.load(src)
                    for feat in data_json_fc.features:
                        feat.properties["job_id"] = job_id
                        try:
                            feat.properties[
                                "up42.data_path"
                            ] = f"job_{job_id}/{feat.properties['up42.data_path']}"
                        except KeyError:
                            logger.warning(
                                "data.json does not contain up42.data_path, skipping..."
                            )
                        out_features.append(feat)
            geojson.dump(FeatureCollection(out_features), dst)

        out_filepaths["merged_result"] = [str(merged_data_json)]

    self.results = out_filepaths
    return out_filepaths

map_results(self, bands=[1, 2, 3], aoi=None, show_images=True, show_features=True, name_column='uid', save_html=None) inherited

Displays data.json, and if available, one or multiple results geotiffs.

Parameters:

Name Type Description Default
bands

Image bands and order to plot, default [1,2,3]. First band is 1.

[1, 2, 3]
aoi GeoDataFrame

Optional visualization of aoi boundaries when given GeoDataFrame of aoi.

None
show_images bool

Shows images if True (default).

True
show_features bool

Shows features if True (default).

True
name_column str

Name of the feature property that provides the Feature/Layer name.

'uid'
save_html Path

The path for saving folium map as html file. With default None, no file is saved.

None
Source code in up42/jobcollection.py
def map_results(
    self,
    bands=[1, 2, 3],
    aoi: GeoDataFrame = None,
    show_images: bool = True,
    show_features: bool = True,
    name_column: str = "uid",
    save_html: Path = None,
) -> folium.Map:
    """
    Displays data.json, and if available, one or multiple results geotiffs.

    Args:
        bands: Image bands and order to plot, default [1,2,3]. First band is 1.
        aoi: Optional visualization of aoi boundaries when given GeoDataFrame of aoi.
        show_images: Shows images if True (default).
        show_features: Shows features if True (default).
        name_column: Name of the feature property that provides the Feature/Layer name.
        save_html: The path for saving folium map as html file. With default None, no file is saved.
    """
    # TODO: Surface optional filepaths? or remove option alltogether?
    if self.results is None:
        raise ValueError(
            "You first need to download the results via job.download_results()!"
        )

    f_paths = []
    if isinstance(self.results, list):
        # Add features to map.
        # Some blocks store vector results in an additional geojson file.
        # pylint: disable=not-an-iterable
        json_fp = [fp for fp in self.results if fp.endswith(".geojson")]
        if json_fp:
            json_fp = json_fp[0]  # why only one element is selected?
        else:
            # pylint: disable=not-an-iterable
            json_fp = [fp for fp in self.results if fp.endswith(".json")][0]
        f_paths = self.results

    elif isinstance(self.results, dict):
        # pylint: disable=unsubscriptable-object
        json_fp = self.results["merged_result"][0]

        f_paths = []
        for k, v in self.results.items():
            if k != "merged_result":
                f_paths.append([i for i in v if i.endswith(".tif")][0])

    df: GeoDataFrame = gpd.read_file(json_fp)

    # Add image to map.
    m = self._map_images(
        bands=bands,
        plot_file_format=[".tif"],
        result_df=df,
        filepaths=f_paths,
        aoi=aoi,
        show_images=show_images,
        show_features=show_features,
        name_column=name_column,
        save_html=save_html,
    )

    return m

plot_results(self, figsize=(14, 8), bands=[1, 2, 3], titles=None, filepaths=None, plot_file_format=['.tif'], **kwargs) inherited

Plots image data (quicklooks or results)

Parameters:

Name Type Description Default
figsize Tuple[int, int]

matplotlib figure size.

(14, 8)
bands List[int]

Image bands and order to plot, default [1,2,3]. First band is 1.

[1, 2, 3]
titles Optional[List[str]]

Optional list of titles for the subplots.

None
filepaths Union[List[Union[str, pathlib.Path]], dict]

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
plot_file_format List[str]

List of accepted image file formats e.g. [".tif"]

['.tif']
kwargs

Accepts any additional args and kwargs of rasterio.plot.show, e.g. matplotlib cmap etc.

{}
Source code in up42/jobcollection.py
def plot_results(
    self,
    figsize: Tuple[int, int] = (14, 8),
    bands: List[int] = [1, 2, 3],
    titles: Optional[List[str]] = None,
    filepaths: Union[List[Union[str, Path]], dict, None] = None,
    plot_file_format: List[str] = [".tif"],
    **kwargs,
) -> None:
    # pylint: disable=line-too-long
    """
    Plots image data (quicklooks or results)

    Args:
        figsize: matplotlib figure size.
        bands: Image bands and order to plot, default [1,2,3]. First band is 1.
        titles: Optional list of titles for the subplots.
        filepaths: Paths to images to plot. Optional, by default picks up the last
            downloaded results.
        plot_file_format: List of accepted image file formats e.g. [".tif"]
        kwargs: Accepts any additional args and kwargs of
            [rasterio.plot.show](https://rasterio.readthedocs.io/en/latest/api/rasterio.plot.html#rasterio.plot.show),
             e.g. matplotlib cmap etc.
    """
    if filepaths is None:
        if self.results is None:
            raise ValueError("You first need to download the results!")
        filepaths = self.results
        # Unpack results path dict in case of jobcollection.
        if isinstance(filepaths, dict):
            filepaths_lists = list(filepaths.values())
            filepaths = [item for sublist in filepaths_lists for item in sublist]

    if not isinstance(filepaths, list):
        filepaths = [filepaths]  # type: ignore
    filepaths = [Path(path) for path in filepaths]

    imagepaths = [
        path for path in filepaths if str(path.suffix) in plot_file_format  # type: ignore
    ]
    if not imagepaths:
        raise ValueError(
            f"This function only plots files of format {plot_file_format}."
        )

    if not titles:
        titles = [Path(fp).stem for fp in imagepaths]
    if not isinstance(titles, list):
        titles = [titles]  # type: ignore

    if len(imagepaths) < 2:
        nrows, ncols = 1, 1
    else:
        ncols = 3
        nrows = int(math.ceil(len(imagepaths) / float(ncols)))

    _, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=figsize)
    if len(imagepaths) > 1:
        axs = axs.ravel()
    else:
        axs = [axs]

    if len(bands) != 3:
        if len(bands) == 1:
            if "cmap" not in kwargs:
                kwargs["cmap"] = "gray"
        else:
            raise ValueError("Parameter bands can only contain one or three bands.")
    for idx, (fp, title) in enumerate(zip(imagepaths, titles)):
        with rasterio.open(fp) as src:
            img_array = src.read(bands)
            show(
                img_array,
                transform=src.transform,
                title=title,
                ax=axs[idx],
                aspect="auto",
                **kwargs,
            )
        axs[idx].set_axis_off()
    plt.axis("off")
    plt.tight_layout()
    plt.show()