Source code for dsgrid.dataformat.upgrade

from collections import OrderedDict
from distutils.version import StrictVersion
import logging

import h5py
import numpy as np

from dsgrid import __version__ as VERSION
from dsgrid.dataformat.datafile import Datafile
from dsgrid.dataformat.enumeration import (SectorEnumeration,
    GeographyEnumeration, EndUseEnumeration, EndUseEnumerationBase, 
    SingleFuelEndUseEnumeration, TimeEnumeration)
from dsgrid.dataformat.sectordataset import Datamap, SectorDataset, NULL_IDX

logger = logging.getLogger(__name__)

[docs]class UpgradeDatafile(object): from_version = None to_version = None
[docs] @classmethod def upgrade(cls,datafile,f): assert StrictVersion(datafile.version) == StrictVersion(cls.from_version) cls._transform(datafile,f) f.attrs["dsgrid"] = cls.to_version datafile.version = cls.to_version return datafile
@classmethod def _transform(cls,datafile,f): pass
[docs] @classmethod def load_datafile(cls,filepath): """ Load enough to return a Datafile object. Object should not be expected to be fully functional. Parameters ---------- filepath : str path to Datafile Returns ------- dsgrid.dataformat.datafile.Datafile (partially) loaded Datafile in old format """ with h5py.File(filepath, "r") as f: enum_group = f["enumerations"] result = Datafile(filepath, SectorEnumeration.load(enum_group), GeographyEnumeration.load(enum_group), EndUseEnumerationBase.load(enum_group), TimeEnumeration.load(enum_group), loading=True, version=cls.from_version) for sector_id, sector_dataset in SectorDataset.loadall(result,f,_upgrade_class=cls): result.sectordata[sector_id] = sector_dataset return result
[docs] @classmethod def load_sectordataset(cls,datafile,f,sector_id): """ Load enough to return a SectorDataset object. Object should not be expected to be fully functional. """ pass
[docs]class DSG_0_1_0(UpgradeDatafile): from_version = '0.1.0' to_version = '0.2.0' ZERO_IDX = 65535 @classmethod def _transform(cls,datafile,f): # from v0.1.0 to v0.2.0 # - f['data'][sector_id] is no longer a h5py.Dataset with attributes # - f['data'][sector_id] is now a h5py.Group containing # - f['data'][sector_id]['data'] # - f['data'][sector_id]['geographies'] # - f['data'][sector_id]['enduses'] # - f['data'][sector_id]['times'] for sector_id, sectordataset in datafile.sectordata.items(): tmp_name = sector_id + '_temp' f['data'][tmp_name] = f['data'][sector_id] orig_dset = f['data'][tmp_name] del f['data'][sector_id] dgroup = f['data'].create_group(sector_id) dgroup['geographies'] = Datamap.create(datafile.geo_enum,[]).value try: geo_map = orig_dset.attrs['geo_mappings'].astype("u4") except: geo_map = f['geo_mappings'][()].astype("u4") if np.any(geo_map == cls.ZERO_IDX): null_pos = (geo_map == cls.ZERO_IDX) geo_map[null_pos] = NULL_IDX dgroup['geographies'][:,'idx'] = geo_map try: scalings = orig_dset.attrs['geo_scalings'] except: # Handle Joe's format scalings = f['geo_scalings'][()] dgroup['geographies'][:,'scale'] = scalings # already loaded these sub-enums as part of the backward compatible # load process dgroup['enduses'] = Datamap.create(datafile.enduse_enum,sectordataset.enduses).value dgroup['times'] = Datamap.create(datafile.time_enum,sectordataset.times).value for attr_name in ['geo_mappings', 'geo_scalings', 'enduse_mappings', 'time_mappings']: if attr_name in orig_dset.attrs: del orig_dset.attrs[attr_name] assert len(orig_dset.attrs) == 0, "There are attrs left in orig_dset: {}".format(orig_dset.attrs) dgroup['data'] = orig_dset del f['data'][tmp_name]
[docs] @classmethod def load_sectordataset(cls,datafile,f,sector_id): dset = f["data/" + sector_id] enduses = list(datafile.enduse_enum.ids) times = np.array(datafile.time_enum.ids) enduses = [enduses[i] for i in dset.attrs['enduse_mappings'][:]] times = [times[i] for i in dset.attrs['time_mappings'][:]] result = SectorDataset(datafile,sector_id,enduses,times) try: geo_mappings = dset.attrs["geo_mappings"] except: # handles Joe's files geo_mappings = f["geo_mappings"] geo_ptrs = [x for x in geo_mappings if not (x == cls.ZERO_IDX)] result.n_geos = len(set(geo_ptrs)) return result
OLD_VERSIONS = OrderedDict() OLD_VERSIONS[DSG_0_1_0.from_version] = DSG_0_1_0
[docs]def make_fuel_and_units_explicit(datafile, filepath, fuel='Electricity', units='MWh'): old_enduse_enum = datafile.enduse_enum assert isinstance(old_enduse_enum,EndUseEnumeration), "This upgrade method is for datafiles with old-style EndUseEnumerations" new_enduse_enum = SingleFuelEndUseEnumeration( old_enduse_enum.name, old_enduse_enum.ids, old_enduse_enum.names, fuel=fuel, units=units) result = Datafile(filepath, datafile.sector_enum, datafile.geo_enum, new_enduse_enum, datafile.time_enum) for sector_id in datafile: old_sector = datafile[sector_id] new_sector = result.add_sector(sector_id,enduses=old_sector.enduses,times=old_sector.times) batch_dataframes = []; batch_geo_ids = []; batch_scalings = [] for i in range(old_sector.n_geos): df, geo_ids, scalings = old_sector.get_data(i) batch_dataframes.append(df) batch_geo_ids.append(geo_ids) batch_scalings.append(scalings) new_sector.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False) return result