# -*- coding: utf-8 -*-
""" Utilities
"""
import logging
import os
from contextlib import contextmanager
import affine
from osgeo import osr
import rasterio
import shapely
import shapely.geometry
from rasterio import warp
from .core import BoundingBox
logger = logging.getLogger('tilezilla')
osr.UseExceptions()
[docs]def match_to_grid(match, grid, pix_size):
""" Return new postings for input that coalign with grid
Args:
match (iterable): X or Y coordinates of input extent to be repositioned
grid_xy (iterable): X or Y coordinates of grid extent to match
pix_size (iterable): X or Y pixel sizes of data to match
Returns:
tuple: new X/Y coordinate of matched input
"""
new_coords = []
for _grid, _match, ps in zip(grid, match, pix_size):
offset = int(round((_grid - _match) / ps))
new_coords.append(_grid - offset * ps)
return new_coords
[docs]def intersects_bounds(a_bounds, b_bounds):
""" Return True/False if a intersects b
Args:
a_bounds (iterable): bounds of a (left bottom right top)
b_bounds (iterable): bounds of b (left bottom right top)
Returns:
bool: True/False if a intersects b
"""
a = bounds_to_polygon(a_bounds)
b = bounds_to_polygon(b_bounds)
return a.intersects(b)
[docs]def bounds_to_polygon(bounds):
""" Returns Shapely polygon of bounds
Args:
bounds (iterable): bounds (left bottom right top)
Returns:
shapely.geometry.Polygon: polygon of bounds
"""
return shapely.geometry.Polygon([
(bounds[0], bounds[1]),
(bounds[2], bounds[1]),
(bounds[2], bounds[3]),
(bounds[0], bounds[3])
])
[docs]def reproject_bounds(bounds, src_crs, dst_crs):
""" Return bounds reprojected to `dst_crs`
Args:
BoundingBox: Bounding box in `src_crs`
src_crs (str or dict): The coordinate reference system, interpretable
by rasterio
dst_crs (str or dict): The coordinate reference system, interpretable
by rasterio
Returns:
BoundingBox: Bounding box in `dst_crs`
"""
if src_crs != dst_crs:
return BoundingBox(*warp.transform_bounds(src_crs, dst_crs, *bounds))
else:
return bounds
@contextmanager
[docs]def reproject_as_needed(src, tilespec, resampling='nearest'):
""" Return a ``rasterio`` dataset, reprojected if needed
Returns src dataset if reprojection unncessary. Otherwise returns an in
memory ``rasterio`` dataset. Reprojection will snap the bounding
coordinates of the source dataset to align with the tile specification.
Args:
src (rasterio._io.RasterReader): rasterio raster dataset
tilespec (TileSpec): tile specification
resampling (str): reprojection resampling method (default: nearest)
Returns:
rasterio._io.RasterReader: original or reprojected dataset
"""
if src.crs == tilespec.crs:
yield src
else:
# Calculate new transform & size
transform, width, height = warp.calculate_default_transform(
src.crs, tilespec.crs,
src.width, src.height,
*src.bounds, resolution=tilespec.res)
# Snap bounds
transform = snap_transform(transform, tilespec.ul)
dst_meta = src.meta.copy()
dst_meta['driver'] = 'MEM'
dst_meta['crs'] = tilespec.crs
dst_meta['width'] = width
dst_meta['height'] = height
dst_meta['transform'] = transform
with rasterio.open(os.path.basename(src.name), 'w', **dst_meta) as dst:
warp.reproject(
rasterio.band(src, 1),
rasterio.band(dst, 1),
resampling=getattr(warp.Resampling, resampling)
)
yield dst