Skip to content

JobCollection

JobCollection object

The JobCollection object provides facilities for downloading and merging multiple jobs results.

Attributes

info: Dict[str, Dict] property readonly

Gets the metadata information for each job in the jobcollection, dictionary of job_id : job_information.

status: Dict[str, str] property readonly

Gets the status for each job in the jobcollection, a dictionary with job_id : job status.

Methods

apply(self, worker, only_succeeded=True, **kwargs)

Helper function to apply worker on all jobs in the collection. worker needs to accept Job as first argument. For example, a lambda function that returns the job info:

self.apply(lambda job: job.info)

Parameters:

Name Type Description Default
worker Callable

A function to apply on all jobs in the collection.

required
only_succeeded bool

Only apply to succeeded jobs (default is True).

True
kwargs

additional keyword arguments to pass to worker.

{}

Returns:

Type Description
Dict[str, Any]

Dictionary where the key is the job id and the value the return of worker.

Source code in up42/jobcollection.py
def apply(
    self, worker: Callable, only_succeeded: bool = True, **kwargs
) -> Dict[str, Any]:
    """
    Helper function to apply `worker` on all jobs in the collection.
    `worker` needs to accept `Job` as first argument. For example, a
    lambda function that returns the job info:
    ```python
    self.apply(lambda job: job.info)
    ```

    Args:
        worker: A function to apply on all jobs in the collection.
        only_succeeded: Only apply to succeeded jobs (default is `True`).
        kwargs: additional keyword arguments to pass to `worker`.
    Returns:
        Dictionary where the key is the job id and the value the return
        of `worker`.
    """
    if not self.jobs:
        raise ValueError(
            "This is an empty JobCollection. Cannot apply over an empty job list."
        )

    out_dict = {}
    for job in self.jobs:
        if only_succeeded:
            if job.is_succeeded:
                out_dict[job.job_id] = worker(job, **kwargs)
        else:
            out_dict[job.job_id] = worker(job, **kwargs)

    if not out_dict:
        raise ValueError(
            "All jobs have failed! Cannot apply over an empty succeeded job list."
        )

    return out_dict

download_results(self, output_directory=None, merge=True, unpacking=True)

Downloads the job results. The final results are individually downloaded and by default a merged data.json is generated with all the results in a single feature collection. Unpacking the final will happen as default.

Parameters:

Name Type Description Default
output_directory Optional[Union[str, pathlib.Path]]

The file output directory, defaults to the current working directory.

None
merge bool

Wether to generate a merged data.json with all results.

True
unpacking bool

By default the final result which is in TAR archive format will be unpacked.

True

Returns:

Type Description
Dict[str, List[str]]

Dict of the job_ids and jobs' downloaded results filepaths. In addition, an additional key merged_result is added with the path to the merged data.json.

Source code in up42/jobcollection.py
def download_results(
    self,
    output_directory: Union[str, Path, None] = None,
    merge: bool = True,
    unpacking: bool = True,
) -> Dict[str, List[str]]:
    """
    Downloads the job results. The final results are individually downloaded
    and by default a merged data.json is generated with all the results in a single
    feature collection. Unpacking the final will happen as default.
    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        merge: Wether to generate a merged data.json with all results.
        unpacking: By default the final result which is in TAR archive format will be unpacked.

    Returns:
        Dict of the job_ids and jobs' downloaded results filepaths. In addition,
        an additional key merged_result is added with the path to the merged
        data.json.
    """
    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}"
    else:
        output_directory = Path(output_directory)

    def download_results_worker(job, output_directory, unpacking):
        out_dir = output_directory / f"job_{job.job_id}"
        out_filepaths_job = job.download_results(
            output_directory=out_dir, unpacking=unpacking
        )
        return out_filepaths_job

    out_filepaths = self.apply(
        download_results_worker,
        output_directory=output_directory,
        unpacking=unpacking,
    )

    if merge:
        merged_data_json = output_directory / "data.json"
        with open(merged_data_json, "w") as dst:
            out_features = []
            for job_id in out_filepaths:
                all_files = out_filepaths[job_id]
                data_json = [d for d in all_files if Path(d).name == "data.json"][0]
                with open(data_json) as src:
                    data_json_fc = geojson.load(src)
                    for feat in data_json_fc.features:
                        feat.properties["job_id"] = job_id
                        try:
                            feat.properties[
                                "up42.data_path"
                            ] = f"job_{job_id}/{feat.properties['up42.data_path']}"
                        except KeyError:
                            logger.warning(
                                "data.json does not contain up42.data_path, skipping..."
                            )
                        out_features.append(feat)
            geojson.dump(FeatureCollection(out_features), dst)

        out_filepaths["merged_result"] = [str(merged_data_json)]

    self.results = out_filepaths
    return out_filepaths

map_results(self, aoi=None, show_images=True, show_features=True, name_column='uid', save_html=None) inherited

Displays data.json, and if available, one or multiple results geotiffs.

Parameters:

Name Type Description Default
aoi GeoDataFrame

GeoDataFrame of aoi.

None
show_images bool

Shows images if True (default).

True
show_features bool

Shows features if True (default).

True
name_column str

Name of the feature property that provides the Feature/Layer name.

'uid'
save_html

The path for saving folium map as html file. With default None, no file is saved.

None
Source code in up42/jobcollection.py
def map_results(
    self,
    aoi: GeoDataFrame = None,
    show_images: bool = True,
    show_features: bool = True,
    name_column: str = "uid",
    save_html=None,
) -> folium.Map:
    """
    Displays data.json, and if available, one or multiple results geotiffs.

    Args:
        aoi: GeoDataFrame of aoi.
        show_images: Shows images if True (default).
        show_features: Shows features if True (default).
        name_column: Name of the feature property that provides the Feature/Layer name.
        save_html: The path for saving folium map as html file. With default None, no file is saved.
    """
    # TODO: Surface optional filepaths? or remove option alltogether?
    if self.results is None:
        raise ValueError(
            "You first need to download the results via job.download_results()!"
        )

    # Add features to map.
    # Some blocks store vector results in an additional geojson file.
    # pylint: disable=not-an-iterable
    json_fp = [fp for fp in self.results if fp.endswith(".geojson")]
    if json_fp:
        json_fp = json_fp[0]
    else:
        # pylint: disable=not-an-iterable
        json_fp = [fp for fp in self.results if fp.endswith(".json")][0]
    df: GeoDataFrame = gpd.read_file(json_fp)

    # Add image to map.
    m = self._map_images(
        plot_file_format=[".tif"],
        result_df=df,
        filepaths=self.results,
        aoi=aoi,
        show_images=show_images,
        show_features=show_features,
        name_column=name_column,
        save_html=save_html,
    )
    return m

plot_results(self, figsize=(14, 8), filepaths=None, titles=None, plot_file_format=['.tif']) inherited

Plots image data (quicklooks or results)

Parameters:

Name Type Description Default
plot_file_format List[str]

List of accepted image file formats e.g. [".tif"]

['.tif']
figsize Tuple[int, int]

matplotlib figure size.

(14, 8)
filepaths Union[List[Union[str, pathlib.Path]], Dict]

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
titles List[str]

Optional list of titles for the subplots.

None
Source code in up42/jobcollection.py
def plot_results(
    self,
    figsize: Tuple[int, int] = (14, 8),
    filepaths: Union[List[Union[str, Path]], Dict] = None,
    titles: List[str] = None,
    # pylint: disable=dangerous-default-value
    plot_file_format: List[str] = [".tif"],
) -> None:
    """
    Plots image data (quicklooks or results)

    Args:
        plot_file_format: List of accepted image file formats e.g. [".tif"]
        figsize: matplotlib figure size.
        filepaths: Paths to images to plot. Optional, by default picks up the last
            downloaded results.
        titles: Optional list of titles for the subplots.

    """
    if filepaths is None:
        if self.results is None:
            raise ValueError("You first need to download the results!")
        filepaths = self.results
        # Unpack results path dict in case of jobcollection.
        if isinstance(filepaths, dict):
            filepaths_lists = list(filepaths.values())
            filepaths = [item for sublist in filepaths_lists for item in sublist]

    if not isinstance(filepaths, list):
        filepaths = [filepaths]  # type: ignore
    filepaths = [Path(path) for path in filepaths]

    imagepaths = [
        path for path in filepaths if str(path.suffix) in plot_file_format  # type: ignore
    ]
    if not imagepaths:
        raise ValueError(
            f"This function only plots files of format {plot_file_format}."
        )

    if not titles:
        titles = [Path(fp).stem for fp in imagepaths]
    if not isinstance(titles, list):
        titles = [titles]  # type: ignore

    if len(imagepaths) < 2:
        nrows, ncols = 1, 1
    else:
        ncols = 3
        nrows = int(math.ceil(len(imagepaths) / float(ncols)))

    _, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=figsize)
    if len(imagepaths) > 1:
        axs = axs.ravel()
    else:
        axs = [axs]

    for idx, (fp, title) in enumerate(zip(imagepaths, titles)):
        with rasterio.open(fp) as src:
            img_array = src.read()[:3, :, :]
            # TODO: Handle more band configurations.
            # TODO: add histogram equalization?
            show(
                img_array,
                transform=src.transform,
                title=title,
                ax=axs[idx],
                aspect="auto",
            )
        axs[idx].set_axis_off()
    plt.axis("off")
    plt.tight_layout()
    plt.show()