Skip to content

Catalog

The Catalog class enables access to the UP42 catalog search. You can search for satellite image scenes (for different sensors and criteria like cloud cover), plot the scene coverage and download and plot the scene quicklooks.

Use the catalog:

catalog = up42.initialize_catalog()

Source code in up42/catalog.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
class Catalog(VizTools):
    """
    The Catalog class enables access to the UP42 catalog search. You can search
    for satellite image scenes (for different sensors and criteria like cloud cover),
    plot the scene coverage and download and plot the scene quicklooks.

    Use the catalog:
    ```python
    catalog = up42.initialize_catalog()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.quicklooks = None

    def __repr__(self):
        return f"Catalog(auth={self.auth})"

    def get_collections(self) -> Union[Dict, List]:
        """
        Get the available data collections.
        """
        url = f"{self.auth._endpoint()}/collections"
        json_response = self.auth._request("GET", url)
        return json_response["data"]

    # pylint: disable=dangerous-default-value
    @staticmethod
    def construct_parameters(
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        collections: List[str],
        start_date: str = "2020-01-01",
        end_date: str = "2020-01-30",
        usage_type: List[str] = ["DATA", "ANALYTICS"],
        limit: int = 10,
        max_cloudcover: float = 100,
        sortby: str = "acquisitionDate",
        ascending: bool = True,
    ) -> dict:
        """
        Follows STAC principles and property names.

        Args:
            geometry: The search geometry, one of dict, Feature, FeatureCollection,
                list, GeoDataFrame, Polygon.
            collections: The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"].
                Also see catalog.get_collections().
            start_date: Query period starting day, format "2020-01-01".
            end_date: Query period ending day, format "2020-01-01".
            usage_type: Filter for imagery that can just be purchased & downloaded or also
                processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
                or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
                (can be any combination). The filter is inclusive, using ["DATA"] can
                also result in results with ["DATA", "ANALYTICS"].
            limit: The maximum number of search results to return (1-max.500).
            max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
                8.4 will return all scenes with 8.4 or less cloudcover.
                Ignored for collections that have no cloudcover (e.g. sentinel1).
            sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
                "acquisitionIdentifier", "incidenceAngle", "snowCover".
            ascending: Ascending sort order by default, descending if False.

        Returns:
            The constructed parameters dictionary.
        """
        time_period = format_time_period(start_date=start_date, end_date=end_date)
        aoi_fc = any_vector_to_fc(
            vector=geometry,
        )
        aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
        sort_order = "asc" if ascending else "desc"

        query_filters: Dict[Any, Any] = {}
        if not "Sentinel-1" in collections:
            query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

        if usage_type == ["DATA"]:
            query_filters["up42:usageType"] = {"in": ["DATA"]}
        elif usage_type == ["ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
        elif usage_type == ["DATA", "ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
        else:
            raise ValueError("Select correct `usage_type`")

        search_parameters = {
            "datetime": time_period,
            "intersects": aoi_geometry,
            "limit": limit,
            "collections": collections,
            "query": query_filters,
            "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
        }

        return search_parameters

    def search(
        self, search_parameters: dict, as_dataframe: bool = True
    ) -> Union[GeoDataFrame, dict]:
        """
        Searches the catalog for the the search parameters and returns the metadata of
        the matching scenes.

        Args:
            search_parameters: The catalog search parameters, see example.
            as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

        Returns:
            The search results as a GeoDataFrame, optionally as json dict.

        Example:
            ```python
                search_parameters={
                    "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                    "collections": ["phr"],
                    "intersects": {
                        "type": "Polygon",
                        "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                        [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                        52.73971768]]]},
                    "limit": 10,
                    "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                    }
            ```
        """
        logger.info(f"Searching catalog with search_parameters: {search_parameters}")

        # The API request would fail with a limit above 500, thus 500 is forced in the initial
        # request but additional results are handled below via pagination.
        max_limit = search_parameters["limit"]
        if max_limit > 500:
            search_parameters = dict(search_parameters)
            search_parameters["limit"] = 500

        # UP42 API can query multiple collections of the same host at once.
        collections = self.get_collections()
        hosts = [
            c["hostName"]
            for c in collections
            if c["name"] in search_parameters["collections"]
        ]
        if not hosts:
            raise ValueError(
                f"Selected collections {search_parameters['collections']} are not valid. See "
                f"catalog.get_collections."
            )
        if len(set(hosts)) > 1:
            raise ValueError(
                "Only collections with the same host can be searched at the same time. Please adjust the "
                "collections in the search_parameters!"
            )
        host = hosts[0]

        url = f"{self.auth._endpoint()}/catalog/hosts/{host}/stac/search"
        response_json: dict = self.auth._request("POST", url, search_parameters)
        features = response_json["features"]

        # Search results with more than 500 items are given as 50-per-page additional pages.
        while len(features) < max_limit:
            pagination_exhausted = len(response_json["links"]) == 1
            if pagination_exhausted:
                break
            next_page_url = response_json["links"][1]["href"]
            response_json = self.auth._request("POST", next_page_url, search_parameters)
            features += response_json["features"]

        features = features[:max_limit]
        df = GeoDataFrame.from_features(
            FeatureCollection(features=features), crs="EPSG:4326"
        )

        logger.info(f"{df.shape[0]} results returned.")
        if as_dataframe:
            return df
        else:
            return df.__geo_interface__

    def download_quicklooks(
        self,
        image_ids: List[str],
        sensor: str,
        output_directory: Union[str, Path, None] = None,
    ) -> List[str]:
        """
        Gets the quicklooks of scenes from a single sensor. After download, can
        be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

        Args:
            image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
                Access the search results id column via `list(search_results.id)`.
            sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
                "sentinel1", "sentinel2", "sentinel3", "sentinel5p".
            output_directory: The file output directory, defaults to the current working
                directory.

        Returns:
            List of quicklook image output file paths.
        """
        supported_sensors = {
            "pleiades": "oneatlas",
            "spot": "oneatlas",
            "sentinel1": "sobloo-image",
            "sentinel2": "sobloo-image",
            "sentinel3": "sobloo-image",
            "sentinel5p": "sobloo-image",
        }

        if sensor not in list(supported_sensors.keys()):
            raise ValueError(
                f"Currently only these sensors are supported: "
                f"{list(supported_sensors.keys())}"
            )
        provider = supported_sensors[sensor]
        logger.info(
            f"Getting quicklooks from provider {provider} for image_ids: "
            f"{image_ids}"
        )

        if output_directory is None:
            output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        if isinstance(image_ids, str):
            image_ids = [image_ids]

        out_paths: List[str] = []
        for image_id in tqdm(image_ids):
            try:
                url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"

                response = self.auth._request(
                    request_type="GET", url=url, return_text=False
                )
                out_path = output_directory / f"quicklook_{image_id}.jpg"
                out_paths.append(str(out_path))
                with open(out_path, "wb") as dst:
                    for chunk in response:
                        dst.write(chunk)
            except ValueError:
                logger.warning(
                    f"Image with id {image_id} does not have quicklook available. Skipping ..."
                )

        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

    @staticmethod
    def _order_payload(
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
    ) -> Tuple[str, dict]:
        """
        Helper that constructs necessary parameters for `Order.place` and `Order.estimate`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.

        Returns:
            str, dict: A tuple including a provider name and order parameters.
        """
        if not isinstance(scene, Series):
            raise ValueError(
                "`scene` parameter must be a GeoSeries, or a single item/row of a GeoDataFrame. "
                "For instance, search_results.loc[0] returns a GeoSeries."
            )
        aoi_fc = any_vector_to_fc(
            vector=geometry,
        )
        aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
        data_provider_name = scene.providerName
        order_params = {"id": scene.id, "aoi": aoi_geometry}
        return data_provider_name, order_params

    def estimate_order(
        self,
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
    ) -> int:
        """
        Estimate the cost of an order from an item/row in a result of `Catalog.search`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.

        Returns:
            int: An estimated cost for the order in UP42 credits.
        """
        data_provider_name, order_params = self._order_payload(geometry, scene)
        return Order.estimate(self.auth, data_provider_name, order_params)

    def place_order(
        self,
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
        track_status: bool = False,
        report_time: int = 120,
    ) -> "Order":
        """
        Place an order from an item/row in a result of `Catalog.search`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.
            track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
            report_time (int): The interval (in seconds) to query the order status if `track_status` is True.

         Warning:
            When placing orders of items that are in archive or cold storage,
            the order fulfillment can happen up to **24h after order placement**.
            In such cases, please make sure to set an appropriate `report_time`.
            You can also use `Order.track_status` on the returned object to track the status later.

        Returns:
            Order: The placed order.
        """
        data_provider_name, order_params = self._order_payload(geometry, scene)
        order = Order.place(self.auth, data_provider_name, order_params)
        if track_status:
            order.track_status(report_time)
        return order

Functions

construct_parameters(geometry, collections, start_date='2020-01-01', end_date='2020-01-30', usage_type=['DATA', 'ANALYTICS'], limit=10, max_cloudcover=100, sortby='acquisitionDate', ascending=True) staticmethod

Follows STAC principles and property names.

Parameters:

Name Type Description Default
geometry Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]

The search geometry, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
collections List[str]

The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"]. Also see catalog.get_collections().

required
start_date str

Query period starting day, format "2020-01-01".

'2020-01-01'
end_date str

Query period ending day, format "2020-01-01".

'2020-01-30'
usage_type List[str]

Filter for imagery that can just be purchased & downloaded or also processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded or used directly with a processing algorithm), ["DATA", "ANALYTICS"] (can be any combination). The filter is inclusive, using ["DATA"] can also result in results with ["DATA", "ANALYTICS"].

['DATA', 'ANALYTICS']
limit int

The maximum number of search results to return (1-max.500).

10
max_cloudcover float

Maximum cloudcover % - e.g. 100 will return all scenes, 8.4 will return all scenes with 8.4 or less cloudcover. Ignored for collections that have no cloudcover (e.g. sentinel1).

100
sortby str

The property to sort by, "cloudCoverage", "acquisitionDate", "acquisitionIdentifier", "incidenceAngle", "snowCover".

'acquisitionDate'
ascending bool

Ascending sort order by default, descending if False.

True

Returns:

Type Description
dict

The constructed parameters dictionary.

Source code in up42/catalog.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@staticmethod
def construct_parameters(
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    collections: List[str],
    start_date: str = "2020-01-01",
    end_date: str = "2020-01-30",
    usage_type: List[str] = ["DATA", "ANALYTICS"],
    limit: int = 10,
    max_cloudcover: float = 100,
    sortby: str = "acquisitionDate",
    ascending: bool = True,
) -> dict:
    """
    Follows STAC principles and property names.

    Args:
        geometry: The search geometry, one of dict, Feature, FeatureCollection,
            list, GeoDataFrame, Polygon.
        collections: The satellite sensor collections to search for, e.g. ["phr"] or ["phr", "spot"].
            Also see catalog.get_collections().
        start_date: Query period starting day, format "2020-01-01".
        end_date: Query period ending day, format "2020-01-01".
        usage_type: Filter for imagery that can just be purchased & downloaded or also
            processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
            or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
            (can be any combination). The filter is inclusive, using ["DATA"] can
            also result in results with ["DATA", "ANALYTICS"].
        limit: The maximum number of search results to return (1-max.500).
        max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
            8.4 will return all scenes with 8.4 or less cloudcover.
            Ignored for collections that have no cloudcover (e.g. sentinel1).
        sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
            "acquisitionIdentifier", "incidenceAngle", "snowCover".
        ascending: Ascending sort order by default, descending if False.

    Returns:
        The constructed parameters dictionary.
    """
    time_period = format_time_period(start_date=start_date, end_date=end_date)
    aoi_fc = any_vector_to_fc(
        vector=geometry,
    )
    aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
    sort_order = "asc" if ascending else "desc"

    query_filters: Dict[Any, Any] = {}
    if not "Sentinel-1" in collections:
        query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

    if usage_type == ["DATA"]:
        query_filters["up42:usageType"] = {"in": ["DATA"]}
    elif usage_type == ["ANALYTICS"]:
        query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
    elif usage_type == ["DATA", "ANALYTICS"]:
        query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
    else:
        raise ValueError("Select correct `usage_type`")

    search_parameters = {
        "datetime": time_period,
        "intersects": aoi_geometry,
        "limit": limit,
        "collections": collections,
        "query": query_filters,
        "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
    }

    return search_parameters

download_quicklooks(image_ids, sensor, output_directory=None)

Gets the quicklooks of scenes from a single sensor. After download, can be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

Parameters:

Name Type Description Default
image_ids List[str]

List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"]. Access the search results id column via list(search_results.id).

required
sensor str

The satellite sensor of the image_ids, one of "pleiades", "spot", "sentinel1", "sentinel2", "sentinel3", "sentinel5p".

required
output_directory Union[str, Path, None]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

List of quicklook image output file paths.

Source code in up42/catalog.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def download_quicklooks(
    self,
    image_ids: List[str],
    sensor: str,
    output_directory: Union[str, Path, None] = None,
) -> List[str]:
    """
    Gets the quicklooks of scenes from a single sensor. After download, can
    be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

    Args:
        image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
            Access the search results id column via `list(search_results.id)`.
        sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
            "sentinel1", "sentinel2", "sentinel3", "sentinel5p".
        output_directory: The file output directory, defaults to the current working
            directory.

    Returns:
        List of quicklook image output file paths.
    """
    supported_sensors = {
        "pleiades": "oneatlas",
        "spot": "oneatlas",
        "sentinel1": "sobloo-image",
        "sentinel2": "sobloo-image",
        "sentinel3": "sobloo-image",
        "sentinel5p": "sobloo-image",
    }

    if sensor not in list(supported_sensors.keys()):
        raise ValueError(
            f"Currently only these sensors are supported: "
            f"{list(supported_sensors.keys())}"
        )
    provider = supported_sensors[sensor]
    logger.info(
        f"Getting quicklooks from provider {provider} for image_ids: "
        f"{image_ids}"
    )

    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    if isinstance(image_ids, str):
        image_ids = [image_ids]

    out_paths: List[str] = []
    for image_id in tqdm(image_ids):
        try:
            url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"

            response = self.auth._request(
                request_type="GET", url=url, return_text=False
            )
            out_path = output_directory / f"quicklook_{image_id}.jpg"
            out_paths.append(str(out_path))
            with open(out_path, "wb") as dst:
                for chunk in response:
                    dst.write(chunk)
        except ValueError:
            logger.warning(
                f"Image with id {image_id} does not have quicklook available. Skipping ..."
            )

    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

estimate_order(geometry, scene)

Estimate the cost of an order from an item/row in a result of Catalog.search.

Parameters:

Name Type Description Default
geometry Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]

The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
scene Series

A geopandas series with a single item/row of the result of Catalog.search. For instance, search_results.loc[0] for the first scene of a catalog search result.

required

Returns:

Name Type Description
int int

An estimated cost for the order in UP42 credits.

Source code in up42/catalog.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def estimate_order(
    self,
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    scene: Series,
) -> int:
    """
    Estimate the cost of an order from an item/row in a result of `Catalog.search`.

    Args:
        geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
            GeoDataFrame, Polygon.
        scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
            search_results.loc[0] for the first scene of a catalog search result.

    Returns:
        int: An estimated cost for the order in UP42 credits.
    """
    data_provider_name, order_params = self._order_payload(geometry, scene)
    return Order.estimate(self.auth, data_provider_name, order_params)

get_collections()

Get the available data collections.

Source code in up42/catalog.py
47
48
49
50
51
52
53
def get_collections(self) -> Union[Dict, List]:
    """
    Get the available data collections.
    """
    url = f"{self.auth._endpoint()}/collections"
    json_response = self.auth._request("GET", url)
    return json_response["data"]

place_order(geometry, scene, track_status=False, report_time=120)

Place an order from an item/row in a result of Catalog.search.

Parameters:

Name Type Description Default
geometry Union[dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon]

The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
scene Series

A geopandas series with a single item/row of the result of Catalog.search. For instance, search_results.loc[0] for the first scene of a catalog search result.

required
track_status bool

If set to True, will only return the Order once it is FULFILLED or FAILED.

False
report_time int

The interval (in seconds) to query the order status if track_status is True.

120

Returns:

Name Type Description
Order Order

The placed order.

Source code in up42/catalog.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def place_order(
    self,
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    scene: Series,
    track_status: bool = False,
    report_time: int = 120,
) -> "Order":
    """
    Place an order from an item/row in a result of `Catalog.search`.

    Args:
        geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
            GeoDataFrame, Polygon.
        scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
            search_results.loc[0] for the first scene of a catalog search result.
        track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
        report_time (int): The interval (in seconds) to query the order status if `track_status` is True.

     Warning:
        When placing orders of items that are in archive or cold storage,
        the order fulfillment can happen up to **24h after order placement**.
        In such cases, please make sure to set an appropriate `report_time`.
        You can also use `Order.track_status` on the returned object to track the status later.

    Returns:
        Order: The placed order.
    """
    data_provider_name, order_params = self._order_payload(geometry, scene)
    order = Order.place(self.auth, data_provider_name, order_params)
    if track_status:
        order.track_status(report_time)
    return order

search(search_parameters, as_dataframe=True)

Searches the catalog for the the search parameters and returns the metadata of the matching scenes.

Parameters:

Name Type Description Default
search_parameters dict

The catalog search parameters, see example.

required
as_dataframe bool

return type, GeoDataFrame if True (default), FeatureCollection if False.

True

Returns:

Type Description
Union[GeoDataFrame, dict]

The search results as a GeoDataFrame, optionally as json dict.

Example
    search_parameters={
        "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
        "collections": ["phr"],
        "intersects": {
            "type": "Polygon",
            "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
            [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
            52.73971768]]]},
        "limit": 10,
        "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
        }
Source code in up42/catalog.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def search(
    self, search_parameters: dict, as_dataframe: bool = True
) -> Union[GeoDataFrame, dict]:
    """
    Searches the catalog for the the search parameters and returns the metadata of
    the matching scenes.

    Args:
        search_parameters: The catalog search parameters, see example.
        as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

    Returns:
        The search results as a GeoDataFrame, optionally as json dict.

    Example:
        ```python
            search_parameters={
                "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                "collections": ["phr"],
                "intersects": {
                    "type": "Polygon",
                    "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                    [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                    52.73971768]]]},
                "limit": 10,
                "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                }
        ```
    """
    logger.info(f"Searching catalog with search_parameters: {search_parameters}")

    # The API request would fail with a limit above 500, thus 500 is forced in the initial
    # request but additional results are handled below via pagination.
    max_limit = search_parameters["limit"]
    if max_limit > 500:
        search_parameters = dict(search_parameters)
        search_parameters["limit"] = 500

    # UP42 API can query multiple collections of the same host at once.
    collections = self.get_collections()
    hosts = [
        c["hostName"]
        for c in collections
        if c["name"] in search_parameters["collections"]
    ]
    if not hosts:
        raise ValueError(
            f"Selected collections {search_parameters['collections']} are not valid. See "
            f"catalog.get_collections."
        )
    if len(set(hosts)) > 1:
        raise ValueError(
            "Only collections with the same host can be searched at the same time. Please adjust the "
            "collections in the search_parameters!"
        )
    host = hosts[0]

    url = f"{self.auth._endpoint()}/catalog/hosts/{host}/stac/search"
    response_json: dict = self.auth._request("POST", url, search_parameters)
    features = response_json["features"]

    # Search results with more than 500 items are given as 50-per-page additional pages.
    while len(features) < max_limit:
        pagination_exhausted = len(response_json["links"]) == 1
        if pagination_exhausted:
            break
        next_page_url = response_json["links"][1]["href"]
        response_json = self.auth._request("POST", next_page_url, search_parameters)
        features += response_json["features"]

    features = features[:max_limit]
    df = GeoDataFrame.from_features(
        FeatureCollection(features=features), crs="EPSG:4326"
    )

    logger.info(f"{df.shape[0]} results returned.")
    if as_dataframe:
        return df
    else:
        return df.__geo_interface__