from collections import defaultdict, OrderedDict
from distutils.version import StrictVersion
from itertools import repeat
import h5py
import logging
import numpy as np
import pandas as pd
from dsgrid import __version__ as VERSION
from dsgrid import DSGridError, DSGridNotImplemented
from dsgrid.helpers import h5Reader
from dsgrid.dataformat.enumeration import (
SectorEnumeration, GeographyEnumeration,
EndUseEnumerationBase, TimeEnumeration)
logger = logging.getLogger(__name__)
NULL_IDX = 2**32 - 1
enum_datamap_dtype = np.dtype([
("idx", "u4"), # 32-bit unsigned integer index
("scale", "f4") # 32-bit float scaling factor
])
[docs]class Datamap(object):
"""
Map between Datafile-level enumeration (enum) and Sectordataset-level
sub-enumeration (enum_ids). Sub-enumeration may also have non-unity scaling
factors. Per Sectordataset, these Datamaps link each enumeration value with:
a) a particular index in the dataset along the Enumeration's
dimension; and
b) a scaling parameter to apply to the associated underlying
data.
Multiple enumeration values can refer to the same index in the
dataset's enumeration dimension, with the option to apply different
scaling factors. The index is represented as a 32-bit unsigned
integer, which limits dataset size to 2^32 - 2 in each dimension,
with NULL_IDX (2^32 - 1) serving as the sentinel value assigned to
enumeration values not described in the dataset (looking up data
associated with such an enumeration value will simply return zeros)
Attributes
----------
value : numpy.ndarray
datamap vector of length len(enum.ids) with 'idx' and 'scale'
dimensions. For the example of (j, scale) = self.value[i],
- i = position of enum_id in enum.ids (datafile-level enum)
- j = position of enum_id in enum_ids (sectordataset-level sub-enum)
- scale = scaling factor to apply to this enumeration element
We also have j = self.value[i]['idx'], scale = self.value[i]['scale'].
"""
def __init__(self,value):
assert isinstance(value,np.ndarray), type(value)
self.value = value
[docs] @classmethod
def create(cls, enum, enum_ids, enum_scales=None):
"""
Parameters
----------
enum : dsgrid.enumeration.Enumeration
enum_ids : list
List of items in enum.ids
enum_scales : None or list
if list, is list of floats the same length as enum_ids
Returns
-------
Datamap
"""
datamap = np.empty(len(enum.ids), enum_datamap_dtype)
datamap["idx"] = NULL_IDX
datamap["scale"] = 0.
if enum_scales:
if len(enum_ids) != len(enum_scales):
raise ValueError(enum.dimension + " id list has length " +
len(enum_ids) + ", but scaling factor list " +
"has length " + len(enum_scales))
else:
enum_scales = repeat(1.0, len(enum_ids))
for i, (enum_id, enum_scale) in enumerate(zip(enum_ids, enum_scales)):
enum_idx = list(enum.ids).index(enum_id)
datamap[enum_idx] = (i, enum_scale)
return cls(datamap)
[docs] @classmethod
def load(cls,dataset):
"""
Parameters
----------
dataset : h5py.Dataset
a Datamap serialized to h5py
Returns
-------
Datamap
"""
assert isinstance(dataset,h5py.Dataset)
idx = dataset[:,"idx"]
scale = dataset[:,"scale"]
datamap = np.empty(len(idx), enum_datamap_dtype)
datamap["idx"] = idx
datamap["scale"] = scale
return cls(datamap)
[docs] def update(self,dataset):
"""
Updates dataset with this Datamap's value. Overwrites current
dataset[:,'idx'] and dataset[:,'scale'].
Parameters
----------
dataset : h5py.Dataset
a Datamap serialized to h5py
"""
dataset[:,"idx"] = self.value["idx"]
dataset[:,"scale"] = self.value["scale"]
@property
def num_entries(self):
"""
The number of non-null idx values in this Datamap's value. Corresponds,
e.g. to the number of number of distinct (up to a scaling factor)
entries along a dimension.
"""
original_idxes = np.flatnonzero(self.value["idx"] != NULL_IDX)
return len(set(self.value["idx"][original_idxes]))
[docs] def get_subenum(self,enum):
"""
Parameters
----------
enum : dsgrid.dataformat.enumeration.Enumeration
Datafile-level enumeration
Returns
-------
list of enum.ids
ids in enum for which there is data in this SectorDataset. The ids
are returned in the order imposed by the SectorData In the correct order for this Sectordataset
"""
full_enum_ids = list(enum.ids)
original_idxes = np.flatnonzero(self.value["idx"] != NULL_IDX)
original_idxes = sorted(original_idxes,key=lambda x: self.value[x]['idx'])
return [full_enum_ids[i] for i in original_idxes]
[docs] def is_empty(self,enum_id,enum):
"""
Parameters
----------
enum_id : str
element of enum.ids
enum : dsgrid.dataformat.enumeration.Enumeration
Datafile-level enumeration
Returns
-------
bool
True if the dataset has no data for enum_id, False otherwise
"""
id_idx = list(enum.ids).index(enum_id)
return self.value[id_idx]['idx'] == NULL_IDX
[docs] def get_map(self,enum):
"""
Get the data in this map in ordered, hashed form, with the dataset-level
idx as the key.
Parameters
----------
enum : dsgrid.dataformat.enumeration.Enumeration
Datafile-level enumeration
Returns
-------
OrderedDict
idx: (list of enum.ids, scales)
"""
result = OrderedDict()
original_idxes = np.flatnonzero(self.value["idx"] != NULL_IDX)
unique_idxes = sorted(list(set(self.value["idx"][original_idxes])))
for i in unique_idxes:
result[i] = (self.ids(i,enum), self.scales(i))
return result
[docs] def ids(self,idx,enum):
"""
Returns the enum.ids for that are mapped to the dataset idx
Parameters
----------
idx : int
dataset-level sub-enumeration index
enum : dsgrid.dataformat.Enumeration
datafile-level enumeration the sub-enum is based on
Returns
-------
list of enum.ids
in particular, the ones that are mapped to idx, in the order
specified by enum.ids
"""
orig_idxs = [i for i, val in enumerate(self.value['idx']) if val == idx]
all_ids = list(enum.ids)
return [all_ids[i] for i in orig_idxs]
[docs] def scales(self,idx):
"""
Returns the scaling factors that correspond to the dataset idx
Parameters
----------
idx : int
dataset-level sub-enumeration index
Returns
-------
list of float
one for each of the .ids(idx,enum), and in the same order
"""
orig_idxs = [i for i, val in enumerate(self.value['idx']) if val == idx]
return [self.value[i]['scale'] for i in orig_idxs]
[docs] def append_element(self,new_elem_idx,enum_ids,enum,scalings=[]):
"""
Appends a new non-null element for this Datamap that defines the index
(new_elem_idx) for data that corresponds to enum_ids in enum.
Parameters
----------
new_elem_idx : int
index value for the new element
enum_ids : list
list of distinct elements in enum.ids
enum : dsgrid.dataformat.enumeration.Enumeration
should be same Enumeration originally used to .create this Datamap
scalinges : list of numeric
if empty, will be defaulted to 1.0. otherwise should be the same
length as enum_ids
"""
if not isinstance(scalings,(list,np.ndarray)):
scalings = [scalings]
if len(scalings) == 0:
scalings = [1 for x in enum_ids]
elif len(scalings) != len(enum_ids):
raise ValueError("Enum id and scale factor list lengths must " +
"match, but len(enum_ids) = {} and len(scalings) = {}, ".format(len(enum_ids),len(scalings)) +
"where enum_ids = {}, scalings = {}.".format(enum_ids,scalings))
id_idxs = np.array([enum.ids.index(enum_id) for enum_id in enum_ids])
scalings = np.array(scalings)
try:
self.value["idx"][id_idxs] = new_elem_idx
except:
logger.error("Unable to set the selected enumeration ids to new_elem_idx. " +
"enum_ids: {}, id_idxs: {}, new_elem_ids: {}, datamap: {}".format(repr(enum_ids), repr(id_idxs), repr(new_elem_idx), repr(self.value)))
raise
self.value["scale"][id_idxs] = scalings
return
[docs]def append_element_to_dataset_dimension(dataset,new_elem_idx,enum_ids,enum,scalings=[]):
"""
Helper method to do all the work of adding a new element to a SectorDataset
dimenstion.
Parameters
----------
dataset : h5py.Dataset
a Datamap serialized to h5py
new_elem_idx : int
index value for the new element
enum_ids : list
list of distinct elements in enum.ids
enum : dsgrid.dataformat.enumeration.Enumeration
should be same Enumeration originally used to .create this Datamap
scalinges : list of numeric
if empty, will be defaulted to 1.0. otherwise should be the same
length as enum_ids
"""
datamap = Datamap.load(dataset)
datamap.append_element(new_elem_idx,enum_ids,enum,scalings=scalings)
datamap.update(dataset)
[docs]class SectorDataset(object):
def __init__(self,datafile,sector_id,enduses,times):
"""
Creates a SectorDataset object. Note that this does not read from
or write to datafile in any way, and should generally not be called directly.
Instead, use the SectorDataset.load or SectorDataset.new class methods.
"""
if sector_id not in datafile.sector_enum.ids:
raise ValueError("Sector ID " + sector_id + " is not in " +
"the Datafile's SectorEnumeration")
if not set(enduses).issubset(set(datafile.enduse_enum.ids)):
raise ValueError("Supplied enduses (" + str(enduses) +
") are not a subset of the " +
"Datafile's EndUseEnumeration ids (" +
str(datafile.enduse_enum.ids) + ")")
if not set(times).issubset(set(datafile.time_enum.ids)):
raise ValueError("Supplied times are not a subset of the " +
"Datafile's TimeEnumeration")
self.sector_id = sector_id
self.datafile = datafile
self.enduses = enduses
self.times = times
self.n_geos = 0 # data is inserted by geography
[docs] @classmethod
def new(cls,datafile,sector_id,enduses=None,times=None):
if not enduses:
enduses = datafile.enduse_enum.ids
if not times:
times = datafile.time_enum.ids
sdset = cls(datafile,sector_id,enduses,times)
n_enduses = len(enduses)
n_times = len(times)
shape = (0, n_enduses, n_times)
max_shape = (None, n_enduses, n_times)
chunk_shape = (1, n_enduses, n_times)
with h5py.File(datafile.h5path, "r+", driver='core') as f:
dgroup = f["data"].create_group(sector_id)
dset = dgroup.create_dataset(
"data",
shape=shape,
maxshape=max_shape,
chunks=chunk_shape,
compression="gzip")
dgroup["geographies"] = Datamap.create(datafile.geo_enum, []).value
dgroup["enduses"] = Datamap.create(datafile.enduse_enum, enduses).value
dgroup["times"] = Datamap.create(datafile.time_enum, times).value
return sdset
[docs] @classmethod
def load(cls,datafile,f,sector_id):
dgroup = f["data/" + sector_id]
datamap = Datamap.load(dgroup["enduses"])
enduses = datamap.get_subenum(datafile.enduse_enum)
datamap = Datamap.load(dgroup["times"])
times = datamap.get_subenum(datafile.time_enum)
result = cls(datafile,sector_id,enduses,times)
datamap = Datamap.load(dgroup["geographies"])
result.n_geos = datamap.num_entries
return result
[docs] @classmethod
def loadall(cls,datafile,f,_upgrade_class=None):
for sector_id, sector_group in f["data"].items():
if _upgrade_class is not None:
yield sector_id, _upgrade_class.load_sectordataset(datafile,f,sector_id)
continue
assert isinstance(sector_group, h5py.Group)
yield sector_id, SectorDataset.load(datafile,f,sector_id)
def __eq__(self, other):
return (
isinstance(other, self.__class__) and
self.__dict__ == self.__dict__
)
def __repr__(self):
return "%s(%r)" % (self.__class__, self.__dict__)
def __str__(self):
return self.__repr__()
[docs] def add_data_batch(self,dataframes,geo_ids,scalings=None,full_validation=True):
"""
Add a batch of new data to this SectorDataset. Uses the basic add_data
functionality, but handles the h5 file so as to write data to memory first
and only write to disk upon closing.
Parameters
----------
dataframes : iterable
One dataframe per call to add_data
geo_ids : list
List of geo_ids arguments to pass to add_data
scalings : None or list of lists
If None, [] will be passed in to each call of add_data. Otherwise,
this must be a list of the scalings arguments to pass, which must be
lists of the same size as the geo_ids argument for that call.
full_validation : bool
If true, checks that all enumeration ids (time, enduse, and geography)
are valid. If false, does this, but only for the first item.
"""
with h5py.File(self.datafile.h5path, "r+", driver='core') as f:
first = True
for i, dataframe in enumerate(dataframes):
self.add_data(
dataframe,
geo_ids[i],
scalings = [] if scalings is None else scalings[i],
full_validation=full_validation if full_validation else first,
_batch_file_object=f)
first = False
[docs] def add_data(self,dataframe,geo_ids,scalings=[],full_validation=True,_batch_file_object=None):
"""
Add new data to this SectorDataset, as part of the self.datafile HDF5.
Parameters
----------
dataframe : pandas.DataFrame
Data to add, indexed by times, and with columns equal to enduses.
geo_ids : id or list of ids
Ids map to datafile.geo_enum
scalings : list of float
If non-empty, must be same length as geo_ids and represents the
scaling factors for the geo_ids in order. Otherwise, a uniform value
of 1.0 is assumed for all geo_ids.
full_validation : bool
If true, checks that all enumeration ids (time, enduse, and geography)
are valid.
"""
if type(geo_ids) is not list:
geo_ids = [geo_ids]
if len(geo_ids) == 0:
logger.info("Skipping call to add_data because geo_ids is empty.")
if not dataframe.empty:
logger.warning("Although geo_ids is empty, dataframe is not:\n{}".format(dataframe))
return
if full_validation:
for geo_id in geo_ids:
if geo_id not in self.datafile.geo_enum.ids:
raise ValueError("Geography ID must be in the DataFile's " +
"GeographyEnumeration, but is {!r}".format(geo_id))
if len(dataframe.index.unique()) != len(dataframe.index):
raise ValueError("DataFrame row indices must be unique")
if full_validation:
for time in dataframe.index:
if time not in self.times:
raise ValueError("Time ID (DataFrame row index) {!r}".format(time) +
" is invalid: time IDs must both exist in the DataFile's TimeEnumeration and "
"have been defined as a sector-relevant time during the SectorDataset's creation.")
if len(dataframe.columns.unique()) != len(dataframe.columns):
raise ValueError("DataFrame column names must be unique")
if full_validation:
for enduse in dataframe.columns:
if enduse not in self.enduses:
raise ValueError("End-use ID (DataFrame column name) " + enduse +
" is invalid: end-use IDs must both exist in the DataFile's EndUseEnumeration and "
"have been defined as a sector-relevant end-use during the SectorDataset's creation.")
data = np.array(dataframe.loc[self.times, self.enduses]).T
data = np.nan_to_num(data)
def append_to_h5file(f):
dgroup = f["data/" + self.sector_id]
dset = dgroup["data"]
new_geo_idx = self.n_geos
self.n_geos += 1
dset.resize(self.n_geos, 0)
dset[new_geo_idx, :, :] = data
append_element_to_dataset_dimension(dgroup["geographies"],
new_geo_idx,geo_ids,self.datafile.geo_enum,scalings=scalings)
if _batch_file_object is None:
with h5py.File(self.datafile.h5path, "r+", driver='core') as f:
append_to_h5file(f)
else:
append_to_h5file(_batch_file_object)
def __setitem__(self, geo_ids, dataframe):
self.add_data(dataframe, geo_ids)
def __getitem__(self, geo_id):
id_idx = self.datafile.geo_enum.ids.index(geo_id)
with h5Reader(self.datafile.h5path) as f:
dgroup = f["data/" + self.sector_id]
dset = dgroup["data"]
geo_idx, geo_scale = dgroup["geographies"][id_idx]
if geo_idx == NULL_IDX:
data = 0
else:
data = dset[geo_idx, :, :].T * geo_scale
return pd.DataFrame(data,
index=self.times,
columns=self.enduses,
dtype="float32")
[docs] def has_data(self,geo_id):
id_idx = self.datafile.geo_enum.ids.index(geo_id)
with h5Reader(self.datafile.h5path) as f:
dgroup = f["data/" + self.sector_id]
geo_idx, geo_scale = dgroup["geographies"][id_idx]
if geo_idx == NULL_IDX:
return False
return True
[docs] def get_datamap(self,dim_key):
with h5Reader(self.datafile.h5path) as f:
dgroup = f["data"][self.sector_id]
result = Datamap.load(dgroup[dim_key])
return result
[docs] def get_data(self, dataset_geo_index):
"""
Get data in this file's native format.
Parameters
----------
dataset_geo_index : int
Index into the geography dimension of this dataset. Is an integer
in the range [0,self.n_geos) that corresponds to the values in this
dataset's geographies[:,'idx'] that are not equal to NULL_IDX
Returns
-------
pandas.DataFrame
data indexed by time and differentiated by enduse (as columns)
list of .datafile.geo_enum.ids
geographic enum values this data applies to
list of float
one scaling factor for each geographic enum value
"""
if (dataset_geo_index < 0) or (not dataset_geo_index < self.n_geos):
raise ValueError("dataset_geo_index must be in the range [0,{}), but is {}.".format(self.n_geos,dataset_geo_index))
with h5Reader(self.datafile.h5path) as f:
dgroup = f["data"][self.sector_id]
dset = dgroup["data"]
data = dset[dataset_geo_index, :, :]
data = data.T
df = pd.DataFrame(data,
index=self.times,
columns=self.enduses,
dtype="float32")
geo_datamap = Datamap.load(dgroup["geographies"])
geo_ids = geo_datamap.ids(dataset_geo_index,self.datafile.geo_enum)
scalings = geo_datamap.scales(dataset_geo_index)
return df, geo_ids, scalings
[docs] def copy_data(self,other_sectordataset,full_validation=True):
"""
Copy data from this SectorDataset into other_sectordataset.
Parameters
----------
other_sectordataset : SectorDataset
target for this SectorDataset's data to be copied into
full_validation : bool
flag for SectorDataset.add_data
"""
batch_dataframes = []; batch_geo_ids = []; batch_scalings = []
for i in range(self.n_geos):
# pull data
df, geo_ids, scalings = self.get_data(i)
batch_dataframes.append(df)
batch_geo_ids.append(geo_ids)
batch_scalings.append(scalings)
other_sectordataset.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False)
[docs] def map_dimension(self,new_datafile,mapping):
# import ExplicitDisaggregation here to avoid circular import but be
# able to test and raise DSGridNotImplemented as needed
from dsgrid.dataformat.dimmap import ExplicitDisaggregation
result = self.__class__.new(new_datafile,self.sector_id,
enduses=None if isinstance(mapping.to_enum,EndUseEnumerationBase) else self.enduses,
times=None if isinstance(mapping.to_enum,TimeEnumeration) else self.times)
if isinstance(mapping.to_enum,GeographyEnumeration):
# 1. Figure out how geography is mapped now, where it needs to get
# mapped to, and what the new scalings should be on a
# per-dataset_geo_index basis
geo_datamap = self.get_datamap('geographies')
from_geo_map = geo_datamap.get_map(self.datafile.geo_enum) # dataset_geo_index: (geo_ids, scalings) in THIS dataset
to_geo_map = OrderedDict() # DITTO, but for mapped dataset, ignoring aggregation for now
# new_geo_id: [dataset_geo_indices] so can see what aggregation needs to be done
new_geo_ids_to_dataset_geo_index_map = defaultdict(lambda: [])
for dataset_geo_index in from_geo_map:
geo_ids, scalings = from_geo_map[dataset_geo_index]
new_geo_ids = []
new_scalings = []
for i, geo_id in enumerate(geo_ids):
new_geo_id = mapping.map(geo_id)
if new_geo_id is None:
# filtering out; moving on
continue
if isinstance(new_geo_id,list):
# disaggregating
new_geo_ids += new_geo_id
new_scaling = mapping.get_scalings(new_geo_id)
logger.debug("Disaggregating.\n new_geo_id: {}".format(new_geo_id) +
"\n new_scaling: {}".format(new_scaling))
new_scalings = np.concatenate((new_scalings, (new_scaling * scalings[i])))
for newid in new_geo_id:
new_geo_ids_to_dataset_geo_index_map[newid].append(dataset_geo_index)
else:
new_geo_ids.append(new_geo_id)
new_scalings.append(scalings[i])
new_geo_ids_to_dataset_geo_index_map[new_geo_id].append(dataset_geo_index)
to_geo_map[dataset_geo_index] = (new_geo_ids,new_scalings)
# 2. Step through via new_geo_ids_to_dataset_geo_index_map. Pull out
# new_geo_ids that are aggregations across dataset_geo_indices. Then
# process those new_geo_ids first.
pulled_data = OrderedDict() # dataset_geo_index: df
def get_df(dataset_geo_index,new_geo_id):
if not (dataset_geo_index in pulled_data):
# pull the original data
df, geo_ids, scalings = self.get_data(dataset_geo_index)
pulled_data[dataset_geo_index] = df
# pull the scaling factor
ind = to_geo_map[dataset_geo_index][0].index(new_geo_id)
scale = to_geo_map[dataset_geo_index][1][ind]
# calculate the result
result = pulled_data[dataset_geo_index] * scale
# delete the items in to_geo_map that have now been handled
to_geo_map[dataset_geo_index] = (
[x for i, x in enumerate(to_geo_map[dataset_geo_index][0]) if not (i == ind)],
[x for i, x in enumerate(to_geo_map[dataset_geo_index][1]) if not (i == ind)])
# if this to_geo_map entry is empty, delete it, and delete the
# pulled_data too
assert len(to_geo_map[dataset_geo_index][0]) == len(to_geo_map[dataset_geo_index][1])
if len(to_geo_map[dataset_geo_index][0]) == 0:
del to_geo_map[dataset_geo_index]
del pulled_data[dataset_geo_index]
return result
require_aggregation = []
for new_geo_id in new_geo_ids_to_dataset_geo_index_map:
if len(new_geo_ids_to_dataset_geo_index_map[new_geo_id]) > 1:
require_aggregation.append(new_geo_id)
batch_dataframes = []; batch_geo_ids = []
for new_geo_id in require_aggregation:
aggdf = None
for dataset_geo_index in new_geo_ids_to_dataset_geo_index_map[new_geo_id]:
tempdf = get_df(dataset_geo_index,new_geo_id)
if aggdf is None:
aggdf = tempdf
else:
aggdf = aggdf.add(tempdf,fill_value=0.0)
batch_dataframes.append(aggdf)
batch_geo_ids.append([new_geo_id])
result.add_data_batch(batch_dataframes,batch_geo_ids,full_validation=False)
# 3. Now add data that did not need to be aggregated
batch_dataframes = []; batch_geo_ids = []; batch_scalings = []
for dataset_geo_index in to_geo_map:
geo_ids, scalings = to_geo_map[dataset_geo_index]
if len(geo_ids) == 0:
logger.debug("All data from dataset_geo_index {} is being dropped.".format(dataset_geo_index))
continue
if dataset_geo_index in pulled_data:
df = pulled_data[dataset_geo_index]
del pulled_data[dataset_geo_index]
else:
df, junk1, junk2 = self.get_data(dataset_geo_index)
batch_dataframes.append(df)
batch_geo_ids.append(geo_ids)
batch_scalings.append(scalings)
result.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False)
elif isinstance(mapping.to_enum,TimeEnumeration):
if isinstance(mapping,ExplicitDisaggregation):
raise DSGridNotImplemented("Temporal disaggregations have not been implemented.")
# Map dataframe indices
batch_dataframes = []; batch_geo_ids = []; batch_scalings = []
for i in range(self.n_geos):
# pull data
df, geo_ids, scalings = self.get_data(i)
# apply the mapping
cols = df.columns
df[mapping.to_enum.name] = df.index.to_series().apply(mapping.map)
# filter out unmapped items
df = df[~(df[mapping.to_enum.name] == None)]
df = df.pivot_table(index=mapping.to_enum.name,
values=cols,
aggfunc=np.sum,
fill_value=0.0)
# add the mapped data to the new file
batch_dataframes.append(df)
batch_geo_ids.append(geo_ids)
batch_scalings.append(scalings)
result.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False)
elif isinstance(mapping.to_enum,EndUseEnumerationBase):
if isinstance(mapping,ExplicitDisaggregation):
raise DSGridNotImplemented("End-use disaggregations have not been implemented.")
# Map dataframe columns
batch_dataframes = []; batch_geo_ids = []; batch_scalings = []
for i in range(self.n_geos):
# pull data
df, geo_ids, scalings = self.get_data(i)
# apply scaling (for unit conversion)
for col in df.columns:
if not (mapping.scale_factor(col) == 1.0):
df[col] = df[col] * mapping.scale_factor(col)
# apply the mapping
df.columns = [mapping.map(col) for col in df.columns]
# filter out unmapped items
df = df.iloc[:,[i for i, col in enumerate(df.columns) if col is not None]]
df = df.groupby(df.columns,axis=1).sum()
# add the mapped data to the new file
batch_dataframes.append(df)
batch_geo_ids.append(geo_ids)
batch_scalings.append(scalings)
result.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False)
else:
raise DSGridError("SectorDataset is not able to map to {}.".format(mapping.to_enum))
return result
[docs] def scale_data(self,new_datafile,factor=0.001):
"""
Scale all the data in self by factor, creating a new HDF5 file and
corresponding Datafile.
Parameters
----------
filepath : str
Location for the new HDF5 file to be created
factor : float
Factor by which all the data in the file is to be multiplied. The
default value of 0.001 corresponds to converting the bottom-up data
from kWh to MWh.
"""
result = self.__class__.new(new_datafile,self.sector_id,self.enduses,self.times)
# pull data
batch_dataframes = []; batch_geo_ids = []; batch_scalings = []
for i in range(self.n_geos):
df, geo_ids, scalings = self.get_data(i)
if not geo_ids and not df.empty:
# This will cause warning later and shouldn't be happening
raise DSGridError("No geo_ids associated with a non-empty DataFrame. Sector ID: {}, idx: {}".format(self.sector_id,i))
# apply the scaling
scalings = [x * factor for x in scalings]
# add the mapped data to the new file
batch_dataframes.append(df)
batch_geo_ids.append(geo_ids)
batch_scalings.append(scalings)
result.add_data_batch(batch_dataframes,batch_geo_ids,scalings=batch_scalings,full_validation=False)
return result