Skip to content

up42

authenticate(cfg_file=None, project_id=None, project_api_key=None, **kwargs)

Authenticate with UP42, either via project_id & project_api_key, or a config JSON file containing both. Also see the documentation https://sdk.up42.com/authentication/

Parameters:

Name Type Description Default
cfg_file Union[str, Path]

A JSON file containing project_id & project_api_key

None
project_id Optional[str]

The UP42 project id.

None
project_api_key Optional[str]

The UP42 project api key.

None

Examples:

up42.authenticate(
    project_id="your-project-ID",
    project_api_key="your-project-API-key"
)
Source code in up42/main.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def authenticate(
    cfg_file: Union[str, Path] = None,
    project_id: Optional[str] = None,
    project_api_key: Optional[str] = None,
    **kwargs,
):
    """
    Authenticate with UP42, either via project_id & project_api_key, or a config JSON file containing both.
    Also see the documentation https://sdk.up42.com/authentication/

    Args:
        cfg_file: A JSON file containing project_id & project_api_key
        project_id: The UP42 project id.
        project_api_key: The UP42 project api key.

    Examples:
        ```python
        up42.authenticate(
            project_id="your-project-ID",
            project_api_key="your-project-API-key"
        )
        ```
    """
    global _auth
    _auth = Auth(
        cfg_file=cfg_file,
        project_id=project_id,
        project_api_key=project_api_key,
        **kwargs,
    )

SDK conveniance functionality that is made available on the up42 import object (in the init) and is not directly related to API calls.

get_example_aoi(location='Berlin', as_dataframe=False)

Gets predefined, small, rectangular example AOI for the selected location.

Parameters:

Name Type Description Default
location str

Location, one of Berlin, Washington.

'Berlin'
as_dataframe bool

Returns a dataframe instead of dict FeatureColletions (default).

False

Returns:

Type Description
Union[dict, GeoDataFrame]

Feature collection JSON with the selected AOI.

Source code in up42/tools.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def get_example_aoi(
    location: str = "Berlin", as_dataframe: bool = False
) -> Union[dict, GeoDataFrame]:
    """
    Gets predefined, small, rectangular example AOI for the selected location.

    Args:
        location: Location, one of Berlin, Washington.
        as_dataframe: Returns a dataframe instead of dict FeatureColletions
            (default).

    Returns:
        Feature collection JSON with the selected AOI.
    """
    logger.info(f"Getting small example AOI in location '{location}'.")
    if location == "Berlin":
        example_aoi = read_vector_file(
            f"{str(Path(__file__).resolve().parent)}/data/aoi_berlin.geojson"
        )
    elif location == "Washington":
        example_aoi = read_vector_file(
            f"{str(Path(__file__).resolve().parent)}/data/aoi_washington.geojson"
        )
    else:
        raise ValueError(
            "Please select one of 'Berlin' or 'Washington' as the location!"
        )

    if as_dataframe:
        df = GeoDataFrame.from_features(example_aoi, crs=4326)
        return df
    else:
        return example_aoi

get_logger(name, level=logging.INFO, verbose=False)

Use level=logging.CRITICAL to disable temporarily.

Source code in up42/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_logger(
    name: str,
    level=logging.INFO,
    verbose: bool = False,
):
    """
    Use level=logging.CRITICAL to disable temporarily.
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)
    # create console handler and set level to debug
    ch = logging.StreamHandler()
    ch.setLevel(level)
    if verbose:
        log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)"
    else:
        # hide logger module & level, truncate log messages > 2000 characters (e.g. huge geometries)
        log_format = "%(asctime)s - %(message).2000s"
    formatter = logging.Formatter(log_format)
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    logger.propagate = False
    return logger

read_vector_file(filename='aoi.geojson', as_dataframe=False)

Reads vector files (geojson, shapefile, kml, wkt) to a feature collection, for use as the AOI geometry in the workflow input parameters (see get_input_parameters).

Example AOI fields are provided, e.g. example/data/aoi_Berlin.geojson

Parameters:

Name Type Description Default
filename str

File path of the vector file.

'aoi.geojson'
as_dataframe bool

Return type, default FeatureCollection, GeoDataFrame if True.

False

Returns:

Type Description
Union[dict, GeoDataFrame]

Feature Collection

Source code in up42/tools.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def read_vector_file(
    filename: str = "aoi.geojson", as_dataframe: bool = False
) -> Union[dict, GeoDataFrame]:
    """
    Reads vector files (geojson, shapefile, kml, wkt) to a feature collection,
    for use as the AOI geometry in the workflow input parameters
    (see get_input_parameters).

    Example AOI fields are provided, e.g. example/data/aoi_Berlin.geojson

    Args:
        filename: File path of the vector file.
        as_dataframe: Return type, default FeatureCollection, GeoDataFrame if True.

    Returns:
        Feature Collection
    """
    suffix = Path(filename).suffix

    if suffix == ".kml":
        # pylint: disable=no-member
        gpd.io.file.fiona.drvsupport.supported_drivers["KML"] = "rw"
        df = gpd.read_file(filename, driver="KML")
    elif suffix == ".wkt":
        with open(filename) as wkt_file:
            wkt = wkt_file.read()
            df = pd.DataFrame({"geometry": [wkt]})
            df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
            df = GeoDataFrame(df, geometry="geometry", crs=4326)
    else:
        df = gpd.read_file(filename)

    if df.crs.to_string() != "EPSG:4326":
        df = df.to_crs(epsg=4326)
    if as_dataframe:
        return df
    else:
        return df.__geo_interface__

settings(log=True)

Configures settings about logging etc. when using the up42-py package.

Parameters:

Name Type Description Default
log bool

Activates/deactivates logging, default True is activated logging.

True
Source code in up42/tools.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def settings(log: bool = True) -> None:
    """
    Configures settings about logging etc. when using the up42-py package.
    Args:
        log: Activates/deactivates logging, default True is activated logging.
    """
    if log:
        logger.info(
            "Logging enabled (default) - use up42.settings(log=False) to disable."
        )
    else:
        logger.info("Logging disabled - use up42.settings(log=True) to reactivate.")

    for name in logging.root.manager.loggerDict:
        setattr(logging.getLogger(name), "disabled", not log)

Visualization tools available in various objects

draw_aoi()

Displays an interactive map to draw an AOI by hand, returns the folium object if not run in a Jupyter notebook.

Export the drawn AOI via the export button, then read the geometries via up42.read_aoi_file().

Requires installation of up42-py[viz] extra dependencies.

Source code in up42/viztools.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@requires_viz
def draw_aoi() -> "folium.Map":
    """
    Displays an interactive map to draw an AOI by hand, returns the folium object if
    not run in a Jupyter notebook.

    Export the drawn AOI via the export button, then read the geometries via
    up42.read_aoi_file().

    Requires installation of up42-py[viz] extra dependencies.
    """
    m = folium_base_map(layer_control=True)
    DrawFoliumOverride(
        export=True,
        filename="aoi.geojson",
        position="topleft",
        draw_options={
            "rectangle": {"repeatMode": False, "showArea": True},
            "polygon": {"showArea": True, "allowIntersection": False},
            "polyline": False,
            "circle": False,
            "marker": False,
            "circlemarker": False,
        },
        edit_options={"polygon": {"allowIntersection": False}},
    ).add_to(m)
    return m

Auth

Source code in up42/auth.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class Auth:
    def __init__(
        self,
        cfg_file: Union[str, Path, None] = None,
        project_id: Optional[str] = None,
        project_api_key: Optional[str] = None,
        **kwargs,
    ):
        """
        The Auth class handles the authentication with UP42.

        Info:
            Authentication is possible via the credentials of a specific project (project_id &
            project_api_key). To get your **project id** and **project api key**, follow
            the instructions in the docs authentication chapter.

        Args:
            cfg_file: File path to the cfg.json with {project_id: "...", project_api_key: "..."}.
            project_id: The unique identifier of the project.
            project_api_key: The project-specific API key.
        """
        self.cfg_file = cfg_file
        self.project_id = project_id
        self.project_api_key = project_api_key
        self.workspace_id: Optional[str] = None
        self.token: Optional[str] = None

        try:
            self.env: str = kwargs["env"]
        except KeyError:
            self.env = "com"
        try:
            self.authenticate: bool = kwargs["authenticate"]
        except KeyError:
            self.authenticate = True

        if self.authenticate:
            self._find_credentials()
            self._get_token()
            self._get_workspace()
            logger.info("Authentication with UP42 successful!")

    def __repr__(self):
        env_string = f" ,{self.env}" if self.env != "com" else ""
        return f"UP42ProjectAuth(project_id={self.project_id}{env_string})"

    def _find_credentials(self) -> None:
        """
        Sources the project credentials from a provided config file, error handling
        if no credentials are provided in arguments or config file.
        """
        if self.project_id is None or self.project_api_key is None:
            if self.cfg_file is None:
                raise ValueError(
                    "Provide project_id and project_api_key via arguments or config file!"
                )

            # Source credentials from config file.
            try:
                with open(self.cfg_file) as src:
                    config = json.load(src)
                    try:
                        self.project_id = config["project_id"]
                        self.project_api_key = config["project_api_key"]
                    except KeyError as e:
                        raise ValueError(
                            "Provided config file does not contain project_id and "
                            "project_api_key!"
                        ) from e
                logger.info("Got credentials from config file.")
            except FileNotFoundError as e:
                raise ValueError("Selected config file does not exist!") from e

        elif all(
            v is not None
            for v in [self.cfg_file, self.project_id, self.project_api_key]
        ):
            logger.info(
                "Credentials are provided via arguments and config file, "
                "now using the argument credentials."
            )

    def _endpoint(self) -> str:
        """Gets the endpoint."""
        return f"https://api.up42.{self.env}"

    def _get_token(self):
        """Project specific authentication via project id and project api key."""
        try:
            client = BackendApplicationClient(
                client_id=self.project_id, client_secret=self.project_api_key
            )
            auth = HTTPBasicAuth(self.project_id, self.project_api_key)
            get_token_session = OAuth2Session(client=client)
            token_response = get_token_session.fetch_token(
                token_url=self._endpoint() + "/oauth/token", auth=auth
            )
        except MissingTokenError as err:
            raise ValueError(
                "Authentication was not successful, check the provided project credentials."
            ) from err

        self.token = token_response["data"]["accessToken"]

    def _get_workspace(self) -> None:
        """Get workspace id belonging to authenticated project."""
        url = f"https://api.up42.{self.env}/projects/{self.project_id}"
        resp = self._request("GET", url)
        self.workspace_id = resp["data"]["workspaceId"]  # type: ignore

    @staticmethod
    def _generate_headers(token: str) -> Dict[str, str]:
        version = (
            Path(__file__)
            .resolve()
            .parent.joinpath("_version.txt")
            .read_text(encoding="utf-8")
        )
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}",
            "cache-control": "no-cache",
            "User-Agent": f"up42-py/{version} (https://github.com/up42/up42-py)",
        }
        return headers

    # pylint: disable=dangerous-default-value
    @retry(  # type: ignore
        retry=retry_if_429_rate_limit(),
        wait=wait_random_exponential(multiplier=0.5, max=180),
        reraise=True,
    )
    def _request_helper(
        self, request_type: str, url: str, data: dict = {}, querystring: dict = {}
    ) -> requests.Response:
        """
        Helper function for the request, running the actual request with the correct headers.

        Args:
            request_type: 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'
            url: The requests url.
            data: The payload, e.g. dictionary with job parameters etc.
            querystring: The querystring.

        Returns:
            The request response.
        """
        headers = self._generate_headers(self.token)  # type: ignore
        if querystring == {}:
            response: requests.Response = requests.request(
                method=request_type, url=url, data=json.dumps(data), headers=headers
            )
        else:
            response = requests.request(
                method=request_type,
                url=url,
                data=json.dumps(data),
                headers=headers,
                params=querystring,
            )
        logger.debug(response)
        logger.debug(data)
        response.raise_for_status()
        return response

    def _request(
        self,
        request_type: str,
        url: str,
        data: Union[dict, list] = {},
        querystring: dict = {},
        return_text: bool = True,
    ):  # Union[str, dict, requests.Response]:
        """
        Handles retrying the request and automatically retries and gets a new token if
        the old is invalid.

        Retry is enabled by default, can be set to False as kwargs of Auth.

        In addition to this retry mechanic, 429-errors (too many requests) are retried
        more extensively in _request_helper.

        Args:
            request_type: 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'
            url: The url to request.
            data: The payload, e.g. dictionary with job parameters etc.
            querystring: The querystring.
            return_text: If true returns response text/json, false returns response.
            retry: If False, after 5 minutes and invalid token will return 401
                errors.

        Returns:
            The API response.
        """
        retryer_token = Retrying(
            stop=stop_after_attempt(2),  # Original attempt + one retry
            wait=wait_fixed(0.5),
            retry=(
                retry_if_401_invalid_token()
                | retry_if_exception_type(requests.exceptions.ConnectionError)
            ),
            after=lambda retry_state: self._get_token(),  # type:ignore
            reraise=True,
            # after final failed attempt, raises last attempt's exception instead of RetryError.
        )

        try:
            response: requests.Response = retryer_token(
                self._request_helper, request_type, url, data, querystring
            )

        # There are two UP42 API versions:
        # v1 endpoints give response format {"data": ..., "error": ...}   data e.g. dict or list.  error str or dict
        # or None (if no error).
        # v1 always gives response, the error is indicated by the error key.
        # v2 endpoints follows RFC 7807: {"title":..., "status": 404} Optional "detail" and "type" keys.
        # v2 either gives above positive response, or fails with httperror (then check error.json() for the above
        # fields)

        except requests.exceptions.RequestException as err:  # Base error class
            # Raising the original `err` error would not surface the relevant error message (contained in API response)
            err_message = err.response.json()
            logger.error(f"Error {err_message}")
            raise requests.exceptions.RequestException(err_message) from err

        # Handle response text.
        if return_text:
            try:
                response_text = json.loads(response.text)
            except json.JSONDecodeError:  # e.g. JobTask logs are str format.
                response_text = response.text

            # Handle api error messages here before handling it in every single function.
            try:
                if response_text["error"] is not None and response_text["data"] is None:
                    raise ValueError(response_text["error"])
                return response_text
            except (
                KeyError,
                TypeError,
            ):  # Catalog search, JobTask logs etc. does not have the usual {"data":"",
                # "error":""} format.
                return response_text

        else:  # E.g. for DELETE
            return response

Webhook

Webhook

Webhook class to control a specific UP42 webhook, e.g. modify, test or delete the specific webhook.

webhook = webhook.trigger_test_event()
Source code in up42/webhooks.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Webhook:
    """
    # Webhook

    Webhook class to control a specific UP42 webhook, e.g. modify, test or delete the specific webhook.

    ```python
    webhook = webhook.trigger_test_event()
    ```
    """

    def __init__(self, auth: Auth, webhook_id: str, webhook_info: dict = None):
        self.auth = auth
        self.workspace_id = auth.workspace_id
        self.webhook_id = webhook_id
        if webhook_info is not None:
            self._info = webhook_info
        else:
            self._info = self.info

    def __repr__(self):
        return f"Webhook(name: {self._info['name']}, webhook_id: {self.webhook_id}, active: {self._info['active']}"

    @property
    def info(self) -> dict:
        """
        Gets and updates the webhook metadata information.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    def trigger_test_events(self) -> dict:
        """
        Triggers webhook test event to test your receiving side. The UP42 server will send test
        messages for each subscribed event to the specified webhook URL.

        Returns:
            A dict with information about the test events.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}/tests"
        response_json = self.auth._request(
            request_type="POST",
            url=url,
        )
        return response_json["data"]

    def update(
        self,
        name: Optional[str] = None,
        url: Optional[str] = None,
        events: Optional[List[str]] = None,
        active: Optional[bool] = None,
        secret: Optional[str] = None,
    ) -> "Webhook":
        """
        Updates a registered webhook.

        Args:
            name: Updated webhook name
            url: Updated unique URL where the webhook will send the message (HTTPS required)
            events: Updated list of event types [order.status, job.status].
            active: Updated webhook status.
            secret: Updated string that acts as signature to the https request sent to the url.

        Returns:
            The updated webhook object.
        """
        self.info  # _info could be outdated. #pylint: disable=pointless-statement
        input_parameters = {
            "name": name if name is not None else self._info["name"],
            "url": url if url is not None else self._info["url"],
            "events": events if events is not None else self._info["events"],
            "secret": secret if secret is not None else self._info["secret"],
            "active": active if active is not None else self._info["active"],
        }
        url_put = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        response_json = self.auth._request(
            request_type="PUT", url=url_put, data=input_parameters
        )
        self._info = response_json["data"]
        logger.info(f"Updated webhook {self}")
        return self

    def delete(self) -> None:
        """
        Deletes a registered webhook.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        self.auth._request(request_type="DELETE", url=url)
        logger.info(f"Successfully deleted Webhook: {self.webhook_id}")

delete()

Deletes a registered webhook.

Source code in up42/webhooks.py
 94
 95
 96
 97
 98
 99
100
def delete(self) -> None:
    """
    Deletes a registered webhook.
    """
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
    self.auth._request(request_type="DELETE", url=url)
    logger.info(f"Successfully deleted Webhook: {self.webhook_id}")

info: dict property

Gets and updates the webhook metadata information.

trigger_test_events()

Triggers webhook test event to test your receiving side. The UP42 server will send test messages for each subscribed event to the specified webhook URL.

Returns:

Type Description
dict

A dict with information about the test events.

Source code in up42/webhooks.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def trigger_test_events(self) -> dict:
    """
    Triggers webhook test event to test your receiving side. The UP42 server will send test
    messages for each subscribed event to the specified webhook URL.

    Returns:
        A dict with information about the test events.
    """
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}/tests"
    response_json = self.auth._request(
        request_type="POST",
        url=url,
    )
    return response_json["data"]

update(name=None, url=None, events=None, active=None, secret=None)

Updates a registered webhook.

Parameters:

Name Type Description Default
name Optional[str]

Updated webhook name

None
url Optional[str]

Updated unique URL where the webhook will send the message (HTTPS required)

None
events Optional[List[str]]

Updated list of event types [order.status, job.status].

None
active Optional[bool]

Updated webhook status.

None
secret Optional[str]

Updated string that acts as signature to the https request sent to the url.

None

Returns:

Type Description
Webhook

The updated webhook object.

Source code in up42/webhooks.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def update(
    self,
    name: Optional[str] = None,
    url: Optional[str] = None,
    events: Optional[List[str]] = None,
    active: Optional[bool] = None,
    secret: Optional[str] = None,
) -> "Webhook":
    """
    Updates a registered webhook.

    Args:
        name: Updated webhook name
        url: Updated unique URL where the webhook will send the message (HTTPS required)
        events: Updated list of event types [order.status, job.status].
        active: Updated webhook status.
        secret: Updated string that acts as signature to the https request sent to the url.

    Returns:
        The updated webhook object.
    """
    self.info  # _info could be outdated. #pylint: disable=pointless-statement
    input_parameters = {
        "name": name if name is not None else self._info["name"],
        "url": url if url is not None else self._info["url"],
        "events": events if events is not None else self._info["events"],
        "secret": secret if secret is not None else self._info["secret"],
        "active": active if active is not None else self._info["active"],
    }
    url_put = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
    response_json = self.auth._request(
        request_type="PUT", url=url_put, data=input_parameters
    )
    self._info = response_json["data"]
    logger.info(f"Updated webhook {self}")
    return self

Webhooks

Contains UP42 webhooks functionality to set up a custom callback e.g. when an order is finished he webhook is triggered and an event notification is transmitted via HTTPS to a specific URL.

Also see the full webhook documentation.

Create a new webhook or query a existing ones via the up42 object, e.g.

webhooks = up42.get_webhooks()
webhook = up42.initialize_webhook(webhook_id = "...")

The resulting Webhook object lets you modify, test or delete the specific webhook, e.g.

webhook = webhook.trigger_test_event()

Source code in up42/webhooks.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class Webhooks:
    """
    Contains UP42 webhooks functionality to set up a custom callback e.g. when an order is finished
    he webhook is triggered and an event notification is transmitted via HTTPS to a specific URL.

    Also see the [full webhook documentation](https://docs.up42.com/account/webhooks).

    Create a new webhook or query a existing ones via the `up42` object, e.g.
    ```python
    webhooks = up42.get_webhooks()
    ```
    ```python
    webhook = up42.initialize_webhook(webhook_id = "...")
    ```

    The resulting Webhook object lets you modify, test or delete the specific webhook, e.g.
    ```python
    webhook = webhook.trigger_test_event()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.workspace_id = auth.workspace_id

    def get_webhook_events(self) -> dict:
        """
        Gets all available webhook events.

        Returns:
            A dict of the available webhook events.
        """
        url = f"{self.auth._endpoint()}/webhooks/events"
        response_json = self.auth._request(request_type="GET", url=url)
        return response_json["data"]

    def get_webhooks(self, return_json: bool = False) -> List[Webhook]:
        """
        Gets all registered webhooks for this workspace.

        Args:
            return_json: If true returns the webhooks information as JSON instead of webhook class objects.

        Returns:
            A list of the registered webhooks for this workspace.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks"
        response_json = self.auth._request(request_type="GET", url=url)
        logger.info(f"Queried {len(response_json['data'])} webhooks.")

        if return_json:
            return response_json["data"]
        webhooks = [
            Webhook(
                auth=self.auth, webhook_id=webhook_info["id"], webhook_info=webhook_info
            )
            for webhook_info in response_json["data"]
        ]
        return webhooks

    def create_webhook(
        self,
        name: str,
        url: str,
        events: List[str],
        active: bool = False,
        secret: Optional[str] = None,
    ) -> Webhook:
        """
        Registers a new webhook in the system.

        Args:
            name: Webhook name
            url: Unique URL where the webhook will send the message (HTTPS required)
            events: List of event types e.g. [order.status, job.status]
            active: Webhook status.
            secret: String that acts as signature to the https request sent to the url.

        Returns:
            A dict with details of the registered webhook.
        """
        input_parameters = {
            "name": name,
            "url": url,
            "events": events,
            "secret": secret,
            "active": active,
        }
        url_post = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks"
        response_json = self.auth._request(
            request_type="POST", url=url_post, data=input_parameters
        )
        webhook = Webhook(
            auth=self.auth,
            webhook_id=response_json["data"]["id"],
            webhook_info=response_json["data"],
        )
        logger.info(f"Created webhook {webhook}")
        return webhook

create_webhook(name, url, events, active=False, secret=None)

Registers a new webhook in the system.

Parameters:

Name Type Description Default
name str

Webhook name

required
url str

Unique URL where the webhook will send the message (HTTPS required)

required
events List[str]

List of event types e.g. [order.status, job.status]

required
active bool

Webhook status.

False
secret Optional[str]

String that acts as signature to the https request sent to the url.

None

Returns:

Type Description
Webhook

A dict with details of the registered webhook.

Source code in up42/webhooks.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def create_webhook(
    self,
    name: str,
    url: str,
    events: List[str],
    active: bool = False,
    secret: Optional[str] = None,
) -> Webhook:
    """
    Registers a new webhook in the system.

    Args:
        name: Webhook name
        url: Unique URL where the webhook will send the message (HTTPS required)
        events: List of event types e.g. [order.status, job.status]
        active: Webhook status.
        secret: String that acts as signature to the https request sent to the url.

    Returns:
        A dict with details of the registered webhook.
    """
    input_parameters = {
        "name": name,
        "url": url,
        "events": events,
        "secret": secret,
        "active": active,
    }
    url_post = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks"
    response_json = self.auth._request(
        request_type="POST", url=url_post, data=input_parameters
    )
    webhook = Webhook(
        auth=self.auth,
        webhook_id=response_json["data"]["id"],
        webhook_info=response_json["data"],
    )
    logger.info(f"Created webhook {webhook}")
    return webhook

get_webhook_events()

Gets all available webhook events.

Returns:

Type Description
dict

A dict of the available webhook events.

Source code in up42/webhooks.py
128
129
130
131
132
133
134
135
136
137
def get_webhook_events(self) -> dict:
    """
    Gets all available webhook events.

    Returns:
        A dict of the available webhook events.
    """
    url = f"{self.auth._endpoint()}/webhooks/events"
    response_json = self.auth._request(request_type="GET", url=url)
    return response_json["data"]

get_webhooks(return_json=False)

Gets all registered webhooks for this workspace.

Parameters:

Name Type Description Default
return_json bool

If true returns the webhooks information as JSON instead of webhook class objects.

False

Returns:

Type Description
List[Webhook]

A list of the registered webhooks for this workspace.

Source code in up42/webhooks.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_webhooks(self, return_json: bool = False) -> List[Webhook]:
    """
    Gets all registered webhooks for this workspace.

    Args:
        return_json: If true returns the webhooks information as JSON instead of webhook class objects.

    Returns:
        A list of the registered webhooks for this workspace.
    """
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks"
    response_json = self.auth._request(request_type="GET", url=url)
    logger.info(f"Queried {len(response_json['data'])} webhooks.")

    if return_json:
        return response_json["data"]
    webhooks = [
        Webhook(
            auth=self.auth, webhook_id=webhook_info["id"], webhook_info=webhook_info
        )
        for webhook_info in response_json["data"]
    ]
    return webhooks

create_webhook(name, url, events, active=False, secret=None)

Registers a new webhook in the system.

Parameters:

Name Type Description Default
name str

Webhook name

required
url str

Unique URL where the webhook will send the message (HTTPS required)

required
events List[str]

List of event types (order status / job task status)

required
active bool

Webhook status.

False
secret Optional[str]

String that acts as signature to the https request sent to the url.

None

Returns:

Type Description

A dict with details of the registered webhook.

Source code in up42/main.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@_check_auth
def create_webhook(
    name: str,
    url: str,
    events: List[str],
    active: bool = False,
    secret: Optional[str] = None,
):
    """
    Registers a new webhook in the system.

    Args:
        name: Webhook name
        url: Unique URL where the webhook will send the message (HTTPS required)
        events: List of event types (order status / job task status)
        active: Webhook status.
        secret: String that acts as signature to the https request sent to the url.
    Returns:
        A dict with details of the registered webhook.
    """
    webhook = Webhooks(auth=_auth).create_webhook(
        name=name, url=url, events=events, active=active, secret=secret
    )
    return webhook

format_time(date, set_end_of_day=False)

Formats date isostring to datetime string format

Parameters:

Name Type Description Default
date Optional[Union[str, datetime]]

datetime object or isodatetime string e.g. "YYYY-MM-DD" or "YYYY-MM-DDTHH:MM:SS".

required
set_end_of_day

Sets the date to end of day, as required for most image archive searches. Only applies for type date string without time, e.g. "YYYY-MM-DD", not explicit datetime object or time of day.

False
Source code in up42/utils.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def format_time(date: Optional[Union[str, datetime]], set_end_of_day=False):
    """
    Formats date isostring to datetime string format

    Args:
        date: datetime object or isodatetime string e.g. "YYYY-MM-DD" or "YYYY-MM-DDTHH:MM:SS".
        set_end_of_day: Sets the date to end of day, as required for most image archive searches. Only applies for
            type date string without time, e.g. "YYYY-MM-DD", not explicit datetime object or time of day.
    """
    if isinstance(date, str):
        has_time_of_day = len(date) > 11
        date = datetime.fromisoformat(date)  # type: ignore
        if not has_time_of_day and set_end_of_day:
            date = datetime.combine(date.date(), datetime_time(23, 59, 59, 999999))
    elif isinstance(date, datetime):
        pass
    else:
        raise ValueError("date needs to be of type datetime or isoformat date string!")

    return date.strftime("%Y-%m-%dT%H:%M:%SZ")

get_block_coverage(block_id)

Gets the spatial coverage of a data/processing block as url or GeoJson Feature Collection.

Parameters:

Name Type Description Default
block_id str

The block id.

required

Returns:

Type Description
dict

A dict of the spatial coverage for the specific block.

Source code in up42/main.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@_check_auth
def get_block_coverage(block_id: str) -> dict:
    """
    Gets the spatial coverage of a data/processing block as
    url or GeoJson Feature Collection.

    Args:
        block_id: The block id.

    Returns:
        A dict of the spatial coverage for the specific block.
    """
    url = f"{_auth._endpoint()}/blocks/{block_id}/coverage"
    response_json = _auth._request(request_type="GET", url=url)
    details_json = response_json["data"]
    response_coverage = requests.get(details_json["url"]).json()
    return response_coverage

get_block_details(block_id, as_dataframe=False)

Gets the detailed information about a specific public block from the server, includes all manifest.json and marketplace.json contents. Can not access custom blocks.

Parameters:

Name Type Description Default
block_id str

The block id.

required
as_dataframe bool

Returns a dataframe instead of JSON (default).

False

Returns:

Type Description
dict

A dict of the block details metadata for the specific block.

Source code in up42/main.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@_check_auth
def get_block_details(block_id: str, as_dataframe: bool = False) -> dict:
    """
    Gets the detailed information about a specific public block from
    the server, includes all manifest.json and marketplace.json contents.
    Can not access custom blocks.

    Args:
        block_id: The block id.
        as_dataframe: Returns a dataframe instead of JSON (default).

    Returns:
        A dict of the block details metadata for the specific block.
    """
    url = f"{_auth._endpoint()}/blocks/{block_id}"  # public blocks
    response_json = _auth._request(request_type="GET", url=url)
    details_json = response_json["data"]

    if as_dataframe:
        return pd.DataFrame.from_dict(details_json, orient="index").transpose()
    else:
        return details_json

get_blocks(block_type=None, basic=True, as_dataframe=False)

Gets a list of all public blocks on the marketplace. Can not access custom blocks.

Parameters:

Name Type Description Default
block_type Optional[str]

Optionally filters to "data" or "processing" blocks, default None.

None
basic bool

Optionally returns simple version {block_id : block_name}

True
as_dataframe bool

Returns a dataframe instead of JSON (default).

False

Returns:

Type Description
Union[List[Dict], dict]

A list of the public blocks and their metadata. Optional a simpler version

Union[List[Dict], dict]

dict.

Source code in up42/main.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@_check_auth
def get_blocks(
    block_type: Optional[str] = None,
    basic: bool = True,
    as_dataframe: bool = False,
) -> Union[List[Dict], dict]:
    """
    Gets a list of all public blocks on the marketplace. Can not access custom blocks.

    Args:
        block_type: Optionally filters to "data" or "processing" blocks, default None.
        basic: Optionally returns simple version {block_id : block_name}
        as_dataframe: Returns a dataframe instead of JSON (default).

    Returns:
        A list of the public blocks and their metadata. Optional a simpler version
        dict.
    """
    try:
        block_type = block_type.lower()  # type: ignore
    except AttributeError:
        pass
    url = f"{_auth._endpoint()}/blocks"
    response_json = _auth._request(request_type="GET", url=url)
    public_blocks_json = response_json["data"]

    if block_type == "data":
        logger.info("Getting only data blocks.")
        blocks_json = [block for block in public_blocks_json if block["type"] == "DATA"]
    elif block_type == "processing":
        logger.info("Getting only processing blocks.")
        blocks_json = [
            block for block in public_blocks_json if block["type"] == "PROCESSING"
        ]
    else:
        blocks_json = public_blocks_json

    if basic:
        logger.info(
            "Getting blocks name and id, use basic=False for all block details."
        )
        blocks_basic = {block["name"]: block["id"] for block in blocks_json}
        if as_dataframe:
            return pd.DataFrame.from_dict(blocks_basic, orient="index")
        else:
            return blocks_basic

    else:
        if as_dataframe:
            return pd.DataFrame(blocks_json)
        else:
            return blocks_json

get_credits_balance()

Display the overall credits available in your account.

Returns:

Type Description
dict

A dict with the balance of credits available in your account.

Source code in up42/main.py
237
238
239
240
241
242
243
244
245
246
247
248
@_check_auth
def get_credits_balance() -> dict:
    """
    Display the overall credits available in your account.

    Returns:
        A dict with the balance of credits available in your account.
    """
    endpoint_url = f"{_auth._endpoint()}/accounts/me/credits/balance"
    response_json = _auth._request(request_type="GET", url=endpoint_url)
    details_json = response_json["data"]
    return details_json

get_credits_history(start_date=None, end_date=None)

Display the overall credits history consumed in your account. The consumption history will be returned for all workspace_ids on your account.

Parameters:

Name Type Description Default
start_date Optional[Union[str, datetime]]

The start date for the credit consumption search, datetime or isoformat string e.g. 2021-12-01. Default start_date None uses 2000-01-01.

None
end_date Optional[Union[str, datetime]]

The end date for the credit consumption search, datetime or isoformat string e.g. 2021-12-31. Default end_date None uses current date.

None

Returns:

Type Description
Dict[str, Union[str, int, Dict]]

A dict with the information of the credit consumption records for all the users linked by the account_id.

Source code in up42/main.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@_check_auth
def get_credits_history(
    start_date: Optional[Union[str, datetime]] = None,
    end_date: Optional[Union[str, datetime]] = None,
) -> Dict[str, Union[str, int, Dict]]:
    """
    Display the overall credits history consumed in your account.
    The consumption history will be returned for all workspace_ids on your account.

    Args:
        start_date: The start date for the credit consumption search, datetime or isoformat string e.g.
            2021-12-01. Default start_date None uses 2000-01-01.
        end_date: The end date for the credit consumption search, datetime or isoformat string e.g.
            2021-12-31. Default end_date None uses current date.

    Returns:
        A dict with the information of the credit consumption records for all the users linked by the account_id.
    """
    if start_date is None:
        start_date = "2000-01-01"
    if end_date is None:
        tomorrow_date = date.today() + timedelta(days=1)
        tomorrow_datetime = datetime(
            year=tomorrow_date.year,
            month=tomorrow_date.month,
            day=tomorrow_date.day,
        )
        end_date = tomorrow_datetime.strftime("%Y-%m-%d")

    search_parameters = dict(
        {
            "from": format_time(start_date),
            "to": format_time(end_date, set_end_of_day=True),
            "size": 2000,  # 2000 is the maximum page size for this call
            "page": 0,
        }
    )
    endpoint_url = f"{_auth._endpoint()}/accounts/me/credits/history"
    response_json: dict = _auth._request(
        request_type="GET", url=endpoint_url, querystring=search_parameters
    )
    isLastPage = response_json["data"]["last"]
    credit_history = response_json["data"]["content"].copy()
    result = dict(response_json["data"])
    del result["content"]
    while not isLastPage:
        search_parameters["page"] += 1
        response_json = _auth._request(
            request_type="GET", url=endpoint_url, querystring=search_parameters
        )
        isLastPage = response_json["data"]["last"]
        credit_history.extend(response_json["data"]["content"].copy())
    result["content"] = credit_history
    return result

get_logger(name, level=logging.INFO, verbose=False)

Use level=logging.CRITICAL to disable temporarily.

Source code in up42/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_logger(
    name: str,
    level=logging.INFO,
    verbose: bool = False,
):
    """
    Use level=logging.CRITICAL to disable temporarily.
    """
    logger = logging.getLogger(name)
    logger.setLevel(level)
    # create console handler and set level to debug
    ch = logging.StreamHandler()
    ch.setLevel(level)
    if verbose:
        log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)"
    else:
        # hide logger module & level, truncate log messages > 2000 characters (e.g. huge geometries)
        log_format = "%(asctime)s - %(message).2000s"
    formatter = logging.Formatter(log_format)
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    logger.propagate = False
    return logger

get_webhook_events()

Gets all available webhook events.

Returns:

Type Description
dict

A dict of the available webhook events.

Source code in up42/main.py
128
129
130
131
132
133
134
135
136
137
@_check_auth
def get_webhook_events() -> dict:
    """
    Gets all available webhook events.

    Returns:
        A dict of the available webhook events.
    """
    webhook_events = Webhooks(auth=_auth).get_webhook_events()
    return webhook_events

get_webhooks(return_json=False)

Gets all registered webhooks for this workspace.

Parameters:

Name Type Description Default
return_json bool

If true returns the webhooks information as JSON instead of webhook class objects.

False

Returns:

Type Description
List[Webhook]

A list of the registered webhooks for this workspace.

Source code in up42/main.py
88
89
90
91
92
93
94
95
96
97
98
99
@_check_auth
def get_webhooks(return_json: bool = False) -> List[Webhook]:
    """
    Gets all registered webhooks for this workspace.

    Args:
        return_json: If true returns the webhooks information as JSON instead of webhook class objects.
    Returns:
        A list of the registered webhooks for this workspace.
    """
    webhooks = Webhooks(auth=_auth).get_webhooks(return_json=return_json)
    return webhooks

validate_manifest(path_or_json)

Validates a block manifest JSON.

The block manifest is required to build a custom block on UP42 and contains the metadata about the block as well as block input and output capabilities.

Parameters:

Name Type Description Default
path_or_json Union[str, Path, dict]

The input manifest, either a filepath or JSON string, see example.

required

Returns:

Type Description
dict

A dictionary with the validation results and potential validation errors.

Source code in up42/main.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
@_check_auth
def validate_manifest(path_or_json: Union[str, Path, dict]) -> dict:
    """
    Validates a block manifest JSON.

    The [block manifest](https://docs.up42.com/processing-platform/custom-blocks/manifest)
    is required to build a custom block on UP42 and contains
    the metadata about the block as well as block input and output capabilities.

    Args:
        path_or_json: The input manifest, either a filepath or JSON string, see example.

    Returns:
        A dictionary with the validation results and potential validation errors.
    """
    if isinstance(path_or_json, (str, Path)):
        with open(path_or_json) as src:
            manifest_json = json.load(src)
    else:
        manifest_json = path_or_json
    url = f"{_auth._endpoint()}/validate-schema/block"
    response_json = _auth._request(request_type="POST", url=url, data=manifest_json)
    logger.info("The manifest is valid.")
    return response_json["data"]

Asset

The Asset class enables access to the UP42 assets in the storage. Assets are results of orders or results of jobs with download blocks.

Use an existing asset:

asset = up42.initialize_asset(asset_id="8c2dfb4d-bd35-435f-8667-48aea0dce2da")

Source code in up42/asset.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class Asset:
    """
    The Asset class enables access to the UP42 assets in the storage. Assets are results
    of orders or results of jobs with download blocks.

    Use an existing asset:
    ```python
    asset = up42.initialize_asset(asset_id="8c2dfb4d-bd35-435f-8667-48aea0dce2da")
    ```
    """

    def __init__(self, auth: Auth, asset_id: str, asset_info: Optional[dict] = None):
        self.auth = auth
        self.asset_id = asset_id
        self.results: Union[List[str], None] = None
        if asset_info is not None:
            self._info = asset_info
        else:
            self._info = self.info

    def __repr__(self):
        representation = (
            f"Asset(name: {self._info['name']}, asset_id: {self.asset_id}, createdAt: {self._info['createdAt']}, "
            f"size: {self._info['size']})"
        )
        if "source" in self._info:
            representation += f", source: {self._info['source']}"
        if "contentType" in self._info:
            representation += f", contentType: {self._info['contentType']}"
        return representation

    @property
    def info(self) -> dict:
        """
        Gets and updates the asset metadata information.
        """
        url = f"{self.auth._endpoint()}/v2/assets/{self.asset_id}/metadata"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json
        return self._info

    @property
    def stac_info(self) -> Union[dict, None]:
        """
        Gets the storage STAC information for the asset as a FeatureCollection.

        One asset can contain multiple STAC items (e.g. the pan- and multispectral images).
        """
        stac_search_parameters = {
            "max_items": 50,
            "limit": 50,
            "filter": {
                "op": "=",
                "args": [{"property": "asset_id"}, self.asset_id],
            },
        }
        url = f"{self.auth._endpoint()}/v2/assets/stac/search"
        stac_results = self.auth._request(
            request_type="POST", url=url, data=stac_search_parameters
        )
        stac_results.pop("links", None)
        if not stac_results["features"]:
            logger.info(
                "No STAC metadata information available for this asset's items!"
            )
        return stac_results

    def update_metadata(
        self, title: str = None, tags: List[str] = None, **kwargs
    ) -> dict:
        """
        Update the metadata of the asset.

        Args:
            title: The title string to be assigned to the asset.
            tags: A list of tag strings to be assigned to the asset.

        Returns:
            The updated asset metadata information
        """
        url = f"{self.auth._endpoint()}/v2/assets/{self.asset_id}/metadata"
        body_update = {"title": title, "tags": tags, **kwargs}
        response_json = self.auth._request(
            request_type="POST", url=url, data=body_update
        )
        self._info = response_json
        return self._info

    def _get_download_url(self) -> str:
        url = f"{self.auth._endpoint()}/v2/assets/{self.asset_id}/download-url"
        response_json = self.auth._request(request_type="POST", url=url)
        download_url = response_json["url"]
        return download_url

    def download(
        self, output_directory: Union[str, Path, None] = None, unpacking: bool = True
    ) -> List[str]:
        """
        Downloads the asset. Unpacking the downloaded file will happen as default.

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.
            unpacking: By default the download TGZ/TAR or ZIP archive file will be unpacked.

        Returns:
            List of the downloaded asset filepaths.
        """
        logger.info(f"Downloading asset {self.asset_id}")

        if output_directory is None:
            output_directory = (
                Path.cwd() / f"project_{self.auth.project_id}/asset_{self.asset_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        download_url = self._get_download_url()
        if unpacking:
            out_filepaths = download_from_gcs_unpack(
                download_url=download_url,
                output_directory=output_directory,
            )
        else:
            out_filepaths = download_gcs_not_unpack(
                download_url=download_url,
                output_directory=output_directory,
            )

        self.results = out_filepaths
        return out_filepaths

download(output_directory=None, unpacking=True)

Downloads the asset. Unpacking the downloaded file will happen as default.

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None
unpacking bool

By default the download TGZ/TAR or ZIP archive file will be unpacked.

True

Returns:

Type Description
List[str]

List of the downloaded asset filepaths.

Source code in up42/asset.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def download(
    self, output_directory: Union[str, Path, None] = None, unpacking: bool = True
) -> List[str]:
    """
    Downloads the asset. Unpacking the downloaded file will happen as default.

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        unpacking: By default the download TGZ/TAR or ZIP archive file will be unpacked.

    Returns:
        List of the downloaded asset filepaths.
    """
    logger.info(f"Downloading asset {self.asset_id}")

    if output_directory is None:
        output_directory = (
            Path.cwd() / f"project_{self.auth.project_id}/asset_{self.asset_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    download_url = self._get_download_url()
    if unpacking:
        out_filepaths = download_from_gcs_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )
    else:
        out_filepaths = download_gcs_not_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )

    self.results = out_filepaths
    return out_filepaths

info: dict property

Gets and updates the asset metadata information.

stac_info: Union[dict, None] property

Gets the storage STAC information for the asset as a FeatureCollection.

One asset can contain multiple STAC items (e.g. the pan- and multispectral images).

update_metadata(title=None, tags=None, **kwargs)

Update the metadata of the asset.

Parameters:

Name Type Description Default
title str

The title string to be assigned to the asset.

None
tags List[str]

A list of tag strings to be assigned to the asset.

None

Returns:

Type Description
dict

The updated asset metadata information

Source code in up42/asset.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def update_metadata(
    self, title: str = None, tags: List[str] = None, **kwargs
) -> dict:
    """
    Update the metadata of the asset.

    Args:
        title: The title string to be assigned to the asset.
        tags: A list of tag strings to be assigned to the asset.

    Returns:
        The updated asset metadata information
    """
    url = f"{self.auth._endpoint()}/v2/assets/{self.asset_id}/metadata"
    body_update = {"title": title, "tags": tags, **kwargs}
    response_json = self.auth._request(
        request_type="POST", url=url, data=body_update
    )
    self._info = response_json
    return self._info

Catalog

The Catalog class enables access to the UP42 catalog functionality (data archive search & ordering).

Use catalog:

catalog = up42.initialize_catalog()

Source code in up42/catalog.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
class Catalog(CatalogBase, VizTools):
    """
    The Catalog class enables access to the UP42 catalog functionality (data archive search & ordering).

    Use catalog:
    ```python
    catalog = up42.initialize_catalog()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.quicklooks = None
        self.type = "ARCHIVE"
        self.data_products: Union[None, dict] = None

    def __repr__(self):
        return f"Catalog(auth={self.auth})"

    def estimate_order(self, order_parameters: Union[dict, None], **kwargs) -> int:
        """
        Estimate the cost of an order.

        Args:
            order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

        Returns:
            int: An estimated cost for the order in UP42 credits.

        Warning "Deprecated order parameters"
            The use of the 'scene' and 'geometry' parameters for the data estimation is deprecated. Please use the new
            order_parameters parameter as described above.
        """
        if "scene" in kwargs or "geometry" in kwargs:
            # Deprecated, to be removed, use order_parameters.
            message = (
                "The use of the 'scene' and 'geometry' parameters for the data estimation is deprecated. "
                "Please use the new 'order_parameters' parameter."
            )
            warnings.warn(message, DeprecationWarning, stacklevel=2)
        elif order_parameters is None:
            raise ValueError("Please provide the 'order_parameters' parameter!")
        return Order.estimate(self.auth, order_parameters)  # type: ignore

    @deprecation("construct_search_parameters", "0.25.0")
    def construct_parameters(self, **kwargs):  # pragma: no cover
        """Deprecated, see construct_search_parameters"""
        return self.construct_search_parameters(**kwargs)

    @staticmethod
    def construct_search_parameters(
        geometry: Union[FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon],
        collections: List[str],
        start_date: str = "2020-01-01",
        end_date: str = "2020-01-30",
        usage_type: List[str] = None,
        limit: int = 10,
        max_cloudcover: Optional[int] = None,
        sortby: str = "acquisitionDate",
        ascending: bool = True,
    ) -> dict:
        """
        Helps constructing the parameters dictionary required for the search.

        Args:
            geometry: The search geometry, default a Polygon. One of FeatureCollection, Feature,
                dict (geojson geometry), list (bounds coordinates), GeoDataFrame, shapely.Polygon, shapely.Point.
                All assume EPSG 4326!
            collections: The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"].
                Also see catalog.get_collections().
            start_date: Query period starting day, format "2020-01-01".
            end_date: Query period ending day, format "2020-01-01".
            usage_type: Optional. Filter for imagery that can be purchased & downloaded or also
                processed. ["DATA"] (can only be downloaded), ["ANALYTICS"] (can be downloaded
                or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
                (can be any combination). The filter is inclusive, using ["DATA"] can
                also result in results with ["DATA", "ANALYTICS"].
            limit: The maximum number of search results to return (1-max.500).
            max_cloudcover: Optional. Maximum cloud coverage percent - e.g. 100 will return all scenes,
                8.4 will return all scenes with 8.4 or less cloud coverage.
            sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
                "acquisitionIdentifier", "incidenceAngle", "snowCover".
            ascending: Ascending sort order by default, descending if False.

        Returns:
            The constructed parameters dictionary.
        """
        time_period = (
            f"{format_time(start_date)}/{format_time(end_date, set_end_of_day=True)}"
        )
        aoi_fc = any_vector_to_fc(
            vector=geometry,
        )
        aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
        sort_order = "asc" if ascending else "desc"

        query_filters: Dict[Any, Any] = {}
        if max_cloudcover is not None:
            query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

        if usage_type is not None:
            if usage_type == ["DATA"]:
                query_filters["up42:usageType"] = {"in": ["DATA"]}
            elif usage_type == ["ANALYTICS"]:
                query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
            elif usage_type == ["DATA", "ANALYTICS"]:
                query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
            else:
                raise ValueError("Select correct `usage_type`")

        search_parameters = {
            "datetime": time_period,
            "intersects": aoi_geometry,
            "limit": limit,
            "collections": collections,
            "query": query_filters,
            "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
        }

        return search_parameters

    def search(
        self, search_parameters: dict, as_dataframe: bool = True
    ) -> Union[GeoDataFrame, dict]:
        """
        Searches the catalog for the the search parameters and returns the metadata of
        the matching scenes.

        Args:
            search_parameters: The catalog search parameters, see example.
            as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

        Returns:
            The search results as a GeoDataFrame, optionally as JSON dict.

        Example:
            ```python
                search_parameters={
                    "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                    "collections": ["phr"],
                    "intersects": {
                        "type": "Polygon",
                        "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                        [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                        52.73971768]]]},
                    "limit": 10,
                    "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                    }
            ```
        """
        logger.info(f"Searching catalog with search_parameters: {search_parameters}")

        # The API request would fail with a limit above 500, thus 500 is forced in the initial
        # request but additional results are handled below via pagination.
        try:
            max_limit = search_parameters["limit"]
        except KeyError:
            logger.info("No `limit` parameter in search_parameters, using default 500.")
            max_limit = 500

        if max_limit > 500:
            search_parameters["limit"] = 500

        # UP42 API can query multiple collections of the same host at once.
        if self.data_products is None:
            self.data_products = self.get_data_products(basic=True)  # type: ignore
        hosts = [
            v["host"]
            for v in self.data_products.values()  # type: ignore
            if v["collection"] in search_parameters["collections"]
        ]
        if not hosts:
            raise ValueError(
                f"Selected collections {search_parameters['collections']} are not valid. See "
                f"catalog.get_collections."
            )
        if len(set(hosts)) > 1:
            raise ValueError(
                "Only collections with the same host can be searched at the same time. Please adjust the "
                "collections in the search_parameters!"
            )
        host = hosts[0]

        url = f"{self.auth._endpoint()}/catalog/hosts/{host}/stac/search"
        response_json: dict = self.auth._request("POST", url, search_parameters)
        features = response_json["features"]

        # Search results with more than 500 items are given as 50-per-page additional pages.
        while len(features) < max_limit:
            pagination_exhausted = len(response_json["links"]) == 1
            if pagination_exhausted:
                break
            next_page_url = response_json["links"][1]["href"]
            response_json = self.auth._request("POST", next_page_url, search_parameters)
            features += response_json["features"]

        features = features[:max_limit]
        if not features:
            df = GeoDataFrame(columns=["geometry"], geometry="geometry")
        else:
            df = GeoDataFrame.from_features(
                FeatureCollection(features=features), crs="EPSG:4326"
            )

        logger.info(f"{df.shape[0]} results returned.")
        if as_dataframe:
            return df
        else:
            return df.__geo_interface__

    def construct_order_parameters(
        self,
        data_product_id: str,
        image_id: str,
        aoi: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ] = None,
    ):
        """
        Helps constructing the parameters dictionary required for the catalog order. Some collections have
        additional parameters that are added to the output dictionary with value None. The potential values to
        select from are given in the logs, for more detail on the parameter use `catalog.get_data_product_schema()`.

        Args:
            data_product_id: Id of the desired UP42 data product, see `catalog.get_data_products`
            image_id: The id of the desired image (from search results)
            aoi: The geometry of the order, one of dict, Feature, FeatureCollection,
                list, GeoDataFrame, Polygon. Optional for "full-image products".
        Returns:
            The order parameters dictionary.

        Example:
            ```python
            order_parameters = catalog.construct_order_parameters(
                data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
                image_id='6434e7af-2d41-4ded-a789-fb1b2447ac92',
                aoi={'type': 'Polygon',
                'coordinates': (((13.375966, 52.515068),
                  (13.375966, 52.516639),
                  (13.378314, 52.516639),
                  (13.378314, 52.515068),
                  (13.375966, 52.515068)),)})
            ```
        """
        order_parameters = {
            "dataProduct": data_product_id,
            "params": {"id": image_id},
        }
        logger.info(
            "See `catalog.get_data_product_schema(data_product_id)` for more detail on the parameter options."
        )
        schema = self.get_data_product_schema(data_product_id)
        order_parameters = autocomplete_order_parameters(order_parameters, schema)

        # Some catalog orders, e.g. Capella don't require AOI (full image order)
        # Handled on API level, don't manipulate in SDK, providers might accept geometries in the future.
        if aoi is not None:
            aoi = any_vector_to_fc(vector=aoi)
            aoi = fc_to_query_geometry(fc=aoi, geometry_operation="intersects")
            order_parameters["params"]["aoi"] = aoi  # type: ignore

        return order_parameters

    def download_quicklooks(
        self,
        image_ids: List[str],
        collection: str,
        output_directory: Union[str, Path, None] = None,
    ) -> List[str]:
        """
        Gets the quicklooks of scenes from a single sensor. After download, can
        be plotted via catalog.map_quicklooks() or catalog.plot_quicklooks().

        Args:
            image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
                Access the search results id column via `list(search_results.id)`.
            collection: The data collection corresponding to the image ids.
            output_directory: The file output directory, defaults to the current working
                directory.

        Returns:
            List of quicklook image output file paths.
        """
        if self.data_products is None:
            self.data_products = self.get_data_products(basic=True)  # type: ignore
        host = [
            v["host"]
            for v in self.data_products.values()  # type: ignore
            if v["collection"] == collection
        ]
        if not host:
            raise ValueError(
                f"Selected collections {collection} is not valid. See catalog.get_collections."
            )
        host = host[0]
        logger.info(f"Downloading quicklooks from provider {host}.")

        if output_directory is None:
            output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        if isinstance(image_ids, str):
            image_ids = [image_ids]

        out_paths: List[str] = []
        for image_id in tqdm(image_ids):
            try:
                url = (
                    f"{self.auth._endpoint()}/catalog/{host}/image/{image_id}/quicklook"
                )
                response = self.auth._request(
                    request_type="GET", url=url, return_text=False
                )
                out_path = output_directory / f"quicklook_{image_id}.jpg"
                out_paths.append(str(out_path))
                with open(out_path, "wb") as dst:
                    for chunk in response:
                        dst.write(chunk)
            except ValueError:
                logger.warning(
                    f"Image with id {image_id} does not have quicklook available. Skipping ..."
                )

        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

construct_order_parameters(data_product_id, image_id, aoi=None)

Helps constructing the parameters dictionary required for the catalog order. Some collections have additional parameters that are added to the output dictionary with value None. The potential values to select from are given in the logs, for more detail on the parameter use catalog.get_data_product_schema().

Parameters:

Name Type Description Default
data_product_id str

Id of the desired UP42 data product, see catalog.get_data_products

required
image_id str

The id of the desired image (from search results)

required
aoi Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]

The geometry of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon. Optional for "full-image products".

None

Returns:

Type Description

The order parameters dictionary.

Example
order_parameters = catalog.construct_order_parameters(
    data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
    image_id='6434e7af-2d41-4ded-a789-fb1b2447ac92',
    aoi={'type': 'Polygon',
    'coordinates': (((13.375966, 52.515068),
      (13.375966, 52.516639),
      (13.378314, 52.516639),
      (13.378314, 52.515068),
      (13.375966, 52.515068)),)})
Source code in up42/catalog.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def construct_order_parameters(
    self,
    data_product_id: str,
    image_id: str,
    aoi: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ] = None,
):
    """
    Helps constructing the parameters dictionary required for the catalog order. Some collections have
    additional parameters that are added to the output dictionary with value None. The potential values to
    select from are given in the logs, for more detail on the parameter use `catalog.get_data_product_schema()`.

    Args:
        data_product_id: Id of the desired UP42 data product, see `catalog.get_data_products`
        image_id: The id of the desired image (from search results)
        aoi: The geometry of the order, one of dict, Feature, FeatureCollection,
            list, GeoDataFrame, Polygon. Optional for "full-image products".
    Returns:
        The order parameters dictionary.

    Example:
        ```python
        order_parameters = catalog.construct_order_parameters(
            data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
            image_id='6434e7af-2d41-4ded-a789-fb1b2447ac92',
            aoi={'type': 'Polygon',
            'coordinates': (((13.375966, 52.515068),
              (13.375966, 52.516639),
              (13.378314, 52.516639),
              (13.378314, 52.515068),
              (13.375966, 52.515068)),)})
        ```
    """
    order_parameters = {
        "dataProduct": data_product_id,
        "params": {"id": image_id},
    }
    logger.info(
        "See `catalog.get_data_product_schema(data_product_id)` for more detail on the parameter options."
    )
    schema = self.get_data_product_schema(data_product_id)
    order_parameters = autocomplete_order_parameters(order_parameters, schema)

    # Some catalog orders, e.g. Capella don't require AOI (full image order)
    # Handled on API level, don't manipulate in SDK, providers might accept geometries in the future.
    if aoi is not None:
        aoi = any_vector_to_fc(vector=aoi)
        aoi = fc_to_query_geometry(fc=aoi, geometry_operation="intersects")
        order_parameters["params"]["aoi"] = aoi  # type: ignore

    return order_parameters

construct_parameters(**kwargs)

Deprecated, see construct_search_parameters

Source code in up42/catalog.py
200
201
202
203
@deprecation("construct_search_parameters", "0.25.0")
def construct_parameters(self, **kwargs):  # pragma: no cover
    """Deprecated, see construct_search_parameters"""
    return self.construct_search_parameters(**kwargs)

construct_search_parameters(geometry, collections, start_date='2020-01-01', end_date='2020-01-30', usage_type=None, limit=10, max_cloudcover=None, sortby='acquisitionDate', ascending=True) staticmethod

Helps constructing the parameters dictionary required for the search.

Parameters:

Name Type Description Default
geometry Union[FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon]

The search geometry, default a Polygon. One of FeatureCollection, Feature, dict (geojson geometry), list (bounds coordinates), GeoDataFrame, shapely.Polygon, shapely.Point. All assume EPSG 4326!

required
collections List[str]

The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"]. Also see catalog.get_collections().

required
start_date str

Query period starting day, format "2020-01-01".

'2020-01-01'
end_date str

Query period ending day, format "2020-01-01".

'2020-01-30'
usage_type List[str]

Optional. Filter for imagery that can be purchased & downloaded or also processed. ["DATA"] (can only be downloaded), ["ANALYTICS"] (can be downloaded or used directly with a processing algorithm), ["DATA", "ANALYTICS"] (can be any combination). The filter is inclusive, using ["DATA"] can also result in results with ["DATA", "ANALYTICS"].

None
limit int

The maximum number of search results to return (1-max.500).

10
max_cloudcover Optional[int]

Optional. Maximum cloud coverage percent - e.g. 100 will return all scenes, 8.4 will return all scenes with 8.4 or less cloud coverage.

None
sortby str

The property to sort by, "cloudCoverage", "acquisitionDate", "acquisitionIdentifier", "incidenceAngle", "snowCover".

'acquisitionDate'
ascending bool

Ascending sort order by default, descending if False.

True

Returns:

Type Description
dict

The constructed parameters dictionary.

Source code in up42/catalog.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
@staticmethod
def construct_search_parameters(
    geometry: Union[FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon],
    collections: List[str],
    start_date: str = "2020-01-01",
    end_date: str = "2020-01-30",
    usage_type: List[str] = None,
    limit: int = 10,
    max_cloudcover: Optional[int] = None,
    sortby: str = "acquisitionDate",
    ascending: bool = True,
) -> dict:
    """
    Helps constructing the parameters dictionary required for the search.

    Args:
        geometry: The search geometry, default a Polygon. One of FeatureCollection, Feature,
            dict (geojson geometry), list (bounds coordinates), GeoDataFrame, shapely.Polygon, shapely.Point.
            All assume EPSG 4326!
        collections: The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"].
            Also see catalog.get_collections().
        start_date: Query period starting day, format "2020-01-01".
        end_date: Query period ending day, format "2020-01-01".
        usage_type: Optional. Filter for imagery that can be purchased & downloaded or also
            processed. ["DATA"] (can only be downloaded), ["ANALYTICS"] (can be downloaded
            or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
            (can be any combination). The filter is inclusive, using ["DATA"] can
            also result in results with ["DATA", "ANALYTICS"].
        limit: The maximum number of search results to return (1-max.500).
        max_cloudcover: Optional. Maximum cloud coverage percent - e.g. 100 will return all scenes,
            8.4 will return all scenes with 8.4 or less cloud coverage.
        sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
            "acquisitionIdentifier", "incidenceAngle", "snowCover".
        ascending: Ascending sort order by default, descending if False.

    Returns:
        The constructed parameters dictionary.
    """
    time_period = (
        f"{format_time(start_date)}/{format_time(end_date, set_end_of_day=True)}"
    )
    aoi_fc = any_vector_to_fc(
        vector=geometry,
    )
    aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
    sort_order = "asc" if ascending else "desc"

    query_filters: Dict[Any, Any] = {}
    if max_cloudcover is not None:
        query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

    if usage_type is not None:
        if usage_type == ["DATA"]:
            query_filters["up42:usageType"] = {"in": ["DATA"]}
        elif usage_type == ["ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
        elif usage_type == ["DATA", "ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
        else:
            raise ValueError("Select correct `usage_type`")

    search_parameters = {
        "datetime": time_period,
        "intersects": aoi_geometry,
        "limit": limit,
        "collections": collections,
        "query": query_filters,
        "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
    }

    return search_parameters

download_quicklooks(image_ids, collection, output_directory=None)

Gets the quicklooks of scenes from a single sensor. After download, can be plotted via catalog.map_quicklooks() or catalog.plot_quicklooks().

Parameters:

Name Type Description Default
image_ids List[str]

List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"]. Access the search results id column via list(search_results.id).

required
collection str

The data collection corresponding to the image ids.

required
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

List of quicklook image output file paths.

Source code in up42/catalog.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def download_quicklooks(
    self,
    image_ids: List[str],
    collection: str,
    output_directory: Union[str, Path, None] = None,
) -> List[str]:
    """
    Gets the quicklooks of scenes from a single sensor. After download, can
    be plotted via catalog.map_quicklooks() or catalog.plot_quicklooks().

    Args:
        image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
            Access the search results id column via `list(search_results.id)`.
        collection: The data collection corresponding to the image ids.
        output_directory: The file output directory, defaults to the current working
            directory.

    Returns:
        List of quicklook image output file paths.
    """
    if self.data_products is None:
        self.data_products = self.get_data_products(basic=True)  # type: ignore
    host = [
        v["host"]
        for v in self.data_products.values()  # type: ignore
        if v["collection"] == collection
    ]
    if not host:
        raise ValueError(
            f"Selected collections {collection} is not valid. See catalog.get_collections."
        )
    host = host[0]
    logger.info(f"Downloading quicklooks from provider {host}.")

    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    if isinstance(image_ids, str):
        image_ids = [image_ids]

    out_paths: List[str] = []
    for image_id in tqdm(image_ids):
        try:
            url = (
                f"{self.auth._endpoint()}/catalog/{host}/image/{image_id}/quicklook"
            )
            response = self.auth._request(
                request_type="GET", url=url, return_text=False
            )
            out_path = output_directory / f"quicklook_{image_id}.jpg"
            out_paths.append(str(out_path))
            with open(out_path, "wb") as dst:
                for chunk in response:
                    dst.write(chunk)
        except ValueError:
            logger.warning(
                f"Image with id {image_id} does not have quicklook available. Skipping ..."
            )

    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

estimate_order(order_parameters, **kwargs)

Estimate the cost of an order.

Parameters:

Name Type Description Default
order_parameters Union[dict, None]

A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

required

Returns:

Name Type Description
int int

An estimated cost for the order in UP42 credits.

Warning "Deprecated order parameters" The use of the 'scene' and 'geometry' parameters for the data estimation is deprecated. Please use the new order_parameters parameter as described above.

Source code in up42/catalog.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def estimate_order(self, order_parameters: Union[dict, None], **kwargs) -> int:
    """
    Estimate the cost of an order.

    Args:
        order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

    Returns:
        int: An estimated cost for the order in UP42 credits.

    Warning "Deprecated order parameters"
        The use of the 'scene' and 'geometry' parameters for the data estimation is deprecated. Please use the new
        order_parameters parameter as described above.
    """
    if "scene" in kwargs or "geometry" in kwargs:
        # Deprecated, to be removed, use order_parameters.
        message = (
            "The use of the 'scene' and 'geometry' parameters for the data estimation is deprecated. "
            "Please use the new 'order_parameters' parameter."
        )
        warnings.warn(message, DeprecationWarning, stacklevel=2)
    elif order_parameters is None:
        raise ValueError("Please provide the 'order_parameters' parameter!")
    return Order.estimate(self.auth, order_parameters)  # type: ignore

search(search_parameters, as_dataframe=True)

Searches the catalog for the the search parameters and returns the metadata of the matching scenes.

Parameters:

Name Type Description Default
search_parameters dict

The catalog search parameters, see example.

required
as_dataframe bool

return type, GeoDataFrame if True (default), FeatureCollection if False.

True

Returns:

Type Description
Union[GeoDataFrame, dict]

The search results as a GeoDataFrame, optionally as JSON dict.

Example
    search_parameters={
        "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
        "collections": ["phr"],
        "intersects": {
            "type": "Polygon",
            "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
            [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
            52.73971768]]]},
        "limit": 10,
        "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
        }
Source code in up42/catalog.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def search(
    self, search_parameters: dict, as_dataframe: bool = True
) -> Union[GeoDataFrame, dict]:
    """
    Searches the catalog for the the search parameters and returns the metadata of
    the matching scenes.

    Args:
        search_parameters: The catalog search parameters, see example.
        as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

    Returns:
        The search results as a GeoDataFrame, optionally as JSON dict.

    Example:
        ```python
            search_parameters={
                "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                "collections": ["phr"],
                "intersects": {
                    "type": "Polygon",
                    "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                    [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                    52.73971768]]]},
                "limit": 10,
                "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                }
        ```
    """
    logger.info(f"Searching catalog with search_parameters: {search_parameters}")

    # The API request would fail with a limit above 500, thus 500 is forced in the initial
    # request but additional results are handled below via pagination.
    try:
        max_limit = search_parameters["limit"]
    except KeyError:
        logger.info("No `limit` parameter in search_parameters, using default 500.")
        max_limit = 500

    if max_limit > 500:
        search_parameters["limit"] = 500

    # UP42 API can query multiple collections of the same host at once.
    if self.data_products is None:
        self.data_products = self.get_data_products(basic=True)  # type: ignore
    hosts = [
        v["host"]
        for v in self.data_products.values()  # type: ignore
        if v["collection"] in search_parameters["collections"]
    ]
    if not hosts:
        raise ValueError(
            f"Selected collections {search_parameters['collections']} are not valid. See "
            f"catalog.get_collections."
        )
    if len(set(hosts)) > 1:
        raise ValueError(
            "Only collections with the same host can be searched at the same time. Please adjust the "
            "collections in the search_parameters!"
        )
    host = hosts[0]

    url = f"{self.auth._endpoint()}/catalog/hosts/{host}/stac/search"
    response_json: dict = self.auth._request("POST", url, search_parameters)
    features = response_json["features"]

    # Search results with more than 500 items are given as 50-per-page additional pages.
    while len(features) < max_limit:
        pagination_exhausted = len(response_json["links"]) == 1
        if pagination_exhausted:
            break
        next_page_url = response_json["links"][1]["href"]
        response_json = self.auth._request("POST", next_page_url, search_parameters)
        features += response_json["features"]

    features = features[:max_limit]
    if not features:
        df = GeoDataFrame(columns=["geometry"], geometry="geometry")
    else:
        df = GeoDataFrame.from_features(
            FeatureCollection(features=features), crs="EPSG:4326"
        )

    logger.info(f"{df.shape[0]} results returned.")
    if as_dataframe:
        return df
    else:
        return df.__geo_interface__

Job

The Job class is the result of running a workflow. It lets you download, visualize and manipulate the results of the job, and keep track of the status or cancel a job while still running.

Run a new job:

job = workflow.run_job(name="new_job", input_parameters={...})

Use an existing job:

job = up42.initialize_job(job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")

Source code in up42/job.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
class Job(VizTools):
    """
    The Job class is the result of running a workflow. It lets you download, visualize and
        manipulate the results of the job, and keep track of the status or cancel a job while
        still running.

    Run a new job:
    ```python
    job = workflow.run_job(name="new_job", input_parameters={...})
    ```

    Use an existing job:
    ```python
    job = up42.initialize_job(job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")
    ```
    """

    def __init__(
        self, auth: Auth, project_id: str, job_id: str, job_info: Optional[dict] = None
    ):
        self.auth = auth
        self.project_id = project_id
        self.job_id = job_id
        self.quicklooks = None
        self.results = None
        if job_info is not None:
            self._info = job_info
        else:
            self._info = self.info

    def __repr__(self):
        return (
            f"Job(name: {self._info['name']}, job_id: {self.job_id}, mode: {self._info['mode']}, "
            f"status: {self._info['status']}, startedAt: {self._info['startedAt']}, "
            f"finishedAt: {self._info['finishedAt']}, workflow_name: {self._info['workflowName']}, "
            f"input_parameters: {self._info['inputs']}"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the job metadata information.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    @property
    def status(self) -> str:
        """
        Gets the job progress status. One of `SUCCEEDED`, `NOT STARTED`, `PENDING`,
            `RUNNING`, `CANCELLED`, `CANCELLING`, `FAILED`, `ERROR`.
        """
        status = self.info["status"]
        logger.info(f"Job is {status}")
        return status

    @property
    def is_succeeded(self) -> bool:
        """
        Gets `True` if the job succeeded, `False` otherwise.
        Also see [status attribute](job-reference.md#up42.job.Job.status).
        """
        return self.status == "SUCCEEDED"

    def track_status(self, report_time: int = 30) -> str:
        """`
        Continuously gets the job status until job has finished or failed.

        Internally checks every five seconds for the status, prints the log every
        time interval given in report_time argument.

        Args:
            report_time: The intervall (in seconds) when to query the job status.
        """
        logger.info(
            f"Tracking job status continuously, reporting every {report_time} seconds...",
        )
        status = "NOT STARTED"
        time_asleep = 0

        while status != "SUCCEEDED":
            logger.setLevel(logging.CRITICAL)
            status = self.status
            logger.setLevel(logging.INFO)

            if status in ["NOT STARTED", "PENDING", "RUNNING"]:
                if time_asleep != 0 and time_asleep % report_time == 0:
                    logger.info(f"Job is {status}! - {self.job_id}")
            elif status in ["FAILED", "ERROR"]:
                logger.info(f"Job is {status}! - {self.job_id} - Printing logs ...")
                self.get_logs(as_print=True)
                raise ValueError("Job has failed! See the above log.")
            elif status in ["CANCELLED", "CANCELLING"]:
                logger.info(f"Job is {status}! - {self.job_id}")
                raise ValueError("Job has been cancelled!")
            elif status == "SUCCEEDED":
                logger.info(f"Job finished successfully! - {self.job_id}")

            sleep(5)
            time_asleep += 5

        return status

    def cancel_job(self) -> None:
        """Cancels a pending or running job."""
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}/cancel/"
        self.auth._request(request_type="POST", url=url)
        logger.info(f"Job canceled: {self.job_id}")

    def download_quicklooks(
        self, output_directory: Union[str, Path, None] = None
    ) -> List[str]:
        """
        Conveniance function that downloads the quicklooks of the data (dirst) jobtask.

        After download, can be plotted via job.plot_quicklooks().
        """
        # Currently only the first/data task produces quicklooks.
        logger.setLevel(logging.CRITICAL)
        data_task = self.get_jobtasks()[0]
        logger.setLevel(logging.INFO)

        out_paths: List[str] = data_task.download_quicklooks(  # type: ignore
            output_directory=output_directory
        )  # type: ignore
        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

    def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
        """
        Gets the Job results data.json.

        Args:
            as_dataframe: Return type, Default Feature Collection. GeoDataFrame if True.

        Returns:
            The job data.json.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/outputs/data-json/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        logger.info(f"Retrieved {len(response_json['features'])} features.")

        if as_dataframe:
            # UP42 results are always in EPSG 4326
            df = GeoDataFrame.from_features(response_json, crs=4326)
            return df
        else:
            return response_json

    def _get_download_url(self) -> str:
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/downloads/results/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        download_url = response_json["data"]["url"]
        return download_url

    def download_results(
        self, output_directory: Union[str, Path, None] = None, unpacking: bool = True
    ) -> List[str]:
        """
        Downloads the job results. Unpacking the final file will happen as default.

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.
            unpacking: By default the final result which is in TAR archive format will be unpacked.

        Returns:
            List of the downloaded results' filepaths.
        """
        logger.info(f"Downloading results of job {self.job_id}")

        if output_directory is None:
            output_directory = (
                Path.cwd() / f"project_{self.auth.project_id}/job_{self.job_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        download_url = self._get_download_url()
        if unpacking:
            out_filepaths = download_from_gcs_unpack(
                download_url=download_url,
                output_directory=output_directory,
            )
        else:
            out_filepaths = download_gcs_not_unpack(
                download_url=download_url,
                output_directory=output_directory,
            )

        self.results = out_filepaths
        return out_filepaths

    def upload_results_to_bucket(
        self,
        gs_client,
        bucket,
        folder: str,
        extension: str = ".tgz",
        version: str = "v0",
    ) -> None:
        """
        Uploads the results of a job directly to a custom google cloud storage bucket.
        """
        download_url = self._get_download_url()
        r = requests.get(download_url)
        blob = bucket.blob(
            str(Path(version) / Path(folder) / Path(self.job_id + extension))
        )
        logger.info(f"Upload job {self.job_id} results to {blob.name} ...")
        blob.upload_from_string(
            data=r.content,
            content_type="application/octet-stream",
            client=gs_client,
        )
        logger.info("Uploaded!")

    def get_logs(
        self, as_print: bool = True, as_return: bool = False
    ) -> Optional[dict]:
        """
        Convenience function to print or return the logs of all job tasks.

        Args:
            as_print: Prints the logs, no return.
            as_return: Also returns the log strings.

        Returns:
            The log strings (only if as_return was selected).
        """
        job_logs = {}

        jobtasks: List[dict] = self.get_jobtasks(return_json=True)  # type: ignore
        jobtasks_ids = [task["id"] for task in jobtasks]

        logger.info(f"Getting logs for {len(jobtasks_ids)} job tasks: {jobtasks_ids}")
        if as_print:
            print(
                f"Printing logs of {len(jobtasks_ids)} JobTasks in Job with job_id "
                f"{self.job_id}:\n"
            )

        for idx, jobtask_id in enumerate(jobtasks_ids):
            url = (
                f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/"
                f"{self.job_id}/tasks/{jobtask_id}/logs"
            )
            response_json = self.auth._request(request_type="GET", url=url)

            job_logs[jobtask_id] = response_json

            if as_print:
                print("----------------------------------------------------------")
                print(f"JobTask {idx+1} with jobtask_id {jobtask_id}:\n")
                print(response_json)
        if as_return:
            return job_logs
        else:
            return None

    def get_jobtasks(
        self, return_json: bool = False
    ) -> Union[List["JobTask"], List[dict]]:
        """
        Get the individual items of the job as a list of JobTask objects or JSON.

        Args:
            return_json: If True returns the JSON information of the job tasks.

        Returns:
            The job task objects in a list.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/"
        )
        logger.info(f"Getting job tasks: {self.job_id}")
        response_json = self.auth._request(request_type="GET", url=url)
        jobtasks_json: List[dict] = response_json["data"]

        if return_json:
            return jobtasks_json
        else:
            jobtasks = [
                JobTask(
                    auth=self.auth,
                    project_id=self.project_id,
                    job_id=self.job_id,
                    jobtask_id=task["id"],
                )
                for task in jobtasks_json
            ]
            return jobtasks

    def get_jobtasks_results_json(self) -> dict:
        """
        Convenience function to get the resulting data.json of all job tasks
        in a dictionary of strings.

        Returns:
            The data.json of all single job tasks.
        """
        jobtasks: List[dict] = self.get_jobtasks(return_json=True)  # type: ignore
        jobtasks_ids = [task["id"] for task in jobtasks]
        jobtasks_results_json = {}
        for jobtask_id in jobtasks_ids:
            url = (
                f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
                f"/tasks/{jobtask_id}/outputs/data-json"
            )
            response_json = self.auth._request(request_type="GET", url=url)

            jobtasks_results_json[jobtask_id] = response_json
        return jobtasks_results_json

    def get_credits(self) -> dict:
        """
        Gets the credit costs of the job.

        Returns:
            The consumed credits for the job.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}/credits"
        response_json = self.auth._request(request_type="GET", url=url)
        credits_used = response_json["data"]["creditsUsed"]
        credits_used_dict = {"creditsUsed": credits_used}

        return credits_used_dict

cancel_job()

Cancels a pending or running job.

Source code in up42/job.py
127
128
129
130
131
def cancel_job(self) -> None:
    """Cancels a pending or running job."""
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}/cancel/"
    self.auth._request(request_type="POST", url=url)
    logger.info(f"Job canceled: {self.job_id}")

download_quicklooks(output_directory=None)

Conveniance function that downloads the quicklooks of the data (dirst) jobtask.

After download, can be plotted via job.plot_quicklooks().

Source code in up42/job.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def download_quicklooks(
    self, output_directory: Union[str, Path, None] = None
) -> List[str]:
    """
    Conveniance function that downloads the quicklooks of the data (dirst) jobtask.

    After download, can be plotted via job.plot_quicklooks().
    """
    # Currently only the first/data task produces quicklooks.
    logger.setLevel(logging.CRITICAL)
    data_task = self.get_jobtasks()[0]
    logger.setLevel(logging.INFO)

    out_paths: List[str] = data_task.download_quicklooks(  # type: ignore
        output_directory=output_directory
    )  # type: ignore
    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

download_results(output_directory=None, unpacking=True)

Downloads the job results. Unpacking the final file will happen as default.

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None
unpacking bool

By default the final result which is in TAR archive format will be unpacked.

True

Returns:

Type Description
List[str]

List of the downloaded results' filepaths.

Source code in up42/job.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def download_results(
    self, output_directory: Union[str, Path, None] = None, unpacking: bool = True
) -> List[str]:
    """
    Downloads the job results. Unpacking the final file will happen as default.

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        unpacking: By default the final result which is in TAR archive format will be unpacked.

    Returns:
        List of the downloaded results' filepaths.
    """
    logger.info(f"Downloading results of job {self.job_id}")

    if output_directory is None:
        output_directory = (
            Path.cwd() / f"project_{self.auth.project_id}/job_{self.job_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    download_url = self._get_download_url()
    if unpacking:
        out_filepaths = download_from_gcs_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )
    else:
        out_filepaths = download_gcs_not_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )

    self.results = out_filepaths
    return out_filepaths

get_credits()

Gets the credit costs of the job.

Returns:

Type Description
dict

The consumed credits for the job.

Source code in up42/job.py
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_credits(self) -> dict:
    """
    Gets the credit costs of the job.

    Returns:
        The consumed credits for the job.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}/credits"
    response_json = self.auth._request(request_type="GET", url=url)
    credits_used = response_json["data"]["creditsUsed"]
    credits_used_dict = {"creditsUsed": credits_used}

    return credits_used_dict

get_jobtasks(return_json=False)

Get the individual items of the job as a list of JobTask objects or JSON.

Parameters:

Name Type Description Default
return_json bool

If True returns the JSON information of the job tasks.

False

Returns:

Type Description
Union[List[JobTask], List[dict]]

The job task objects in a list.

Source code in up42/job.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def get_jobtasks(
    self, return_json: bool = False
) -> Union[List["JobTask"], List[dict]]:
    """
    Get the individual items of the job as a list of JobTask objects or JSON.

    Args:
        return_json: If True returns the JSON information of the job tasks.

    Returns:
        The job task objects in a list.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/tasks/"
    )
    logger.info(f"Getting job tasks: {self.job_id}")
    response_json = self.auth._request(request_type="GET", url=url)
    jobtasks_json: List[dict] = response_json["data"]

    if return_json:
        return jobtasks_json
    else:
        jobtasks = [
            JobTask(
                auth=self.auth,
                project_id=self.project_id,
                job_id=self.job_id,
                jobtask_id=task["id"],
            )
            for task in jobtasks_json
        ]
        return jobtasks

get_jobtasks_results_json()

Convenience function to get the resulting data.json of all job tasks in a dictionary of strings.

Returns:

Type Description
dict

The data.json of all single job tasks.

Source code in up42/job.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def get_jobtasks_results_json(self) -> dict:
    """
    Convenience function to get the resulting data.json of all job tasks
    in a dictionary of strings.

    Returns:
        The data.json of all single job tasks.
    """
    jobtasks: List[dict] = self.get_jobtasks(return_json=True)  # type: ignore
    jobtasks_ids = [task["id"] for task in jobtasks]
    jobtasks_results_json = {}
    for jobtask_id in jobtasks_ids:
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{jobtask_id}/outputs/data-json"
        )
        response_json = self.auth._request(request_type="GET", url=url)

        jobtasks_results_json[jobtask_id] = response_json
    return jobtasks_results_json

get_logs(as_print=True, as_return=False)

Convenience function to print or return the logs of all job tasks.

Parameters:

Name Type Description Default
as_print bool

Prints the logs, no return.

True
as_return bool

Also returns the log strings.

False

Returns:

Type Description
Optional[dict]

The log strings (only if as_return was selected).

Source code in up42/job.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def get_logs(
    self, as_print: bool = True, as_return: bool = False
) -> Optional[dict]:
    """
    Convenience function to print or return the logs of all job tasks.

    Args:
        as_print: Prints the logs, no return.
        as_return: Also returns the log strings.

    Returns:
        The log strings (only if as_return was selected).
    """
    job_logs = {}

    jobtasks: List[dict] = self.get_jobtasks(return_json=True)  # type: ignore
    jobtasks_ids = [task["id"] for task in jobtasks]

    logger.info(f"Getting logs for {len(jobtasks_ids)} job tasks: {jobtasks_ids}")
    if as_print:
        print(
            f"Printing logs of {len(jobtasks_ids)} JobTasks in Job with job_id "
            f"{self.job_id}:\n"
        )

    for idx, jobtask_id in enumerate(jobtasks_ids):
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/"
            f"{self.job_id}/tasks/{jobtask_id}/logs"
        )
        response_json = self.auth._request(request_type="GET", url=url)

        job_logs[jobtask_id] = response_json

        if as_print:
            print("----------------------------------------------------------")
            print(f"JobTask {idx+1} with jobtask_id {jobtask_id}:\n")
            print(response_json)
    if as_return:
        return job_logs
    else:
        return None

get_results_json(as_dataframe=False)

Gets the Job results data.json.

Parameters:

Name Type Description Default
as_dataframe bool

Return type, Default Feature Collection. GeoDataFrame if True.

False

Returns:

Type Description
Union[dict, GeoDataFrame]

The job data.json.

Source code in up42/job.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
    """
    Gets the Job results data.json.

    Args:
        as_dataframe: Return type, Default Feature Collection. GeoDataFrame if True.

    Returns:
        The job data.json.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/outputs/data-json/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    logger.info(f"Retrieved {len(response_json['features'])} features.")

    if as_dataframe:
        # UP42 results are always in EPSG 4326
        df = GeoDataFrame.from_features(response_json, crs=4326)
        return df
    else:
        return response_json

info: dict property

Gets and updates the job metadata information.

is_succeeded: bool property

Gets True if the job succeeded, False otherwise. Also see status attribute.

status: str property

Gets the job progress status. One of SUCCEEDED, NOT STARTED, PENDING, RUNNING, CANCELLED, CANCELLING, FAILED, ERROR.

track_status(report_time=30)

` Continuously gets the job status until job has finished or failed.

Internally checks every five seconds for the status, prints the log every time interval given in report_time argument.

Parameters:

Name Type Description Default
report_time int

The intervall (in seconds) when to query the job status.

30
Source code in up42/job.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def track_status(self, report_time: int = 30) -> str:
    """`
    Continuously gets the job status until job has finished or failed.

    Internally checks every five seconds for the status, prints the log every
    time interval given in report_time argument.

    Args:
        report_time: The intervall (in seconds) when to query the job status.
    """
    logger.info(
        f"Tracking job status continuously, reporting every {report_time} seconds...",
    )
    status = "NOT STARTED"
    time_asleep = 0

    while status != "SUCCEEDED":
        logger.setLevel(logging.CRITICAL)
        status = self.status
        logger.setLevel(logging.INFO)

        if status in ["NOT STARTED", "PENDING", "RUNNING"]:
            if time_asleep != 0 and time_asleep % report_time == 0:
                logger.info(f"Job is {status}! - {self.job_id}")
        elif status in ["FAILED", "ERROR"]:
            logger.info(f"Job is {status}! - {self.job_id} - Printing logs ...")
            self.get_logs(as_print=True)
            raise ValueError("Job has failed! See the above log.")
        elif status in ["CANCELLED", "CANCELLING"]:
            logger.info(f"Job is {status}! - {self.job_id}")
            raise ValueError("Job has been cancelled!")
        elif status == "SUCCEEDED":
            logger.info(f"Job finished successfully! - {self.job_id}")

        sleep(5)
        time_asleep += 5

    return status

upload_results_to_bucket(gs_client, bucket, folder, extension='.tgz', version='v0')

Uploads the results of a job directly to a custom google cloud storage bucket.

Source code in up42/job.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def upload_results_to_bucket(
    self,
    gs_client,
    bucket,
    folder: str,
    extension: str = ".tgz",
    version: str = "v0",
) -> None:
    """
    Uploads the results of a job directly to a custom google cloud storage bucket.
    """
    download_url = self._get_download_url()
    r = requests.get(download_url)
    blob = bucket.blob(
        str(Path(version) / Path(folder) / Path(self.job_id + extension))
    )
    logger.info(f"Upload job {self.job_id} results to {blob.name} ...")
    blob.upload_from_string(
        data=r.content,
        content_type="application/octet-stream",
        client=gs_client,
    )
    logger.info("Uploaded!")

JobCollection

The JobCollection class provides facilities for handling and downloading multiple jobs results as one object.

A jobcollection is created as the result of e.g. running multiple jobs in parallel:

jobcollection = workflow.run_jobs_parallel()

Initialize a jobcollection from existing jobs:

jobcollection = up42.initialize_jobcollection(job_ids=["12345", "6789"])

Source code in up42/jobcollection.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class JobCollection(VizTools):
    """
    The JobCollection class provides facilities for handling and downloading
    multiple jobs results as one object.

    A jobcollection is created as the result of e.g. running multiple jobs in parallel:
    ```python
    jobcollection = workflow.run_jobs_parallel()
    ```

    Initialize a jobcollection from existing jobs:
    ```python
    jobcollection = up42.initialize_jobcollection(job_ids=["12345", "6789"])
    ```
    """

    def __init__(self, auth: Auth, project_id: str, jobs: List[Job]):
        self.auth = auth
        self.project_id = project_id
        self.jobs = jobs
        if jobs is not None:
            self.jobs_id = [job.job_id for job in jobs]
        else:
            self.jobs_id = None

    def __repr__(self):
        return f"JobCollection(len: {len(self.jobs)}, jobs: {self.jobs}"

    def __getitem__(self, index: int) -> Job:
        return self.jobs[index]

    def __iter__(self):
        for job in self.jobs:
            yield job

    @property
    def info(self) -> Dict[str, dict]:
        """
        Gets and updates the metadata information for each job in the jobcollection,
            dictionary of job_id : job_information.
        """
        return self.apply(lambda job: job.info, only_succeeded=False)

    @property
    def status(self) -> Dict[str, str]:
        """
        Gets the status for each job in the jobcollection, a dictionary with
        job_id : job status.
        """
        return self.apply(lambda job: job.status, only_succeeded=False)

    def apply(
        self, worker: Callable, only_succeeded: bool = True, **kwargs
    ) -> Dict[str, Any]:
        """
        Helper function to apply `worker` on all jobs in the collection.
        `worker` needs to accept `Job` as first argument. For example, a
        lambda function that returns the job info:
        ```python
        self.apply(lambda job: job.info)
        ```

        Args:
            worker: A function to apply on all jobs in the collection.
            only_succeeded: Only apply to succeeded jobs (default is `True`).
            kwargs: additional keyword arguments to pass to `worker`.
        Returns:
            Dictionary where the key is the job id and the value the return
            of `worker`.
        """
        if not self.jobs:
            raise ValueError(
                "This is an empty JobCollection. Cannot apply over an empty job list."
            )

        out_dict = {}
        for job in self.jobs:
            if only_succeeded:
                if job.is_succeeded:
                    out_dict[job.job_id] = worker(job, **kwargs)
            else:
                out_dict[job.job_id] = worker(job, **kwargs)

        if not out_dict:
            raise ValueError(
                "All jobs have failed! Cannot apply over an empty succeeded job list."
            )

        return out_dict

    # TODO: Add method to get logs of failed jobs

    def download_results(
        self,
        output_directory: Union[str, Path, None] = None,
        merge: bool = True,
        unpacking: bool = True,
    ) -> Dict[str, List[str]]:
        """
        Downloads the job results. The final results are individually downloaded
        and by default a merged data.json is generated with all the results in a single
        feature collection. Unpacking the final will happen as default.
        Args:
            output_directory: The file output directory, defaults to the current working
                directory.
            merge: Wether to generate a merged data.json with all results.
            unpacking: By default the final result which is in TAR archive format will be unpacked.

        Returns:
            Dict of the job_ids and jobs' downloaded results filepaths. In addition,
            an additional key merged_result is added with the path to the merged
            data.json.
        """
        if output_directory is None:
            output_directory = Path.cwd() / f"project_{self.auth.project_id}"
        else:
            output_directory = Path(output_directory)

        def download_results_worker(job, output_directory, unpacking):
            out_dir = output_directory / f"job_{job.job_id}"
            out_filepaths_job = job.download_results(
                output_directory=out_dir, unpacking=unpacking
            )
            return out_filepaths_job

        out_filepaths = self.apply(
            download_results_worker,
            output_directory=output_directory,
            unpacking=unpacking,
        )

        if merge:
            merged_data_json = output_directory / "data.json"
            with open(merged_data_json, "w") as dst:
                out_features = []
                for job_id in out_filepaths:
                    all_files = out_filepaths[job_id]
                    data_json = [d for d in all_files if Path(d).name == "data.json"][0]
                    with open(data_json) as src:
                        data_json_fc = geojson.load(src)
                        for feat in data_json_fc.features:
                            feat.properties["job_id"] = job_id
                            try:
                                feat.properties[
                                    "up42.data_path"
                                ] = f"job_{job_id}/{feat.properties['up42.data_path']}"
                            except KeyError:
                                logger.warning(
                                    "data.json does not contain up42.data_path, skipping..."
                                )
                            out_features.append(feat)
                geojson.dump(FeatureCollection(out_features), dst)

            out_filepaths["merged_result"] = [str(merged_data_json)]

        self.results = out_filepaths
        return out_filepaths

apply(worker, only_succeeded=True, **kwargs)

Helper function to apply worker on all jobs in the collection. worker needs to accept Job as first argument. For example, a lambda function that returns the job info:

self.apply(lambda job: job.info)

Parameters:

Name Type Description Default
worker Callable

A function to apply on all jobs in the collection.

required
only_succeeded bool

Only apply to succeeded jobs (default is True).

True
kwargs

additional keyword arguments to pass to worker.

{}

Returns:

Type Description
Dict[str, Any]

Dictionary where the key is the job id and the value the return

Dict[str, Any]

of worker.

Source code in up42/jobcollection.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def apply(
    self, worker: Callable, only_succeeded: bool = True, **kwargs
) -> Dict[str, Any]:
    """
    Helper function to apply `worker` on all jobs in the collection.
    `worker` needs to accept `Job` as first argument. For example, a
    lambda function that returns the job info:
    ```python
    self.apply(lambda job: job.info)
    ```

    Args:
        worker: A function to apply on all jobs in the collection.
        only_succeeded: Only apply to succeeded jobs (default is `True`).
        kwargs: additional keyword arguments to pass to `worker`.
    Returns:
        Dictionary where the key is the job id and the value the return
        of `worker`.
    """
    if not self.jobs:
        raise ValueError(
            "This is an empty JobCollection. Cannot apply over an empty job list."
        )

    out_dict = {}
    for job in self.jobs:
        if only_succeeded:
            if job.is_succeeded:
                out_dict[job.job_id] = worker(job, **kwargs)
        else:
            out_dict[job.job_id] = worker(job, **kwargs)

    if not out_dict:
        raise ValueError(
            "All jobs have failed! Cannot apply over an empty succeeded job list."
        )

    return out_dict

download_results(output_directory=None, merge=True, unpacking=True)

Downloads the job results. The final results are individually downloaded and by default a merged data.json is generated with all the results in a single feature collection. Unpacking the final will happen as default.

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None
merge bool

Wether to generate a merged data.json with all results.

True
unpacking bool

By default the final result which is in TAR archive format will be unpacked.

True

Returns:

Type Description
Dict[str, List[str]]

Dict of the job_ids and jobs' downloaded results filepaths. In addition,

Dict[str, List[str]]

an additional key merged_result is added with the path to the merged

Dict[str, List[str]]

data.json.

Source code in up42/jobcollection.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def download_results(
    self,
    output_directory: Union[str, Path, None] = None,
    merge: bool = True,
    unpacking: bool = True,
) -> Dict[str, List[str]]:
    """
    Downloads the job results. The final results are individually downloaded
    and by default a merged data.json is generated with all the results in a single
    feature collection. Unpacking the final will happen as default.
    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
        merge: Wether to generate a merged data.json with all results.
        unpacking: By default the final result which is in TAR archive format will be unpacked.

    Returns:
        Dict of the job_ids and jobs' downloaded results filepaths. In addition,
        an additional key merged_result is added with the path to the merged
        data.json.
    """
    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}"
    else:
        output_directory = Path(output_directory)

    def download_results_worker(job, output_directory, unpacking):
        out_dir = output_directory / f"job_{job.job_id}"
        out_filepaths_job = job.download_results(
            output_directory=out_dir, unpacking=unpacking
        )
        return out_filepaths_job

    out_filepaths = self.apply(
        download_results_worker,
        output_directory=output_directory,
        unpacking=unpacking,
    )

    if merge:
        merged_data_json = output_directory / "data.json"
        with open(merged_data_json, "w") as dst:
            out_features = []
            for job_id in out_filepaths:
                all_files = out_filepaths[job_id]
                data_json = [d for d in all_files if Path(d).name == "data.json"][0]
                with open(data_json) as src:
                    data_json_fc = geojson.load(src)
                    for feat in data_json_fc.features:
                        feat.properties["job_id"] = job_id
                        try:
                            feat.properties[
                                "up42.data_path"
                            ] = f"job_{job_id}/{feat.properties['up42.data_path']}"
                        except KeyError:
                            logger.warning(
                                "data.json does not contain up42.data_path, skipping..."
                            )
                        out_features.append(feat)
            geojson.dump(FeatureCollection(out_features), dst)

        out_filepaths["merged_result"] = [str(merged_data_json)]

    self.results = out_filepaths
    return out_filepaths

info: Dict[str, dict] property

Gets and updates the metadata information for each job in the jobcollection, dictionary of job_id : job_information.

status: Dict[str, str] property

Gets the status for each job in the jobcollection, a dictionary with job_id : job status.

JobTask

The JobTask class provides access to the result of a specific block in the workflow. Each job contains one or multiple JobTasks, one for each block.

Use an existing jobtask:

jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
                                  job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")

Source code in up42/jobtask.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class JobTask(VizTools):
    """
    The JobTask class provides access to the result of a specific block in the workflow.
    Each job contains one or multiple JobTasks, one for each block.

    Use an existing jobtask:
    ```python
    jobtask = up42.initialize_jobtask(jobtask_id="3f772637-09aa-4164-bded-692fcd746d20",
                                      job_id="de5806aa-5ef1-4dc9-ab1d-06d7ec1a5021")
    ```
    """

    def __init__(
        self,
        auth: Auth,
        project_id: str,
        job_id: str,
        jobtask_id: str,
    ):
        self.auth = auth
        self.project_id = project_id
        self.job_id = job_id
        self.jobtask_id = jobtask_id
        self.quicklooks = None
        self.results = None
        self._info = self.info

    def __repr__(self):
        return (
            f"JobTask(name: {self._info['name']}, jobtask_id: {self.jobtask_id}, "
            f"status: {self._info['status']}, startedAt: {self._info['startedAt']}, "
            f"finishedAt: {self._info['finishedAt']}, job_name: {self._info['name']}, "
            f"block_name: {self._info['block']['name']}, block_version: {self._info['blockVersion']}"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the jobtask metadata information.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        info_all_jobtasks = response_json["data"]
        self._info = next(
            item for item in info_all_jobtasks if item["id"] == self.jobtask_id
        )
        return self._info

    def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
        """
        Gets the Jobtask results data.json.

        Args:
            as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.

        Returns:
            Json of the results, alternatively geodataframe.
        """
        url = (
            f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/data-json/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        logger.info(f"Retrieved {len(response_json['features'])} features.")

        if as_dataframe:
            # UP42 results are always in EPSG 4326
            df = GeoDataFrame.from_features(response_json, crs=4326)
            return df
        else:
            return response_json

    def _get_download_url(self):
        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/downloads/results/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        download_url = response_json["data"]["url"]
        return download_url

    def download_results(
        self, output_directory: Union[str, Path, None] = None
    ) -> List[str]:
        """
        Downloads and unpacks the jobtask results. Default download to Desktop.

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.
        Returns:
            List of the downloaded results' filepaths.
        """
        logger.info(f"Downloading results of jobtask {self.jobtask_id}")

        if output_directory is None:
            output_directory = (
                Path.cwd()
                / f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        download_url = self._get_download_url()
        out_filepaths = download_from_gcs_unpack(
            download_url=download_url,
            output_directory=output_directory,
        )

        self.results = out_filepaths
        return out_filepaths

    def download_quicklooks(
        self,
        output_directory: Union[str, Path, None] = None,
    ) -> List[str]:
        """
        Downloads quicklooks of the job task to disk.

        After download, can be plotted via jobtask.plot_quicklooks().

        Args:
            output_directory: The file output directory, defaults to the current working
                directory.

        Returns:
            The quicklooks filepaths.
        """
        if output_directory is None:
            # On purpose downloading the quicklooks to the jobs folder and not the
            # jobtasks folder,since only relevant for data block task. And clearer
            # for job.download_quicklooks.
            output_directory = (
                Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
            )
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
        )
        response_json = self.auth._request(request_type="GET", url=url)
        quicklooks_ids = response_json["data"]

        out_paths: List[str] = []
        for ql_id in tqdm(quicklooks_ids):
            out_path = output_directory / f"quicklook_{ql_id}"  # No suffix required.
            out_paths.append(str(out_path))

            url = (
                f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
                f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
            )
            response = self.auth._request(
                request_type="GET", url=url, return_text=False
            )

            with open(out_path, "wb") as dst:
                for chunk in response:
                    dst.write(chunk)

        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

download_quicklooks(output_directory=None)

Downloads quicklooks of the job task to disk.

After download, can be plotted via jobtask.plot_quicklooks().

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

The quicklooks filepaths.

Source code in up42/jobtask.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def download_quicklooks(
    self,
    output_directory: Union[str, Path, None] = None,
) -> List[str]:
    """
    Downloads quicklooks of the job task to disk.

    After download, can be plotted via jobtask.plot_quicklooks().

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.

    Returns:
        The quicklooks filepaths.
    """
    if output_directory is None:
        # On purpose downloading the quicklooks to the jobs folder and not the
        # jobtasks folder,since only relevant for data block task. And clearer
        # for job.download_quicklooks.
        output_directory = (
            Path.cwd() / f"project_{self.auth.project_id}" / f"job_{self.job_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    url = (
        f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
        f"/tasks/{self.jobtask_id}/outputs/quicklooks/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    quicklooks_ids = response_json["data"]

    out_paths: List[str] = []
    for ql_id in tqdm(quicklooks_ids):
        out_path = output_directory / f"quicklook_{ql_id}"  # No suffix required.
        out_paths.append(str(out_path))

        url = (
            f"{self.auth._endpoint()}/projects/{self.project_id}/jobs/{self.job_id}"
            f"/tasks/{self.jobtask_id}/outputs/quicklooks/{ql_id}"
        )
        response = self.auth._request(
            request_type="GET", url=url, return_text=False
        )

        with open(out_path, "wb") as dst:
            for chunk in response:
                dst.write(chunk)

    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

download_results(output_directory=None)

Downloads and unpacks the jobtask results. Default download to Desktop.

Parameters:

Name Type Description Default
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

List of the downloaded results' filepaths.

Source code in up42/jobtask.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def download_results(
    self, output_directory: Union[str, Path, None] = None
) -> List[str]:
    """
    Downloads and unpacks the jobtask results. Default download to Desktop.

    Args:
        output_directory: The file output directory, defaults to the current working
            directory.
    Returns:
        List of the downloaded results' filepaths.
    """
    logger.info(f"Downloading results of jobtask {self.jobtask_id}")

    if output_directory is None:
        output_directory = (
            Path.cwd()
            / f"project_{self.auth.project_id}/job_{self.job_id}/jobtask_{self.jobtask_id}"
        )
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    download_url = self._get_download_url()
    out_filepaths = download_from_gcs_unpack(
        download_url=download_url,
        output_directory=output_directory,
    )

    self.results = out_filepaths
    return out_filepaths

get_results_json(as_dataframe=False)

Gets the Jobtask results data.json.

Parameters:

Name Type Description Default
as_dataframe bool

"fc" for FeatureCollection dict, "df" for GeoDataFrame.

False

Returns:

Type Description
Union[dict, GeoDataFrame]

Json of the results, alternatively geodataframe.

Source code in up42/jobtask.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_results_json(self, as_dataframe: bool = False) -> Union[dict, GeoDataFrame]:
    """
    Gets the Jobtask results data.json.

    Args:
        as_dataframe: "fc" for FeatureCollection dict, "df" for GeoDataFrame.

    Returns:
        Json of the results, alternatively geodataframe.
    """
    url = (
        f"{self.auth._endpoint()}/projects/{self.auth.project_id}/jobs/{self.job_id}"
        f"/tasks/{self.jobtask_id}/outputs/data-json/"
    )
    response_json = self.auth._request(request_type="GET", url=url)
    logger.info(f"Retrieved {len(response_json['features'])} features.")

    if as_dataframe:
        # UP42 results are always in EPSG 4326
        df = GeoDataFrame.from_features(response_json, crs=4326)
        return df
    else:
        return response_json

info: dict property

Gets and updates the jobtask metadata information.

Order

The Order class enables you to place, inspect and get information on orders.

Use an existing order:

order = up42.initialize_order(order_id="ea36dee9-fed6-457e-8400-2c20ebd30f44")

Source code in up42/order.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class Order:
    """
    The Order class enables you to place, inspect and get information on orders.

    Use an existing order:
    ```python
    order = up42.initialize_order(order_id="ea36dee9-fed6-457e-8400-2c20ebd30f44")
    ```
    """

    def __init__(
        self,
        auth: Auth,
        order_id: str,
        order_parameters: Optional[dict] = None,
        order_info: Optional[dict] = None,
    ):
        self.auth = auth
        self.workspace_id = auth.workspace_id
        self.order_id = order_id
        self.order_parameters = order_parameters
        if order_info is not None:
            self._info = order_info
        else:
            self._info = self.info

    def __repr__(self):
        return (
            f"Order(order_id: {self.order_id}, assets: {self._info['assets']}, "
            f"dataProvider: {self._info['dataProvider']}, status: {self._info['status']}, "
            f"createdAt: {self._info['createdAt']}, updatedAt: {self._info['updatedAt']})"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the order information.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/orders/{self.order_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    @property
    def status(self) -> str:
        """
        Gets the Order status. One of `PLACED`, `FAILED`, `FULFILLED`, `BEING_FULFILLED`, `FAILED_PERMANENTLY`.
        """
        status = self.info["status"]
        logger.info(f"Order is {status}")
        return status

    @property
    def is_fulfilled(self) -> bool:
        """
        Gets `True` if the order is fulfilled, `False` otherwise.
        Also see [status attribute](order-reference.md#up42.order.Order.status).
        """
        return self.status == "FULFILLED"

    def get_assets(self) -> List[Asset]:
        """
        Gets the Order assets or results.
        """
        if self.is_fulfilled:
            assets: List[str] = self.info["assets"]
            return [Asset(self.auth, asset_id=asset) for asset in assets]
        raise ValueError(
            f"Order {self.order_id} is not FULFILLED! Status is {self.status}"
        )

    @classmethod
    def place(cls, auth: Auth, order_parameters: dict) -> "Order":
        """
        Places an order.

        Args:
            auth: An authentication object.
            order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

        Returns:
            Order: The placed order.
        """
        url = f"{auth._endpoint()}/workspaces/{auth.workspace_id}/orders"
        response_json = auth._request(
            request_type="POST", url=url, data=order_parameters
        )
        try:
            order_id = response_json["data"]["id"]  # type: ignore
        except KeyError as e:
            raise ValueError(f"Order was not placed: {response_json}") from e
        order = cls(auth=auth, order_id=order_id, order_parameters=order_parameters)
        logger.info(f"Order {order.order_id} is now {order.status}.")
        return order

    @staticmethod
    def estimate(auth: Auth, order_parameters: dict) -> int:
        """
        Returns an estimation of the cost of an order.

        Args:
            auth: An authentication object.
            order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

        Returns:
            int: The estimated cost of the order
        """
        url = f"{auth._endpoint()}/workspaces/{auth.workspace_id}/orders/estimate"

        response_json = auth._request(
            request_type="POST", url=url, data=order_parameters
        )
        estimated_credits: int = response_json["data"]["credits"]  # type: ignore
        logger.info(
            f"Order is estimated to cost {estimated_credits} UP42 credits (order_parameters: {order_parameters})"
        )
        return estimated_credits

    def track_status(self, report_time: int = 120) -> str:
        """
        Continuously gets the order status until order is fulfilled or failed.

        Internally checks every `report_time` (s) for the status and prints the log.

        Warning:
            When placing orders of items that are in archive or cold storage,
            the order fulfillment can happen up to **24h after order placement**.
            In such cases,
            please make sure to set an appropriate `report_time`.

        Args:
            report_time: The interval (in seconds) when to get the order status.

        Returns:
            str: The final order status.
        """
        logger.info(
            f"Tracking order status, reporting every {report_time} seconds...",
        )
        time_asleep = 0

        while not self.is_fulfilled:
            status = self.status
            if status in ["PLACED", "BEING_FULFILLED"]:
                if time_asleep != 0 and time_asleep % report_time == 0:
                    logger.info(f"Order is {status}! - {self.order_id}")
            elif status in ["FAILED", "FAILED_PERMANENTLY"]:
                logger.info(f"Order is {status}! - {self.order_id}")
                raise ValueError("Order has failed!")

            sleep(report_time)
            time_asleep += report_time

        logger.info(f"Order is fulfilled successfully! - {self.order_id}")
        return self.status

estimate(auth, order_parameters) staticmethod

Returns an estimation of the cost of an order.

Parameters:

Name Type Description Default
auth Auth

An authentication object.

required
order_parameters dict

A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

required

Returns:

Name Type Description
int int

The estimated cost of the order

Source code in up42/order.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@staticmethod
def estimate(auth: Auth, order_parameters: dict) -> int:
    """
    Returns an estimation of the cost of an order.

    Args:
        auth: An authentication object.
        order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

    Returns:
        int: The estimated cost of the order
    """
    url = f"{auth._endpoint()}/workspaces/{auth.workspace_id}/orders/estimate"

    response_json = auth._request(
        request_type="POST", url=url, data=order_parameters
    )
    estimated_credits: int = response_json["data"]["credits"]  # type: ignore
    logger.info(
        f"Order is estimated to cost {estimated_credits} UP42 credits (order_parameters: {order_parameters})"
    )
    return estimated_credits

get_assets()

Gets the Order assets or results.

Source code in up42/order.py
73
74
75
76
77
78
79
80
81
82
def get_assets(self) -> List[Asset]:
    """
    Gets the Order assets or results.
    """
    if self.is_fulfilled:
        assets: List[str] = self.info["assets"]
        return [Asset(self.auth, asset_id=asset) for asset in assets]
    raise ValueError(
        f"Order {self.order_id} is not FULFILLED! Status is {self.status}"
    )

info: dict property

Gets and updates the order information.

is_fulfilled: bool property

Gets True if the order is fulfilled, False otherwise. Also see status attribute.

place(auth, order_parameters) classmethod

Places an order.

Parameters:

Name Type Description Default
auth Auth

An authentication object.

required
order_parameters dict

A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

required

Returns:

Name Type Description
Order Order

The placed order.

Source code in up42/order.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@classmethod
def place(cls, auth: Auth, order_parameters: dict) -> "Order":
    """
    Places an order.

    Args:
        auth: An authentication object.
        order_parameters: A dictionary like {dataProduct: ..., "params": {"id": ..., "aoi": ...}}

    Returns:
        Order: The placed order.
    """
    url = f"{auth._endpoint()}/workspaces/{auth.workspace_id}/orders"
    response_json = auth._request(
        request_type="POST", url=url, data=order_parameters
    )
    try:
        order_id = response_json["data"]["id"]  # type: ignore
    except KeyError as e:
        raise ValueError(f"Order was not placed: {response_json}") from e
    order = cls(auth=auth, order_id=order_id, order_parameters=order_parameters)
    logger.info(f"Order {order.order_id} is now {order.status}.")
    return order

status: str property

Gets the Order status. One of PLACED, FAILED, FULFILLED, BEING_FULFILLED, FAILED_PERMANENTLY.

track_status(report_time=120)

Continuously gets the order status until order is fulfilled or failed.

Internally checks every report_time (s) for the status and prints the log.

Warning

When placing orders of items that are in archive or cold storage, the order fulfillment can happen up to 24h after order placement. In such cases, please make sure to set an appropriate report_time.

Parameters:

Name Type Description Default
report_time int

The interval (in seconds) when to get the order status.

120

Returns:

Name Type Description
str str

The final order status.

Source code in up42/order.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def track_status(self, report_time: int = 120) -> str:
    """
    Continuously gets the order status until order is fulfilled or failed.

    Internally checks every `report_time` (s) for the status and prints the log.

    Warning:
        When placing orders of items that are in archive or cold storage,
        the order fulfillment can happen up to **24h after order placement**.
        In such cases,
        please make sure to set an appropriate `report_time`.

    Args:
        report_time: The interval (in seconds) when to get the order status.

    Returns:
        str: The final order status.
    """
    logger.info(
        f"Tracking order status, reporting every {report_time} seconds...",
    )
    time_asleep = 0

    while not self.is_fulfilled:
        status = self.status
        if status in ["PLACED", "BEING_FULFILLED"]:
            if time_asleep != 0 and time_asleep % report_time == 0:
                logger.info(f"Order is {status}! - {self.order_id}")
        elif status in ["FAILED", "FAILED_PERMANENTLY"]:
            logger.info(f"Order is {status}! - {self.order_id}")
            raise ValueError("Order has failed!")

        sleep(report_time)
        time_asleep += report_time

    logger.info(f"Order is fulfilled successfully! - {self.order_id}")
    return self.status

Project

The Project is the top-level class of the UP42 hierarchy. With it you can create new workflows, query already existing workflows & jobs in the project and manage the project settings.

Create a new project on the UP42 Console website.

Use an existing project:

up42.authenticate(project_id="uz92-8uo0-4dc9-ab1d-06d7ec1a5321",
                  project_api_key="9i7uec8a-45be-41ad-a50f-98bewb528b10")
project = up42.initialize_project()

Source code in up42/project.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class Project:
    """
    The Project is the top-level class of the UP42 hierarchy. With it you can create
    new workflows, query already existing workflows & jobs in the project and manage
    the project settings.

    Create a new project on the
    [**UP42 Console website**](https://sdk.up42.com/authentication/#get-your-project-credentials).

    Use an existing project:
    ```python
    up42.authenticate(project_id="uz92-8uo0-4dc9-ab1d-06d7ec1a5321",
                      project_api_key="9i7uec8a-45be-41ad-a50f-98bewb528b10")
    project = up42.initialize_project()
    ```
    """

    def __init__(self, auth: Auth, project_id: str):
        self.auth = auth
        self.project_id = project_id
        self._info = self.info

    def __repr__(self):
        env = ", env: dev" if self.auth.env == "dev" else ""
        return (
            f"Project(name: {self._info['name']}, project_id: {self.project_id}, "
            f"description: {self._info['description']}, createdAt: {self._info['createdAt']}"
            f"{env})"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the project metadata information.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    def create_workflow(
        self, name: str, description: str = "", use_existing: bool = False
    ) -> "Workflow":
        """
        Creates a new workflow and returns a workflow object.

        Args:
            name: Name of the new workflow.
            description: Description of the new workflow.
            use_existing: If True, instead of creating a new workflow, uses the
                most recent workflow with the same name & description.

        Returns:
            The workflow object.
        """
        if use_existing:
            logger.info("Getting existing workflows in project ...")
            logging.getLogger("up42.workflow").setLevel(logging.CRITICAL)
            existing_workflows: list = self.get_workflows(return_json=True)
            logging.getLogger("up42.workflow").setLevel(logging.INFO)

            matching_workflows: list = [
                workflow
                for workflow in existing_workflows
                if workflow["name"] == name and workflow["description"] == description
            ]
            if matching_workflows:
                existing_workflow = Workflow(
                    self.auth,
                    project_id=self.project_id,
                    workflow_id=matching_workflows[0]["id"],
                )
                logger.info(
                    f"Using existing workflow: {name} - {existing_workflow.workflow_id}"
                )
                return existing_workflow

        url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        payload = {"name": name, "description": description}
        response_json = self.auth._request(request_type="POST", url=url, data=payload)
        workflow_id = response_json["data"]["id"]
        logger.info(f"Created new workflow: {workflow_id}")
        workflow = Workflow(
            self.auth, project_id=self.project_id, workflow_id=workflow_id
        )
        return workflow

    def get_workflows(
        self, return_json: bool = False
    ) -> Union[List["Workflow"], List[dict]]:
        """
        Gets all workflows in a project as workflow objects or JSON.

        Args:
            return_json: True returns infos of workflows as JSON instead of workflow objects.

        Returns:
            List of Workflow objects in the project or alternatively JSON info of the workflows.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows"
        response_json = self.auth._request(request_type="GET", url=url)
        workflows_json = response_json["data"]
        logger.info(
            f"Got {len(workflows_json)} workflows for project {self.project_id}."
        )

        if return_json:
            return workflows_json
        else:
            workflows = [
                Workflow(
                    self.auth,
                    project_id=self.project_id,
                    workflow_id=workflow_json["id"],
                    workflow_info=workflow_json,
                )
                for workflow_json in workflows_json
            ]
            return workflows

    def get_jobs(
        self,
        return_json: bool = False,
        test_jobs: bool = True,
        real_jobs: bool = True,
        limit: int = 500,
        sortby: str = "createdAt",
        descending: bool = True,
    ) -> Union[JobCollection, List[dict]]:
        """
        Get all jobs in the project as a JobCollection or JSON.

        Use Workflow().get_job() to get a JobCollection with jobs associated with a
        specific workflow.

        Args:
            return_json: If true, returns the job info JSONs instead of JobCollection.
            test_jobs: Return test jobs or test queries.
            real_jobs: Return real jobs.
            limit: Only return n first jobs by sorting criteria and order, default 500.
            sortby: The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".
            descending: The sorting order, True for descending (default), False for ascending.

        Returns:
            All job objects in a JobCollection, or alternatively the jobs info as JSON.
        """
        allowed_sorting_criteria = [
            "createdAt",
            "name",
            "id",
            "mode",
            "status",
            "startedAt",
            "finishedAt",
        ]
        if sortby not in allowed_sorting_criteria:
            raise ValueError(
                f"sortby parameter must be one of {allowed_sorting_criteria}!"
            )
        sort = f"{sortby},{'desc' if descending else 'asc'}"

        page = 0
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
        response_json = self.auth._request(request_type="GET", url=url)
        jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

        # API get jobs pagination exhaustion is indicated by empty next page (no last page flag)
        logging.getLogger("up42.utils").setLevel(logging.CRITICAL)
        while len(response_json["data"]) > 0 and len(jobs_json) < limit:
            page += 1
            url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
            response_json = self.auth._request(request_type="GET", url=url)
            if len(response_json["data"]) > 0:
                jobs_json.extend(
                    filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)
                )
        logging.getLogger("up42.utils").setLevel(logging.INFO)
        jobs_json = jobs_json[:limit]
        logger.info(
            f"Got {len(jobs_json)} jobs (limit parameter {limit}) in project {self.project_id}."
        )
        if return_json:
            return jobs_json
        else:
            jobs = [
                Job(
                    self.auth,
                    job_id=job_json["id"],
                    project_id=self.project_id,
                    job_info=job_json,
                )
                for job_json in jobs_json
            ]
            jobcollection = JobCollection(
                auth=self.auth, project_id=self.project_id, jobs=jobs
            )
            return jobcollection

    def get_project_settings(self) -> List[Dict[str, str]]:
        """
        Gets the project settings.

        Returns:
            The project settings.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
        response_json = self.auth._request(request_type="GET", url=url)
        project_settings = response_json["data"]
        return project_settings

    @property
    def max_concurrent_jobs(self) -> int:
        """
        Gets the maximum number of concurrent jobs allowed by the project settings.
        """
        project_settings = self.get_project_settings()
        project_settings_dict = {d["name"]: int(d["value"]) for d in project_settings}
        return project_settings_dict["MAX_CONCURRENT_JOBS"]

    def update_project_settings(
        self,
        max_aoi_size: Optional[int] = None,
        max_concurrent_jobs: Optional[int] = None,
        number_of_images: Optional[int] = None,
    ) -> None:
        """
        Updates a project's settings.

        Args:
            max_aoi_size: The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.
            max_concurrent_jobs: The maximum number of concurrent jobs, from 1-10, default 1.
            number_of_images: The maximum number of images returned with each job, from 1-20, default 10.
        """
        # The ranges and default values of the project settings can potentially get
        # increased, so need to be dynamically queried from the server.
        current_settings = {d["name"]: d for d in self.get_project_settings()}

        url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
        payload: Dict = {"settings": {}}
        desired_settings = {
            "JOB_QUERY_MAX_AOI_SIZE": max_aoi_size,
            "MAX_CONCURRENT_JOBS": max_concurrent_jobs,
            "JOB_QUERY_LIMIT_PARAMETER_MAX_VALUE": number_of_images,
        }

        for name, desired_setting in desired_settings.items():
            if desired_setting is None:
                payload["settings"][name] = str(current_settings[name]["value"])
            else:
                payload["settings"][name] = str(desired_setting)

        self.auth._request(request_type="POST", url=url, data=payload)
        logger.info(f"Updated project settings: {payload}")

create_workflow(name, description='', use_existing=False)

Creates a new workflow and returns a workflow object.

Parameters:

Name Type Description Default
name str

Name of the new workflow.

required
description str

Description of the new workflow.

''
use_existing bool

If True, instead of creating a new workflow, uses the most recent workflow with the same name & description.

False

Returns:

Type Description
Workflow

The workflow object.

Source code in up42/project.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def create_workflow(
    self, name: str, description: str = "", use_existing: bool = False
) -> "Workflow":
    """
    Creates a new workflow and returns a workflow object.

    Args:
        name: Name of the new workflow.
        description: Description of the new workflow.
        use_existing: If True, instead of creating a new workflow, uses the
            most recent workflow with the same name & description.

    Returns:
        The workflow object.
    """
    if use_existing:
        logger.info("Getting existing workflows in project ...")
        logging.getLogger("up42.workflow").setLevel(logging.CRITICAL)
        existing_workflows: list = self.get_workflows(return_json=True)
        logging.getLogger("up42.workflow").setLevel(logging.INFO)

        matching_workflows: list = [
            workflow
            for workflow in existing_workflows
            if workflow["name"] == name and workflow["description"] == description
        ]
        if matching_workflows:
            existing_workflow = Workflow(
                self.auth,
                project_id=self.project_id,
                workflow_id=matching_workflows[0]["id"],
            )
            logger.info(
                f"Using existing workflow: {name} - {existing_workflow.workflow_id}"
            )
            return existing_workflow

    url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
    payload = {"name": name, "description": description}
    response_json = self.auth._request(request_type="POST", url=url, data=payload)
    workflow_id = response_json["data"]["id"]
    logger.info(f"Created new workflow: {workflow_id}")
    workflow = Workflow(
        self.auth, project_id=self.project_id, workflow_id=workflow_id
    )
    return workflow

get_jobs(return_json=False, test_jobs=True, real_jobs=True, limit=500, sortby='createdAt', descending=True)

Get all jobs in the project as a JobCollection or JSON.

Use Workflow().get_job() to get a JobCollection with jobs associated with a specific workflow.

Parameters:

Name Type Description Default
return_json bool

If true, returns the job info JSONs instead of JobCollection.

False
test_jobs bool

Return test jobs or test queries.

True
real_jobs bool

Return real jobs.

True
limit int

Only return n first jobs by sorting criteria and order, default 500.

500
sortby str

The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".

'createdAt'
descending bool

The sorting order, True for descending (default), False for ascending.

True

Returns:

Type Description
Union[JobCollection, List[dict]]

All job objects in a JobCollection, or alternatively the jobs info as JSON.

Source code in up42/project.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_jobs(
    self,
    return_json: bool = False,
    test_jobs: bool = True,
    real_jobs: bool = True,
    limit: int = 500,
    sortby: str = "createdAt",
    descending: bool = True,
) -> Union[JobCollection, List[dict]]:
    """
    Get all jobs in the project as a JobCollection or JSON.

    Use Workflow().get_job() to get a JobCollection with jobs associated with a
    specific workflow.

    Args:
        return_json: If true, returns the job info JSONs instead of JobCollection.
        test_jobs: Return test jobs or test queries.
        real_jobs: Return real jobs.
        limit: Only return n first jobs by sorting criteria and order, default 500.
        sortby: The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".
        descending: The sorting order, True for descending (default), False for ascending.

    Returns:
        All job objects in a JobCollection, or alternatively the jobs info as JSON.
    """
    allowed_sorting_criteria = [
        "createdAt",
        "name",
        "id",
        "mode",
        "status",
        "startedAt",
        "finishedAt",
    ]
    if sortby not in allowed_sorting_criteria:
        raise ValueError(
            f"sortby parameter must be one of {allowed_sorting_criteria}!"
        )
    sort = f"{sortby},{'desc' if descending else 'asc'}"

    page = 0
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
    response_json = self.auth._request(request_type="GET", url=url)
    jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

    # API get jobs pagination exhaustion is indicated by empty next page (no last page flag)
    logging.getLogger("up42.utils").setLevel(logging.CRITICAL)
    while len(response_json["data"]) > 0 and len(jobs_json) < limit:
        page += 1
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
        response_json = self.auth._request(request_type="GET", url=url)
        if len(response_json["data"]) > 0:
            jobs_json.extend(
                filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)
            )
    logging.getLogger("up42.utils").setLevel(logging.INFO)
    jobs_json = jobs_json[:limit]
    logger.info(
        f"Got {len(jobs_json)} jobs (limit parameter {limit}) in project {self.project_id}."
    )
    if return_json:
        return jobs_json
    else:
        jobs = [
            Job(
                self.auth,
                job_id=job_json["id"],
                project_id=self.project_id,
                job_info=job_json,
            )
            for job_json in jobs_json
        ]
        jobcollection = JobCollection(
            auth=self.auth, project_id=self.project_id, jobs=jobs
        )
        return jobcollection

get_project_settings()

Gets the project settings.

Returns:

Type Description
List[Dict[str, str]]

The project settings.

Source code in up42/project.py
211
212
213
214
215
216
217
218
219
220
221
def get_project_settings(self) -> List[Dict[str, str]]:
    """
    Gets the project settings.

    Returns:
        The project settings.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
    response_json = self.auth._request(request_type="GET", url=url)
    project_settings = response_json["data"]
    return project_settings

get_workflows(return_json=False)

Gets all workflows in a project as workflow objects or JSON.

Parameters:

Name Type Description Default
return_json bool

True returns infos of workflows as JSON instead of workflow objects.

False

Returns:

Type Description
Union[List[Workflow], List[dict]]

List of Workflow objects in the project or alternatively JSON info of the workflows.

Source code in up42/project.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_workflows(
    self, return_json: bool = False
) -> Union[List["Workflow"], List[dict]]:
    """
    Gets all workflows in a project as workflow objects or JSON.

    Args:
        return_json: True returns infos of workflows as JSON instead of workflow objects.

    Returns:
        List of Workflow objects in the project or alternatively JSON info of the workflows.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows"
    response_json = self.auth._request(request_type="GET", url=url)
    workflows_json = response_json["data"]
    logger.info(
        f"Got {len(workflows_json)} workflows for project {self.project_id}."
    )

    if return_json:
        return workflows_json
    else:
        workflows = [
            Workflow(
                self.auth,
                project_id=self.project_id,
                workflow_id=workflow_json["id"],
                workflow_info=workflow_json,
            )
            for workflow_json in workflows_json
        ]
        return workflows

info: dict property

Gets and updates the project metadata information.

max_concurrent_jobs: int property

Gets the maximum number of concurrent jobs allowed by the project settings.

update_project_settings(max_aoi_size=None, max_concurrent_jobs=None, number_of_images=None)

Updates a project's settings.

Parameters:

Name Type Description Default
max_aoi_size Optional[int]

The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.

None
max_concurrent_jobs Optional[int]

The maximum number of concurrent jobs, from 1-10, default 1.

None
number_of_images Optional[int]

The maximum number of images returned with each job, from 1-20, default 10.

None
Source code in up42/project.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def update_project_settings(
    self,
    max_aoi_size: Optional[int] = None,
    max_concurrent_jobs: Optional[int] = None,
    number_of_images: Optional[int] = None,
) -> None:
    """
    Updates a project's settings.

    Args:
        max_aoi_size: The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.
        max_concurrent_jobs: The maximum number of concurrent jobs, from 1-10, default 1.
        number_of_images: The maximum number of images returned with each job, from 1-20, default 10.
    """
    # The ranges and default values of the project settings can potentially get
    # increased, so need to be dynamically queried from the server.
    current_settings = {d["name"]: d for d in self.get_project_settings()}

    url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
    payload: Dict = {"settings": {}}
    desired_settings = {
        "JOB_QUERY_MAX_AOI_SIZE": max_aoi_size,
        "MAX_CONCURRENT_JOBS": max_concurrent_jobs,
        "JOB_QUERY_LIMIT_PARAMETER_MAX_VALUE": number_of_images,
    }

    for name, desired_setting in desired_settings.items():
        if desired_setting is None:
            payload["settings"][name] = str(current_settings[name]["value"])
        else:
            payload["settings"][name] = str(desired_setting)

    self.auth._request(request_type="POST", url=url, data=payload)
    logger.info(f"Updated project settings: {payload}")

Storage

The Storage class enables access to the UP42 storage. You can list your assets and orders within an UP42 workspace.

Use the storage:

storage = up42.initialize_storage()

Source code in up42/storage.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
class Storage:
    """
    The Storage class enables access to the UP42 storage. You can list
    your assets and orders within an UP42 workspace.

    Use the storage:
    ```python
    storage = up42.initialize_storage()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.workspace_id = auth.workspace_id

    def __repr__(self):
        env = ", env: dev" if self.auth.env == "dev" else ""
        return f"Storage(workspace_id: {self.workspace_id}{env})"

    @property
    def pystac_client(self):
        """
        PySTAC client, a Python package for working with UP42 STAC API and accessing storage assets.
        For more information, see [PySTAC Client Documentation](https://pystac-client.readthedocs.io/).
        """

        def _authenticate_client():
            url = f"{self.auth._endpoint()}/v2/assets/stac"
            authenticated_client = pystac_client.Client.open(
                url=url,
                headers={
                    "Authorization": f"Bearer {self.auth.token}",
                },
            )
            return authenticated_client

        try:
            up42_pystac_client = _authenticate_client()
        except pystac_client.exceptions.APIError:
            self.auth._get_token()
            up42_pystac_client = _authenticate_client()

        return up42_pystac_client

    def _query_paginated_endpoints(
        self, url: str, limit: Optional[int] = None, size: int = 50
    ) -> List[dict]:
        """
        Helper to fetch list of items in paginated endpoint, e.g. assets, orders.

        Args:
            url (str): The base url for paginated endpoint.
            limit: Return n first elements sorted by date of creation, optional.
            size: Default number of results per pagination page. Tradeoff of number
                of results per page and API response time to query one page. Default 50.

        Returns:
            List[dict]: List of all paginated items.
        """
        url = url + f"&size={size}"

        first_page_response = self.auth._request(request_type="GET", url=url)
        if (
            "data" in first_page_response
        ):  # UP42 API v2 convention without data key, but still in e.g. get order
            # endpoint
            first_page_response = first_page_response["data"]
        num_pages = first_page_response["totalPages"]
        num_elements = first_page_response["totalElements"]
        results_list = first_page_response["content"]

        if limit is None:
            # Also covers single page (without limit)
            num_pages_to_query = num_pages
        elif limit <= size:
            return results_list[:limit]
        else:
            # Also covers single page (with limit)
            num_pages_to_query = math.ceil(min(limit, num_elements) / size)

        for page in range(1, num_pages_to_query):
            response_json = self.auth._request(
                request_type="GET", url=url + f"&page={page}"
            )
            if "data" in response_json:
                response_json = response_json["data"]
            results_list += response_json["content"]
        return results_list[:limit]

    def _query_paginated_stac_search(
        self,
        url: str,
        stac_search_parameters: dict,
    ) -> list:
        """
        Helper to fetch list of items in paginated stac search endpoind, e.g. stac search assets.

        Args:
            url (str): The base url for paginated endpoint.
            stac_search_parameters (dict): the parameters required for stac search

        Returns:
            List of storage STAC results features.
        """
        response_features: list = []
        response_features_limit = stac_search_parameters["limit"]
        while len(response_features) < response_features_limit:
            stac_results = self.auth._request(
                request_type="POST", url=url, data=stac_search_parameters
            )
            response_features.extend(stac_results["features"])
            token_list = [
                link["body"]["token"]
                for link in stac_results["links"]
                if link["rel"] == "next"
            ]
            if token_list:
                stac_search_parameters["token"] = token_list[0]
            else:
                break
        return response_features

    def _search_stac(
        self,
        acquired_after: Optional[Union[str, datetime]] = None,
        acquired_before: Optional[Union[str, datetime]] = None,
        geometry: Optional[
            Union[
                dict,
                Feature,
                FeatureCollection,
                list,
                GeoDataFrame,
                Polygon,
            ]
        ] = None,
        custom_filter=None,
    ) -> list:
        """
        Search query for storage STAC collection items.

        Args:
            acquired_after: Search for assets that contain data acquired after the specified timestamp,\
                in `"YYYY-MM-DD"` format.
            acquired_before: Search for assets that contain data acquired before the specified timestamp,\
                in `"YYYY-MM-DD"` format.
            geometry: Search for assets that contain STAC items intersecting the provided geometry,\
                in EPSG:4326 (WGS84) format.\
                    For more information on STAC items,\
                        see [Introduction to STAC](https://docs.up42.com/developers/api-assets/stac-about).
            custom_filter:\
                CQL2 filters used to search for assets that contain STAC items with specific property values.\
                For more information on filters,\
                see [PySTAC Client Documentation — CQL2 Filtering]\
                    (https://pystac-client.readthedocs.io/en/stable/tutorials/cql2-filter.html#CQL2-Filters).\
                For more information on STAC items, see [Introduction to STAC]\
                    (https://docs.up42.com/developers/api-assets/stac-about).

        Returns:
            A list of STAC items.
        """
        stac_search_parameters: Dict[str, Any] = {
            "max_items": 100,
            "limit": 10000,
        }
        if geometry is not None:
            geometry = any_vector_to_fc(vector=geometry)
            geometry = fc_to_query_geometry(
                fc=geometry, geometry_operation="intersects"
            )
            stac_search_parameters["intersects"] = geometry
        if custom_filter is not None:
            # e.g. {"op": "gte","args": [{"property": "eo:cloud_cover"}, 10]}
            stac_search_parameters["filter"] = custom_filter

        datetime_filter = None
        if acquired_after is not None:
            datetime_filter = f"{format_time(acquired_after)}/.."
        if acquired_before is not None:
            datetime_filter = f"../{format_time(acquired_before)}"
        if acquired_after is not None and acquired_before is not None:
            datetime_filter = (
                f"{format_time(acquired_after)}/{format_time(acquired_before)}"
            )
        stac_search_parameters["datetime"] = datetime_filter  # type: ignore

        url = f"{self.auth._endpoint()}/v2/assets/stac/search"

        features = self._query_paginated_stac_search(url, stac_search_parameters)
        return features

    def get_assets(
        self,
        created_after: Optional[Union[str, datetime]] = None,
        created_before: Optional[Union[str, datetime]] = None,
        acquired_after: Optional[Union[str, datetime]] = None,
        acquired_before: Optional[Union[str, datetime]] = None,
        geometry: Optional[
            Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]
        ] = None,
        workspace_id: Optional[str] = None,
        collection_names: List[str] = None,
        producer_names: List[str] = None,
        tags: List[str] = None,
        sources: List[str] = None,
        search: str = None,
        custom_filter: dict = None,
        limit: Optional[int] = None,
        sortby: str = "createdAt",
        descending: bool = True,
        return_json: bool = False,
    ) -> Union[List[Asset], dict]:
        """
        Gets a list of assets in storage as [Asset](https://sdk.up42.com/structure/#functionality_1)
        objects or in JSON format.
        Args:
            created_after: Search for assets created after the specified timestamp,\
                in `"YYYY-MM-DD"` format.
            created_before: Search for assets created before the specified timestamp,\
                in `"YYYY-MM-DD"` format.
            acquired_after: Search for assets that contain data acquired after the specified timestamp,\
                in `"YYYY-MM-DD"` format.
            acquired_before: Search for assets that contain data acquired before the specified timestamp,\
            in `"YYYY-MM-DD"` format.
            geometry: Search for assets that contain STAC items intersecting the provided geometry,\
                in EPSG:4326 (WGS84) format.\
                For more information on STAC items,\
                see [Introduction to STAC](https://docs.up42.com/developers/api-assets/stac-about).
            workspace_id: Search by the workspace ID.
            collection_names: Search for assets from any of the provided geospatial collections.
            producer_names: Search for assets from any of the provided producers.
            tags: Search for assets with any of the provided tags.
            sources: Search for assets from any of the provided sources.\
                The allowed values: `"ARCHIVE"`, `"TASKING"`, `"ANALYTICS"`, `"USER"`.
            search: Search for assets that contain the provided search query in their name,\
                title, or order ID.
            custom_filter: CQL2 filters used to search for assets that contain STAC\
            items with specific property values.\
                For more information on filters,\
                    see \
                        [CQL2 Filtering](https://pystac-client.readthedocs.io/en/stable/tutorials/cql2-filter.html).\
                    For more information on STAC items,\
                        see [Introduction to STAC](https://docs.up42.com/developers/api-assets/stac-about).
            limit: The number of results on a results page.
            sortby: The property to sort by.
            descending: The sorting order: <ul><li>`true` — descending</li><li>`false` — ascending</li></ul>
            return_json: If `true`, returns a JSON dictionary. If `false`,\
                returns a list of [Asset](https://sdk.up42.com/structure/#functionality_1) objects.

        Returns:
            A list of Asset objects.
        """
        sort = f"{sortby},{'desc' if descending else 'asc'}"
        url = f"{self.auth._endpoint()}/v2/assets?sort={sort}"
        if created_before is not None:
            url += f"&createdBefore={format_time(created_before)}"
        if created_after is not None:
            url += f"&createdAfter={format_time(created_after)}"
        if workspace_id is not None:
            url += f"&workspaceId={workspace_id}"
        if collection_names is not None:
            url += f"&collectionNames={collection_names}"
        if producer_names is not None:
            url += f"&producerNames={producer_names}"
        if tags is not None:
            url += f"&tags={tags}"
        if sources is not None:
            url += f"&sources={','.join(sources)}"
        if search is not None:
            url += f"&search={search}"

        assets_json = self._query_paginated_endpoints(url=url, limit=limit)

        # Comparison of asset results with storage stac search results which can be related to the assets via asset-id
        if (
            acquired_before is not None
            or acquired_after is not None
            or geometry is not None
            or custom_filter is not None
        ):
            stac_features = self._search_stac(
                acquired_after=acquired_after,
                acquired_before=acquired_before,
                geometry=geometry,
                custom_filter=custom_filter,
            )
            stac_assets_ids = [
                feature["properties"]["up42-system:asset_id"]
                for feature in stac_features
            ]
            assets_json = [
                asset_json
                for asset_json in assets_json
                if asset_json["id"] in stac_assets_ids
            ]

        if workspace_id is not None:
            logger.info(
                f"Queried {len(assets_json)} assets for workspace {self.workspace_id}."
            )
        else:
            logger.info(
                f"Queried {len(assets_json)} assets from all workspaces in account."
            )

        if return_json:
            return assets_json  # type: ignore
        else:
            assets = [
                Asset(self.auth, asset_id=asset_json["id"], asset_info=asset_json)
                for asset_json in assets_json
            ]
            return assets

    def get_orders(
        self,
        return_json: bool = False,
        limit: Optional[int] = None,
        sortby: str = "createdAt",
        descending: bool = True,
    ) -> Union[List[Order], dict]:
        """
        Gets all orders in the workspace as Order objects or JSON.

        Args:
            return_json: If set to True, returns JSON object.
            limit: Optional, only return n first assets by sorting criteria and order.
                Optimal to select if your workspace contains many assets.
            sortby: The sorting criteria, one of "createdAt", "updatedAt", "status", "dataProvider", "type".
            descending: The sorting order, True for descending (default), False for ascending.

        Returns:
            Order objects in the workspace or alternatively JSON info of the orders.
        """
        allowed_sorting_criteria = [
            "createdAt",
            "updatedAt",
            "type",
            "status",
            "dataProvider",
        ]
        if sortby not in allowed_sorting_criteria:
            raise ValueError(
                f"sortby parameter must be one of {allowed_sorting_criteria}!"
            )
        sort = f"{sortby},{'desc' if descending else 'asc'}"
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/orders?format=paginated&sort={sort}"
        orders_json = self._query_paginated_endpoints(url=url, limit=limit)
        logger.info(f"Got {len(orders_json)} orders for workspace {self.workspace_id}.")

        if return_json:
            return orders_json  # type: ignore
        else:
            orders = [
                Order(self.auth, order_id=order_json["id"], order_info=order_json)
                for order_json in orders_json
            ]
            return orders

get_assets(created_after=None, created_before=None, acquired_after=None, acquired_before=None, geometry=None, workspace_id=None, collection_names=None, producer_names=None, tags=None, sources=None, search=None, custom_filter=None, limit=None, sortby='createdAt', descending=True, return_json=False)

Gets a list of assets in storage as Asset objects or in JSON format.

Parameters:

Name Type Description Default
created_after Optional[Union[str, datetime]]

Search for assets created after the specified timestamp, in "YYYY-MM-DD" format.

None
created_before Optional[Union[str, datetime]]

Search for assets created before the specified timestamp, in "YYYY-MM-DD" format.

None
acquired_after Optional[Union[str, datetime]]

Search for assets that contain data acquired after the specified timestamp, in "YYYY-MM-DD" format.

None
acquired_before Optional[Union[str, datetime]]

Search for assets that contain data acquired before the specified timestamp, in "YYYY-MM-DD" format.

None
geometry Optional[Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]]

Search for assets that contain STAC items intersecting the provided geometry, in EPSG:4326 (WGS84) format. For more information on STAC items, see Introduction to STAC.

None
workspace_id Optional[str]

Search by the workspace ID.

None
collection_names List[str]

Search for assets from any of the provided geospatial collections.

None
producer_names List[str]

Search for assets from any of the provided producers.

None
tags List[str]

Search for assets with any of the provided tags.

None
sources List[str]

Search for assets from any of the provided sources. The allowed values: "ARCHIVE", "TASKING", "ANALYTICS", "USER".

None
search str

Search for assets that contain the provided search query in their name, title, or order ID.

None
custom_filter dict

CQL2 filters used to search for assets that contain STAC items with specific property values. For more information on filters, see CQL2 Filtering. For more information on STAC items, see Introduction to STAC.

None
limit Optional[int]

The number of results on a results page.

None
sortby str

The property to sort by.

'createdAt'
descending bool

The sorting order:

  • true — descending
  • false — ascending

True
return_json bool

If true, returns a JSON dictionary. If false, returns a list of Asset objects.

False

Returns:

Type Description
Union[List[Asset], dict]

A list of Asset objects.

Source code in up42/storage.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def get_assets(
    self,
    created_after: Optional[Union[str, datetime]] = None,
    created_before: Optional[Union[str, datetime]] = None,
    acquired_after: Optional[Union[str, datetime]] = None,
    acquired_before: Optional[Union[str, datetime]] = None,
    geometry: Optional[
        Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]
    ] = None,
    workspace_id: Optional[str] = None,
    collection_names: List[str] = None,
    producer_names: List[str] = None,
    tags: List[str] = None,
    sources: List[str] = None,
    search: str = None,
    custom_filter: dict = None,
    limit: Optional[int] = None,
    sortby: str = "createdAt",
    descending: bool = True,
    return_json: bool = False,
) -> Union[List[Asset], dict]:
    """
    Gets a list of assets in storage as [Asset](https://sdk.up42.com/structure/#functionality_1)
    objects or in JSON format.
    Args:
        created_after: Search for assets created after the specified timestamp,\
            in `"YYYY-MM-DD"` format.
        created_before: Search for assets created before the specified timestamp,\
            in `"YYYY-MM-DD"` format.
        acquired_after: Search for assets that contain data acquired after the specified timestamp,\
            in `"YYYY-MM-DD"` format.
        acquired_before: Search for assets that contain data acquired before the specified timestamp,\
        in `"YYYY-MM-DD"` format.
        geometry: Search for assets that contain STAC items intersecting the provided geometry,\
            in EPSG:4326 (WGS84) format.\
            For more information on STAC items,\
            see [Introduction to STAC](https://docs.up42.com/developers/api-assets/stac-about).
        workspace_id: Search by the workspace ID.
        collection_names: Search for assets from any of the provided geospatial collections.
        producer_names: Search for assets from any of the provided producers.
        tags: Search for assets with any of the provided tags.
        sources: Search for assets from any of the provided sources.\
            The allowed values: `"ARCHIVE"`, `"TASKING"`, `"ANALYTICS"`, `"USER"`.
        search: Search for assets that contain the provided search query in their name,\
            title, or order ID.
        custom_filter: CQL2 filters used to search for assets that contain STAC\
        items with specific property values.\
            For more information on filters,\
                see \
                    [CQL2 Filtering](https://pystac-client.readthedocs.io/en/stable/tutorials/cql2-filter.html).\
                For more information on STAC items,\
                    see [Introduction to STAC](https://docs.up42.com/developers/api-assets/stac-about).
        limit: The number of results on a results page.
        sortby: The property to sort by.
        descending: The sorting order: <ul><li>`true` — descending</li><li>`false` — ascending</li></ul>
        return_json: If `true`, returns a JSON dictionary. If `false`,\
            returns a list of [Asset](https://sdk.up42.com/structure/#functionality_1) objects.

    Returns:
        A list of Asset objects.
    """
    sort = f"{sortby},{'desc' if descending else 'asc'}"
    url = f"{self.auth._endpoint()}/v2/assets?sort={sort}"
    if created_before is not None:
        url += f"&createdBefore={format_time(created_before)}"
    if created_after is not None:
        url += f"&createdAfter={format_time(created_after)}"
    if workspace_id is not None:
        url += f"&workspaceId={workspace_id}"
    if collection_names is not None:
        url += f"&collectionNames={collection_names}"
    if producer_names is not None:
        url += f"&producerNames={producer_names}"
    if tags is not None:
        url += f"&tags={tags}"
    if sources is not None:
        url += f"&sources={','.join(sources)}"
    if search is not None:
        url += f"&search={search}"

    assets_json = self._query_paginated_endpoints(url=url, limit=limit)

    # Comparison of asset results with storage stac search results which can be related to the assets via asset-id
    if (
        acquired_before is not None
        or acquired_after is not None
        or geometry is not None
        or custom_filter is not None
    ):
        stac_features = self._search_stac(
            acquired_after=acquired_after,
            acquired_before=acquired_before,
            geometry=geometry,
            custom_filter=custom_filter,
        )
        stac_assets_ids = [
            feature["properties"]["up42-system:asset_id"]
            for feature in stac_features
        ]
        assets_json = [
            asset_json
            for asset_json in assets_json
            if asset_json["id"] in stac_assets_ids
        ]

    if workspace_id is not None:
        logger.info(
            f"Queried {len(assets_json)} assets for workspace {self.workspace_id}."
        )
    else:
        logger.info(
            f"Queried {len(assets_json)} assets from all workspaces in account."
        )

    if return_json:
        return assets_json  # type: ignore
    else:
        assets = [
            Asset(self.auth, asset_id=asset_json["id"], asset_info=asset_json)
            for asset_json in assets_json
        ]
        return assets

get_orders(return_json=False, limit=None, sortby='createdAt', descending=True)

Gets all orders in the workspace as Order objects or JSON.

Parameters:

Name Type Description Default
return_json bool

If set to True, returns JSON object.

False
limit Optional[int]

Optional, only return n first assets by sorting criteria and order. Optimal to select if your workspace contains many assets.

None
sortby str

The sorting criteria, one of "createdAt", "updatedAt", "status", "dataProvider", "type".

'createdAt'
descending bool

The sorting order, True for descending (default), False for ascending.

True

Returns:

Type Description
Union[List[Order], dict]

Order objects in the workspace or alternatively JSON info of the orders.

Source code in up42/storage.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def get_orders(
    self,
    return_json: bool = False,
    limit: Optional[int] = None,
    sortby: str = "createdAt",
    descending: bool = True,
) -> Union[List[Order], dict]:
    """
    Gets all orders in the workspace as Order objects or JSON.

    Args:
        return_json: If set to True, returns JSON object.
        limit: Optional, only return n first assets by sorting criteria and order.
            Optimal to select if your workspace contains many assets.
        sortby: The sorting criteria, one of "createdAt", "updatedAt", "status", "dataProvider", "type".
        descending: The sorting order, True for descending (default), False for ascending.

    Returns:
        Order objects in the workspace or alternatively JSON info of the orders.
    """
    allowed_sorting_criteria = [
        "createdAt",
        "updatedAt",
        "type",
        "status",
        "dataProvider",
    ]
    if sortby not in allowed_sorting_criteria:
        raise ValueError(
            f"sortby parameter must be one of {allowed_sorting_criteria}!"
        )
    sort = f"{sortby},{'desc' if descending else 'asc'}"
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/orders?format=paginated&sort={sort}"
    orders_json = self._query_paginated_endpoints(url=url, limit=limit)
    logger.info(f"Got {len(orders_json)} orders for workspace {self.workspace_id}.")

    if return_json:
        return orders_json  # type: ignore
    else:
        orders = [
            Order(self.auth, order_id=order_json["id"], order_info=order_json)
            for order_json in orders_json
        ]
        return orders

pystac_client property

PySTAC client, a Python package for working with UP42 STAC API and accessing storage assets. For more information, see PySTAC Client Documentation.

Tasking

The Tasking class enables access to the UP42 tasking functionality.

Use tasking:

tasking = up42.initialize_tasking()

Source code in up42/tasking.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class Tasking(CatalogBase):
    """
    The Tasking class enables access to the UP42 tasking functionality.

    Use tasking:
    ```python
    tasking = up42.initialize_tasking()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.type = "TASKING"

    def construct_order_parameters(
        self,
        data_product_id: str,
        name: str,
        acquisition_start: Union[str, datetime],
        acquisition_end: Union[str, datetime],
        geometry: Union[
            FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon, Point
        ],
    ):
        """
        Helps constructing the parameters dictionary required for the tasking order. Each sensor has additional
        parameters that are added to the output dictionary with value None. The potential values for to select from
        are given in the logs, for more detail on the parameter use `tasking.get_data_product_schema()`.

        Args:
            data_product_id: Id of the desired UP42 data product, see `tasking.get_data_products`
            name: Name of the tasking order project.
            acquisition_start: Start date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"
            acquisition_end: End date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"
            geometry: Geometry of the area to be captured, default a Polygon. Allows Point feature for specific
                data products. One of FeatureCollection, Feature, dict (geojson geometry), list (bounds coordinates),
                GeoDataFrame, shapely.Polygon, shapely.Point. All assume EPSG 4326!

        Returns:
            The constructed order parameters dictionary.

        Example:
            ```python
            order_parameters = tasking.construct_order_parameters(
                data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
                name="My tasking order",
                acquisition_start=2022-11-01,
                acquisition_end=2022-12-01,
                geometry={'type': 'Polygon',
                   'coordinates': (((13.375966, 52.515068),
                     (13.375966, 52.516639),
                     (13.378314, 52.516639),
                     (13.378314, 52.515068),
                     (13.375966, 52.515068)),)}
                )
            ```
        """
        order_parameters = {
            "dataProduct": data_product_id,
            "params": {
                "displayName": name,
                "acquisitionStart": format_time(acquisition_start),
                "acquisitionEnd": format_time(acquisition_end, set_end_of_day=True),
            },
        }

        schema = self.get_data_product_schema(data_product_id)
        logger.info(
            "See `tasking.get_data_product_schema(data_product_id)` for more detail on the parameter options."
        )
        order_parameters = autocomplete_order_parameters(order_parameters, schema)

        geometry = any_vector_to_fc(vector=geometry)
        if geometry["features"][0]["geometry"]["type"] == "Point":
            # Tasking (e.g. Blacksky) can require Point geometry.
            order_parameters["params"]["geometry"] = geometry["features"][0]["geometry"]  # type: ignore
        else:
            geometry = fc_to_query_geometry(
                fc=geometry, geometry_operation="intersects"
            )
            order_parameters["params"]["geometry"] = geometry  # type: ignore

        return order_parameters

    def __repr__(self):
        return f"Tasking(auth={self.auth})"

construct_order_parameters(data_product_id, name, acquisition_start, acquisition_end, geometry)

Helps constructing the parameters dictionary required for the tasking order. Each sensor has additional parameters that are added to the output dictionary with value None. The potential values for to select from are given in the logs, for more detail on the parameter use tasking.get_data_product_schema().

Parameters:

Name Type Description Default
data_product_id str

Id of the desired UP42 data product, see tasking.get_data_products

required
name str

Name of the tasking order project.

required
acquisition_start Union[str, datetime]

Start date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"

required
acquisition_end Union[str, datetime]

End date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"

required
geometry Union[FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon, Point]

Geometry of the area to be captured, default a Polygon. Allows Point feature for specific data products. One of FeatureCollection, Feature, dict (geojson geometry), list (bounds coordinates), GeoDataFrame, shapely.Polygon, shapely.Point. All assume EPSG 4326!

required

Returns:

Type Description

The constructed order parameters dictionary.

Example
order_parameters = tasking.construct_order_parameters(
    data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
    name="My tasking order",
    acquisition_start=2022-11-01,
    acquisition_end=2022-12-01,
    geometry={'type': 'Polygon',
       'coordinates': (((13.375966, 52.515068),
         (13.375966, 52.516639),
         (13.378314, 52.516639),
         (13.378314, 52.515068),
         (13.375966, 52.515068)),)}
    )
Source code in up42/tasking.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def construct_order_parameters(
    self,
    data_product_id: str,
    name: str,
    acquisition_start: Union[str, datetime],
    acquisition_end: Union[str, datetime],
    geometry: Union[
        FeatureCollection, Feature, dict, list, GeoDataFrame, Polygon, Point
    ],
):
    """
    Helps constructing the parameters dictionary required for the tasking order. Each sensor has additional
    parameters that are added to the output dictionary with value None. The potential values for to select from
    are given in the logs, for more detail on the parameter use `tasking.get_data_product_schema()`.

    Args:
        data_product_id: Id of the desired UP42 data product, see `tasking.get_data_products`
        name: Name of the tasking order project.
        acquisition_start: Start date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"
        acquisition_end: End date of the acquisition period, datetime or isoformat string e.g. "2022-11-01"
        geometry: Geometry of the area to be captured, default a Polygon. Allows Point feature for specific
            data products. One of FeatureCollection, Feature, dict (geojson geometry), list (bounds coordinates),
            GeoDataFrame, shapely.Polygon, shapely.Point. All assume EPSG 4326!

    Returns:
        The constructed order parameters dictionary.

    Example:
        ```python
        order_parameters = tasking.construct_order_parameters(
            data_product_id='647780db-5a06-4b61-b525-577a8b68bb54',
            name="My tasking order",
            acquisition_start=2022-11-01,
            acquisition_end=2022-12-01,
            geometry={'type': 'Polygon',
               'coordinates': (((13.375966, 52.515068),
                 (13.375966, 52.516639),
                 (13.378314, 52.516639),
                 (13.378314, 52.515068),
                 (13.375966, 52.515068)),)}
            )
        ```
    """
    order_parameters = {
        "dataProduct": data_product_id,
        "params": {
            "displayName": name,
            "acquisitionStart": format_time(acquisition_start),
            "acquisitionEnd": format_time(acquisition_end, set_end_of_day=True),
        },
    }

    schema = self.get_data_product_schema(data_product_id)
    logger.info(
        "See `tasking.get_data_product_schema(data_product_id)` for more detail on the parameter options."
    )
    order_parameters = autocomplete_order_parameters(order_parameters, schema)

    geometry = any_vector_to_fc(vector=geometry)
    if geometry["features"][0]["geometry"]["type"] == "Point":
        # Tasking (e.g. Blacksky) can require Point geometry.
        order_parameters["params"]["geometry"] = geometry["features"][0]["geometry"]  # type: ignore
    else:
        geometry = fc_to_query_geometry(
            fc=geometry, geometry_operation="intersects"
        )
        order_parameters["params"]["geometry"] = geometry  # type: ignore

    return order_parameters

Webhook

Webhook

Webhook class to control a specific UP42 webhook, e.g. modify, test or delete the specific webhook.

webhook = webhook.trigger_test_event()
Source code in up42/webhooks.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Webhook:
    """
    # Webhook

    Webhook class to control a specific UP42 webhook, e.g. modify, test or delete the specific webhook.

    ```python
    webhook = webhook.trigger_test_event()
    ```
    """

    def __init__(self, auth: Auth, webhook_id: str, webhook_info: dict = None):
        self.auth = auth
        self.workspace_id = auth.workspace_id
        self.webhook_id = webhook_id
        if webhook_info is not None:
            self._info = webhook_info
        else:
            self._info = self.info

    def __repr__(self):
        return f"Webhook(name: {self._info['name']}, webhook_id: {self.webhook_id}, active: {self._info['active']}"

    @property
    def info(self) -> dict:
        """
        Gets and updates the webhook metadata information.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    def trigger_test_events(self) -> dict:
        """
        Triggers webhook test event to test your receiving side. The UP42 server will send test
        messages for each subscribed event to the specified webhook URL.

        Returns:
            A dict with information about the test events.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}/tests"
        response_json = self.auth._request(
            request_type="POST",
            url=url,
        )
        return response_json["data"]

    def update(
        self,
        name: Optional[str] = None,
        url: Optional[str] = None,
        events: Optional[List[str]] = None,
        active: Optional[bool] = None,
        secret: Optional[str] = None,
    ) -> "Webhook":
        """
        Updates a registered webhook.

        Args:
            name: Updated webhook name
            url: Updated unique URL where the webhook will send the message (HTTPS required)
            events: Updated list of event types [order.status, job.status].
            active: Updated webhook status.
            secret: Updated string that acts as signature to the https request sent to the url.

        Returns:
            The updated webhook object.
        """
        self.info  # _info could be outdated. #pylint: disable=pointless-statement
        input_parameters = {
            "name": name if name is not None else self._info["name"],
            "url": url if url is not None else self._info["url"],
            "events": events if events is not None else self._info["events"],
            "secret": secret if secret is not None else self._info["secret"],
            "active": active if active is not None else self._info["active"],
        }
        url_put = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        response_json = self.auth._request(
            request_type="PUT", url=url_put, data=input_parameters
        )
        self._info = response_json["data"]
        logger.info(f"Updated webhook {self}")
        return self

    def delete(self) -> None:
        """
        Deletes a registered webhook.
        """
        url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
        self.auth._request(request_type="DELETE", url=url)
        logger.info(f"Successfully deleted Webhook: {self.webhook_id}")

delete()

Deletes a registered webhook.

Source code in up42/webhooks.py
 94
 95
 96
 97
 98
 99
100
def delete(self) -> None:
    """
    Deletes a registered webhook.
    """
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
    self.auth._request(request_type="DELETE", url=url)
    logger.info(f"Successfully deleted Webhook: {self.webhook_id}")

info: dict property

Gets and updates the webhook metadata information.

trigger_test_events()

Triggers webhook test event to test your receiving side. The UP42 server will send test messages for each subscribed event to the specified webhook URL.

Returns:

Type Description
dict

A dict with information about the test events.

Source code in up42/webhooks.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def trigger_test_events(self) -> dict:
    """
    Triggers webhook test event to test your receiving side. The UP42 server will send test
    messages for each subscribed event to the specified webhook URL.

    Returns:
        A dict with information about the test events.
    """
    url = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}/tests"
    response_json = self.auth._request(
        request_type="POST",
        url=url,
    )
    return response_json["data"]

update(name=None, url=None, events=None, active=None, secret=None)

Updates a registered webhook.

Parameters:

Name Type Description Default
name Optional[str]

Updated webhook name

None
url Optional[str]

Updated unique URL where the webhook will send the message (HTTPS required)

None
events Optional[List[str]]

Updated list of event types [order.status, job.status].

None
active Optional[bool]

Updated webhook status.

None
secret Optional[str]

Updated string that acts as signature to the https request sent to the url.

None

Returns:

Type Description
Webhook

The updated webhook object.

Source code in up42/webhooks.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def update(
    self,
    name: Optional[str] = None,
    url: Optional[str] = None,
    events: Optional[List[str]] = None,
    active: Optional[bool] = None,
    secret: Optional[str] = None,
) -> "Webhook":
    """
    Updates a registered webhook.

    Args:
        name: Updated webhook name
        url: Updated unique URL where the webhook will send the message (HTTPS required)
        events: Updated list of event types [order.status, job.status].
        active: Updated webhook status.
        secret: Updated string that acts as signature to the https request sent to the url.

    Returns:
        The updated webhook object.
    """
    self.info  # _info could be outdated. #pylint: disable=pointless-statement
    input_parameters = {
        "name": name if name is not None else self._info["name"],
        "url": url if url is not None else self._info["url"],
        "events": events if events is not None else self._info["events"],
        "secret": secret if secret is not None else self._info["secret"],
        "active": active if active is not None else self._info["active"],
    }
    url_put = f"{self.auth._endpoint()}/workspaces/{self.workspace_id}/webhooks/{self.webhook_id}"
    response_json = self.auth._request(
        request_type="PUT", url=url_put, data=input_parameters
    )
    self._info = response_json["data"]
    logger.info(f"Updated webhook {self}")
    return self

Workflow

The Workflow class lets you configure & run jobs and query existing jobs related to this workflow.

Create a new workflow:

workflow = project.create_workflow(name="new_workflow")

Use an existing workflow:

workflow = up42.initialize_workflow(workflow_id="7fb2ec8a-45be-41ad-a50f-98ba6b528b98")

Source code in up42/workflow.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583