JobTask¶
The JobTask class provides access to the result of a specific block in the workflow. Each job contains one or multiple JobTasks, one for each block.
Use an existing jobtask:
jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")
Source code in up42/jobtask.py
class JobTask(VizTools):
"""
The JobTask class provides access to the result of a specific block in the workflow.
Each job contains one or multiple JobTasks, one for each block.
Use an existing jobtask:
```python
jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")
```
"""
def __init__(
self,
auth: Auth,
project_id: str,
job_id: str,
jobtask_id: str,
):
self.auth = auth
self.project_id = project_id
self.job_id = job_id
self.jobtask_id = jobtask_id
self.quicklooks = None
self.results = None
self._info = self.info
def __repr__(self):
return (
f"JobTask(name: {self._info['name']}, jobtask_id: {self.jobtask_id}, "
f"status: {self._info['status']}, startedAt: {self._info['startedAt']}, "
f"finishedAt: {self._info['finishedAt']}, job_name: {self._info['name']}, "
f"block_name: {self._info['block']['name']}, block_version: {self._info['blockVersion']}"
)
@property
def info(self) -> dict:
"""
Gets and updates the jobtask metadata information.
"""
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/"
)
response_json = self.auth._request(request_type="GET", url=url)
info_all_jobtasks = response_json["data"]
self._info = next(
item for item in info_all_jobtasks if item["id"] == self.jobtask_id
)
return self._info
def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
"""
Gets the Jobtask results data.json.
Args:
as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.
Returns:
Json of the results, alternatively geodataframe.
"""
url = (
f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/data-json/"
)
response_json = self.auth._request(request_type="GET", url=url)
logger.info(f"Retrieved {len(response_json['features'])} features.")
if as_dataframe:
# UP42 results are always in EPSG 4326
df = GeoDataFrame.from_features(response_json, crs=4326)
return df
else:
return response_json
def _get_download_url(self):
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/downloads/results/"
)
response_json = self.auth._request(request_type="GET", url=url)
download_url = response_json["data"]["url"]
return download_url
def download_results(
self, output_directory: Union[str, Path, None] = None
) -> List[str]:
"""
Downloads and unpacks the jobtask results. Default download to Desktop.
Args:
output_directory: The file output directory, defaults to the current working
directory.
Returns:
List of the downloaded results' filepaths.
"""
logger.info(f"Downloading results of jobtask {self.jobtask_id}")
if output_directory is None:
output_directory = (
Path.cwd()
/ f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
)
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
download_url = self._get_download_url()
out_filepaths = download_results_from_gcs(
download_url=download_url,
output_directory=output_directory,
)
self.results = out_filepaths
return out_filepaths
def download_quicklooks(
self,
output_directory: Union[str, Path, None] = None,
) -> List[str]:
"""
Downloads quicklooks of the job task to disk.
After download, can be plotted via jobtask.plot_quicklooks().
Args:
output_directory: The file output directory, defaults to the current working
directory.
Returns:
The quicklooks filepaths.
"""
if output_directory is None:
# On purpose downloading the quicklooks to the jobs folder and not the
# jobtasks folder,since only relevant for data block task. And clearer
# for job.download_quicklooks.
output_directory = (
Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
)
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
)
response_json = self.auth._request(request_type="GET", url=url)
quicklooks_ids = response_json["data"]
out_paths: List[str] = []
for ql_id in tqdm(quicklooks_ids):
out_path = output_directory / f"quicklook_{ql_id}" # No suffix required.
out_paths.append(str(out_path))
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
)
response = self.auth._request(
request_type="GET", url=url, return_text=False
)
with open(out_path, "wb") as dst:
for chunk in response:
dst.write(chunk)
self.quicklooks = out_paths # pylint: disable=attribute-defined-outside-init
return out_paths
Attributes¶
info: dict
property
readonly
¶
Gets and updates the jobtask metadata information.
Methods¶
download_quicklooks(self, output_directory=None)
¶
Downloads quicklooks of the job task to disk.
After download, can be plotted via jobtask.plot_quicklooks().
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_directory |
Union[str, pathlib.Path] |
The file output directory, defaults to the current working directory. |
None |
Returns:
Type | Description |
---|---|
List[str] |
The quicklooks filepaths. |
Source code in up42/jobtask.py
def download_quicklooks(
self,
output_directory: Union[str, Path, None] = None,
) -> List[str]:
"""
Downloads quicklooks of the job task to disk.
After download, can be plotted via jobtask.plot_quicklooks().
Args:
output_directory: The file output directory, defaults to the current working
directory.
Returns:
The quicklooks filepaths.
"""
if output_directory is None:
# On purpose downloading the quicklooks to the jobs folder and not the
# jobtasks folder,since only relevant for data block task. And clearer
# for job.download_quicklooks.
output_directory = (
Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
)
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
)
response_json = self.auth._request(request_type="GET", url=url)
quicklooks_ids = response_json["data"]
out_paths: List[str] = []
for ql_id in tqdm(quicklooks_ids):
out_path = output_directory / f"quicklook_{ql_id}" # No suffix required.
out_paths.append(str(out_path))
url = (
f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
)
response = self.auth._request(
request_type="GET", url=url, return_text=False
)
with open(out_path, "wb") as dst:
for chunk in response:
dst.write(chunk)
self.quicklooks = out_paths # pylint: disable=attribute-defined-outside-init
return out_paths
download_results(self, output_directory=None)
¶
Downloads and unpacks the jobtask results. Default download to Desktop.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_directory |
Union[str, pathlib.Path] |
The file output directory, defaults to the current working directory. |
None |
Returns:
Type | Description |
---|---|
List[str] |
List of the downloaded results' filepaths. |
Source code in up42/jobtask.py
def download_results(
self, output_directory: Union[str, Path, None] = None
) -> List[str]:
"""
Downloads and unpacks the jobtask results. Default download to Desktop.
Args:
output_directory: The file output directory, defaults to the current working
directory.
Returns:
List of the downloaded results' filepaths.
"""
logger.info(f"Downloading results of jobtask {self.jobtask_id}")
if output_directory is None:
output_directory = (
Path.cwd()
/ f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
)
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
download_url = self._get_download_url()
out_filepaths = download_results_from_gcs(
download_url=download_url,
output_directory=output_directory,
)
self.results = out_filepaths
return out_filepaths
get_results_json(self, as_dataframe=False)
¶
Gets the Jobtask results data.json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
as_dataframe |
bool |
"fc" for FeatureCollection dict, "df" for GeoDataFrame. |
False |
Returns:
Type | Description |
---|---|
Union[dict, geopandas.geodataframe.GeoDataFrame] |
Json of the results, alternatively geodataframe. |
Source code in up42/jobtask.py
def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
"""
Gets the Jobtask results data.json.
Args:
as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.
Returns:
Json of the results, alternatively geodataframe.
"""
url = (
f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
f"/tasks/{self.jobtask_id}/outputs/data-json/"
)
response_json = self.auth._request(request_type="GET", url=url)
logger.info(f"Retrieved {len(response_json['features'])} features.")
if as_dataframe:
# UP42 results are always in EPSG 4326
df = GeoDataFrame.from_features(response_json, crs=4326)
return df
else:
return response_json
map_results(self, bands=[1, 2, 3], aoi=None, show_images=True, show_features=True, name_column='uid', save_html=None)
inherited
¶
Displays data.json, and if available, one or multiple results geotiffs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bands |
Image bands and order to plot, default [1,2,3]. First band is 1. |
[1, 2, 3] |
|
aoi |
GeoDataFrame |
Optional visualization of aoi boundaries when given GeoDataFrame of aoi. |
None |
show_images |
bool |
Shows images if True (default). |
True |
show_features |
bool |
Shows features if True (default). |
True |
name_column |
str |
Name of the feature property that provides the Feature/Layer name. |
'uid' |
save_html |
Path |
The path for saving folium map as html file. With default None, no file is saved. |
None |
Source code in up42/jobtask.py
def map_results(
self,
bands=[1, 2, 3],
aoi: GeoDataFrame = None,
show_images: bool = True,
show_features: bool = True,
name_column: str = "uid",
save_html: Path = None,
) -> folium.Map:
"""
Displays data.json, and if available, one or multiple results geotiffs.
Args:
bands: Image bands and order to plot, default [1,2,3]. First band is 1.
aoi: Optional visualization of aoi boundaries when given GeoDataFrame of aoi.
show_images: Shows images if True (default).
show_features: Shows features if True (default).
name_column: Name of the feature property that provides the Feature/Layer name.
save_html: The path for saving folium map as html file. With default None, no file is saved.
"""
# TODO: Surface optional filepaths? or remove option alltogether?
if self.results is None:
raise ValueError(
"You first need to download the results via job.download_results()!"
)
f_paths = []
if isinstance(self.results, list):
# Add features to map.
# Some blocks store vector results in an additional geojson file.
# pylint: disable=not-an-iterable
json_fp = [fp for fp in self.results if fp.endswith(".geojson")]
if json_fp:
json_fp = json_fp[0] # why only one element is selected?
else:
# pylint: disable=not-an-iterable
json_fp = [fp for fp in self.results if fp.endswith(".json")][0]
f_paths = self.results
elif isinstance(self.results, dict):
# pylint: disable=unsubscriptable-object
json_fp = self.results["merged_result"][0]
f_paths = []
for k, v in self.results.items():
if k != "merged_result":
f_paths.append([i for i in v if i.endswith(".tif")][0])
df: GeoDataFrame = gpd.read_file(json_fp)
# Add image to map.
m = self._map_images(
bands=bands,
plot_file_format=[".tif"],
result_df=df,
filepaths=f_paths,
aoi=aoi,
show_images=show_images,
show_features=show_features,
name_column=name_column,
save_html=save_html,
)
return m
plot_quicklooks(self, figsize=(8, 8), titles=None, filepaths=None)
inherited
¶
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
Tuple[int, int] |
matplotlib figure size. |
(8, 8) |
filepaths |
Optional[list] |
Paths to images to plot. Optional, by default picks up the last downloaded results. |
None |
titles |
Optional[List[str]] |
List of titles for the subplots, optional. |
None |
Source code in up42/jobtask.py
def plot_quicklooks(
self,
figsize: Tuple[int, int] = (8, 8),
titles: Optional[List[str]] = None,
filepaths: Optional[list] = None,
) -> None:
"""
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
respective object, e.g. job, catalog).
Args:
figsize: matplotlib figure size.
filepaths: Paths to images to plot. Optional, by default picks up the last
downloaded results.
titles: List of titles for the subplots, optional.
"""
if filepaths is None:
if self.quicklooks is None:
raise ValueError("You first need to download the quicklooks!")
filepaths = self.quicklooks
self.plot_results(
plot_file_format=[".jpg", ".jpeg", ".png"],
figsize=figsize,
filepaths=filepaths,
titles=titles,
)
plot_results(self, figsize=(14, 8), bands=[1, 2, 3], titles=None, filepaths=None, plot_file_format=['.tif'], **kwargs)
inherited
¶
Plots image data (quicklooks or results)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
Tuple[int, int] |
matplotlib figure size. |
(14, 8) |
bands |
List[int] |
Image bands and order to plot, default [1,2,3]. First band is 1. |
[1, 2, 3] |
titles |
Optional[List[str]] |
Optional list of titles for the subplots. |
None |
filepaths |
Union[List[Union[str, pathlib.Path]], dict] |
Paths to images to plot. Optional, by default picks up the last downloaded results. |
None |
plot_file_format |
List[str] |
List of accepted image file formats e.g. [".tif"] |
['.tif'] |
kwargs |
Accepts any additional args and kwargs of rasterio.plot.show, e.g. matplotlib cmap etc. |
{} |
Source code in up42/jobtask.py
def plot_results(
self,
figsize: Tuple[int, int] = (14, 8),
bands: List[int] = [1, 2, 3],
titles: Optional[List[str]] = None,
filepaths: Union[List[Union[str, Path]], dict, None] = None,
plot_file_format: List[str] = [".tif"],
**kwargs,
) -> None:
# pylint: disable=line-too-long
"""
Plots image data (quicklooks or results)
Args:
figsize: matplotlib figure size.
bands: Image bands and order to plot, default [1,2,3]. First band is 1.
titles: Optional list of titles for the subplots.
filepaths: Paths to images to plot. Optional, by default picks up the last
downloaded results.
plot_file_format: List of accepted image file formats e.g. [".tif"]
kwargs: Accepts any additional args and kwargs of
[rasterio.plot.show](https://rasterio.readthedocs.io/en/latest/api/rasterio.plot.html#rasterio.plot.show),
e.g. matplotlib cmap etc.
"""
if filepaths is None:
if self.results is None:
raise ValueError("You first need to download the results!")
filepaths = self.results
# Unpack results path dict in case of jobcollection.
if isinstance(filepaths, dict):
filepaths_lists = list(filepaths.values())
filepaths = [item for sublist in filepaths_lists for item in sublist]
if not isinstance(filepaths, list):
filepaths = [filepaths] # type: ignore
filepaths = [Path(path) for path in filepaths]
imagepaths = [
path for path in filepaths if str(path.suffix) in plot_file_format # type: ignore
]
if not imagepaths:
raise ValueError(
f"This function only plots files of format {plot_file_format}."
)
if not titles:
titles = [Path(fp).stem for fp in imagepaths]
if not isinstance(titles, list):
titles = [titles] # type: ignore
if len(imagepaths) < 2:
nrows, ncols = 1, 1
else:
ncols = 3
nrows = int(math.ceil(len(imagepaths) / float(ncols)))
_, axs = plt.subplots(nrows=nrows, ncols=ncols, figsize=figsize)
if len(imagepaths) > 1:
axs = axs.ravel()
else:
axs = [axs]
if len(bands) != 3:
if len(bands) == 1:
if "cmap" not in kwargs:
kwargs["cmap"] = "gray"
else:
raise ValueError("Parameter bands can only contain one or three bands.")
for idx, (fp, title) in enumerate(zip(imagepaths, titles)):
with rasterio.open(fp) as src:
img_array = src.read(bands)
show(
img_array,
transform=src.transform,
title=title,
ax=axs[idx],
aspect="auto",
**kwargs,
)
axs[idx].set_axis_off()
plt.axis("off")
plt.tight_layout()
plt.show()