Skip to content

JobTask

Bases: VizTools

The JobTask class provides access to the result of a specific block in the workflow. Each job contains one or multiple JobTasks, one for each block.

Use an existing jobtask:

jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
                                  job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")

Source code in up42/jobtask.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class JobTask(VizTools):
    """
    The JobTask class provides access to the result of a specific block in the workflow.
    Each job contains one or multiple JobTasks, one for each block.

    Use an existing jobtask:
    ```python
    jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
                                      job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")
    ```
    """

    def __init__(
        self,
        auth: Auth,
        project_id: str,
        job_id: str,
        jobtask_id: str,
    ):
        self.auth = auth
        self.project_id = project_id
        self.job_id = job_id
        self.jobtask_id = jobtask_id
        self.quicklooks = None
        self.results = None
        self._info = self.info

    def __repr__(self):
        return (
            f"JobTask(name: {self._info['name']}, jobtask_id: {self.jobtask_id}, "
            f"status: {self._info['status']}, startedAt: {self._info['startedAt']}, "
            f"finishedAt: {self._info['finishedAt']}, job_name: {self._info['name']}, "
            f"block_name: {self._info['block']['name']}, block_version: {self._info['blockVersion']}"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the jobtask metadata information.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        info_all_jobtasks = response_json["data"]
        self._info = next(
            item for item in info_all_jobtasks if item["id"] == self.jobtask_id
        )
        return self._info

    def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
        """
        Gets the Jobtask results data.json.

        Args:
            as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.

        Returns:
            Json of the results, alternatively geodataframe.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/data-json/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        logger.info(f"Retrieved {len(response_json['features'])} features.")

        if as_dataframe:
            # UP42 results are always in EPSG 4326
            df = GeoDataFrame.from_features(response_json, crs=4326)
            return df
        else:
            return response_json

    def _get_download_url(self):
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/downloads/results/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        download_url = response_json["data"]["url"]
        return download_url

    def download_results(
        self, output_directory: Union[str, Path, None] = None
    ) -> List[str]:
        """
        Downloads and unpacks the jobtask results. Default download to Desktop.

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.
        Returns:
            List of the downloaded results' filepaths.
        """
        logger.info(f"Downloading results of jobtask {self.jobtask_id}")

        if output_directory is None:
            output_directory = (
                Path.cwd()
                / f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        download_url = self._get_download_url()
        out_filepaths = download_from_gcs_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )

        self.results = out_filepaths
        return out_filepaths

    def download_quicklooks(
        self,
        output_directory: Union[str, Path, None] = None,
    ) -> List[str]:
        """
        Downloads quicklooks of the job task to disk.

        After download, can be plotted via jobtask.plot_quicklooks().

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.

        Returns:
            The quicklooks filepaths.
        """
        if output_directory is None:
            # On purpose downloading the quicklooks to the jobs folder and not the
            # jobtasks folder,since only relevant for data block task. And clearer
            # for job.download_quicklooks.
            output_directory = (
                Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        quicklooks_ids = response_json["data"]

        out_paths: List[str] = []
        for ql_id in tqdm(quicklooks_ids):
            out_path = output_directory / f"quicklook_{ql_id}"  # No suffix required.
            out_paths.append(str(out_path))

            url = (
                f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
                f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
            )
            response = self.auth._request(
                request_type="GET", url=url, return_text=False
            )

            with open(out_path, "wb") as dst:
                for chunk in response:
                    dst.write(chunk)

        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

info: dict property

Gets and updates the jobtask metadata information.

download_quicklooks(output_directory=None)

Downloads quicklooks of the job task to disk.

After download, can be plotted via jobtask.plot_quicklooks().

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

The quicklooks filepaths.

Source code in up42/jobtask.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def download_quicklooks(
    self,
    output_directory: Union[str, Path, None] = None,
) -> List[str]:
    """
    Downloads quicklooks of the job task to disk.

    After download, can be plotted via jobtask.plot_quicklooks().

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.

    Returns:
        The quicklooks filepaths.
    """
    if output_directory is None:
        # On purpose downloading the quicklooks to the jobs folder and not the
        # jobtasks folder,since only relevant for data block task. And clearer
        # for job.download_quicklooks.
        output_directory = (
            Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    quicklooks_ids = response_json["data"]

    out_paths: List[str] = []
    for ql_id in tqdm(quicklooks_ids):
        out_path = output_directory / f"quicklook_{ql_id}"  # No suffix required.
        out_paths.append(str(out_path))

        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
        )
        response = self.auth._request(
            request_type="GET", url=url, return_text=False
        )

        with open(out_path, "wb") as dst:
            for chunk in response:
                dst.write(chunk)

    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

download_results(output_directory=None)

Downloads and unpacks the jobtask results. Default download to Desktop.

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None
Source code in up42/jobtask.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def download_results(
    self, output_directory: Union[str, Path, None] = None
) -> List[str]:
    """
    Downloads and unpacks the jobtask results. Default download to Desktop.

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
    Returns:
        List of the downloaded results' filepaths.
    """
    logger.info(f"Downloading results of jobtask {self.jobtask_id}")

    if output_directory is None:
        output_directory = (
            Path.cwd()
            / f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    download_url = self._get_download_url()
    out_filepaths = download_from_gcs_unpack(
        download_url=download_url,
        output_directory=output_directory,
    )

    self.results = out_filepaths
    return out_filepaths

get_results_json(as_dataframe=False)

Gets the Jobtask results data.json.

Parameters:

Name Type Description Default
as_dataframe bool

"fc" for FeatureCollection dict, "df" for GeoDataFrame.

False

Returns:

Type Description
Union[dict, GeoDataFrame]

Json of the results, alternatively geodataframe.

Source code in up42/jobtask.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
    """
    Gets the Jobtask results data.json.

    Args:
        as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.

    Returns:
        Json of the results, alternatively geodataframe.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
        f"/tasks/{self.jobtask_id}/outputs/data-json/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    logger.info(f"Retrieved {len(response_json['features'])} features.")

    if as_dataframe:
        # UP42 results are always in EPSG 4326
        df = GeoDataFrame.from_features(response_json, crs=4326)
        return df
    else:
        return response_json