Catalog¶
The Catalog class enables access to the UP42 catalog search. You can search for satellite image scenes (for different sensors and criteria like cloud cover), plot the scene coverage and download and plot the scene quicklooks.
Use the catalog:
catalog = up42.initialize_catalog()
Source code in up42/catalog.py
class Catalog(VizTools):
"""
The Catalog class enables access to the UP42 catalog search. You can search
for satellite image scenes (for different sensors and criteria like cloud cover),
plot the scene coverage and download and plot the scene quicklooks.
Use the catalog:
```python
catalog = up42.initialize_catalog()
```
"""
def __init__(self, auth: Auth):
self.auth = auth
self.quicklooks = None
def __repr__(self):
return f"Catalog(auth={self.auth})"
def get_collections(self) -> Union[Dict, List]:
"""
Get the available data collections.
"""
url = f"{self.auth._endpoint()}/collections"
json_response = self.auth._request("GET", url)
return json_response["data"]
# pylint: disable=dangerous-default-value
@staticmethod
def construct_parameters(
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
collections: List[str],
start_date: str = "2020-01-01",
end_date: str = "2020-01-30",
usage_type: List[str] = ["DATA", "ANALYTICS"],
limit: int = 10,
max_cloudcover: float = 100,
sortby: str = "acquisitionDate",
ascending: bool = True,
) -> dict:
"""
Follows STAC principles and property names.
Args:
geometry: The search geometry, one of dict, Feature, FeatureCollection,
list, GeoDataFrame, Polygon.
collections: The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"].
Also see catalog.get_collections().
start_date: Query period starting day, format "2020-01-01".
end_date: Query period ending day, format "2020-01-01".
usage_type: Filter for imagery that can just be purchased & downloaded or also
processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
(can be any combination). The filter is inclusive, using ["DATA"] can
also result in results with ["DATA", "ANALYTICS"].
limit: The maximum number of search results to return (1-max.500).
max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
8.4 will return all scenes with 8.4 or less cloudcover.
Ignored for collections that have no cloudcover (e.g. sentinel1).
sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
"acquisitionIdentifier", "incidenceAngle", "snowCover".
ascending: Ascending sort order by default, descending if False.
Returns:
The constructed parameters dictionary.
"""
time_period = format_time_period(start_date=start_date, end_date=end_date)
aoi_fc = any_vector_to_fc(
vector=geometry,
)
aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
sort_order = "asc" if ascending else "desc"
query_filters: Dict[Any, Any] = {}
if not "Sentinel-1" in collections:
query_filters["cloudCoverage"] = {"lte": max_cloudcover} # type: ignore
if usage_type == ["DATA"]:
query_filters["up42:usageType"] = {"in": ["DATA"]}
elif usage_type == ["ANALYTICS"]:
query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
elif usage_type == ["DATA", "ANALYTICS"]:
query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
else:
raise ValueError("Select correct `usage_type`")
search_parameters = {
"datetime": time_period,
"intersects": aoi_geometry,
"limit": limit,
"collections": collections,
"query": query_filters,
"sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
}
return search_parameters
def search(
self, search_parameters: dict, as_dataframe: bool = True
) -> Union[GeoDataFrame, dict]:
"""
Searches the catalog for the the search parameters and returns the metadata of
the matching scenes.
Args:
search_parameters: The catalog search parameters, see example.
as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.
Returns:
The search results as a GeoDataFrame, optionally as json dict.
Example:
```python
search_parameters={
"datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
"intersects": {
"type": "Polygon",
"coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
[13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
52.73971768]]]},
"limit": 10,
"sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
}
```
"""
logger.info(f"Searching catalog with search_parameters: {search_parameters}")
# The API request would fail with a limit above 500, thus 500 is forced in the initial
# request but additional results are handled below via pagination.
max_limit = search_parameters["limit"]
if max_limit > 500:
search_parameters = dict(search_parameters)
search_parameters["limit"] = 500
url = f"{self.auth._endpoint()}/catalog/stac/search"
response_json: dict = self.auth._request("POST", url, search_parameters)
features = response_json["features"]
# A request with no results will still include a (non-exhausted) pagination token.
# Only the first pagination token request will then indicate search exhausted.
# Search results with more than 500 items are given as 50-per-page additional pages.
while len(features) < max_limit:
page_url = response_json["links"][0]["href"]
next_page_url = response_json["links"][1]["href"]
pagination_exhausted = next_page_url == page_url
if pagination_exhausted:
break
response_json = self.auth._request("POST", next_page_url, search_parameters)
features += response_json["features"]
features = features[:max_limit]
df = GeoDataFrame.from_features(
FeatureCollection(features=features), crs="EPSG:4326"
)
logger.info(f"{df.shape[0]} results returned.")
if as_dataframe:
return df
else:
return df.__geo_interface__
def download_quicklooks(
self,
image_ids: List[str],
sensor: str,
output_directory: Union[str, Path, None] = None,
) -> List[str]:
"""
Gets the quicklooks of scenes from a single sensor. After download, can
be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().
Args:
image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
Access the search results id column via `list(search_results.id)`.
sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
"sentinel1", "sentinel2", "sentinel3", "sentinel5p".
output_directory: The file output directory, defaults to the current working
directory.
Returns:
List of quicklook image output file paths.
"""
supported_sensors = {
"pleiades": "oneatlas",
"spot": "oneatlas",
"sentinel1": "sobloo-image",
"sentinel2": "sobloo-image",
"sentinel3": "sobloo-image",
"sentinel5p": "sobloo-image",
}
if sensor not in list(supported_sensors.keys()):
raise ValueError(
f"Currently only these sensors are supported: "
f"{list(supported_sensors.keys())}"
)
provider = supported_sensors[sensor]
logger.info(
f"Getting quicklooks from provider {provider} for image_ids: "
f"{image_ids}"
)
if output_directory is None:
output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
if isinstance(image_ids, str):
image_ids = [image_ids]
out_paths: List[str] = []
for image_id in tqdm(image_ids):
try:
url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"
response = self.auth._request(
request_type="GET", url=url, return_text=False
)
out_path = output_directory / f"quicklook_{image_id}.jpg"
out_paths.append(str(out_path))
with open(out_path, "wb") as dst:
for chunk in response:
dst.write(chunk)
except ValueError:
logger.warning(
f"Image with id {image_id} does not have quicklook available. Skipping ..."
)
self.quicklooks = out_paths # pylint: disable=attribute-defined-outside-init
return out_paths
@staticmethod
def _order_payload(
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
scene: Series,
) -> Tuple[str, dict]:
"""
Helper that constructs necessary parameters for `Order.place` and `Order.estimate`.
Args:
geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
GeoDataFrame, Polygon.
scene: A geopandas series with a single item/row of the result of `Catalog.search`. For instance,
search_results.loc[0] for the first scene of a catalog search result.
Returns:
str, dict: A tuple including a provider name and order parameters.
"""
if not isinstance(scene, Series):
raise ValueError(
"`scene` parameter must be a GeoSeries, or a single item/row of a GeoDataFrame. "
"For instance, search_results.loc[0] returns a GeoSeries."
)
aoi_fc = any_vector_to_fc(
vector=geometry,
)
aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
data_provider_name = scene.providerName
order_params = {"id": scene.id, "aoi": aoi_geometry}
return data_provider_name, order_params
def estimate_order(
self,
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
scene: Series,
) -> int:
"""
Estimate the cost of an order from an item/row in a result of `Catalog.search`.
Args:
geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
GeoDataFrame, Polygon.
scene: A geopandas series with a single item/row of the result of `Catalog.search`. For instance,
search_results.loc[0] for the first scene of a catalog search result.
Returns:
int: An estimated cost for the order in UP42 credits.
"""
data_provider_name, order_params = self._order_payload(geometry, scene)
return Order.estimate(self.auth, data_provider_name, order_params)
def place_order(
self,
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
scene: Series,
track_status: bool = False,
report_time: int = 120,
) -> "Order":
"""
Place an order from an item/row in a result of `Catalog.search`.
Args:
geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
GeoDataFrame, Polygon.
scene: A geopandas series with a single item/row of the result of `Catalog.search`. For instance,
search_results.loc[0] for the first scene of a catalog search result.
track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
report_time (int): The interval (in seconds) to query the order status if `track_status` is True.
Warning:
When placing orders of items that are in archive or cold storage,
the order fulfillment can happen up to **24h after order placement**.
In such cases, please make sure to set an appropriate `report_time`.
You can also use `Order.track_status` on the returned object to track the status later.
Returns:
Order: The placed order.
"""
data_provider_name, order_params = self._order_payload(geometry, scene)
order = Order.place(self.auth, data_provider_name, order_params)
if track_status:
order.track_status(report_time)
return order
Methods¶
construct_parameters(geometry, collections, start_date='2020-01-01', end_date='2020-01-30', usage_type=['DATA', 'ANALYTICS'], limit=10, max_cloudcover=100, sortby='acquisitionDate', ascending=True)
staticmethod
¶
Follows STAC principles and property names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon] |
The search geometry, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon. |
required |
collections |
List[str] |
The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"]. Also see catalog.get_collections(). |
required |
start_date |
str |
Query period starting day, format "2020-01-01". |
'2020-01-01' |
end_date |
str |
Query period ending day, format "2020-01-01". |
'2020-01-30' |
usage_type |
List[str] |
Filter for imagery that can just be purchased & downloaded or also processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded or used directly with a processing algorithm), ["DATA", "ANALYTICS"] (can be any combination). The filter is inclusive, using ["DATA"] can also result in results with ["DATA", "ANALYTICS"]. |
['DATA', 'ANALYTICS'] |
limit |
int |
The maximum number of search results to return (1-max.500). |
10 |
max_cloudcover |
float |
Maximum cloudcover % - e.g. 100 will return all scenes, 8.4 will return all scenes with 8.4 or less cloudcover. Ignored for collections that have no cloudcover (e.g. sentinel1). |
100 |
sortby |
str |
The property to sort by, "cloudCoverage", "acquisitionDate", "acquisitionIdentifier", "incidenceAngle", "snowCover". |
'acquisitionDate' |
ascending |
bool |
Ascending sort order by default, descending if False. |
True |
Returns:
Type | Description |
---|---|
dict |
The constructed parameters dictionary. |
Source code in up42/catalog.py
@staticmethod
def construct_parameters(
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
collections: List[str],
start_date: str = "2020-01-01",
end_date: str = "2020-01-30",
usage_type: List[str] = ["DATA", "ANALYTICS"],
limit: int = 10,
max_cloudcover: float = 100,
sortby: str = "acquisitionDate",
ascending: bool = True,
) -> dict:
"""
Follows STAC principles and property names.
Args:
geometry: The search geometry, one of dict, Feature, FeatureCollection,
list, GeoDataFrame, Polygon.
collections: The satellite sensor collections to search for, e.g. ["PHR"] or ["PHR", "SPOT"].
Also see catalog.get_collections().
start_date: Query period starting day, format "2020-01-01".
end_date: Query period ending day, format "2020-01-01".
usage_type: Filter for imagery that can just be purchased & downloaded or also
processes. ["DATA"] (can only be download), ["ANALYTICS"] (can be downloaded
or used directly with a processing algorithm), ["DATA", "ANALYTICS"]
(can be any combination). The filter is inclusive, using ["DATA"] can
also result in results with ["DATA", "ANALYTICS"].
limit: The maximum number of search results to return (1-max.500).
max_cloudcover: Maximum cloudcover % - e.g. 100 will return all scenes,
8.4 will return all scenes with 8.4 or less cloudcover.
Ignored for collections that have no cloudcover (e.g. sentinel1).
sortby: The property to sort by, "cloudCoverage", "acquisitionDate",
"acquisitionIdentifier", "incidenceAngle", "snowCover".
ascending: Ascending sort order by default, descending if False.
Returns:
The constructed parameters dictionary.
"""
time_period = format_time_period(start_date=start_date, end_date=end_date)
aoi_fc = any_vector_to_fc(
vector=geometry,
)
aoi_geometry = fc_to_query_geometry(fc=aoi_fc, geometry_operation="intersects")
sort_order = "asc" if ascending else "desc"
query_filters: Dict[Any, Any] = {}
if not "Sentinel-1" in collections:
query_filters["cloudCoverage"] = {"lte": max_cloudcover} # type: ignore
if usage_type == ["DATA"]:
query_filters["up42:usageType"] = {"in": ["DATA"]}
elif usage_type == ["ANALYTICS"]:
query_filters["up42:usageType"] = {"in": ["ANALYTICS"]}
elif usage_type == ["DATA", "ANALYTICS"]:
query_filters["up42:usageType"] = {"in": ["DATA", "ANALYTICS"]}
else:
raise ValueError("Select correct `usage_type`")
search_parameters = {
"datetime": time_period,
"intersects": aoi_geometry,
"limit": limit,
"collections": collections,
"query": query_filters,
"sortby": [{"field": f"properties.{sortby}", "direction": sort_order}],
}
return search_parameters
download_quicklooks(self, image_ids, sensor, output_directory=None)
¶
Gets the quicklooks of scenes from a single sensor. After download, can be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_ids |
List[str] |
List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
Access the search results id column via |
required |
sensor |
str |
The satellite sensor of the image_ids, one of "pleiades", "spot", "sentinel1", "sentinel2", "sentinel3", "sentinel5p". |
required |
output_directory |
Union[str, pathlib.Path] |
The file output directory, defaults to the current working directory. |
None |
Returns:
Type | Description |
---|---|
List[str] |
List of quicklook image output file paths. |
Source code in up42/catalog.py
def download_quicklooks(
self,
image_ids: List[str],
sensor: str,
output_directory: Union[str, Path, None] = None,
) -> List[str]:
"""
Gets the quicklooks of scenes from a single sensor. After download, can
be plotted via catalog.plot_quicklooks() or catalog.map_quicklooks().
Args:
image_ids: List of provider image_ids e.g. ["6dffb8be-c2ab-46e3-9c1c-6958a54e4527"].
Access the search results id column via `list(search_results.id)`.
sensor: The satellite sensor of the image_ids, one of "pleiades", "spot",
"sentinel1", "sentinel2", "sentinel3", "sentinel5p".
output_directory: The file output directory, defaults to the current working
directory.
Returns:
List of quicklook image output file paths.
"""
supported_sensors = {
"pleiades": "oneatlas",
"spot": "oneatlas",
"sentinel1": "sobloo-image",
"sentinel2": "sobloo-image",
"sentinel3": "sobloo-image",
"sentinel5p": "sobloo-image",
}
if sensor not in list(supported_sensors.keys()):
raise ValueError(
f"Currently only these sensors are supported: "
f"{list(supported_sensors.keys())}"
)
provider = supported_sensors[sensor]
logger.info(
f"Getting quicklooks from provider {provider} for image_ids: "
f"{image_ids}"
)
if output_directory is None:
output_directory = Path.cwd() / f"project_{self.auth.project_id}/catalog"
else:
output_directory = Path(output_directory)
output_directory.mkdir(parents=True, exist_ok=True)
logger.info(f"Download directory: {str(output_directory)}")
if isinstance(image_ids, str):
image_ids = [image_ids]
out_paths: List[str] = []
for image_id in tqdm(image_ids):
try:
url = f"{self.auth._endpoint()}/catalog/{provider}/image/{image_id}/quicklook"
response = self.auth._request(
request_type="GET", url=url, return_text=False
)
out_path = output_directory / f"quicklook_{image_id}.jpg"
out_paths.append(str(out_path))
with open(out_path, "wb") as dst:
for chunk in response:
dst.write(chunk)
except ValueError:
logger.warning(
f"Image with id {image_id} does not have quicklook available. Skipping ..."
)
self.quicklooks = out_paths # pylint: disable=attribute-defined-outside-init
return out_paths
estimate_order(self, geometry, scene)
¶
Estimate the cost of an order from an item/row in a result of Catalog.search
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon] |
The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon. |
required |
scene |
Series |
A geopandas series with a single item/row of the result of |
required |
Returns:
Type | Description |
---|---|
int |
An estimated cost for the order in UP42 credits. |
Source code in up42/catalog.py
def estimate_order(
self,
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
scene: Series,
) -> int:
"""
Estimate the cost of an order from an item/row in a result of `Catalog.search`.
Args:
geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
GeoDataFrame, Polygon.
scene: A geopandas series with a single item/row of the result of `Catalog.search`. For instance,
search_results.loc[0] for the first scene of a catalog search result.
Returns:
int: An estimated cost for the order in UP42 credits.
"""
data_provider_name, order_params = self._order_payload(geometry, scene)
return Order.estimate(self.auth, data_provider_name, order_params)
get_collections(self)
¶
Get the available data collections.
Source code in up42/catalog.py
def get_collections(self) -> Union[Dict, List]:
"""
Get the available data collections.
"""
url = f"{self.auth._endpoint()}/collections"
json_response = self.auth._request("GET", url)
return json_response["data"]
map_quicklooks(self, scenes, aoi=None, show_images=True, show_features=False, filepaths=None, name_column='id', save_html=None)
inherited
¶
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scenes |
GeoDataFrame |
GeoDataFrame of scenes, results of catalog.search() |
required |
aoi |
Optional[geopandas.geodataframe.GeoDataFrame] |
GeoDataFrame of aoi. |
None |
show_images |
bool |
Shows images if True (default). |
True |
show_features |
bool |
Shows no features if False (default). |
False |
filepaths |
Optional[list] |
Paths to images to plot. Optional, by default picks up the last downloaded results. |
None |
name_column |
str |
Name of the feature property that provides the Feature/Layer name. |
'id' |
save_html |
Optional[pathlib.Path] |
The path for saving folium map as html file. With default None, no file is saved. |
None |
Source code in up42/catalog.py
def map_quicklooks(
self,
scenes: GeoDataFrame,
aoi: Optional[GeoDataFrame] = None,
show_images: bool = True,
show_features: bool = False,
filepaths: Optional[list] = None,
name_column: str = "id",
save_html: Optional[Path] = None,
) -> folium.Map:
"""
TODO: Currently only implemented for catalog!
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
respective object, e.g. job, catalog).
Args:
scenes: GeoDataFrame of scenes, results of catalog.search()
aoi: GeoDataFrame of aoi.
show_images: Shows images if True (default).
show_features: Shows no features if False (default).
filepaths: Paths to images to plot. Optional, by default picks up the last
downloaded results.
name_column: Name of the feature property that provides the Feature/Layer name.
save_html: The path for saving folium map as html file. With default None, no file is saved.
"""
if filepaths is None:
if self.quicklooks is None:
raise ValueError("You first need to download the quicklooks!")
filepaths = self.quicklooks
m = self._map_images(
plot_file_format=[".jpg", ".jpeg", ".png"],
result_df=scenes,
filepaths=filepaths,
aoi=aoi,
show_images=show_images,
show_features=show_features,
name_column=name_column,
save_html=save_html,
)
return m
place_order(self, geometry, scene, track_status=False, report_time=120)
¶
Place an order from an item/row in a result of Catalog.search
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
geometry |
Union[dict, geojson.feature.Feature, geojson.feature.FeatureCollection, list, geopandas.geodataframe.GeoDataFrame, shapely.geometry.polygon.Polygon] |
The intended output AOI of the order, one of dict, Feature, FeatureCollection, list, GeoDataFrame, Polygon. |
required |
scene |
Series |
A geopandas series with a single item/row of the result of |
required |
track_status |
bool |
If set to True, will only return the Order once it is |
False |
report_time |
int |
The interval (in seconds) to query the order status if |
120 |
Returns:
Type | Description |
---|---|
Order |
The placed order. |
Source code in up42/catalog.py
def place_order(
self,
geometry: Union[
dict,
Feature,
FeatureCollection,
list,
GeoDataFrame,
Polygon,
],
scene: Series,
track_status: bool = False,
report_time: int = 120,
) -> "Order":
"""
Place an order from an item/row in a result of `Catalog.search`.
Args:
geometry: The intended output AOI of the order, one of dict, Feature, FeatureCollection, list,
GeoDataFrame, Polygon.
scene: A geopandas series with a single item/row of the result of `Catalog.search`. For instance,
search_results.loc[0] for the first scene of a catalog search result.
track_status (bool): If set to True, will only return the Order once it is `FULFILLED` or `FAILED`.
report_time (int): The interval (in seconds) to query the order status if `track_status` is True.
Warning:
When placing orders of items that are in archive or cold storage,
the order fulfillment can happen up to **24h after order placement**.
In such cases, please make sure to set an appropriate `report_time`.
You can also use `Order.track_status` on the returned object to track the status later.
Returns:
Order: The placed order.
"""
data_provider_name, order_params = self._order_payload(geometry, scene)
order = Order.place(self.auth, data_provider_name, order_params)
if track_status:
order.track_status(report_time)
return order
plot_coverage(scenes, aoi=None, legend_column='sceneId', figsize=(12, 16))
inherited
¶
Plots a coverage map of a dataframe with geometries e.g. the results of catalog.search())
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scenes |
GeoDataFrame |
GeoDataFrame of scenes, results of catalog.search() |
required |
aoi |
Optional[geopandas.geodataframe.GeoDataFrame] |
GeoDataFrame of aoi. |
None |
legend_column |
str |
Dataframe column set to legend, default is "sceneId". Legend entries are sorted and this determines plotting order. |
'sceneId' |
figsize |
Matplotlib figure size. |
(12, 16) |
Source code in up42/catalog.py
@staticmethod
def plot_coverage(
scenes: GeoDataFrame,
aoi: Optional[GeoDataFrame] = None,
legend_column: str = "sceneId",
figsize=(12, 16),
) -> None:
"""
Plots a coverage map of a dataframe with geometries e.g. the results of catalog.search())
Args:
scenes: GeoDataFrame of scenes, results of catalog.search()
aoi: GeoDataFrame of aoi.
legend_column: Dataframe column set to legend, default is "sceneId".
Legend entries are sorted and this determines plotting order.
figsize: Matplotlib figure size.
"""
if legend_column not in scenes.columns:
legend_column = None # type: ignore
logger.info(
"Given legend_column name not in scene dataframe, "
"plotting without legend."
)
try:
ax = scenes.plot(
legend_column,
categorical=True,
figsize=figsize,
cmap="Set3",
legend=True,
alpha=0.7,
legend_kwds=dict(loc="upper left", bbox_to_anchor=(1, 1)),
)
if aoi is not None:
aoi.plot(color="r", ax=ax, fc="None", edgecolor="r", lw=1)
except AttributeError as e:
raise TypeError(
"'scenes' and 'aoi' (optional) have to be a GeoDataFrame."
) from e
ax.set_axis_off()
plt.show()
plot_quicklooks(self, figsize=(8, 8), titles=None, filepaths=None)
inherited
¶
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the respective object, e.g. job, catalog).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
figsize |
Tuple[int, int] |
matplotlib figure size. |
(8, 8) |
filepaths |
Optional[list] |
Paths to images to plot. Optional, by default picks up the last downloaded results. |
None |
titles |
Optional[List[str]] |
List of titles for the subplots, optional. |
None |
Source code in up42/catalog.py
def plot_quicklooks(
self,
figsize: Tuple[int, int] = (8, 8),
titles: Optional[List[str]] = None,
filepaths: Optional[list] = None,
) -> None:
"""
Plots the downloaded quicklooks (filepaths saved to self.quicklooks of the
respective object, e.g. job, catalog).
Args:
figsize: matplotlib figure size.
filepaths: Paths to images to plot. Optional, by default picks up the last
downloaded results.
titles: List of titles for the subplots, optional.
"""
if filepaths is None:
if self.quicklooks is None:
raise ValueError("You first need to download the quicklooks!")
filepaths = self.quicklooks
self.plot_results(
plot_file_format=[".jpg", ".jpeg", ".png"],
figsize=figsize,
filepaths=filepaths,
titles=titles,
)
search(self, search_parameters, as_dataframe=True)
¶
Searches the catalog for the the search parameters and returns the metadata of the matching scenes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search_parameters |
dict |
The catalog search parameters, see example. |
required |
as_dataframe |
bool |
return type, GeoDataFrame if True (default), FeatureCollection if False. |
True |
Returns:
Type | Description |
---|---|
Union[geopandas.geodataframe.GeoDataFrame, dict] |
The search results as a GeoDataFrame, optionally as json dict. |
Examples:
search_parameters={
"datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
"intersects": {
"type": "Polygon",
"coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
[13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
52.73971768]]]},
"limit": 10,
"sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
}
Source code in up42/catalog.py
def search(
self, search_parameters: dict, as_dataframe: bool = True
) -> Union[GeoDataFrame, dict]:
"""
Searches the catalog for the the search parameters and returns the metadata of
the matching scenes.
Args:
search_parameters: The catalog search parameters, see example.
as_dataframe: return type, GeoDataFrame if True (default), FeatureCollection if False.
Returns:
The search results as a GeoDataFrame, optionally as json dict.
Example:
```python
search_parameters={
"datetime": "2019-01-01T00:00:00Z/2019-01-15T23:59:59Z",
"intersects": {
"type": "Polygon",
"coordinates": [[[13.32113746,52.73971768],[13.15981158,52.2092959],
[13.62204483,52.15632025],[13.78859517,52.68655119],[13.32113746,
52.73971768]]]},
"limit": 10,
"sortby": [{"field" : "properties.acquisitionDate", "direction" : "asc"}]
}
```
"""
logger.info(f"Searching catalog with search_parameters: {search_parameters}")
# The API request would fail with a limit above 500, thus 500 is forced in the initial
# request but additional results are handled below via pagination.
max_limit = search_parameters["limit"]
if max_limit > 500:
search_parameters = dict(search_parameters)
search_parameters["limit"] = 500
url = f"{self.auth._endpoint()}/catalog/stac/search"
response_json: dict = self.auth._request("POST", url, search_parameters)
features = response_json["features"]
# A request with no results will still include a (non-exhausted) pagination token.
# Only the first pagination token request will then indicate search exhausted.
# Search results with more than 500 items are given as 50-per-page additional pages.
while len(features) < max_limit:
page_url = response_json["links"][0]["href"]
next_page_url = response_json["links"][1]["href"]
pagination_exhausted = next_page_url == page_url
if pagination_exhausted:
break
response_json = self.auth._request("POST", next_page_url, search_parameters)
features += response_json["features"]
features = features[:max_limit]
df = GeoDataFrame.from_features(
FeatureCollection(features=features), crs="EPSG:4326"
)
logger.info(f"{df.shape[0]} results returned.")
if as_dataframe:
return df
else:
return df.__geo_interface__