# -*- coding: utf-8 -*-
""" CLI to process imagery products to tiles and index in database
"""
from collections import defaultdict
import concurrent.futures
import logging
import os
import click
import six
# TODO: hide many of these imports to improve CLI startup speed
from . import cliutils, options
from .. import multiprocess, products
from .._util import decompress_to, include_bands, mkdir_p
from ..errors import FillValueException
from ..geoutils import reproject_as_needed, reproject_bounds
from ..stores import destination_path, STORAGE_TYPES
[docs]def ingest_source(config, source, overwrite, log_name):
""" Ingest (tile and index) a source
Table entries for indexing are created and returned by this function so
that database writes can be performed in parent process/context.
"""
mlogger = multiprocess.get_logger_multiproc(name=os.path.basename(source),
filename=log_name)
echoer = cliutils.Echoer(logger=mlogger)
spec, storage_name, database, cube, dataset = (
cliutils.config_to_resources(config))
echoer.info('Decompressing: {}'.format(os.path.basename(source)))
with decompress_to(source) as tmpdir:
# Find product and get dataset database resource
product = products.registry.sniff_product_type(tmpdir)
collection_name = product.description
# Subset bands
product_config = config.get('products', {}).get(collection_name, {})
if not product_config:
echoer.warning('No inclusion filter specified for product. '
'Ingesting all bands in product.')
desired_bands = product.bands
else:
band_filter = product_config.copy().get('include_filter', {})
band_filter_regex = band_filter.pop('regex', False)
desired_bands = include_bands(product.bands, band_filter,
regex=band_filter_regex)
# Reprojection option
resampling = product_config.get('resampling', 'nearest')
# Retrieve bounding box in tilespec's CRS
bbox = reproject_bounds(product.bounds, 'EPSG:4326', spec.crs)
# Find tiles for product & IDs of these tiles in database
tiles = list(spec.bounds_to_tiles(bbox))
tiles_id = [
cube.ensure_tile(
collection_name, tile.horizontal, tile.vertical)
for tile in tiles
]
tiles_product = {
tile_id: database.get_product_by_name(
tile_id, product.timeseries_id)
for tile_id in tiles_id
}
indexed_products, indexed_bands = {}, defaultdict(list)
for band in desired_bands:
echoer.info('Reprojecting band: {}'.format(band))
with reproject_as_needed(band.src, spec, resampling) as src:
band.src = src
echoer.process('Tiling: {}'.format(band.long_name))
for tile, tile_id in zip(tiles, tiles_id):
db_product = tiles_product[tile_id]
if db_product:
# If product is in DB, check if we have bands to add
_band_names = [b.standard_name for b
in db_product.bands]
if band.standard_name in _band_names and not overwrite:
echoer.item('Already tiled -- skipping')
continue
else:
# Product not in DB -- need to create
db_product = database.create_product(product)
db_product.tile_id = tile_id
tiles_product[tile_id] = db_product
# Setup dataset store
path = destination_path(config, tile, product)
store_cls = STORAGE_TYPES[config['store']['name']]
store = store_cls(path, tile,
meta_options=config['store']['co'])
# Save and record path
try:
dst_path = store.store_variable(
product, band,
img_pattern=config['store']['tile_imgpattern'],
overwrite=overwrite)
except FillValueException:
# TODO: skip tile but complain
continue
band.path = dst_path
# Copy over metadata files
for md_name, md_file in six.iteritems(
product.metadata_files):
if md_file:
dst_path = store.store_file(product, md_file)
product.metadata_files[md_name] = dst_path
# Update index with new product/band entry
if db_product.id:
db_band = (
database.get_band_by_name(db_product.id,
band.standard_name)
or database.create_band(band)
)
else:
db_product = database.create_product(product)
db_product.tile_id = tile_id
db_band = database.create_band(band)
indexed_products[tile_id] = db_product
indexed_bands[tile_id].append(db_band)
# TODO: delete file if index went bad
echoer.item('Tiled band for tile {}'.format(
tile.str_format(config['store']['tile_dirpattern'])
))
# Make sure to close database connection
database.session.close()
return indexed_products, indexed_bands
@click.command(short_help='Ingest known products into tile dataset format')
@options.opt_multiprocess_method
@options.opt_multiprocess_njob
@click.option('--log_dir', 'log_dir',
type=click.Path(exists=False, dir_okay=True, writable=True,
resolve_path=True),
help='Log ingests to this directory (otherwise to stdout)')
@click.option('--overwrite', is_flag=True,
help='Overwriting existing tiled data')
@options.arg_sources
@click.pass_context
def ingest(ctx, sources, overwrite, log_dir, njob, executor):
config = options.fetch_config(ctx)
logger = logging.getLogger('tilez')
echoer = cliutils.Echoer(logger)
spec, storage_name, database, cube, dataset = (
cliutils.config_to_resources(config))
echoer.info('Ingesting {} products'.format(len(sources)))
if log_dir:
mkdir_p(log_dir)
product_ids, band_ids = [], []
futures = {
executor.submit(ingest_source, config, src, overwrite,
log_dir and os.path.join(
log_dir, os.path.basename(src) + '.log')): src
for src in sources
}
sources_indexed = 0
for future in concurrent.futures.as_completed(futures):
src = futures[future]
try:
indexed_products, indexed_bands = future.result()
for k in indexed_products:
prod = indexed_products[k]
with database.scope() as txn:
txn.merge(prod) if prod.id else txn.add(prod)
txn.flush()
for b in indexed_bands[k]:
b.product_id = prod.id
txn.merge(b) if b.id else txn.add(b)
product_ids.append(prod.id)
band_ids.extend([b.id for b in indexed_bands[k]])
except Exception as exc:
echoer.warning('Ingest of {} produced exception: {}'
.format(src, exc))
else:
echoer.item('Ingested: {} (product IDs: {})'
.format(src, [p.id for p in indexed_products.values()])
)
sources_indexed += 1
echoer.process('Indexed {nprod} products to {ntile} tiles of {nband} bands'
.format(nprod=sources_indexed,
ntile=len(set(product_ids)),
nband=len(band_ids)))