Skip to content

Commit bf7f451

Browse files
refactor(attribute-completeness): query trino
1 parent 737e42e commit bf7f451

File tree

3 files changed

+56
-135
lines changed

3 files changed

+56
-135
lines changed

ohsome_quality_api/indicators/attribute_completeness/indicator.py

+34-134
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1+
import json
12
import logging
2-
import time
3+
import os
34
from string import Template
45

56
import plotly.graph_objects as go
6-
import requests
77
from geojson import Feature
8-
from shapely import to_wkt
9-
from shapely.geometry import shape
108

119
from ohsome_quality_api.attributes.definitions import (
1210
build_attribute_filter,
1311
get_attribute,
1412
)
1513
from ohsome_quality_api.indicators.base import BaseIndicator
1614
from ohsome_quality_api.topics.models import BaseTopic as Topic
15+
from ohsome_quality_api.trino import client as trino_client
16+
17+
WORKING_DIR = os.path.dirname(os.path.abspath(__file__))
1718

1819

1920
class AttributeCompleteness(BaseIndicator):
@@ -76,142 +77,41 @@ def __init__(
7677
)
7778

7879
async def preprocess(self) -> None:
79-
80-
TRINO_HOST = ""
81-
TRINO_PORT =
82-
TRINO_USER = ""
83-
TRINO_CATALOG = ""
84-
TRINO_SCHEMA = ""
85-
86-
URL = f"http://{TRINO_HOST}:{TRINO_PORT}/v1/statement"
87-
88-
HEADERS = {
89-
"X-Trino-User": TRINO_USER,
90-
"X-Trino-Catalog": TRINO_CATALOG,
91-
"X-Trino-Schema": TRINO_SCHEMA,
92-
}
93-
94-
AUTH = None
95-
96-
QUERY_TEMPLATE = """
97-
SELECT
98-
SUM(
99-
CASE
100-
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
101-
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
102-
END
103-
) AS total_road_length,
104-
105-
SUM(
106-
CASE
107-
WHEN element_at(tags, 'name') IS NULL THEN 0
108-
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
109-
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
110-
END
111-
) AS total_road_length_with_name,
112-
113-
(
114-
SUM(
115-
CASE
116-
WHEN element_at(tags, 'name') IS NULL THEN 0
117-
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
118-
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
119-
END
120-
)
121-
/
122-
SUM(
123-
CASE
124-
WHEN ST_Within(ST_GeometryFromText(a.geometry), b.geometry) THEN length
125-
ELSE CAST(st_length(ST_Intersection(ST_GeometryFromText(a.geometry), b.geometry)) AS integer)
126-
END
127-
)
128-
) AS ratio
129-
130-
FROM contributions a, (VALUES {aoi_values}) AS b(id, geometry)
131-
WHERE 'herfort' != 'kwakye'
132-
AND status = 'latest'
133-
AND element_at(a.tags, 'highway') IS NOT NULL
134-
AND a.tags['highway'] IN (
135-
'motorway', 'trunk', 'motorway_link', 'trunk_link', 'primary', 'primary_link',
136-
'secondary', 'secondary_link', 'tertiary', 'tertiary_link', 'unclassified', 'residential'
137-
)
138-
AND (bbox.xmax >= 8.629761 AND bbox.xmin <= 8.742371)
139-
AND (bbox.ymax >= 49.379556 AND bbox.ymin <= 49.437890)
140-
AND ST_Intersects(ST_GeometryFromText(a.geometry), b.geometry)
141-
GROUP BY b.id
80+
filter_1 = """
81+
element_at (contributions.tags, 'highway') IS NOT NULL
82+
AND contributions.tags['highway'] IN ('motorway', 'trunk',
83+
'motorway_link', 'trunk_link', 'primary', 'primary_link', 'secondary',
84+
'secondary_link', 'tertiary', 'tertiary_link', 'unclassified',
85+
'residential')
86+
"""
87+
filter_2 = """
88+
element_at (contributions.tags, 'highway') IS NOT NULL
89+
AND element_at (contributions.tags, 'name') IS NOT NULL
90+
AND contributions.tags['highway'] IN ('motorway', 'trunk',
91+
'motorway_link', 'trunk_link', 'primary', 'primary_link', 'secondary',
92+
'secondary_link', 'tertiary', 'tertiary_link', 'unclassified',
93+
'residential')
14294
"""
14395

144-
def extract_geometry(feature):
145-
geometry = feature.get("geometry")
146-
if not geometry:
147-
raise ValueError("Feature does not contain a geometry")
148-
geom_shape = shape(geometry)
149-
return to_wkt(geom_shape)
150-
151-
def format_aoi_values(geom_wkt):
152-
return f"('AOI', ST_GeometryFromText('{geom_wkt}'))"
153-
154-
def execute_query(query):
155-
try:
156-
response = requests.post(URL, data=query, headers=HEADERS, auth=AUTH)
157-
response.raise_for_status()
158-
return response.json()
159-
except requests.exceptions.RequestException as e:
160-
print(f"Error submitting query: {e}")
161-
return None
162-
163-
def poll_query(next_uri):
164-
"""Poll the query's nextUri until results are ready."""
165-
results = []
166-
while next_uri:
167-
try:
168-
response = requests.get(next_uri, headers=HEADERS, auth=AUTH)
169-
response.raise_for_status()
170-
data = response.json()
171-
172-
state = data["stats"]["state"]
173-
print(f"Query state: {state}")
174-
175-
if state == "FINISHED":
176-
if "data" in data:
177-
results.extend(data["data"])
178-
print("Query completed successfully!")
179-
break
180-
elif state in {"FAILED", "CANCELLED"}:
181-
print(f"Query failed or was cancelled: {data}")
182-
break
183-
184-
next_uri = data.get("nextUri")
185-
except requests.exceptions.RequestException as e:
186-
print(f"Error polling query: {e}")
187-
break
188-
time.sleep(1)
189-
190-
return results
191-
192-
193-
geom_wkt = extract_geometry(self.feature)
194-
195-
aoi_values = format_aoi_values(geom_wkt)
196-
197-
query = QUERY_TEMPLATE.format(aoi_values=aoi_values)
198-
199-
initial_response = execute_query(query)
200-
if not initial_response:
201-
return
202-
next_uri = initial_response.get("nextUri")
203-
if not next_uri:
204-
print("No nextUri found. Query might have failed immediately.")
205-
print(initial_response)
206-
return
207-
208-
response = poll_query(next_uri)
96+
file_path = os.path.join(WORKING_DIR, "query.sql")
97+
with open(file_path, "r") as file:
98+
template = file.read()
99+
sql = template.format(
100+
filter=filter_1, geometry=json.dumps(self.feature["geometry"])
101+
)
102+
query = await trino_client.query(sql)
103+
results = await trino_client.fetch(query)
104+
self.absolute_value_1 = results[0][0]
209105

106+
sql = template.format(
107+
filter=filter_2, geometry=json.dumps(self.feature["geometry"])
108+
)
109+
query = await trino_client.query(sql)
110+
results = await trino_client.fetch(query)
111+
self.absolute_value_2 = results[0][0]
210112

211113
# timestamp = response["ratioResult"][0]["timestamp"]
212114
# self.result.timestamp_osm = dateutil.parser.isoparse(timestamp)
213-
self.absolute_value_1 = response[0][0]
214-
self.absolute_value_2 = response[0][1]
215115
self.result.value = self.absolute_value_2 / self.absolute_value_1
216116

217117
def calculate(self) -> None:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
WITH bpoly AS (
2+
SELECT
3+
to_geometry (from_geojson_geometry ('{geometry}')) AS geometry
4+
)
5+
SELECT
6+
Sum(
7+
CASE WHEN ST_Within (ST_GeometryFromText (contributions.geometry), bpoly.geometry) THEN
8+
length
9+
ELSE
10+
Cast(ST_Length (ST_Intersection (ST_GeometryFromText
11+
(contributions.geometry), bpoly.geometry)) AS integer)
12+
END) AS length
13+
FROM
14+
bpoly,
15+
sotm2024_iceberg.geo_sort.contributions AS contributions
16+
WHERE
17+
status = 'latest'
18+
AND ST_Intersects (bpoly.geometry, ST_GeometryFromText (contributions.geometry))
19+
AND {filter}
20+
AND (bbox.xmax >= 8.629761 AND bbox.xmin <= 8.742371)
21+
AND (bbox.ymax >= 49.379556 AND bbox.ymin <= 49.437890)

tests/integrationtests/indicators/test_attribute_completeness.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
class TestPreprocess:
23-
@oqapi_vcr.use_cassette
23+
# @oqapi_vcr.use_cassette
2424
def test_preprocess_attribute_keys_single(
2525
self, topic_building_count, feature_germany_heidelberg, attribute_key
2626
):

0 commit comments

Comments
 (0)