Skip to content

Job

Job object

The Job object is the result of running a workflow. It lets you download, visualize and manipulate the results of the job, and keep track of the status or cancel a job while still running.

Initialize an existing job:

job = up42.initialize_job(job_id="12345")

Run a new job:

job = workflow.run_job(name="new_job", input_parameters={...})


Attributes

info: Dict property readonly

Gets the job metadata information.

status: str property readonly

Gets the job progress status. One of SUCCEEDED, NOT STARTED, PENDING, RUNNING, CANCELLED, CANCELLING, FAILED, ERROR.

Methods

cancel_job(self)

Cancels a pending or running job.

Source code in up42/job.py
def cancel_job(self) -> None:
    """Cancels a pending or running job."""
    url = f"{self.auth._endpoint()}/jobs/{self.job_id}/cancel/"
    self.auth._request(request_type="POST", url=url)
    logger.info(f"Job canceled: {self.job_id}")

download_quicklooks(self, output_directory=None)

Conveniance function that downloads the quicklooks of the data (dirst) jobtask.

After download, can be plotted via job.plot_quicklooks().

Source code in up42/job.py
def download_quicklooks(
    self, output_directory: Union[str, Path, None] = None
) -> List[str]:
    """
    Conveniance function that downloads the quicklooks of the data (dirst) jobtask.

    After download, can be plotted via job.plot_quicklooks().
    """
    # Currently only the first/data task produces quicklooks.
    logger.setLevel(logging.CRITICAL)
    data_task = self.get_jobtasks()[0]
    logger.setLevel(logging.INFO)

    out_paths: List[str] = data_task.download_quicklooks(  # type: ignore
        output_directory=output_directory
    )  # type: ignore
    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

download_results(self, output_directory=None, unpacking=True)

Downloads the job results. Unpacking the final file will happen as default.

Parameters:

Name Type Description Default
output_directory Optional[Union[str, pathlib.Path]]

The file output directory, defaults to the current working directory.

None
unpacking bool

By default the final result which is in TAR archive format will be unpacked.

True

Returns:

Type Description
List[str]

List of the downloaded results' filepaths.

Source code in up42/job.py
def download_results(
    self, output_directory: Union[str, Path, None] = None, unpacking: bool = True
) -> List[str]:
    """
    Downloads the job results. Unpacking the final file will happen as default.

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        unpacking: By default the final result which is in TAR archive format will be unpacked.

    Returns:
        List of the downloaded results' filepaths.
    """
    logger.info(f"Downloading results of job {self.job_id}")

    if output_directory is None:
        output_directory = (
            Path.cwd() / f"project_{self.auth.project_id}/job_{self.job_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    download_url = self._get_download_url()
    if unpacking:
        out_filepaths = download_results_from_gcs(
            download_url=download_url,
            output_directory=output_directory,
        )
    else:
        out_filepaths = download_results_from_gcs_without_unpacking(
            download_url=download_url,
            output_directory=output_directory,
        )

    self.results = out_filepaths
    return out_filepaths

get_jobtasks(self, return_json=False)

Get the individual items of the job as JobTask objects or json.

Parameters:

Name Type Description Default
return_json bool

If True returns the json information of the job tasks.

False

Returns:

Type Description
Union[List[JobTask], List[Dict]]

The job task objects in a list.

Source code in up42/job.py
def get_jobtasks(
    self, return_json: bool = False
) -> Union[List["JobTask"], List[Dict]]:
    """
    Get the individual items of the job as JobTask objects or json.

    Args:
        return_json: If True returns the json information of the job tasks.

    Returns:
        The job task objects in a list.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/tasks/"
    )
    logger.info(f"Getting job tasks: {self.job_id}")
    response_json = self.auth._request(request_type="GET", url=url)
    jobtasks_json: List[Dict] = response_json["data"]

    if return_json:
        return jobtasks_json
    else:
        jobtasks = [
            JobTask(
                auth=self.auth,
                project_id=self.project_id,
                job_id=self.job_id,
                jobtask_id=task["id"],
            )
            for task in jobtasks_json
        ]
        return jobtasks

get_jobtasks_results_json(self)

Convenience function to get the resulting data.json of all job tasks in a dictionary of strings.

Returns:

Type Description
Dict

The data.json of alle single job tasks.

Source code in up42/job.py
def get_jobtasks_results_json(self) -> Dict:
    """
    Convenience function to get the resulting data.json of all job tasks
    in a dictionary of strings.

    Returns:
        The data.json of alle single job tasks.
    """
    jobtasks: List[Dict] = self.get_jobtasks(return_json=True)  # type: ignore
    jobtasks_ids = [task["id"] for task in jobtasks]
    jobtasks_results_json = {}
    for jobtask_id in jobtasks_ids:
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{jobtask_id}/outputs/data-json"
        )
        response_json = self.auth._request(request_type="GET", url=url)

        jobtasks_results_json[jobtask_id] = response_json
    return jobtasks_results_json

get_logs(self, as_print=True, as_return=False)

Convenience function to print or return the logs of all job tasks.

Parameters:

Name Type Description Default
as_print bool

Prints the logs, no return.

True
as_return bool

Also returns the log strings.

False

Returns:

Type Description
Optional[Dict]

The log strings (only if as_return was selected).

Source code in up42/job.py
def get_logs(
    self, as_print: bool = True, as_return: bool = False
) -> Optional[Dict]:
    """
    Convenience function to print or return the logs of all job tasks.

    Args:
        as_print: Prints the logs, no return.
        as_return: Also returns the log strings.

    Returns:
        The log strings (only if as_return was selected).
    """
    job_logs = {}

    jobtasks: List[Dict] = self.get_jobtasks(return_json=True)  # type: ignore
    jobtasks_ids = [task["id"] for task in jobtasks]

    logger.info(f"Getting logs for {len(jobtasks_ids)} job tasks: {jobtasks_ids}")
    if as_print:
        print(
            f"Printing logs of {len(jobtasks_ids)} JobTasks in Job with job_id "
            f"{self.job_id}:\n"
        )

    for idx, jobtask_id in enumerate(jobtasks_ids):
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/"
            f"{self.job_id}/tasks/{jobtask_id}/logs"
        )
        response_json = self.auth._request(request_type="GET", url=url)

        job_logs[jobtask_id] = response_json

        if as_print:
            print("----------------------------------------------------------")
            print(f"JobTask {idx+1} with jobtask_id {jobtask_id}:\n")
            print(response_json)
    if as_return:
        return job_logs
    else:
        return None

get_results_json(self, as_dataframe=False)

Gets the Job results data.json.

Parameters:

Name Type Description Default
as_dataframe bool

Return type, Default Feature Collection. GeoDataFrame if True.

False

Returns:

Type Description
Union[Dict, geopandas.geodataframe.GeoDataFrame]

The job data.json json.

Source code in up42/job.py
def get_results_json(self, as_dataframe: bool = False) -> Union[Dict, GeoDataFrame]:
    """
    Gets the Job results data.json.

    Args:
        as_dataframe: Return type, Default Feature Collection. GeoDataFrame if True.

    Returns:
        The job data.json json.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/outputs/data-json/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    logger.info(f"Retrieved {len(response_json['features'])} features.")

    if as_dataframe:
        # UP42 results are always in EPSG 4326
        df = GeoDataFrame.from_features(response_json, crs=4326)
        return df
    else:
        return response_json

map_results(self, aoi=None, show_images=True, show_features=True, name_column='uid', save_html=None) inherited

Displays data.json, and if available, one or multiple results geotiffs.

Parameters:

Name Type Description Default
aoi GeoDataFrame

GeoDataFrame of aoi.

None
show_images bool

Shows images if True (default).

True
show_features bool

Shows features if True (default).

True
name_column str

Name of the feature property that provides the Feature/Layer name.

'uid'
save_html

The path for saving folium map as html file. With default None, no file is saved.

None
Source code in up42/job.py
def map_results(
    self,
    aoi: GeoDataFrame = None,
    show_images: bool = True,
    show_features: bool = True,
    name_column: str = "uid",
    save_html=None,
) -> folium.Map:
    """
    Displays data.json, and if available, one or multiple results geotiffs.

    Args:
        aoi: GeoDataFrame of aoi.
        show_images: Shows images if True (default).
        show_features: Shows features if True (default).
        name_column: Name of the feature property that provides the Feature/Layer name.
        save_html: The path for saving folium map as html file. With default None, no file is saved.
    """
    # TODO: Surface optional filepaths? or remove option alltogether?
    if self.results is None:
        raise ValueError(
            "You first need to download the results via job.download_results()!"
        )

    # Add features to map.
    # Some blocks store vector results in an additional geojson file.
    # pylint: disable=not-an-iterable
    json_fp = [fp for fp in self.results if fp.endswith(".geojson")]
    if json_fp:
        json_fp = json_fp[0]
    else:
        # pylint: disable=not-an-iterable
        json_fp = [fp for fp in self.results if fp.endswith(".json")][0]
    df: GeoDataFrame = gpd.read_file(json_fp)

    # Add image to map.
    m = self._map_images(
        plot_file_format=[".tif"],
        result_df=df,
        filepaths=self.results,
        aoi=aoi,
        show_images=show_images,
        show_features=show_features,
        name_column=name_column,
        save_html=save_html,
    )
    return m

plot_quicklooks(self, figsize=(8, 8), filepaths=None, titles=None) inherited

Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).

Parameters:

Name Type Description Default
figsize Tuple[int, int]

matplotlib figure size.

(8, 8)
filepaths List

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
titles List[str]

List of titles for the subplots, optional.

None
Source code in up42/job.py
def plot_quicklooks(
    self,
    figsize: Tuple[int, int] = (8, 8),
    filepaths: List = None,
    titles: List[str] = None,
) -> None:
    """
    Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
    respective object, e.g. job, catalog).

    Args:
            figsize: matplotlib figure size.
            filepaths: Paths to images to plot. Optional, by default picks up the last
                    downloaded results.
            titles: List of titles for the subplots, optional.

    """
    if filepaths is None:
        if self.quicklooks is None:
            raise ValueError("You first need to download the quicklooks!")
        filepaths = self.quicklooks

    warnings.filterwarnings(
        "ignore", category=rasterio.errors.NotGeoreferencedWarning
    )
    self.plot_results(
        plot_file_format=[".jpg", ".jpeg", ".png"],
        figsize=figsize,
        filepaths=filepaths,
        titles=titles,
    )

plot_results(self, figsize=(14, 8), filepaths=None, titles=None, plot_file_format=['.tif']) inherited

Plots image data (quicklooks or results)

Parameters:

Name Type Description Default
plot_file_format List[str]

List of accepted image file formats e.g. [".tif"]

['.tif']
figsize Tuple[int, int]

matplotlib figure size.

(14, 8)
filepaths Union[List[Union[str, pathlib.Path]], Dict]

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
titles List[str]

Optional list of titles for the subplots.

None
Source code in up42/job.py
def plot_results(
    self,
    figsize: Tuple[int, int] = (14, 8),
    filepaths: Union[List[Union[str, Path]], Dict] = None,
    titles: List[str] = None,
    # pylint: disable=dangerous-default-value
    plot_file_format: List[str] = [".tif"],
) -> None:
    """
    Plots image data (quicklooks or results)

    Args:
        plot_file_format: List of accepted image file formats e.g. [".tif"]
        figsize: matplotlib figure size.
        filepaths: Paths to images to plot. Optional, by default picks up the last
            downloaded results.
        titles: Optional list of titles for the subplots.

    """
    if filepaths is None:
        if self.results is None:
            raise ValueError("You first need to download the results!")
        filepaths = self.results
        # Unpack results path dict in case of jobcollection.
        if isinstance(filepaths, dict):
            filepaths_lists = list(filepaths.values())
            filepaths = [item for sublist in filepaths_lists for item in sublist]

    if not isinstance(filepaths, list):
        filepaths = [filepaths]  # type: ignore
    filepaths = [Path(path) for path in filepaths]

    imagepaths = [
        path for path in filepaths if str(path.suffix) in plot_file_format  # type: ignore
    ]
    if not imagepaths:
        raise ValueError(
            f"This function only plots files of format {plot_file_format}."
        )

    if not titles:
        titles = [Path(fp).stem for fp in imagepaths]
    if not isinstance(titles, list):
        titles = [titles]  # type: ignore

    if len(imagepaths) < 2:
        nrows, ncols = 1, 1
    else:
        ncols = 3
        nrows = int(math.ceil(len(imagepaths) / float(ncols)))

    _, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=figsize)
    if len(imagepaths) > 1:
        axs = axs.ravel()
    else:
        axs = [axs]

    for idx, (fp, title) in enumerate(zip(imagepaths, titles)):
        with rasterio.open(fp) as src:
            img_array = src.read()[:3, :, :]
            # TODO: Handle more band configurations.
            # TODO: add histogram equalization?
            show(
                img_array,
                transform=src.transform,
                title=title,
                ax=axs[idx],
                aspect="auto",
            )
        axs[idx].set_axis_off()
    plt.axis("off")
    plt.tight_layout()
    plt.show()

track_status(self, report_time=30)

` Continuously gets the job status until job has finished or failed.

Internally checks every five seconds for the status, prints the log every time interval given in report_time argument.

Parameters:

Name Type Description Default
report_time int

The intervall (in seconds) when to query the job status.

30
Source code in up42/job.py
def track_status(self, report_time: int = 30) -> str:
    """`
    Continuously gets the job status until job has finished or failed.

    Internally checks every five seconds for the status, prints the log every
    time interval given in report_time argument.

    Args:
        report_time: The intervall (in seconds) when to query the job status.
    """
    logger.info(
        f"Tracking job status continuously, reporting every {report_time} seconds...",
    )
    status = "NOT STARTED"
    time_asleep = 0

    while status != "SUCCEEDED":
        logger.setLevel(logging.CRITICAL)
        status = self.status
        logger.setLevel(logging.INFO)

        # TODO: Add statuses as constants (maybe objects?)
        if status in ["NOT STARTED", "PENDING", "RUNNING"]:
            if time_asleep != 0 and time_asleep % report_time == 0:
                logger.info(f"Job is {status}! - {self.job_id}")
        elif status in ["FAILED", "ERROR"]:
            logger.info(f"Job is {status}! - {self.job_id} - Printing logs ...")
            self.get_logs(as_print=True)
            raise ValueError("Job has failed! See the above log.")
        elif status in ["CANCELLED", "CANCELLING"]:
            logger.info(f"Job is {status}! - {self.job_id}")
            raise ValueError("Job has been cancelled!")
        elif status == "SUCCEEDED":
            logger.info(f"Job finished successfully! - {self.job_id}")

        sleep(5)
        time_asleep += 5

    return status

upload_results_to_bucket(self, gs_client, bucket, folder, extension='.tgz', version='v0')

Uploads the results of a job directly to a custom google cloud storage bucket.

Source code in up42/job.py
def upload_results_to_bucket(
    self, gs_client, bucket, folder, extension: str = ".tgz", version: str = "v0"
) -> None:
    """
    Uploads the results of a job directly to a custom google cloud storage bucket.
    """
    download_url = self._get_download_url()
    r = requests.get(download_url)

    if self.order_ids is not None:
        blob = bucket.blob(
            str(Path(version) / Path(folder) / Path(self.order_ids[0] + extension))
        )
        logger.info(
            f"Upload job {self.job_id} results with order_ids to "
            f"{blob.name} ..."
        )
    else:
        blob = bucket.blob(
            str(Path(version) / Path(folder) / Path(self.job_id + extension))
        )
        logger.info(f"Upload job {self.job_id} results to {blob.name} ...")
    blob.upload_from_string(
        data=r.content,
        content_type="application/octet-stream",
        client=gs_client,
    )
    logger.info("Uploaded!")