diff --git a/public/common/common.py b/public/common/common.py index 466acd2d..cea7210d 100644 --- a/public/common/common.py +++ b/public/common/common.py @@ -222,7 +222,7 @@ def get_status(job_id): def s3_upload(path, s3_path=None): import s3fs fs = s3fs.S3FileSystem() - if not s3_path: + if not s3_path or s3_path=='s3': s3_path = s3_tmp_path(path, folder=None) fs.put(path, s3_path) return s3_path @@ -3813,3 +3813,139 @@ def udf_to_json(udf): except: udf_nail_json = func_to_udf(udf).json() return udf_nail_json + + +def to_pmtiles(gdf, zooms: list, output_path=None, args='--force -r1', verbose=True): + import tempfile + geojson_path = tempfile.NamedTemporaryFile(suffix=".geojson", delete=False).name + if output_path is None: + upload_path=None + output_path = geojson_path.replace('geojson','pmtiles') + elif output_path.startswith('s3'): + upload_path=output_path + output_path = geojson_path.replace('geojson','pmtiles') + else: #todo: add GCS option + import os + os.makedirs(os.path.dirname(output_path), exist_ok=True) + upload_path=None + gdf.to_file(geojson_path, driver="GeoJSON") + if verbose: print(f"{geojson_path} done!") + cmd = f"tippecanoe -o {output_path} {args} -Z{min(zooms)} -z{max(zooms)} {geojson_path}" + result = run_cmd(cmd, communicate=True) + if verbose: print(result[1][:100], result[1][-100:]) + if upload_path: + return s3_upload(output_path, s3_path=upload_path) + else: + return output_path + +def pmtile_metadata(pmtile_path: str, key: str = "description"): + import pandas as pd + import json + from shapely.geometry import Point + from pmtiles.reader import deserialize_header + import fsspec, json, gzip + + with fsspec.filesystem("s3").open(pmtile_path, "rb") as f: + header_bytes = f.read(127) + header = deserialize_header(header_bytes) + + f.seek(header["metadata_offset"]) + metadata_bytes = f.read(header["metadata_length"]) + + metadata_str = ( + gzip.decompress(metadata_bytes).decode("utf-8") + if metadata_bytes[0:2] == b"\x1f\x8b" + else metadata_bytes.decode("utf-8") + ) + metadata = json.loads(metadata_str) + result_text = metadata.get(key) or (json.dumps(metadata, indent=2) if isinstance(metadata, dict) else str(metadata)) + return result_text + + +def read_pmtile(bounds, pmtile_path: str): + x, y, z = get_tiles(bounds)[["x", "y", "z"]].iloc[0] + from pmtiles.reader import Reader + from shapely.geometry import Point + import geopandas as gpd + + from pmtiles.reader import deserialize_header + import fsspec, json, gzip + + with fsspec.filesystem("s3").open(pmtile_path, "rb") as f: + file_data = f.read() + + header = deserialize_header(file_data[0:127]) + metadata_bytes = file_data[header["metadata_offset"] : header["metadata_offset"] + header["metadata_length"]] + metadata_str = ( + gzip.decompress(metadata_bytes).decode("utf-8") + if metadata_bytes[0:2] == b"\x1f\x8b" + else metadata_bytes.decode("utf-8") + ) + metadata = json.loads(metadata_str) + + if not (header.get("min_zoom", 0) <= z <= header.get("max_zoom", 18)): + return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326") + reader = Reader(lambda offset, length: file_data[offset : offset + length]) + tile_data=None + for method_name in ["get_tile", "get", "getZxy", "get_zxy"]: + if hasattr(reader, method_name): + try: + tile_data = getattr(reader, method_name)(z, x, y) + except: + continue + + if not tile_data: + print('no data was returned') + return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326") + + if header.get("tile_type", 0) == 1 or metadata.get("type") == "overlay": + try: + features = _decode_mvt_tile(tile_data, z, x, y) + import geopandas as gpd + from shapely.geometry import shape + + if not features: + print('no data was returned') + return gpd.GeoDataFrame(geometry=[], crs="EPSG:4326") + + geometries = [shape(f["geometry"]) for f in features] + properties = [{k: v for k, v in f.items() if k != "geometry"} for f in features] + return gpd.GeoDataFrame(properties, geometry=geometries, crs="EPSG:3857").to_crs("EPSG:4326") + except Exception as e: + print(f"Error decoding MVT: {e}") + + return gpd.GeoDataFrame( + {"tile_type": ["raster"], "size_bytes": [len(tile_data)]}, geometry=[Point(0, 0)], crs="EPSG:4326" + ) + + +def _transform_pmtile_geometry(geom, z, x, y, tile_extent=4096): + """Convert tile-local coordinates to EPSG:3857 (Web Mercator).""" + import mercantile + + b = mercantile.xy_bounds(x, y, z) + sx, sy = (b.right - b.left) / tile_extent, (b.top - b.bottom) / tile_extent + + def transform(coords): + if isinstance(coords[0], (int, float)): + return [coords[0] * sx + b.left, (tile_extent - coords[1]) * sy + b.bottom] + return [transform(c) for c in coords] + + geom["coordinates"] = transform(geom["coordinates"]) + return geom + +def _decode_mvt_tile(tile_data, z, x, y): + from mapbox_vector_tile import decode as mvt_decode + import gzip + + if len(tile_data) >= 2 and tile_data[0:2] == b"\x1f\x8b": + tile_data = gzip.decompress(tile_data) + + features = [] + for layer_name, layer_data in mvt_decode(tile_data, y_coord_down=True).items(): + for feature in layer_data["features"]: + props = feature["properties"].copy() + props["_layer"] = layer_name + features.append({"geometry": _transform_pmtile_geometry(feature["geometry"], z, x, y), **props}) + return features +