Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 137 additions & 1 deletion public/common/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def get_status(job_id):
def s3_upload(path, s3_path=None):
import s3fs
fs = s3fs.S3FileSystem()
if not s3_path:
if not s3_path or s3_path=='s3':
s3_path = s3_tmp_path(path, folder=None)
fs.put(path, s3_path)
return s3_path
Expand Down Expand Up @@ -3813,3 +3813,139 @@ def udf_to_json(udf):
except:
udf_nail_json = func_to_udf(udf).json()
return udf_nail_json


def to_pmtiles(gdf, zooms: list, output_path=None, args='--force -r1', verbose=True):
import tempfile
geojson_path = tempfile.NamedTemporaryFile(suffix=".geojson", delete=False).name
if output_path is None:
upload_path=None
output_path = geojson_path.replace('geojson','pmtiles')
elif output_path.startswith('s3'):
upload_path=output_path
output_path = geojson_path.replace('geojson','pmtiles')
else: #todo: add GCS option
import os
os.makedirs(os.path.dirname(output_path), exist_ok=True)
upload_path=None
gdf.to_file(geojson_path, driver="GeoJSON")
if verbose: print(f"{geojson_path} done!")
cmd = f"tippecanoe -o {output_path} {args} -Z{min(zooms)} -z{max(zooms)} {geojson_path}"
result = run_cmd(cmd, communicate=True)
if verbose: print(result[1][:100], result[1][-100:])
if upload_path:
return s3_upload(output_path, s3_path=upload_path)
else:
return output_path

def pmtile_metadata(pmtile_path: str, key: str = "description"):
import pandas as pd
import json
from shapely.geometry import Point
from pmtiles.reader import deserialize_header
import fsspec, json, gzip

with fsspec.filesystem("s3").open(pmtile_path, "rb") as f:
header_bytes = f.read(127)
header = deserialize_header(header_bytes)

f.seek(header["metadata_offset"])
metadata_bytes = f.read(header["metadata_length"])

metadata_str = (
gzip.decompress(metadata_bytes).decode("utf-8")
if metadata_bytes[0:2] == b"\x1f\x8b"
else metadata_bytes.decode("utf-8")
)
metadata = json.loads(metadata_str)
result_text = metadata.get(key) or (json.dumps(metadata, indent=2) if isinstance(metadata, dict) else str(metadata))
return result_text


def read_pmtile(bounds, pmtile_path: str):
x, y, z = get_tiles(bounds)[["x", "y", "z"]].iloc[0]
from pmtiles.reader import Reader
from shapely.geometry import Point
import geopandas as gpd

from pmtiles.reader import deserialize_header
import fsspec, json, gzip

with fsspec.filesystem("s3").open(pmtile_path, "rb") as f:
file_data = f.read()

header = deserialize_header(file_data[0:127])
metadata_bytes = file_data[header["metadata_offset"] : header["metadata_offset"] + header["metadata_length"]]
metadata_str = (
gzip.decompress(metadata_bytes).decode("utf-8")
if metadata_bytes[0:2] == b"\x1f\x8b"
else metadata_bytes.decode("utf-8")
)
metadata = json.loads(metadata_str)

if not (header.get("min_zoom", 0) <= z <= header.get("max_zoom", 18)):
return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326")
reader = Reader(lambda offset, length: file_data[offset : offset + length])
tile_data=None
for method_name in ["get_tile", "get", "getZxy", "get_zxy"]:
if hasattr(reader, method_name):
try:
tile_data = getattr(reader, method_name)(z, x, y)
except:
continue

if not tile_data:
print('no data was returned')
return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326")

if header.get("tile_type", 0) == 1 or metadata.get("type") == "overlay":
try:
features = _decode_mvt_tile(tile_data, z, x, y)
import geopandas as gpd
from shapely.geometry import shape

if not features:
print('no data was returned')
return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326")

geometries = [shape(f["geometry"]) for f in features]
properties = [{k: v for k, v in f.items() if k != "geometry"} for f in features]
return gpd.GeoDataFrame(properties, geometry=geometries, crs="EPSG:3857").to_crs("EPSG:4326")
except Exception as e:
print(f"Error decoding MVT: {e}")

return gpd.GeoDataFrame(
{"tile_type": ["raster"], "size_bytes": [len(tile_data)]}, geometry=[Point(0, 0)], crs="EPSG:4326"
)


def _transform_pmtile_geometry(geom, z, x, y, tile_extent=4096):
"""Convert tile-local coordinates to EPSG:3857 (Web Mercator)."""
import mercantile

b = mercantile.xy_bounds(x, y, z)
sx, sy = (b.right - b.left) / tile_extent, (b.top - b.bottom) / tile_extent

def transform(coords):
if isinstance(coords[0], (int, float)):
return [coords[0] * sx + b.left, (tile_extent - coords[1]) * sy + b.bottom]
return [transform(c) for c in coords]

geom["coordinates"] = transform(geom["coordinates"])
return geom

def _decode_mvt_tile(tile_data, z, x, y):
from mapbox_vector_tile import decode as mvt_decode
import gzip

if len(tile_data) >= 2 and tile_data[0:2] == b"\x1f\x8b":
tile_data = gzip.decompress(tile_data)

features = []
for layer_name, layer_data in mvt_decode(tile_data, y_coord_down=True).items():
for feature in layer_data["features"]:
props = feature["properties"].copy()
props["_layer"] = layer_name
features.append({"geometry": _transform_pmtile_geometry(feature["geometry"], z, x, y), **props})
return features