Source code for dsgrid.dataformat.dimmap

from collections import defaultdict
import os

import numpy as np
import pandas as pd

from dsgrid import DSGridError, DSGridNotImplemented
from dsgrid.dataformat.datatable import Datatable
from dsgrid.dataformat.enumeration import (
    SectorEnumeration, GeographyEnumeration, EndUseEnumeration, 
    EndUseEnumerationBase, FuelEnumeration, MultiFuelEndUseEnumeration, 
    SingleFuelEndUseEnumeration, TimeEnumeration,
    allenduses,allsectors,annual,census_divisions,census_regions,conus,
    conus_counties,conus_states,counties,daily2012,daytypes,enduses,
    enumdata_folder,fuel_types,hourly2012,loss_state_groups,seasons,sectors,
    sectors_subsectors,states,weekdays,weekly2012)


[docs]class DimensionMap(object): def __init__(self,from_enum,to_enum): self.from_enum = from_enum self.to_enum = to_enum
[docs] def map(self,from_id): """ Returns the appropriate to_id. """ return None
[docs] def scale_factor(self,from_id): return 1.0
def __repr__(self): return f"{self.__class__.__name__}({self.from_enum}, {self.to_enum})"
[docs]class TautologyMapping(DimensionMap): def __init__(self,from_to_enum): super().__init__(from_to_enum,from_to_enum)
[docs] def map(self,from_id): return from_id
[docs]class FullAggregationMap(DimensionMap): def __init__(self,from_enum,to_enum,exclude_list=[]): """ Parameters ---------- from_enum : dsgrid.dataformat.enumeration.Enumeration to_enum : dsgrid.dataformat.enumeration.Enumeration Class must correspond to the same dimension as from_enum, and the enumeration must have exactly one element exclude_list : list of from_enum.ids from_enum values that should be dropped from the aggregation """ super().__init__(from_enum,to_enum) if len(to_enum.ids) > 1: raise DSGridError("FullAggregationMaps are aggregates that may exclude " + "some items, but otherwise aggretate up to one quantity. " + "to_enum {} contains too many items.".format(repr(to_enum))) self.to_id = to_enum.ids[0] self.exclude_list = exclude_list for exclude_item in self.exclude_list: if exclude_item not in from_enum.ids: raise DSGridError("exclude_list must contain ids in from_enum " + "that are to be exluded from the overall aggregation. " "Found {} in exclude list, which is not in {}.".format(exclude_item,from_enum))
[docs] def map(self,from_id): if from_id in self.exclude_list: return None return self.to_id
[docs]class FilterToSubsetMap(DimensionMap): def __init__(self,from_enum,to_enum): """ Arguments: - to_enum (Enumeration) - should be a subset of from_enum """ super().__init__(from_enum,to_enum) for to_id in to_enum.ids: if not to_id in from_enum.ids: raise DSGridError("to_enum should be a subset of from_enum")
[docs] def map(self,from_id): if from_id in self.to_enum.ids: return from_id return None
[docs]class FilterToSingleFuelMap(DimensionMap): def __init__(self,from_enum,fuel_to_keep): assert isinstance(from_enum,MultiFuelEndUseEnumeration), "This map only applies to MultiFuelEndUseEnumerations" assert fuel_to_keep in from_enum.fuel_enum.ids, "{} is not a fuel_id in {}".format(fuel_to_keep,from_enum.fuel_enum) to_enum_name = from_enum.name + " ({})".format(from_enum.fuel_enum.get_name(fuel_to_keep)) ids = []; names = []; self._map = {} for i, id in enumerate(from_enum.ids): if id[1] == fuel_to_keep: ids.append(id[0]); names.append(from_enum._names[i]) self._map[id] = id[0] else: self._map[id] = None to_enum = SingleFuelEndUseEnumeration(to_enum_name, ids,names, fuel=from_enum.fuel_enum.get_name(fuel_to_keep), units=from_enum.fuel_enum.get_units(fuel_to_keep)) super().__init__(from_enum,to_enum)
[docs] def map(self,from_id): return self._map[from_id]
[docs]class ExplicitMap(DimensionMap): def __init__(self,from_enum,to_enum,dictmap): super().__init__(from_enum,to_enum) self._dictmap = defaultdict(lambda: None)
[docs] def map(self,from_id): return self._dictmap[from_id]
[docs] @classmethod def create_from_csv(cls,from_enum,to_enum,filepath): mapdata = pd.read_csv(filepath,dtype=str) return cls(from_enum,to_enum,cls._make_dictmap(mapdata))
@classmethod def _make_dictmap(cls,mapdata): pass
[docs]class ExplicitDisaggregation(ExplicitMap): def __init__(self,from_enum,to_enum,dictmap,scaling_datafile=None): """ If no scaling_datafile, scaling factors are assumed to be 1.0. """ super().__init__(from_enum,to_enum,dictmap) self._dictmap = defaultdict(lambda: []) for from_id, to_ids in dictmap.items(): if from_id not in self.from_enum.ids: raise DSGridError("Id {} is not in from_enum {}.".format(from_id,self.from_enum)) for to_id in to_ids: if to_id not in self.to_enum.ids: raise DSGridError("Id {} is not in to_enum {}.".format(to_id,self.to_enum)) self._dictmap[from_id] = to_ids # scaling_datafile must have to_enum as one of its dimensions if (scaling_datafile is not None) and (not scaling_datafile.contains(to_enum)): raise DSGridError("Datafile {} cannot be used to scale this map ".format(repr(scaling_datafile)) + "because it does not contain to_enum {}.".format(repr(to_enum))) self._scaling_datafile = scaling_datafile self._scaling_datatable = None @property def default_scaling(self): return self._scaling_datafile is None @property def scaling_datatable(self): assert not self.default_scaling if self._scaling_datatable is None: self._scaling_datatable = Datatable(self._scaling_datafile) return self._scaling_datatable
[docs] def get_scalings(self,to_ids): """ Return an array of scalings for to_ids. """ if self.default_scaling: return np.array([1.0 for to_id in to_ids]) if isinstance(self.to_enum,SectorEnumeration): temp = self.scaling_datatable[to_ids,:,:,:] temp = temp.groupby(level='sector').sum() elif isinstance(self.to_enum,GeographyEnumeration): temp = self.scaling_datatable[:,to_ids,:,:] temp = temp.groupby(level='geography').sum() elif isinstance(self.to_enum,EndUseEnumerationBase): temp = self.scaling_datatable[:,:,to_ids,:] temp = temp.groupby(level='enduse').sum() else: assert isinstance(self.to_enum,TimeEnumeration) temp = self.scaling_datatable[:,:,:,to_ids] temp = temp.groupby(level='time').sum() # fraction of from_id that should go to each to_id temp = temp / temp.sum() result = np.array([temp[to_id] for to_id in to_ids]) return result
[docs] @classmethod def create_from_csv(cls,from_enum,to_enum,filepath,scaling_datafile=None): mapdata = pd.read_csv(filepath,dtype=str) return cls(from_enum,to_enum,cls._make_dictmap(mapdata),scaling_datafile=scaling_datafile)
@classmethod def _make_dictmap(cls,mapdata): result = defaultdict(lambda: []) for from_id, to_id in zip(mapdata.from_id,mapdata.to_id): result[from_id].append(to_id) return result
[docs]class ExplicitAggregation(ExplicitMap): def __init__(self,from_enum,to_enum,dictmap): super().__init__(from_enum,to_enum,dictmap) for from_id, to_id in dictmap.items(): if from_id not in self.from_enum.ids: raise DSGridError("Id {} is not in from_enum {}.".format(from_id,self.from_enum)) if to_id not in self.to_enum.ids: raise DSGridError("Id {} is not in to_enum {}.".format(to_id,self.to_enum)) self._dictmap[from_id] = to_id @classmethod def _make_dictmap(cls,mapdata): result = {} from_fuel_enduse = ('from_fuel_id' in mapdata.columns) to_fuel_enduse = ('to_fuel_id' in mapdata.columns) for row in mapdata.itertuples(index=False): from_key = (row.from_id, row.from_fuel_id) if from_fuel_enduse else row.from_id to_key = (row.to_id, row.to_fuel_id) if to_fuel_enduse else row.to_id result[from_key] = to_key return result
[docs]class UnitConversionMap(DimensionMap): CONVERSION_FACTORS = { ('kWh','MWh'): 1.0E-3, ('MWh','GWh'): 1.0E-3, ('GWh','TWh'): 1.0E-3 } def __init__(self,from_enum,from_units,to_units): """ Convert from_units to to_units. Parameters ---------- from_enum : EndUseEnumerationBase from_units : list of str List of units in from_enum that are to be converted to_units : list of str List of units to convert to. Same length list as from_units. """ assert isinstance(from_enum,EndUseEnumerationBase), "Unit conversion applies to EndUseEnumerations" assert not isinstance(from_enum,EndUseEnumeration), "Old-style end-use enumerations do not include units information" assert len(from_units) == len(to_units), "Cannot convert {} to {} since they are of a different number".format(from_units,to_units) assert len(from_units) > 0, "from_units is empty. Nothing to do." self._scale_map = defaultdict(lambda: 1.0) if isinstance(from_enum,SingleFuelEndUseEnumeration): assert len(from_units) == 1 assert from_units[0] == from_enum._units to_enum_name = from_enum.name.replace(from_units[0],to_units[0]) to_enum_name = to_enum_name.replace(from_units[0].lower(),to_units[0].lower()) to_enum = SingleFuelEndUseEnumeration(to_enum_name, from_enum.ids, from_enum.names, fuel=from_enum._fuel, units=to_units[0]) self._scale_map = self.scaling_factor(from_units[0],to_units[0]) else: assert isinstance(from_enum,MultiFuelEndUseEnumeration) for from_unit in from_units: assert from_unit in from_enum.fuel_enum.units, "{} is not a unit in {!r}".format(from_unit,from_enum.fuel_enum) to_fuel_enum_units = [] for unit in from_enum.fuel_enum.units: if unit in from_units: to_fuel_enum_units.append(to_units[from_units.index(unit)]) else: to_fuel_enum_units.append(unit) to_fuel_enum = FuelEnumeration(from_enum.fuel_enum.name, from_enum.fuel_enum.ids, from_enum.fuel_enum.names, to_fuel_enum_units) to_enum = MultiFuelEndUseEnumeration(from_enum.name, from_enum._ids, from_enum._names, to_fuel_enum, from_enum._fuel_ids) for id in from_enum.ids: u = from_enum.units(id) if u in from_units: self._scale_map[id] = self.scaling_factor(u,to_units[from_units.index(u)]) super().__init__(from_enum,to_enum)
[docs] def map(self,from_id): # no change in enduse or fuel id return from_id
[docs] def scale_factor(self,from_id): if isinstance(self._scale_map,dict): return self._scale_map[from_id] return self._scale_map
[docs] @classmethod def scaling_factor(cls,from_unit,to_unit): key = (from_unit,to_unit) these_factors = cls._get_all_factors(to_unit, cls.CONVERSION_FACTORS) if key in these_factors: return these_factors[key] something_added = True while something_added: something_added = False to_expand = [(a_key, val) for a_key, val in these_factors.items()] for a_key, factor in to_expand: candidates = cls._get_all_factors(a_key[0], cls.CONVERSION_FACTORS, multiplier=factor) for b_key, val in candidates.items(): c_key = (b_key[0], to_unit) if c_key not in these_factors: these_factors[c_key] = val something_added = True if key in these_factors: return these_factors[key] raise DSGridNotImplemented("No conversion factor available to go from {} to {}.".format(from_unit,to_unit))
@classmethod def _get_all_factors(cls, to_unit, factors, multiplier = None): result = {} for units, factor in factors.items(): from_u, to_u = units # directly in factors? if to_u == to_unit: result[units] = factor # in factors backward? if from_u == to_unit: result[(to_u, from_u)] = 1.0 / factor if multiplier is not None: for key in result: result[key] *= multiplier return result
[docs]class Mappings(object): def __init__(self): self._mappings = defaultdict(lambda: None)
[docs] def add_mapping(self,mapping): self._mappings[(mapping.from_enum.name,mapping.to_enum.name)] = mapping
[docs] def get_mapping(self,datafile,to_enum): from_enum = None if isinstance(to_enum,SectorEnumeration): from_enum = datafile.sector_enum elif isinstance(to_enum,GeographyEnumeration): from_enum = datafile.geo_enum elif isinstance(to_enum,EndUseEnumerationBase): from_enum = datafile.enduse_enum elif isinstance(to_enum,TimeEnumeration): from_enum = datafile.time_enum else: raise DSGridError("to_enum {} is not a recognized enumeration type.".format(repr(to_enum))) key = (from_enum.name,to_enum.name) if key in self._mappings: return self._mappings[key] # No immediate match # Is the requested mapping a tautology? if from_enum == to_enum: return TautologyMapping(to_enum) if from_enum.is_subset(to_enum): return TautologyMapping(to_enum) # Are elements in from_enum a subset of a stored mapping.from_enum? candidates = [mapping for key, mapping in self._mappings.items() if key[1] == to_enum.name] for candidate in candidates: okay = True for from_id in from_enum.ids: if from_id not in candidate.from_enum.ids: okay = False break if okay: return candidate return None
mappings = Mappings() # key geography mappings.add_mapping(ExplicitAggregation.create_from_csv(counties,states,os.path.join(enumdata_folder,'counties_to_states.csv'))) conus_states_list = pd.read_csv(os.path.join(enumdata_folder,'conus_to_states.csv'),dtype=str)['to_id'].tolist() mappings.add_mapping(FullAggregationMap(states,conus,exclude_list=[state_id for state_id in states.ids if state_id not in conus_states_list])) # full aggregations mappings.add_mapping(FullAggregationMap(census_regions,conus)) mappings.add_mapping(FullAggregationMap(hourly2012,annual)) mappings.add_mapping(FullAggregationMap(daily2012,annual)) mappings.add_mapping(FullAggregationMap(weekly2012,annual)) mappings.add_mapping(FullAggregationMap(seasons,annual)) mappings.add_mapping(FullAggregationMap(sectors,allsectors)) mappings.add_mapping(FullAggregationMap(sectors_subsectors,allsectors)) mappings.add_mapping(FullAggregationMap(enduses,allenduses)) # filter down to conus mappings.add_mapping(FilterToSubsetMap(states,conus_states)) mappings.add_mapping(FilterToSubsetMap(counties,conus_counties)) # then go back to vanilla enumerations mappings.add_mapping(ExplicitAggregation.create_from_csv(conus_states,states,os.path.join(enumdata_folder,'conus_states_to_states.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(conus_counties,counties,os.path.join(enumdata_folder,'conus_counties_to_counties.csv'))) # explicit aggregations mappings.add_mapping(ExplicitAggregation.create_from_csv(hourly2012,daily2012,os.path.join(enumdata_folder,'hourly2012_to_daily2012.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(hourly2012,weekly2012,os.path.join(enumdata_folder,'hourly2012_to_weekly2012.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(hourly2012,seasons,os.path.join(enumdata_folder,'hourly2012_to_seasons.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(daily2012,weekly2012,os.path.join(enumdata_folder,'daily2012_to_weekly2012.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(daily2012,seasons,os.path.join(enumdata_folder,'daily2012_to_seasons.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(enduses,fuel_types,os.path.join(enumdata_folder,'enduses_to_fuel_types.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(states,loss_state_groups,os.path.join(enumdata_folder,'states_to_loss_state_groups.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(states,census_divisions,os.path.join(enumdata_folder,'states_to_census_divisions.csv'))) mappings.add_mapping(ExplicitAggregation.create_from_csv(census_divisions,census_regions,os.path.join(enumdata_folder,'census_divisions_to_census_regions.csv')))