Source code for dsgrid.dataformat.enumeration

import datetime as dt
import copy
from enum import Enum
import os
import logging
import pytz
import re

import numpy as np
import pandas as pd

from dsgrid import DSGridRuntimeError, DSGridValueError
from dsgrid.dataformat import ENCODING, get_str

logger = logging.getLogger(__name__)


[docs]class Enumeration(object): max_id_len = 64 max_name_len = 128 enum_dtype = np.dtype([ ("id", "S" + str(max_id_len)), ("name", "S" + str(max_name_len)) ]) dimension = None def __init__(self, name, ids, names): self.name = name self.ids = ids self.names = names self.checkvalues() return
[docs] def checkvalues(self): ids = list(self.ids); names = list(self.names) n_ids = len(ids); n_names = len(names) if n_ids != n_names: raise DSGridValueError("Number of ids (" + str(n_ids) + ") must match number of names (" + str(n_names) + ")") if len(set(ids)) != n_ids: raise DSGridValueError("Enumeration ids must be unique") if max(len(value) for value in ids) > self.max_id_len: raise DSGridValueError("Enumeration ids cannot exceed " + "{} characters".format(self.max_id_len)) if max(len(value) for value in names) > self.max_name_len: raise DSGridValueError("Enumeration names cannot exceed " + "{} characters".format(self.max_name_len))
def __eq__(self, other): return ( isinstance(other, self.__class__) and self.__dict__ == other.__dict__ ) def __len__(self): return len(list(self.ids)) def __repr__(self): return "%s(%r)" % (self.__class__, self.__dict__) def __str__(self): if len(self.ids) == 1: return (f"{self.__class__.__name__}({self.name}, [{self.ids[0]}], " f"[{self.names[0]}])") return (f"{self.__class__.__name__}({self.name}, [{self.ids[0]}, ...], " f"[{self.names[0]}, ...])")
[docs] def get_name(self,id): ind = list(self.ids).index(id) return self.names[ind]
[docs] def create_subset_enum(self,ids): """ Returns a new enumeration that is a subset of this one, based on keeping the items in ids. Parameters ---------- ids : list subset of self.ids that should be kept in the new enumeration Returns ------- self.__class__ """ _ids, _names = self._get_subset_ids_names(ids) return self.__class__(self.name + ' Subset',_ids,_names)
def _get_subset_ids_names(self,ids): n = len(ids) _ids = [None] * n; _names = [None] * n for i, full_id in enumerate(self.ids): if full_id in ids: j = ids.index(full_id) logger.debug("Found info for {}, which is entry {} of {}".format(full_id,j,len(_ids))) _ids[j] = self.ids[i] _names[j] = self.names[i] if len([x for x in _ids if x is None]): raise DSGridRuntimeError("At least one of {} is not in {}".format(ids,self.ids)) return _ids, _names
[docs] def is_subset(self,other_enum): """ Returns true if this Enumeration is a subset of other_enum. """ if not isinstance(other_enum,self.__class__): return False for my_id in self.ids: if not (my_id in other_enum.ids): return False return True
[docs] def persist(self, h5group): dset = h5group.create_dataset( self.dimension, dtype=self.enum_dtype, shape=(len(self),)) dset.attrs["name"] = self.name dset["id"] = np.array(self.ids) dset["name"] = np.array([name.encode(ENCODING) for name in self.names]) return dset
[docs] @classmethod def load(cls, h5group): h5dset = h5group[cls.dimension] return cls( get_str(h5dset.attrs["name"]), [get_str(vid) for vid in h5dset["id"]], [get_str(vname) for vname in h5dset["name"]] )
[docs] @classmethod def read_csv(cls, filepath, name=None): enum = pd.read_csv(filepath, dtype=str) name = cls._name_from_filepath(filepath) if name is None else name return cls(name, list(enum.id), list(enum.name))
[docs] def to_csv(self, filedir=None, filepath=None, overwrite=False): p = self._default_filepath() if filepath is not None: p = filepath elif filedir is not None: p = os.path.join(filedir,self._default_filename()) if not overwrite and os.path.exists(p): msg = "{} already exists".format(p) logger.error(msg) raise DSGridRuntimeError(msg) df = pd.DataFrame(list(zip(self.ids,self.names)),columns=['id','name']) df.to_csv(p,index=False)
@classmethod def _name_from_filepath(cls,filepath): return os.path.splitext(os.path.basename(filepath))[0].replace("_"," ").title() def _default_filepath(self): return os.path.join(enumdata_folder,self._default_filename()) def _default_filename(self): return self.name.lower().replace(' ','_') + '.csv'
# Define standard dimensions
[docs]class SectorEnumeration(Enumeration): dimension = "sector"
[docs]class GeographyEnumeration(Enumeration): dimension = "geography"
[docs]class EndUseEnumerationBase(Enumeration): dimension = "enduse"
[docs] def fuel(self,id): pass
[docs] def units(self,id): pass
[docs] @classmethod def load(cls, h5group): # Create correct type of EndUseEnumerationBase depending on auxillary data if FuelEnumeration.dimension in h5group: return MultiFuelEndUseEnumeration.load(h5group) h5dset = h5group[cls.dimension] name = get_str(h5dset.attrs["name"]) ids = [get_str(vid) for vid in h5dset["id"]] names = [get_str(vname) for vname in h5dset["name"]] if 'fuel' in h5dset.attrs: return SingleFuelEndUseEnumeration(name, ids, names, fuel=h5dset.attrs['fuel'], units=h5dset.attrs['units']) else: return EndUseEnumeration(name,ids,names)
[docs] @classmethod def read_csv(cls, filepath, name=None): """ Infer and read into the correct derived class. """ enum = pd.read_csv(filepath , dtype=str) if 'fuel' in enum.columns: return SingleFuelEndUseEnumeration.read_csv(filepath,name=name) if 'fuel_id' in enum.columns: return MultiFuelEndUseEnumeration.read_csv(filepath,name=name) return EndUseEnumeration.read_csv(filepath,name=name)
[docs]class TimeEnumeration(Enumeration): dimension = "time" TIMESTAMP_POSITION = Enum('TIMESTAMP_POSITION', ['period_beginning', 'period_midpoint', 'period_ending']) TIMEZONE_DISPLAY_NAMES = { 'Etc/GMT+5': 'EST', 'Etc/GMT+6': 'CST', 'Etc/GMT+7': 'MST', 'Etc/GMT+8': 'PST' } TIMEZONE_LOOKUP = {val: key for key, val in TIMEZONE_DISPLAY_NAMES.items()}
[docs] @classmethod def create(cls,enum_name,start,duration,resolution, extent_timezone=pytz.timezone('UTC'), store_timezone=None, timestamp_position=TIMESTAMP_POSITION['period_ending']): """ Create a new time enumeration based on the specified temporal extents, resolution, and timezone. Parameters ---------- enum_name : str name for this enumeration, ideally descriptive of the parameters used for creation start : datetime.datetime beginning of the time period to be represented by the timestamps duration : datetime.timedelta total length of time to be covered resolution : datetime.timedelta timestep for the enumeration extent_timezone : pytz.timezone timezone that should be used to interpret the extent parameters store_timezone : None or pytz.timezone timezone to write the ids and names in. If None, extent_timezone is used. timestamp_position : TimeEnumeration.TIMESTAMP_POSITION or convertable str whether timestamps are placed at the beginning, ending, or midpoint of the time period being described Returns ------- TimeEnumeration """ num_steps = duration / resolution if not (num_steps == int(num_steps)): logger.warning("Duration {} is not divided cleanly into steps of size {}".format(duration,resolution)) extent_timezone = cls._timezone_object(extent_timezone) store_timezone = cls._timezone_object(store_timezone,extent_timezone) end = start + duration ts_pos = timestamp_position if isinstance(timestamp_position,cls.TIMESTAMP_POSITION) else cls.TIMESTAMP_POSITION[timestamp_position] next_stamp = start if ts_pos == cls.TIMESTAMP_POSITION['period_ending']: next_stamp = start + resolution elif ts_pos == cls.TIMESTAMP_POSITION['period_midpoint']: next_stamp = start + (resolution / 2) last_stamp = end if ts_pos == cls.TIMESTAMP_POSITION['period_beginning']: last_stamp = end - resolution elif ts_pos == cls.TIMESTAMP_POSITION['period_midpoint']: last_stamp = end - (resolution / 2) ids = [] while next_stamp <= last_stamp: ids.append(str(extent_timezone.localize(next_stamp).astimezone(store_timezone))) next_stamp = next_stamp + resolution return cls(enum_name,ids,ids)
@classmethod def _timezone_object(cls,timezone,default=None): result = timezone if timezone is None: result = default if result in cls.TIMEZONE_LOOKUP: result = cls.TIMEZONE_LOOKUP[result] if isinstance(result,str): result = pytz.timezone(result) return result @property def store_timezone(self): """ Examines the first id to determine what timezone this TimeEnumeration is stored in. Assumes the usage of datetime, pytz, and the "standard" timezones, e.g., - pytz.timezone('Etc/GMT+5') = EST - pytz.timezone('Etc/GMT+6') = CST - pytz.timezone('Etc/GMT+7') = MST - pytz.timezone('Etc/GMT+8') = PST """ if not self.ids: raise DSGridValueError('No instances in this {}. Cannot determine a timezone.'.format(type(self))) m = re.match(r'[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}([+-][0-9]{2}:[0-9]{2})?',self.ids[0]) if not m: raise DSGridValueError('Not able to interpret {} as a timestamp'.format(self.ids[0])) if m.group(1) is None: logger.warning('Explicit timezone not found in timestamp {}, assuming UTC'.format(self.ids[0])) return pytz.timezone('UTC') assert m.group(1)[3:] == ':00', m.group(1) tz_str = 'Etc/GMT' tz_str += '+' if m.group(1)[0] == '-' else '-' tz_str += str(int(m.group(1)[1:3])) return pytz.timezone(tz_str) @property def store_timezone_display_name(self): """ Interprets self.ids[0] to report what timezone this enumeration is stored in. Converts from pytz strings to what we typically use, namely EST, CST, MST, or PST. Returns ------- str timezone this TimeEnumeration is stored in, per self.store_timezone and self.TIMEZONE_DISPLAY_NAMES """ result = str(self.store_timezone) if result in self.TIMEZONE_DISPLAY_NAMES: result = self.TIMEZONE_DISPLAY_NAMES[result] return result @property def resolution(self): """ The resolution of this TimeEnumeration. Returns ------- dt.timedelta or array of dt.timedelta Returns a single value if the intervals are all of the same length. Returns a vector of values if they are different. """ ind = self.to_datetime_index() result = (ind[1:] - ind[:-1]) unique_vals = result.unique() if len(unique_vals) == 1: return unique_vals.to_pytimedelta()[0] return result.to_pytimedelta()
[docs] def get_extents(self,report_timezone=None, timestamp_position=TIMESTAMP_POSITION['period_ending']): """ Returns the inclusive temporal extents represented in this TimeEnumeration. That interpretation requires knowledge of the timestamp_postion--beginning, end, or midpoint of the period being described. Parameters ---------- report_timezone : pytz.timezone Timezone in which to report out the result Returns ------- (datetime.datetime,datetime.datetime) Tuple of start and end times, inclusive of all time represented based on the timestamp position, and in report_timezone. """ ind = self.to_datetime_index(return_timezone=report_timezone) res = self.resolution bres = res; eres = res if not isinstance(res,dt.timedelta): logger.warning("Temporal resolution is not uniform. Reported extents may be inaccurate.") bres = res[0]; eres = res[-1] start = ind[0].to_pydatetime(); end = ind[-1].to_pydatetime() ts_pos = timestamp_position if isinstance(timestamp_position,self.TIMESTAMP_POSITION) else self.TIMESTAMP_POSITION[timestamp_position] if ts_pos == self.TIMESTAMP_POSITION['period_beginning']: end = end + eres elif ts_pos == self.TIMESTAMP_POSITION['period_midpoint']: start = start - (bres / 2) end = end + (eres / 2) elif ts_pos == self.TIMESTAMP_POSITION['period_ending']: start = start - bres return (start, end)
[docs] def to_datetime_index(self,return_timezone=None): """ Return a Pandas DatetimeIndex corresponding to this TimeEnumeration. By default, localizes the timestamps to the timezone inferred based on the text of the first enumeration id. If return_timezpone is None, this is what is returned. If return_timezone is not None, the index is converted to that timezone before being returned. Parameters ---------- return_timezone : None or pytz.timezone timezone of the returned index. If None, this is inferred from self.ids[0] Returns ------- pandas.DatetimeIndex same length as self.ids, but strings are converted to datetime.datetime objects and localized to a timezone. """ df = pd.DataFrame([],index=self.ids) return_timezone = self._timezone_object(return_timezone,default=self.store_timezone) logger.info("Stored timezone is {}. Returning in timezone {}.".format(self.store_timezone,return_timezone)) try: df.index = pd.to_datetime(df.index).tz_localize('UTC').tz_convert(return_timezone) except: # pandas version issue df.index = pd.to_datetime(df.index).tz_convert(return_timezone) df.index.name = 'time' return df.index
[docs] def get_datetime_map(self,return_timezone=None): """ Converts self.ids and result of to_datetime_index into dict that can be used to map ids to datetimes in contexts other than a single DataFrame index. Parameters ---------- return_timezone : None or pytz.timezone timezone of the returned index. If None, this is inferred from self.ids[0] Returns ------- dict {id: localized datetime} """ index = self.to_datetime_index(return_timezone=return_timezone) result = {} for i, _id in enumerate(self.ids): result[_id] = index[i] return result
# Define data units -- these are ultimately associated with end-uses
[docs]class EndUseEnumeration(EndUseEnumerationBase): """ Provided for backward compatibility with dsgrid v0.1.0 datasets. """
[docs] def fuel(self,id): logger.warning("Deprecated: Fuel type has not been explicitly specified. Returning default value.") return 'Electricity'
[docs] def units(self,id): logger.warning("Deprecated: Units have not been explicitly specified. Returning default value.") return 'MWh'
[docs] @classmethod def read_csv(cls, filepath, name=None): enum = pd.read_csv(filepath, dtype=str) name = cls._name_from_filepath(filepath) if name is None else name return cls(name, list(enum.id), list(enum.name))
[docs]class SingleFuelEndUseEnumeration(EndUseEnumerationBase): """ If the end-use enumeration only applies to a single fuel type, and all the data is in the same units, just give the fuel and units. """ def __init__(self, name, ids, names, fuel='Electricity', units='MWh'): super(SingleFuelEndUseEnumeration, self).__init__(name,ids,names) self._fuel = fuel self._units = units def __str__(self): if len(self.ids) == 1: return (f"{self.__class__.__name__}({self.name}, [{self.ids[0]}], " f"[{self.names[0]}], fuel = {self._fuel!r}, units = {self._units!r})") return (f"{self.__class__.__name__}({self.name}, [{self.ids[0]}, ...], " f"[{self.names[0]}, ...], fuel = {self._fuel!r}, units = {self._units!r})")
[docs] def fuel(self,id): return self._fuel
[docs] def units(self,id): return self._units
[docs] def create_subset_enum(self,ids): """ Returns a new enumeration that is a subset of this one, based on keeping the items in ids. Parameters ---------- ids : list subset of self.ids that should be kept in the new enumeration Returns ------- self.__class__ """ _ids, _names = self._get_subset_ids_names(ids) return self.__class__(self.name + ' Subset',_ids,_names,fuel=self._fuel,units=self._units)
[docs] def persist(self, h5group): dset = super(SingleFuelEndUseEnumeration, self).persist(h5group) dset.attrs["fuel"] = self._fuel dset.attrs["units"] = self._units return dset
[docs] @classmethod def read_csv(cls, filepath, name=None, fuel='Electricity', units='MWh'): enum = pd.read_csv(filepath , dtype=str) if ('fuel' in enum.columns): assert len(enum['fuel'].unique()) == 1, "There must be exactly 1 fuel, but {} are listed".format(len(enum.fuel.unique())) fuel = enum['fuel'].unique()[0] if ('units' in enum.columns): assert len(enum['units'].unique()) == 1, "There must be exactly 1 units, but {} are listed".format(len(enum.units.unique())) units = enum['units'].unique()[0] name = cls._name_from_filepath(filepath) if name is None else name return cls(name, list(enum.id), list(enum.name), fuel=fuel, units=units)
[docs] def to_csv(self, filedir=None, filepath=None, overwrite=False): p = self._default_filepath() if filepath is not None: p = filepath elif filedir is not None: p = os.path.join(filedir,self._default_filename()) if not overwrite and os.path.exists(p): msg = "{} already exists".format(p) logger.error(msg) raise DSGridRuntimeError(msg) data = [list(x) + [self._fuel, self._units] for x in zip(self.ids,self.names)] df = pd.DataFrame(data,columns=['id','name','fuel','units']) df.to_csv(p,index=False)
[docs]class FuelEnumeration(Enumeration): dimension = "fuel" enum_dtype = np.dtype([ ("id", "S" + str(Enumeration.max_id_len)), ("name", "S" + str(Enumeration.max_name_len)), ("units", "S" + str(Enumeration.max_id_len)) ]) def __init__(self, name, ids, names, units): self.units = units super(FuelEnumeration, self).__init__(name,ids,names) def __str__(self): return (f"{self.__class__.__name__}({self.name}, {self.ids}, {self.names}, {self.units})")
[docs] def checkvalues(self): super(FuelEnumeration, self).checkvalues() # make sure units is as long as ids ids = list(self.ids); units = list(self.units) n_ids = len(ids); n_units = len(units) if n_ids != n_units: raise DSGridValueError("Number of units (" + str(n_units) + ") must match number of ids (" + str(n_ids) + ")") if max(len(unit) for unit in units) > self.max_id_len: raise DSGridValueError("Enumeration units cannot exceed " + "{} characters".format(self.max_id_len))
[docs] def get_units(self,id): ind = list(self.ids).index(id) return self.units[ind]
[docs] def create_subset_enum(self,ids): """ Returns a new enumeration that is a subset of this one, based on keeping the items in ids. Parameters ---------- ids : list subset of self.ids that should be kept in the new enumeration Returns ------- self.__class__ """ n = len(ids) _ids = [None] * n; _names = [None] * n; _units = [None] * n for i, full_id in enumerate(self.ids): if full_id in ids: j = ids.index(full_id) logger.debug("Found info for {}, which is entry {} of {}".format(full_id,j,len(_ids))) _ids[j] = self.ids[i] _names[j] = self.names[i] _units[j] = self.units[i] if len([x for x in _ids if x is None]): raise DSGridRuntimeError("At least one of {} is not in {}".format(ids,self.ids)) return self.__class__(self.name + ' Subset',_ids,_names,_units)
[docs] def persist(self, h5group): dset = super(FuelEnumeration, self).persist(h5group) dset["units"] = np.array(self.units) return dset
[docs] @classmethod def load(cls, h5group): h5dset = h5group[cls.dimension] return cls( get_str(h5dset.attrs["name"]), [get_str(vid) for vid in h5dset["id"]], [get_str(vname) for vname in h5dset["name"]], [get_str(vunits) for vunits in h5dset["units"]] )
[docs] @classmethod def read_csv(cls, filepath, name=None): enum = pd.read_csv(filepath , dtype=str) name = cls._name_from_filepath(filepath) if name is None else name return cls(name, list(enum.id), list(enum.name), list(enum.units))
[docs] def to_csv(self, filedir=None, filepath=None, overwrite=False): p = self._default_filepath() if filepath is not None: p = filepath elif filedir is not None: p = os.path.join(filedir,self._default_filename()) if not overwrite and os.path.exists(p): msg = "{} already exists".format(p) logger.error(msg) raise DSGridRuntimeError(msg) df = pd.DataFrame(list(zip(self.ids,self.names,self.units)), columns=['id','name','units']) df.to_csv(p,index=False)
[docs]class MultiFuelEndUseEnumeration(EndUseEnumerationBase): enum_dtype = np.dtype([ ("id", "S" + str(Enumeration.max_id_len)), ("name", "S" + str(Enumeration.max_name_len)), ("fuel_id", "S" + str(Enumeration.max_id_len)) ]) def __init__(self, name, ids, names, fuel_enum, fuel_ids): self.name = name self._ids = ids self._names = names self.fuel_enum = fuel_enum self._fuel_ids = fuel_ids self.checkvalues() return def __str__(self): return (f"{self.__class__.__name__}({self.name}, [{self._ids[0]}, ...], " f"[{self._names[0]}, ...], {self.fuel_enum}, [{self._fuel_ids[0]}, ...])")
[docs] def checkvalues(self): ids = self._ids; fuel_ids = self._fuel_ids; fuel_enum = self.fuel_enum n_ids = len(ids); n_fuel_ids = len(fuel_ids) # make sure fuel_ids is as long as ids if n_fuel_ids != n_ids: raise DSGridValueError("Number of fuel ids (" + str(n_fuel_ids) + ") must match number of ids (" + str(n_ids) + ")") if not isinstance(fuel_enum,FuelEnumeration): raise DSGridValueError("The fuel_enum must be of type " + "{}, but is instead of type {}".format(FuelEnumeration.__class__, type(fuel_enum))) # make sure fuel_ids are in fuel enum for fuel_id in set(fuel_ids): if fuel_id not in fuel_enum.ids: raise DSGridValueError("The fuel_ids must each be an id in the fuel_enum." + "fuel_id: {}, fuel_enum.ids: {}".format(fuel_id,fuel_enum.ids)) super(MultiFuelEndUseEnumeration, self).checkvalues() return
@property def ids(self): return list(zip(self._ids,self._fuel_ids)) @property def names(self): for i, _id in enumerate(self._ids): yield "{} ({})".format(self._names[i],self.fuel((_id,self._fuel_ids[i])))
[docs] def fuel(self,id): assert isinstance(id,tuple) & (len(id) == 2), "The ids for MultiFuelEndUseEnumerations are (enduse_id, fuel_id). Got {!r}".format(id) return self.fuel_enum.names[self.fuel_enum.ids.index(id[1])]
[docs] def units(self,id): assert isinstance(id,tuple) & (len(id) == 2), "The ids for MultiFuelEndUseEnumerations are (enduse_id, fuel_id). Got {!r}".format(id) return self.fuel_enum.units[self.fuel_enum.ids.index(id[1])]
[docs] def create_subset_enum(self,ids): """ Returns a new enumeration that is a subset of this one, based on keeping the items in ids. Parameters ---------- ids : list of 2-tuples subset of self.ids that should be kept in the new enumeration Returns ------- MultiFuelEndUseEnumeration """ n = len(ids) _ids = [None] * n; _names = [None] * n; _fuel_ids = [None] * n for i, full_id in enumerate(self.ids): if full_id in ids: j = ids.index(full_id) logger.debug("Found info for {}, which is entry {} of {}".format(full_id,j,len(_ids))) _ids[j] = self._ids[i] _fuel_ids[j] = self._fuel_ids[i] _names[j] = self._names[i] if len([x for x in _ids if x is None]): raise DSGridRuntimeError("At least one of {} is not in {}".format(ids,self.ids)) fuel_enum = copy.deepcopy(self.fuel_enum) return self.__class__(self.name + ' Subset',_ids,_names,fuel_enum,_fuel_ids)
[docs] def persist(self, h5group): dset = h5group.create_dataset( self.dimension, dtype=self.enum_dtype, shape=(len(self),)) dset.attrs["name"] = self.name dset["id"] = np.array(self._ids) dset["name"] = np.array([name.encode(ENCODING) for name in self._names]) dset["fuel_id"] = np.array(self._fuel_ids) self.fuel_enum.persist(h5group) return dset
[docs] @classmethod def load(cls, h5group): fuel_enum = FuelEnumeration.load(h5group) h5dset = h5group[cls.dimension] return cls( get_str(h5dset.attrs["name"]), [get_str(vid) for vid in h5dset["id"]], [get_str(vname) for vname in h5dset["name"]], fuel_enum, [get_str(vfuel_id) for vfuel_id in h5dset["fuel_id"]] )
[docs] @classmethod def read_csv(cls, filepath, name=None, fuel_enum=None): """ id, name, fuel_id + pass in file_enum or id, name, fuel_id, fuel_name, units or id, name, fuel_id, units (and fuel_name will be guessed from fuel_id) """ enum = pd.read_csv(filepath , dtype=str) name = cls._name_from_filepath(filepath) if name is None else name if fuel_enum is None: fuel_enum_name = name + ' Fuels' if 'fuel_name' in enum.columns: # fuel enum fully defined in this file fuel_enum = enum[["fuel_id","fuel_name","units"]].drop_duplicates() fuel_enum = FuelEnumeration( fuel_enum_name, list(fuel_enum.fuel_id), list(fuel_enum.fuel_name), list(fuel_enum.units)) else: # create fuel enum names from fuel enum ids fuel_enum = enum[["fuel_id","units"]].drop_duplicates() fuel_ids = list(fuel_enum.fuel_id) fuel_names = [fuel_id.replace("_"," ").title() for fuel_id in fuel_ids] fuel_enum = FuelEnumeration( fuel_enum_name, fuel_ids, fuel_names, list(fuel_enum.units)) assert fuel_enum is not None return cls(name, list(enum.id), list(enum.name), fuel_enum, list(enum.fuel_id))
[docs] def to_csv(self, filedir=None, filepath=None, overwrite=False): p = self._default_filepath() if filepath is not None: p = filepath elif filedir is not None: p = os.path.join(filedir,self._default_filename()) if not overwrite and os.path.exists(p): msg = "{} already exists".format(p) logger.error(msg) raise DSGridRuntimeError(msg) simple_fuel_name = True for fuel_id in self.fuel_enum.ids: if not (fuel_id.replace("_"," ").title() == self.fuel_enum.get_name(fuel_id)): simple_fuel_name = False break data = list(zip(self._ids,self._names,self._fuel_ids)) cols = ['id','name','fuel_id'] if not simple_fuel_name: data = [list(x) + [self.fuel_enum.get_name(x[2])] for x in data] cols += ['fuel_name'] data = [list(x) + [self.fuel_enum.get_units(x[2])] for x in data] cols += ['units'] df = pd.DataFrame(data,columns=cols) df.to_csv(p,index=False)
# Define standard enumerations enumdata_folder = os.path.join(os.path.dirname(__file__), "enumeration_data/") ## Sectors sectors_subsectors = SectorEnumeration.read_csv( enumdata_folder + "sectors_subsectors.csv", "standard_sector_subsectors") mecs_subsectors = SectorEnumeration.read_csv( enumdata_folder + "mecs_subsectors.csv", "mecs_subsectors") sectors = SectorEnumeration.read_csv( enumdata_folder + "sectors.csv", "standard_sectors") sectors_eia_extended = SectorEnumeration.read_csv( enumdata_folder + "sectors_eia_extended.csv", "sectors_eia_extended") allsectors = SectorEnumeration("all_sectors", ["All"], ["All Sectors"]) ## Geographies counties = GeographyEnumeration.read_csv( enumdata_folder + "counties.csv", "counties") conus_counties = GeographyEnumeration.read_csv( os.path.join(enumdata_folder,'conus_counties.csv')) states = GeographyEnumeration.read_csv( enumdata_folder + "states.csv", "states") conus_states = GeographyEnumeration.read_csv( os.path.join(enumdata_folder,'conus_states.csv')) census_divisions = GeographyEnumeration.read_csv( enumdata_folder + "census_divisions.csv", "census_divisions") res_state_groups = GeographyEnumeration.read_csv( enumdata_folder + "res_state_groups.csv", "state_groups") loss_state_groups = GeographyEnumeration.read_csv( enumdata_folder + "loss_state_groups.csv", "loss_state_groups") census_regions = GeographyEnumeration.read_csv( enumdata_folder + "census_regions.csv", "census_regions") conus = GeographyEnumeration("conus", ["conus"], ["Continental United States"]) ## End Uses enduses = EndUseEnumeration.read_csv( enumdata_folder + "enduses.csv", "standard_enduses") gaps_enduses = EndUseEnumeration.read_csv( enumdata_folder + "gaps_enduses.csv", "gaps_enduses") fuel_types = EndUseEnumeration.read_csv( enumdata_folder + "fuel_types.csv", "fuel_types") deprecated_allenduses = EndUseEnumeration("all_enduses", ["All"], ["All End-uses"]) allenduses = SingleFuelEndUseEnumeration("all_enduses", ["All"], ["All End-uses"]) loss_factor = SingleFuelEndUseEnumeration('Loss Factor',['loss_factor'], ['Loss Factor'],fuel='N/A',units='dimensionless') # Time hourly2012 = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'hourly2012.csv')) daily2012 = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'daily2012.csv')) weekdays = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'weekdays.csv')) daytypes = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'day_types.csv')) weekly2012 = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'weekly2012.csv')) seasons = TimeEnumeration.read_csv( os.path.join(enumdata_folder,'seasons.csv')) annual = TimeEnumeration("annual", ["Annual"], ["Annual"])