Skip to content

Project

The Project is the top-level class of the UP42 hierarchy. With it you can create new workflows, query already existing workflows & jobs in the project and manage the project settings.

Create a new project on the UP42 Console website.

Use an existing project:

up42.authenticate(project_id="uz92-8uo0-4dc9-ab1d-06d7ec1a5321",
                  project_api_key="9i7uec8a-45be-41ad-a50f-98bewb528b10")
project = up42.initialize_project()

Source code in up42/project.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class Project:
    """
    The Project is the top-level class of the UP42 hierarchy. With it you can create
    new workflows, query already existing workflows & jobs in the project and manage
    the project settings.

    Create a new project on the
    [**UP42 Console website**](https://sdk.up42.com/authentication/#get-your-project-credentials).

    Use an existing project:
    ```python
    up42.authenticate(project_id="uz92-8uo0-4dc9-ab1d-06d7ec1a5321",
                      project_api_key="9i7uec8a-45be-41ad-a50f-98bewb528b10")
    project = up42.initialize_project()
    ```
    """

    def __init__(self, auth: Auth, project_id: str):
        self.auth = auth
        self.project_id = project_id
        self._info = self.info

    def __repr__(self):
        env = ", env: dev" if self.auth.env == "dev" else ""
        return (
            f"Project(name: {self._info['name']}, project_id: {self.project_id}, "
            f"description: {self._info['description']}, createdAt: {self._info['createdAt']}"
            f"{env})"
        )

    @property
    def info(self) -> dict:
        """
        Gets and updates the project metadata information.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}"
        response_json = self.auth._request(request_type="GET", url=url)
        self._info = response_json["data"]
        return self._info

    def create_workflow(
        self, name: str, description: str = "", use_existing: bool = False
    ) -> "Workflow":
        """
        Creates a new workflow and returns a workflow object.

        Args:
            name: Name of the new workflow.
            description: Description of the new workflow.
            use_existing: If True, instead of creating a new workflow, uses the
                most recent workflow with the same name & description.

        Returns:
            The workflow object.
        """
        if use_existing:
            logger.info("Getting existing workflows in project ...")
            logging.getLogger("up42.workflow").setLevel(logging.CRITICAL)
            existing_workflows: list = self.get_workflows(return_json=True)
            logging.getLogger("up42.workflow").setLevel(logging.INFO)

            matching_workflows: list = [
                workflow
                for workflow in existing_workflows
                if workflow["name"] == name and workflow["description"] == description
            ]
            if matching_workflows:
                existing_workflow = Workflow(
                    self.auth,
                    project_id=self.project_id,
                    workflow_id=matching_workflows[0]["id"],
                )
                logger.info(
                    f"Using existing workflow: {name} - {existing_workflow.workflow_id}"
                )
                return existing_workflow

        url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
        payload = {"name": name, "description": description}
        response_json = self.auth._request(request_type="POST", url=url, data=payload)
        workflow_id = response_json["data"]["id"]
        logger.info(f"Created new workflow: {workflow_id}")
        workflow = Workflow(
            self.auth, project_id=self.project_id, workflow_id=workflow_id
        )
        return workflow

    def get_workflows(
        self, return_json: bool = False
    ) -> Union[List["Workflow"], List[dict]]:
        """
        Gets all workflows in a project as workflow objects or JSON.

        Args:
            return_json: True returns infos of workflows as JSON instead of workflow objects.

        Returns:
            List of Workflow objects in the project or alternatively JSON info of the workflows.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows"
        response_json = self.auth._request(request_type="GET", url=url)
        workflows_json = response_json["data"]
        logger.info(
            f"Got {len(workflows_json)} workflows for project {self.project_id}."
        )

        if return_json:
            return workflows_json
        else:
            workflows = [
                Workflow(
                    self.auth,
                    project_id=self.project_id,
                    workflow_id=workflow_json["id"],
                    workflow_info=workflow_json,
                )
                for workflow_json in workflows_json
            ]
            return workflows

    def get_jobs(
        self,
        return_json: bool = False,
        test_jobs: bool = True,
        real_jobs: bool = True,
        limit: int = 500,
        sortby: str = "createdAt",
        descending: bool = True,
    ) -> Union[JobCollection, List[dict]]:
        """
        Get all jobs in the project as a JobCollection or JSON.

        Use Workflow().get_job() to get a JobCollection with jobs associated with a
        specific workflow.

        Args:
            return_json: If true, returns the job info JSONs instead of JobCollection.
            test_jobs: Return test jobs or test queries.
            real_jobs: Return real jobs.
            limit: Only return n first jobs by sorting criteria and order, default 500.
            sortby: The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".
            descending: The sorting order, True for descending (default), False for ascending.

        Returns:
            All job objects in a JobCollection, or alternatively the jobs info as JSON.
        """
        allowed_sorting_criteria = [
            "createdAt",
            "name",
            "id",
            "mode",
            "status",
            "startedAt",
            "finishedAt",
        ]
        if sortby not in allowed_sorting_criteria:
            raise ValueError(
                f"sortby parameter must be one of {allowed_sorting_criteria}!"
            )
        sort = f"{sortby},{'desc' if descending else 'asc'}"

        page = 0
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
        response_json = self.auth._request(request_type="GET", url=url)
        jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

        # API get jobs pagination exhaustion is indicated by empty next page (no last page flag)
        logging.getLogger("up42.utils").setLevel(logging.CRITICAL)
        while len(response_json["data"]) > 0 and len(jobs_json) < limit:
            page += 1
            url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
            response_json = self.auth._request(request_type="GET", url=url)
            if len(response_json["data"]) > 0:
                jobs_json.extend(
                    filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)
                )
        logging.getLogger("up42.utils").setLevel(logging.INFO)
        jobs_json = jobs_json[:limit]
        logger.info(
            f"Got {len(jobs_json)} jobs (limit parameter {limit}) in project {self.project_id}."
        )
        if return_json:
            return jobs_json
        else:
            jobs = [
                Job(
                    self.auth,
                    job_id=job_json["id"],
                    project_id=self.project_id,
                    job_info=job_json,
                )
                for job_json in jobs_json
            ]
            jobcollection = JobCollection(
                auth=self.auth, project_id=self.project_id, jobs=jobs
            )
            return jobcollection

    def get_project_settings(self) -> List[Dict[str, str]]:
        """
        Gets the project settings.

        Returns:
            The project settings.
        """
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
        response_json = self.auth._request(request_type="GET", url=url)
        project_settings = response_json["data"]
        return project_settings

    @property
    def max_concurrent_jobs(self) -> int:
        """
        Gets the maximum number of concurrent jobs allowed by the project settings.
        """
        project_settings = self.get_project_settings()
        project_settings_dict = {d["name"]: int(d["value"]) for d in project_settings}
        return project_settings_dict["MAX_CONCURRENT_JOBS"]

    def update_project_settings(
        self,
        max_aoi_size: Optional[int] = None,
        max_concurrent_jobs: Optional[int] = None,
        number_of_images: Optional[int] = None,
    ) -> None:
        """
        Updates a project's settings.

        Args:
            max_aoi_size: The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.
            max_concurrent_jobs: The maximum number of concurrent jobs, from 1-10, default 1.
            number_of_images: The maximum number of images returned with each job, from 1-20, default 10.
        """
        # The ranges and default values of the project settings can potentially get
        # increased, so need to be dynamically queried from the server.
        current_settings = {d["name"]: d for d in self.get_project_settings()}

        url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
        payload: Dict = {"settings": {}}
        desired_settings = {
            "JOB_QUERY_MAX_AOI_SIZE": max_aoi_size,
            "MAX_CONCURRENT_JOBS": max_concurrent_jobs,
            "JOB_QUERY_LIMIT_PARAMETER_MAX_VALUE": number_of_images,
        }

        for name, desired_setting in desired_settings.items():
            if desired_setting is None:
                payload["settings"][name] = str(current_settings[name]["value"])
            else:
                payload["settings"][name] = str(desired_setting)

        self.auth._request(request_type="POST", url=url, data=payload)
        logger.info(f"Updated project settings: {payload}")

Attributes

info: dict property

Gets and updates the project metadata information.

max_concurrent_jobs: int property

Gets the maximum number of concurrent jobs allowed by the project settings.

Functions

create_workflow(name, description='', use_existing=False)

Creates a new workflow and returns a workflow object.

Parameters:

Name Type Description Default
name str

Name of the new workflow.

required
description str

Description of the new workflow.

''
use_existing bool

If True, instead of creating a new workflow, uses the most recent workflow with the same name & description.

False

Returns:

Type Description
Workflow

The workflow object.

Source code in up42/project.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def create_workflow(
    self, name: str, description: str = "", use_existing: bool = False
) -> "Workflow":
    """
    Creates a new workflow and returns a workflow object.

    Args:
        name: Name of the new workflow.
        description: Description of the new workflow.
        use_existing: If True, instead of creating a new workflow, uses the
            most recent workflow with the same name & description.

    Returns:
        The workflow object.
    """
    if use_existing:
        logger.info("Getting existing workflows in project ...")
        logging.getLogger("up42.workflow").setLevel(logging.CRITICAL)
        existing_workflows: list = self.get_workflows(return_json=True)
        logging.getLogger("up42.workflow").setLevel(logging.INFO)

        matching_workflows: list = [
            workflow
            for workflow in existing_workflows
            if workflow["name"] == name and workflow["description"] == description
        ]
        if matching_workflows:
            existing_workflow = Workflow(
                self.auth,
                project_id=self.project_id,
                workflow_id=matching_workflows[0]["id"],
            )
            logger.info(
                f"Using existing workflow: {name} - {existing_workflow.workflow_id}"
            )
            return existing_workflow

    url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows/"
    payload = {"name": name, "description": description}
    response_json = self.auth._request(request_type="POST", url=url, data=payload)
    workflow_id = response_json["data"]["id"]
    logger.info(f"Created new workflow: {workflow_id}")
    workflow = Workflow(
        self.auth, project_id=self.project_id, workflow_id=workflow_id
    )
    return workflow

get_jobs(return_json=False, test_jobs=True, real_jobs=True, limit=500, sortby='createdAt', descending=True)

Get all jobs in the project as a JobCollection or JSON.

Use Workflow().get_job() to get a JobCollection with jobs associated with a specific workflow.

Parameters:

Name Type Description Default
return_json bool

If true, returns the job info JSONs instead of JobCollection.

False
test_jobs bool

Return test jobs or test queries.

True
real_jobs bool

Return real jobs.

True
limit int

Only return n first jobs by sorting criteria and order, default 500.

500
sortby str

The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".

'createdAt'
descending bool

The sorting order, True for descending (default), False for ascending.

True

Returns:

Type Description
Union[JobCollection, List[dict]]

All job objects in a JobCollection, or alternatively the jobs info as JSON.

Source code in up42/project.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def get_jobs(
    self,
    return_json: bool = False,
    test_jobs: bool = True,
    real_jobs: bool = True,
    limit: int = 500,
    sortby: str = "createdAt",
    descending: bool = True,
) -> Union[JobCollection, List[dict]]:
    """
    Get all jobs in the project as a JobCollection or JSON.

    Use Workflow().get_job() to get a JobCollection with jobs associated with a
    specific workflow.

    Args:
        return_json: If true, returns the job info JSONs instead of JobCollection.
        test_jobs: Return test jobs or test queries.
        real_jobs: Return real jobs.
        limit: Only return n first jobs by sorting criteria and order, default 500.
        sortby: The sorting criteria, one of "createdAt", "name", "id", "mode", "status", "startedAt", "finishedAt".
        descending: The sorting order, True for descending (default), False for ascending.

    Returns:
        All job objects in a JobCollection, or alternatively the jobs info as JSON.
    """
    allowed_sorting_criteria = [
        "createdAt",
        "name",
        "id",
        "mode",
        "status",
        "startedAt",
        "finishedAt",
    ]
    if sortby not in allowed_sorting_criteria:
        raise ValueError(
            f"sortby parameter must be one of {allowed_sorting_criteria}!"
        )
    sort = f"{sortby},{'desc' if descending else 'asc'}"

    page = 0
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
    response_json = self.auth._request(request_type="GET", url=url)
    jobs_json = filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)

    # API get jobs pagination exhaustion is indicated by empty next page (no last page flag)
    logging.getLogger("up42.utils").setLevel(logging.CRITICAL)
    while len(response_json["data"]) > 0 and len(jobs_json) < limit:
        page += 1
        url = f"{self.auth._endpoint()}/projects/{self.project_id}/jobs?page={page}&sort={sort}"
        response_json = self.auth._request(request_type="GET", url=url)
        if len(response_json["data"]) > 0:
            jobs_json.extend(
                filter_jobs_on_mode(response_json["data"], test_jobs, real_jobs)
            )
    logging.getLogger("up42.utils").setLevel(logging.INFO)
    jobs_json = jobs_json[:limit]
    logger.info(
        f"Got {len(jobs_json)} jobs (limit parameter {limit}) in project {self.project_id}."
    )
    if return_json:
        return jobs_json
    else:
        jobs = [
            Job(
                self.auth,
                job_id=job_json["id"],
                project_id=self.project_id,
                job_info=job_json,
            )
            for job_json in jobs_json
        ]
        jobcollection = JobCollection(
            auth=self.auth, project_id=self.project_id, jobs=jobs
        )
        return jobcollection

get_project_settings()

Gets the project settings.

Returns:

Type Description
List[Dict[str, str]]

The project settings.

Source code in up42/project.py
211
212
213
214
215
216
217
218
219
220
221
def get_project_settings(self) -> List[Dict[str, str]]:
    """
    Gets the project settings.

    Returns:
        The project settings.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
    response_json = self.auth._request(request_type="GET", url=url)
    project_settings = response_json["data"]
    return project_settings

get_workflows(return_json=False)

Gets all workflows in a project as workflow objects or JSON.

Parameters:

Name Type Description Default
return_json bool

True returns infos of workflows as JSON instead of workflow objects.

False

Returns:

Type Description
Union[List[Workflow], List[dict]]

List of Workflow objects in the project or alternatively JSON info of the workflows.

Source code in up42/project.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_workflows(
    self, return_json: bool = False
) -> Union[List["Workflow"], List[dict]]:
    """
    Gets all workflows in a project as workflow objects or JSON.

    Args:
        return_json: True returns infos of workflows as JSON instead of workflow objects.

    Returns:
        List of Workflow objects in the project or alternatively JSON info of the workflows.
    """
    url = f"{self.auth._endpoint()}/projects/{self.project_id}/workflows"
    response_json = self.auth._request(request_type="GET", url=url)
    workflows_json = response_json["data"]
    logger.info(
        f"Got {len(workflows_json)} workflows for project {self.project_id}."
    )

    if return_json:
        return workflows_json
    else:
        workflows = [
            Workflow(
                self.auth,
                project_id=self.project_id,
                workflow_id=workflow_json["id"],
                workflow_info=workflow_json,
            )
            for workflow_json in workflows_json
        ]
        return workflows

update_project_settings(max_aoi_size=None, max_concurrent_jobs=None, number_of_images=None)

Updates a project's settings.

Parameters:

Name Type Description Default
max_aoi_size Optional[int]

The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.

None
max_concurrent_jobs Optional[int]

The maximum number of concurrent jobs, from 1-10, default 1.

None
number_of_images Optional[int]

The maximum number of images returned with each job, from 1-20, default 10.

None
Source code in up42/project.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def update_project_settings(
    self,
    max_aoi_size: Optional[int] = None,
    max_concurrent_jobs: Optional[int] = None,
    number_of_images: Optional[int] = None,
) -> None:
    """
    Updates a project's settings.

    Args:
        max_aoi_size: The maximum area of interest geometry size, from 1-1000 sqkm, default 10 sqkm.
        max_concurrent_jobs: The maximum number of concurrent jobs, from 1-10, default 1.
        number_of_images: The maximum number of images returned with each job, from 1-20, default 10.
    """
    # The ranges and default values of the project settings can potentially get
    # increased, so need to be dynamically queried from the server.
    current_settings = {d["name"]: d for d in self.get_project_settings()}

    url = f"{self.auth._endpoint()}/projects/{self.project_id}/settings"
    payload: Dict = {"settings": {}}
    desired_settings = {
        "JOB_QUERY_MAX_AOI_SIZE": max_aoi_size,
        "MAX_CONCURRENT_JOBS": max_concurrent_jobs,
        "JOB_QUERY_LIMIT_PARAMETER_MAX_VALUE": number_of_images,
    }

    for name, desired_setting in desired_settings.items():
        if desired_setting is None:
            payload["settings"][name] = str(current_settings[name]["value"])
        else:
            payload["settings"][name] = str(desired_setting)

    self.auth._request(request_type="POST", url=url, data=payload)
    logger.info(f"Updated project settings: {payload}")