Skip to content

Catalog

The Catalog class enables access to the UP42 catalog search. You can search for satellite image scenes (for different sensors and criteria like cloud cover), plot the scene coverage and download and plot the scene quicklooks.

Use the catalog:

catalog = up42.initialize_catalog()

Source code in up42/catalog.py
class Catalog(VizTools):
    """
    The Catalog class enables access to the UP42 catalog search. You can search
    for satellite image scenes (for different sensors and criteria like cloud cover),
    plot the scene coverage and download and plot the scene quicklooks.

    Use the catalog:
    ```python
    catalog = up42.initialize_catalog()
    ```
    """

    def __init__(self, auth: Auth):
        self.auth = auth
        self.quicklooks = None

    def __repr__(self):
        return f"Catalog(auth={self.auth})"

    def get_collections(self) -> Union[Dict, List]:
        """
        Get the available data collections.
        """
        url = f"{self.auth._endpoint()}/collections"
        json_response = self.auth._request("GET", url)
        return json_response["data"]

    # pylint: disable=dangerous-default-value
    @staticmethod
    def construct_parameters(
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        collections: List[str],
        start_date: str = "2020-01-01",
        end_date: str = "2020-01-30",
        usage_type: List[str] = ["DATA", "ANALYTICS"],
        limit: int = 10,
        max_cloudcover: float = 100,
        sortby: str = "acquisitionDate",
        ascending: bool = True,
    ) -> dict:
        """
        Follows STAC principles and property names.

        Args:
            geometry: The search geometry, one of dict, Feature, FeatureCollection,
                list, GeoDataFrame, Polygon.
            collections: The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"].
                Also see catalog.get_collections().
            start_date: Query period starting day, format "2020-01-01".
            end_date: Query period ending day, format "2020-01-01".
            usage_type: Filter for imagery that can just be purchased & downloaded or also
                processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
                or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
                (can be any combination). The filter is inclusive, using ["DATA"] can
                also result in results with ["DATA", "ANALYTICS"].
            limit: The maximum number of search results to return (1-max.500).
            max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
                8.4 will return all scenes with 8.4 or less cloudcover.
                Ignored for collections that have no cloudcover (e.g. sentinel1).
            sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
                "acquisitionIdentifier", "incidenceAngle", "snowCover".
            ascending: Ascending sort order by default, descending if False.

        Returns:
            The constructed parameters dictionary.
        """
        time_period = format_time_period(start_date=start_date, end_date=end_date)
        aoi_fc = any_vector_to_fc(
            vector=geometry,
        )
        aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
        sort_order = "asc" if ascending else "desc"

        query_filters: Dict[Any, Any] = {}
        if not "Sentinel-1" in collections:
            query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

        if usage_type == ["DATA"]:
            query_filters["up42:usageType"] = {"in": ["DATA"]}
        elif usage_type == ["ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
        elif usage_type == ["DATA", "ANALYTICS"]:
            query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
        else:
            raise ValueError("Select correct `usage_type`")

        search_parameters = {
            "datetime": time_period,
            "intersects": aoi_geometry,
            "limit": limit,
            "collections": collections,
            "query": query_filters,
            "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
        }

        return search_parameters

    def search(
        self, search_parameters: dict, as_dataframe: bool = True
    ) -> Union[GeoDataFrame, dict]:
        """
        Searches the catalog for the the search parameters and returns the metadata of
        the matching scenes.

        Args:
            search_parameters: The catalog search parameters, see example.
            as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

        Returns:
            The search results as a GeoDataFrame, optionally as json dict.

        Example:
            ```python
                search_parameters={
                    "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                    "intersects": {
                        "type": "Polygon",
                        "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                        [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                        52.73971768]]]},
                    "limit": 10,
                    "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                    }
            ```
        """
        logger.info(f"Searching catalog with search_parameters: {search_parameters}")

        # The API request would fail with a limit above 500, thus 500 is forced in the initial
        # request but additional results are handled below via pagination.
        max_limit = search_parameters["limit"]
        if max_limit > 500:
            search_parameters = dict(search_parameters)
            search_parameters["limit"] = 500

        url = f"{self.auth._endpoint()}/catalog/stac/search"
        response_json: dict = self.auth._request("POST", url, search_parameters)
        features = response_json["features"]

        # A request with no results will still include a (non-exhausted) pagination token.
        # Only the first pagination token request will then indicate search exhausted.

        # Search results with more than 500 items are given as 50-per-page additional pages.
        while len(features) < max_limit:
            page_url = response_json["links"][0]["href"]
            next_page_url = response_json["links"][1]["href"]
            pagination_exhausted = next_page_url == page_url
            if pagination_exhausted:
                break
            response_json = self.auth._request("POST", next_page_url, search_parameters)
            features += response_json["features"]

        features = features[:max_limit]
        df = GeoDataFrame.from_features(
            FeatureCollection(features=features), crs="EPSG:4326"
        )

        logger.info(f"{df.shape[0]} results returned.")
        if as_dataframe:
            return df
        else:
            return df.__geo_interface__

    def download_quicklooks(
        self,
        image_ids: List[str],
        sensor: str,
        output_directory: Union[str, Path, None] = None,
    ) -> List[str]:
        """
        Gets the quicklooks of scenes from a single sensor. After download, can
        be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

        Args:
            image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
                Access the search results id column via `list(search_results.id)`.
            sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
                "sentinel1", "sentinel2", "sentinel3", "sentinel5p".
            output_directory: The file output directory, defaults to the current working
                directory.

        Returns:
            List of quicklook image output file paths.
        """
        supported_sensors = {
            "pleiades": "oneatlas",
            "spot": "oneatlas",
            "sentinel1": "sobloo-image",
            "sentinel2": "sobloo-image",
            "sentinel3": "sobloo-image",
            "sentinel5p": "sobloo-image",
        }

        if sensor not in list(supported_sensors.keys()):
            raise ValueError(
                f"Currently only these sensors are supported: "
                f"{list(supported_sensors.keys())}"
            )
        provider = supported_sensors[sensor]
        logger.info(
            f"Getting quicklooks from provider {provider} for image_ids: "
            f"{image_ids}"
        )

        if output_directory is None:
            output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
        else:
            output_directory = Path(output_directory)
        output_directory.mkdir(parents=True, exist_ok=True)
        logger.info(f"Download directory: {str(output_directory)}")

        if isinstance(image_ids, str):
            image_ids = [image_ids]

        out_paths: List[str] = []
        for image_id in tqdm(image_ids):
            try:
                url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"

                response = self.auth._request(
                    request_type="GET", url=url, return_text=False
                )
                out_path = output_directory / f"quicklook_{image_id}.jpg"
                out_paths.append(str(out_path))
                with open(out_path, "wb") as dst:
                    for chunk in response:
                        dst.write(chunk)
            except ValueError:
                logger.warning(
                    f"Image with id {image_id} does not have quicklook available. Skipping ..."
                )

        self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
        return out_paths

    @staticmethod
    def _order_payload(
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
    ) -> Tuple[str, dict]:
        """
        Helper that constructs necessary parameters for `Order.place` and `Order.estimate`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.

        Returns:
            str, dict: A tuple including a provider name and order parameters.
        """
        if not isinstance(scene, Series):
            raise ValueError(
                "`scene` parameter must be a GeoSeries, or a single item/row of a GeoDataFrame. "
                "For instance, search_results.loc[0] returns a GeoSeries."
            )
        aoi_fc = any_vector_to_fc(
            vector=geometry,
        )
        aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
        data_provider_name = scene.providerName
        order_params = {"id": scene.id, "aoi": aoi_geometry}
        return data_provider_name, order_params

    def estimate_order(
        self,
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
    ) -> int:
        """
        Estimate the cost of an order from an item/row in a result of `Catalog.search`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.

        Returns:
            int: An estimated cost for the order in UP42 credits.
        """
        data_provider_name, order_params = self._order_payload(geometry, scene)
        return Order.estimate(self.auth, data_provider_name, order_params)

    def place_order(
        self,
        geometry: Union[
            dict,
            Feature,
            FeatureCollection,
            list,
            GeoDataFrame,
            Polygon,
        ],
        scene: Series,
        track_status: bool = False,
        report_time: int = 120,
    ) -> "Order":
        """
        Place an order from an item/row in a result of `Catalog.search`.

        Args:
            geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
                GeoDataFrame, Polygon.
            scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
                search_results.loc[0] for the first scene of a catalog search result.
            track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
            report_time (int): The interval (in seconds) to query the order status if `track_status` is True.

         Warning:
            When placing orders of items that are in archive or cold storage,
            the order fulfillment can happen up to **24h after order placement**.
            In such cases, please make sure to set an appropriate `report_time`.
            You can also use `Order.track_status` on the returned object to track the status later.

        Returns:
            Order: The placed order.
        """
        data_provider_name, order_params = self._order_payload(geometry, scene)
        order = Order.place(self.auth, data_provider_name, order_params)
        if track_status:
            order.track_status(report_time)
        return order

Methods

construct_parameters(geometry, collections, start_date='2020-01-01', end_date='2020-01-30', usage_type=['DATA', 'ANALYTICS'], limit=10, max_cloudcover=100, sortby='acquisitionDate', ascending=True) staticmethod

Follows STAC principles and property names.

Parameters:

Name Type Description Default
geometry Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon]

The search geometry, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
collections List[str]

The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"]. Also see catalog.get_collections().

required
start_date str

Query period starting day, format "2020-01-01".

'2020-01-01'
end_date str

Query period ending day, format "2020-01-01".

'2020-01-30'
usage_type List[str]

Filter for imagery that can just be purchased & downloaded or also processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded or used directly with a processing algorithm), ["DATA", "ANALYTICS"] (can be any combination). The filter is inclusive, using ["DATA"] can also result in results with ["DATA", "ANALYTICS"].

['DATA', 'ANALYTICS']
limit int

The maximum number of search results to return (1-max.500).

10
max_cloudcover float

Maximum cloudcover % - e.g. 100 will return all scenes, 8.4 will return all scenes with 8.4 or less cloudcover. Ignored for collections that have no cloudcover (e.g. sentinel1).

100
sortby str

The property to sort by, "cloudCoverage", "acquisitionDate", "acquisitionIdentifier", "incidenceAngle", "snowCover".

'acquisitionDate'
ascending bool

Ascending sort order by default, descending if False.

True

Returns:

Type Description
dict

The constructed parameters dictionary.

Source code in up42/catalog.py
@staticmethod
def construct_parameters(
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    collections: List[str],
    start_date: str = "2020-01-01",
    end_date: str = "2020-01-30",
    usage_type: List[str] = ["DATA", "ANALYTICS"],
    limit: int = 10,
    max_cloudcover: float = 100,
    sortby: str = "acquisitionDate",
    ascending: bool = True,
) -> dict:
    """
    Follows STAC principles and property names.

    Args:
        geometry: The search geometry, one of dict, Feature, FeatureCollection,
            list, GeoDataFrame, Polygon.
        collections: The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"].
            Also see catalog.get_collections().
        start_date: Query period starting day, format "2020-01-01".
        end_date: Query period ending day, format "2020-01-01".
        usage_type: Filter for imagery that can just be purchased & downloaded or also
            processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
            or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
            (can be any combination). The filter is inclusive, using ["DATA"] can
            also result in results with ["DATA", "ANALYTICS"].
        limit: The maximum number of search results to return (1-max.500).
        max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
            8.4 will return all scenes with 8.4 or less cloudcover.
            Ignored for collections that have no cloudcover (e.g. sentinel1).
        sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
            "acquisitionIdentifier", "incidenceAngle", "snowCover".
        ascending: Ascending sort order by default, descending if False.

    Returns:
        The constructed parameters dictionary.
    """
    time_period = format_time_period(start_date=start_date, end_date=end_date)
    aoi_fc = any_vector_to_fc(
        vector=geometry,
    )
    aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
    sort_order = "asc" if ascending else "desc"

    query_filters: Dict[Any, Any] = {}
    if not "Sentinel-1" in collections:
        query_filters["cloudCoverage"] = {"lte": max_cloudcover}  # type: ignore

    if usage_type == ["DATA"]:
        query_filters["up42:usageType"] = {"in": ["DATA"]}
    elif usage_type == ["ANALYTICS"]:
        query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
    elif usage_type == ["DATA", "ANALYTICS"]:
        query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
    else:
        raise ValueError("Select correct `usage_type`")

    search_parameters = {
        "datetime": time_period,
        "intersects": aoi_geometry,
        "limit": limit,
        "collections": collections,
        "query": query_filters,
        "sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
    }

    return search_parameters

download_quicklooks(self, image_ids, sensor, output_directory=None)

Gets the quicklooks of scenes from a single sensor. After download, can be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

Parameters:

Name Type Description Default
image_ids List[str]

List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"]. Access the search results id column via list(search_results.id).

required
sensor str

The satellite sensor of the image_ids, one of "pleiades", "spot", "sentinel1", "sentinel2", "sentinel3", "sentinel5p".

required
output_directory Union[str, pathlib.Path]

The file output directory, defaults to the current working directory.

None

Returns:

Type Description
List[str]

List of quicklook image output file paths.

Source code in up42/catalog.py
def download_quicklooks(
    self,
    image_ids: List[str],
    sensor: str,
    output_directory: Union[str, Path, None] = None,
) -> List[str]:
    """
    Gets the quicklooks of scenes from a single sensor. After download, can
    be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().

    Args:
        image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
            Access the search results id column via `list(search_results.id)`.
        sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
            "sentinel1", "sentinel2", "sentinel3", "sentinel5p".
        output_directory: The file output directory, defaults to the current working
            directory.

    Returns:
        List of quicklook image output file paths.
    """
    supported_sensors = {
        "pleiades": "oneatlas",
        "spot": "oneatlas",
        "sentinel1": "sobloo-image",
        "sentinel2": "sobloo-image",
        "sentinel3": "sobloo-image",
        "sentinel5p": "sobloo-image",
    }

    if sensor not in list(supported_sensors.keys()):
        raise ValueError(
            f"Currently only these sensors are supported: "
            f"{list(supported_sensors.keys())}"
        )
    provider = supported_sensors[sensor]
    logger.info(
        f"Getting quicklooks from provider {provider} for image_ids: "
        f"{image_ids}"
    )

    if output_directory is None:
        output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
    else:
        output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    logger.info(f"Download directory: {str(output_directory)}")

    if isinstance(image_ids, str):
        image_ids = [image_ids]

    out_paths: List[str] = []
    for image_id in tqdm(image_ids):
        try:
            url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"

            response = self.auth._request(
                request_type="GET", url=url, return_text=False
            )
            out_path = output_directory / f"quicklook_{image_id}.jpg"
            out_paths.append(str(out_path))
            with open(out_path, "wb") as dst:
                for chunk in response:
                    dst.write(chunk)
        except ValueError:
            logger.warning(
                f"Image with id {image_id} does not have quicklook available. Skipping ..."
            )

    self.quicklooks = out_paths  # pylint: disable=attribute-defined-outside-init
    return out_paths

estimate_order(self, geometry, scene)

Estimate the cost of an order from an item/row in a result of Catalog.search.

Parameters:

Name Type Description Default
geometry Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon]

The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
scene Series

A geopandas series with a single item/row of the result of Catalog.search. For instance, search_results.loc[0] for the first scene of a catalog search result.

required

Returns:

Type Description
int

An estimated cost for the order in UP42 credits.

Source code in up42/catalog.py
def estimate_order(
    self,
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    scene: Series,
) -> int:
    """
    Estimate the cost of an order from an item/row in a result of `Catalog.search`.

    Args:
        geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
            GeoDataFrame, Polygon.
        scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
            search_results.loc[0] for the first scene of a catalog search result.

    Returns:
        int: An estimated cost for the order in UP42 credits.
    """
    data_provider_name, order_params = self._order_payload(geometry, scene)
    return Order.estimate(self.auth, data_provider_name, order_params)

get_collections(self)

Get the available data collections.

Source code in up42/catalog.py
def get_collections(self) -> Union[Dict, List]:
    """
    Get the available data collections.
    """
    url = f"{self.auth._endpoint()}/collections"
    json_response = self.auth._request("GET", url)
    return json_response["data"]

map_quicklooks(self, scenes, aoi=None, show_images=True, show_features=False, filepaths=None, name_column='id', save_html=None) inherited

Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).

Parameters:

Name Type Description Default
scenes GeoDataFrame

GeoDataFrame of scenes, results of catalog.search()

required
aoi Optional[geopandas.geodataframe.GeoDataFrame]

GeoDataFrame of aoi.

None
show_images bool

Shows images if True (default).

True
show_features bool

Shows no features if False (default).

False
filepaths Optional[list]

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
name_column str

Name of the feature property that provides the Feature/Layer name.

'id'
save_html Optional[pathlib.Path]

The path for saving folium map as html file. With default None, no file is saved.

None
Source code in up42/catalog.py
def map_quicklooks(
    self,
    scenes: GeoDataFrame,
    aoi: Optional[GeoDataFrame] = None,
    show_images: bool = True,
    show_features: bool = False,
    filepaths: Optional[list] = None,
    name_column: str = "id",
    save_html: Optional[Path] = None,
) -> folium.Map:
    """
    TODO: Currently only implemented for catalog!

    Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
    respective object, e.g. job, catalog).

    Args:
            scenes: GeoDataFrame of scenes, results of catalog.search()
            aoi: GeoDataFrame of aoi.
            show_images: Shows images if True (default).
            show_features: Shows no features if False (default).
            filepaths: Paths to images to plot. Optional, by default picks up the last
                    downloaded results.
            name_column: Name of the feature property that provides the Feature/Layer name.
            save_html: The path for saving folium map as html file. With default None, no file is saved.
    """
    if filepaths is None:
        if self.quicklooks is None:
            raise ValueError("You first need to download the quicklooks!")
        filepaths = self.quicklooks

    m = self._map_images(
        plot_file_format=[".jpg", ".jpeg", ".png"],
        result_df=scenes,
        filepaths=filepaths,
        aoi=aoi,
        show_images=show_images,
        show_features=show_features,
        name_column=name_column,
        save_html=save_html,
    )
    return m

place_order(self, geometry, scene, track_status=False, report_time=120)

Place an order from an item/row in a result of Catalog.search.

Parameters:

Name Type Description Default
geometry Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon]

The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon.

required
scene Series

A geopandas series with a single item/row of the result of Catalog.search. For instance, search_results.loc[0] for the first scene of a catalog search result.

required
track_status bool

If set to True, will only return the Order once it is FULFILLED or FAILED.

False
report_time int

The interval (in seconds) to query the order status if track_status is True.

120

Returns:

Type Description
Order

The placed order.

Source code in up42/catalog.py
def place_order(
    self,
    geometry: Union[
        dict,
        Feature,
        FeatureCollection,
        list,
        GeoDataFrame,
        Polygon,
    ],
    scene: Series,
    track_status: bool = False,
    report_time: int = 120,
) -> "Order":
    """
    Place an order from an item/row in a result of `Catalog.search`.

    Args:
        geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
            GeoDataFrame, Polygon.
        scene: A geopandas series with a  single item/row of the result of `Catalog.search`. For instance,
            search_results.loc[0] for the first scene of a catalog search result.
        track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
        report_time (int): The interval (in seconds) to query the order status if `track_status` is True.

     Warning:
        When placing orders of items that are in archive or cold storage,
        the order fulfillment can happen up to **24h after order placement**.
        In such cases, please make sure to set an appropriate `report_time`.
        You can also use `Order.track_status` on the returned object to track the status later.

    Returns:
        Order: The placed order.
    """
    data_provider_name, order_params = self._order_payload(geometry, scene)
    order = Order.place(self.auth, data_provider_name, order_params)
    if track_status:
        order.track_status(report_time)
    return order

plot_coverage(scenes, aoi=None, legend_column='sceneId', figsize=(12, 16)) inherited

Plots a coverage map of a dataframe with geometries e.g. the results of catalog.search())

Parameters:

Name Type Description Default
scenes GeoDataFrame

GeoDataFrame of scenes, results of catalog.search()

required
aoi Optional[geopandas.geodataframe.GeoDataFrame]

GeoDataFrame of aoi.

None
legend_column str

Dataframe column set to legend, default is "sceneId". Legend entries are sorted and this determines plotting order.

'sceneId'
figsize

Matplotlib figure size.

(12, 16)
Source code in up42/catalog.py
@staticmethod
def plot_coverage(
    scenes: GeoDataFrame,
    aoi: Optional[GeoDataFrame] = None,
    legend_column: str = "sceneId",
    figsize=(12, 16),
) -> None:
    """
    Plots a coverage map of a dataframe with geometries e.g. the results of catalog.search())
    Args:
            scenes: GeoDataFrame of scenes, results of catalog.search()
            aoi: GeoDataFrame of aoi.
            legend_column: Dataframe column set to legend, default is "sceneId".
                    Legend entries are sorted and this determines plotting order.
            figsize: Matplotlib figure size.
    """
    if legend_column not in scenes.columns:
        legend_column = None  # type: ignore
        logger.info(
            "Given legend_column name not in scene dataframe, "
            "plotting without legend."
        )

    try:
        ax = scenes.plot(
            legend_column,
            categorical=True,
            figsize=figsize,
            cmap="Set3",
            legend=True,
            alpha=0.7,
            legend_kwds=dict(loc="upper left", bbox_to_anchor=(1, 1)),
        )
        if aoi is not None:
            aoi.plot(color="r", ax=ax, fc="None", edgecolor="r", lw=1)
    except AttributeError as e:
        raise TypeError(
            "'scenes' and 'aoi' (optional) have to be a GeoDataFrame."
        ) from e
    ax.set_axis_off()
    plt.show()

plot_quicklooks(self, figsize=(8, 8), titles=None, filepaths=None) inherited

Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).

Parameters:

Name Type Description Default
figsize Tuple[int, int]

matplotlib figure size.

(8, 8)
filepaths Optional[list]

Paths to images to plot. Optional, by default picks up the last downloaded results.

None
titles Optional[List[str]]

List of titles for the subplots, optional.

None
Source code in up42/catalog.py
def plot_quicklooks(
    self,
    figsize: Tuple[int, int] = (8, 8),
    titles: Optional[List[str]] = None,
    filepaths: Optional[list] = None,
) -> None:
    """
    Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
    respective object, e.g. job, catalog).

    Args:
            figsize: matplotlib figure size.
            filepaths: Paths to images to plot. Optional, by default picks up the last
                    downloaded results.
            titles: List of titles for the subplots, optional.

    """
    if filepaths is None:
        if self.quicklooks is None:
            raise ValueError("You first need to download the quicklooks!")
        filepaths = self.quicklooks

    self.plot_results(
        plot_file_format=[".jpg", ".jpeg", ".png"],
        figsize=figsize,
        filepaths=filepaths,
        titles=titles,
    )

search(self, search_parameters, as_dataframe=True)

Searches the catalog for the the search parameters and returns the metadata of the matching scenes.

Parameters:

Name Type Description Default
search_parameters dict

The catalog search parameters, see example.

required
as_dataframe bool

return type, GeoDataFrame if True (default), FeatureCollection if False.

True

Returns:

Type Description
Union[geopandas.geodataframe.GeoDataFrame, dict]

The search results as a GeoDataFrame, optionally as json dict.

Examples:

    search_parameters={
        "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
        "intersects": {
            "type": "Polygon",
            "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
            [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
            52.73971768]]]},
        "limit": 10,
        "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
        }
Source code in up42/catalog.py
def search(
    self, search_parameters: dict, as_dataframe: bool = True
) -> Union[GeoDataFrame, dict]:
    """
    Searches the catalog for the the search parameters and returns the metadata of
    the matching scenes.

    Args:
        search_parameters: The catalog search parameters, see example.
        as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.

    Returns:
        The search results as a GeoDataFrame, optionally as json dict.

    Example:
        ```python
            search_parameters={
                "datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
                "intersects": {
                    "type": "Polygon",
                    "coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
                    [13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
                    52.73971768]]]},
                "limit": 10,
                "sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
                }
        ```
    """
    logger.info(f"Searching catalog with search_parameters: {search_parameters}")

    # The API request would fail with a limit above 500, thus 500 is forced in the initial
    # request but additional results are handled below via pagination.
    max_limit = search_parameters["limit"]
    if max_limit > 500:
        search_parameters = dict(search_parameters)
        search_parameters["limit"] = 500

    url = f"{self.auth._endpoint()}/catalog/stac/search"
    response_json: dict = self.auth._request("POST", url, search_parameters)
    features = response_json["features"]

    # A request with no results will still include a (non-exhausted) pagination token.
    # Only the first pagination token request will then indicate search exhausted.

    # Search results with more than 500 items are given as 50-per-page additional pages.
    while len(features) < max_limit:
        page_url = response_json["links"][0]["href"]
        next_page_url = response_json["links"][1]["href"]
        pagination_exhausted = next_page_url == page_url
        if pagination_exhausted:
            break
        response_json = self.auth._request("POST", next_page_url, search_parameters)
        features += response_json["features"]

    features = features[:max_limit]
    df = GeoDataFrame.from_features(
        FeatureCollection(features=features), crs="EPSG:4326"
    )

    logger.info(f"{df.shape[0]} results returned.")
    if as_dataframe:
        return df
    else:
        return df.__geo_interface__