Source code for dsgrid.config.project_config

import itertools
import logging
from collections import defaultdict
from pathlib import Path
from typing import Optional, Type

import pandas as pd
from pydantic import conlist, field_validator, model_validator, Field, ValidationInfo
from pyspark.sql import DataFrame

from dsgrid.config.dataset_config import DatasetConfig
from dsgrid.config.dimension_config import DimensionBaseConfig
from dsgrid.config.mapping_tables import MappingTableConfig
from dsgrid.data_models import DSGBaseModel, DSGBaseDatabaseModel, make_model_config
from dsgrid.dimension.base_models import (
    check_required_dimensions,
    check_timezone_in_geography,
    DimensionCategory,
    DimensionType,
)
from dsgrid.exceptions import (
    DSGInvalidField,
    DSGInvalidDimension,
    DSGInvalidParameter,
    DSGInvalidDimensionAssociation,
)
from dsgrid.registry.common import (
    ConfigKey,
    ProjectRegistryStatus,
    DatasetRegistryStatus,
    check_config_id_strict,
)
from dsgrid.utils.scratch_dir_context import ScratchDirContext
from dsgrid.utils.spark import (
    get_unique_values,
    cross_join_dfs,
    create_dataframe_from_product,
)
from dsgrid.utils.timing import timer_stats_collector, track_timing
from dsgrid.utils.utilities import check_uniqueness
from dsgrid.config.config_base import ConfigBase
from dsgrid.config.dataset_config import InputDatasetType
from dsgrid.config.supplemental_dimension import SupplementalDimensionModel
from dsgrid.config.dimension_mapping_base import DimensionMappingReferenceModel
from dsgrid.config.dimensions import (
    DimensionsListModel,
    DimensionReferenceModel,
    DimensionModel,
    check_display_name,
    generate_dimension_query_name,
)
from dsgrid.dimension.time import (
    TimeBasedDataAdjustmentModel,
    DaylightSavingSpringForwardType,
    DaylightSavingFallBackType,
)


logger = logging.getLogger(__name__)


class SubsetDimensionSelectorModel(DSGBaseModel):
    """Defines a subset dimension selector inclusive of the subset's records and information
    required to define the selector as a record within the supplemental dimension defined by the
    subset dimension group.
    """

    name: str
    description: str
    column_values: dict[str, str] = Field(
        title="column_values",
        description="Optional columns to populate in the subset dimension group's supplemental "
        "dimension records table. For example, if each selector in the group defines the end "
        "uses for one sector (e.g., commercial_end_uses, transportation_end_uses), the "
        "supplemental dimension records table needs to define the 'fuel_id' and 'unit' fields of "
        "the EnergyEndUse data model.",
        default={},
    )
    records: list[str] = Field(
        title="records",
        description="Table of values populated by reading the parent subset dimension records "
        "file. Should not be populated by the user.",
        default=[],
        json_schema_extra={
            "dsgrid_internal": True,
        },
    )


[docs] class SubsetDimensionGroupModel(DSGBaseModel): """Defines one or more subset dimension selectors for a dimension type.""" name: str display_name: str dimension_query_name: Optional[str] = Field( default=None, title="dimension_query_name", description="Auto-generated query name for SQL queries.", ) description: str dimension_type: DimensionType = Field( title="dimension_type", alias="type", description="Type of the dimension", json_schema_extra={ "options": DimensionType.format_for_docs(), }, ) filename: Optional[str] = Field( default=None, title="filename", alias="file", description="Filename containing dimension records. Only populated for initial " "registration. Each selector's records are stored as JSON objects in the dsgrid registry.", ) selectors: list[SubsetDimensionSelectorModel] = Field( title="selectors", description="Dimension selectors", ) selector_references: list[DimensionReferenceModel] = Field( title="selectors", description="References to the subset dimensions generated by dsgrid during registration.", default=[], ) create_supplemental_dimension: bool = Field( title="create_supplemental_dimension", description="Auto-generate supplemental dimensions in order to allow aggregrations on " "the subsets.", default=True, ) record_ids: set[str] = set()
[docs] @field_validator("display_name") @classmethod def check_display_name(cls, display_name): return check_display_name(display_name)
[docs] @field_validator("dimension_query_name") @classmethod def check_query_name(cls, dimension_query_name, info: ValidationInfo): if "display_name" not in info.data: return dimension_query_name return generate_dimension_query_name(dimension_query_name, info.data["display_name"])
[docs] @field_validator("selectors") @classmethod def check_selectors(cls, selectors): """Check that the selectors are defined consistently.""" if len(selectors) > 1: first = sorted(selectors[0].column_values.keys()) for selector in selectors[1:]: columns = sorted(selector.column_values.keys()) if columns != first: raise ValueError( f"All selectors must define the same columns: {first=} {columns=}" ) return selectors
[docs] @model_validator(mode="after") def load_records(self) -> "SubsetDimensionGroupModel": """Load the records for each subset dimension selector.""" if self.filename is None: return self record_ids, mappings = load_subset_dimensions(Path(self.filename)) self.record_ids.update(record_ids) selector_names = check_uniqueness( [x.name for x in self.selectors], "subset dimension selector" ) diff = selector_names.symmetric_difference(mappings) if diff: raise ValueError( f"subset dimension {self.name} selectors have a mismatch with the records file column names: {diff}" ) for dim in self.selectors: dim.records = mappings[dim.name] self.filename = None return self
[docs] class SubsetDimensionGroupListModel(DSGBaseModel): """Defines a list of subset dimensions.""" subset_dimensions: conlist(SubsetDimensionGroupModel, min_length=1) = Field( description="List of subset dimensions to be registered" )
[docs] class DimensionsModel(DSGBaseModel): """Contains dimensions defined by a project""" base_dimensions: DimensionsListModel = Field( title="base_dimensions", description="List of dimensions for a project's base dimensions. They will be " "automatically registered during project registration and then converted to " "base_dimension_references.", json_schema_extra={ "requirements": ( "All base :class:`dsgrid.dimensions.base_model.DimensionType` must be defined and only" " one dimension reference per type is allowed.", ), }, default=[], ) base_dimension_references: list[DimensionReferenceModel] = Field( title="base_dimensions", description="List of registry references (``DimensionReferenceModel``) for a project's " "base dimensions.", json_schema_extra={ "requirements": ( "All base :class:`dsgrid.dimensions.base_model.DimensionType` must be defined and only" " one dimension reference per type is allowed.", ), }, default=[], ) subset_dimensions: list[SubsetDimensionGroupModel] = Field( title="subset_dimensions", description="List of subset dimension groups", json_schema_extra={ "notes": ( "Subset dimension groups are used to specify subsets of base dimension records that a " "dataset must support, dimensionality of derived datasets, and query filters. " "Subset dimension groups also define a new supplemental dimension whose records " "correspond to the table columns/subset selectors, such that defining a subset " "dimension group can be a convenient way to define reporting at a different level of " "aggregation as compared to the project's base dimensions.", ), }, default=[], ) supplemental_dimensions: list[SupplementalDimensionModel] = Field( title="supplemental_dimensions", description="List of supplemental dimensions. They will be automatically registered " "during project registration and then converted to supplemental_dimension_references.", json_schema_extra={ "notes": ( "Supplemental dimensions are used to support additional querying and transformations", "(e.g., aggregations, disgaggregations, filtering, scaling, etc.) of the project's ", "base data.", ), }, default=[], ) supplemental_dimension_references: list[DimensionReferenceModel] = Field( title="supplemental_dimension_references", description="List of registry references for a project's supplemental dimensions.", json_schema_extra={ "requirements": ( "Dimensions references of the same :class:`dsgrid.dimensions.base_model.DimensionType`" " are allowed for supplemental dimension references (i.e., multiple `Geography` types" " are allowed).", ), "notes": ( "Supplemental dimensions are used to support additional querying and transformations", "(e.g., aggregations, disgaggregations, filtering, scaling, etc.) of the project's ", "base data.", ), }, default=[], )
[docs] @model_validator(mode="after") def check_dimensions(self) -> "DimensionsModel": """Validate that the dimensions are complete and consistent.""" dimensions = itertools.chain(self.base_dimensions, self.base_dimension_references) check_required_dimensions(dimensions, "project base dimensions") return self
[docs] @model_validator(mode="before") @classmethod def pre_check_values(cls, values: dict) -> dict: """Checks that base dimensions are defined.""" if not values.get("base_dimensions", []) and not values.get( "base_dimension_references", [] ): raise ValueError("Either base_dimensions or base_dimension_references must be defined") return values
[docs] @field_validator("base_dimensions") @classmethod def check_files(cls, values: list) -> list: """Validate dimension files are unique across all dimensions""" check_uniqueness( (x.filename for x in values if isinstance(x, DimensionModel)), "dimension record filename", ) return values
[docs] @field_validator("base_dimensions") @classmethod def check_names(cls, values: list) -> list: """Validate dimension names are unique across all dimensions.""" check_uniqueness( [dim.name for dim in values], "dimension record name", ) return values
[docs] @field_validator("base_dimensions") @classmethod def check_time_zone(cls, values: list) -> list: """Validate the time zone column in geography records.""" for dimension in values: if dimension.dimension_type == DimensionType.GEOGRAPHY: check_timezone_in_geography( dimension, err_msg="Project geography dimension records must include a time_zone column", ) return values
[docs] @field_validator("subset_dimensions") @classmethod def check_subset_dimensions(cls, subset_dimensions): """Check that each subset dimension has a unique name and display_name.""" check_uniqueness([x.name for x in subset_dimensions], "subset dimensions name") check_uniqueness( [x.display_name for x in subset_dimensions], "subset dimensions display_name" ) return subset_dimensions
[docs] @model_validator(mode="after") def check_dimension_query_names(self) -> "DimensionsModel": """Check that all dimension query names are unique.""" query_names = set() def add_name(name): if name in query_names: raise ValueError(f"dimension_query_name={name} is not unique in the project") query_names.add(name) for dim in self.base_dimensions: add_name(dim.dimension_query_name) for dim in self.supplemental_dimensions: add_name(dim.dimension_query_name) for group in self.subset_dimensions: add_name(group.display_name) for selector in group.selectors: add_name(selector.name) return self
class RequiredSubsetDimensionRecordsModel(DSGBaseModel): name: str = Field(description="Name of a subset dimension") selectors: list[str] = Field(description="One or more selectors in the subset dimension")
[docs] class RequiredSupplementalDimensionRecordsModel(DSGBaseModel): name: str = Field(description="Name of a supplemental dimension") record_ids: list[str] = Field( description="One or more record IDs in the supplemental dimension" )
[docs] class RequiredDimensionRecordsByTypeModel(DSGBaseModel): base: list[str] = [] base_missing: list[str] = [] subset: list[RequiredSubsetDimensionRecordsModel] = [] supplemental: list[RequiredSupplementalDimensionRecordsModel] = []
[docs] @model_validator(mode="after") def check_base(self) -> "RequiredDimensionRecordsByTypeModel": if self.base and self.base_missing: msg = f"base and base_missing cannot both be set: {self.base=} {self.base_missing=}" raise ValueError(msg) return self
[docs] def defines_dimension_requirement(self) -> bool: """Returns True if the model defines a dimension requirement.""" return ( bool(self.base) or bool(self.base_missing) or bool(self.subset) or bool(self.supplemental) )
[docs] class RequiredDimensionRecordsModel(DSGBaseModel): # This is here because Pydantic doesn't like fields that start with 'model_' model_config = make_model_config(protected_namespaces=()) # time is excluded geography: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() metric: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() model_year: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() scenario: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() sector: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() subsector: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() weather_year: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel()
[docs] class RequiredDimensionsModel(DSGBaseModel): """Defines required record IDs that must exist for each dimension in a dataset. Record IDs can reside in the project's base, subset, or supplemental dimensions. Using subset dimensions is recommended. dsgrid will substitute base records for mapped subset records at runtime. If no records are listed for a dimension then all project base records are required. """ single_dimensional: RequiredDimensionRecordsModel = Field( description="Required records for a single dimension.", default=RequiredDimensionRecordsModel(), ) multi_dimensional: list[RequiredDimensionRecordsModel] = Field( description="Required records for a combination of dimensions. For example, there may be " "a dataset requirement for only one subsector for a given sector instead of a cross " "product.", default=[], )
[docs] @model_validator(mode="after") def check_for_duplicates(self) -> "RequiredDimensionsModel": single_dimensional = set() for field in RequiredDimensionRecordsModel.model_fields: req = getattr(self.single_dimensional, field) if req.defines_dimension_requirement(): single_dimensional.add(field) dim_combos = set() for item in self.multi_dimensional: dims = [] for field in RequiredDimensionRecordsModel.model_fields: req = getattr(item, field) if req.defines_dimension_requirement(): if field in single_dimensional: msg = ( "dimensions cannot be defined in both single_dimensional and " f"multi_dimensional sections: {field}" ) raise ValueError(msg) dims.append(field) if len(dims) < 2: msg = ( "A multi_dimensional dimension requirement must contain at least two " f"dimensions: {item}" ) raise ValueError(msg) dim_combo = tuple(sorted(dims)) if dim_combo not in dim_combos: for other in dim_combos: if set(dim_combo).intersection(other): msg = ( "All descriptors in the multi-dimensional requirements with an " "intersection of dimensions must have a full intersection. " f"dimension_set1 = {other} dimension_set2 = {dim_combo}" ) raise ValueError(msg) dim_combos.add(dim_combo) return self
[docs] class InputDatasetModel(DSGBaseModel): """Defines an input dataset for the project config.""" dataset_id: str = Field( title="dataset_id", description="Unique dataset identifier.", json_schema_extra={ "updateable": False, }, ) dataset_type: InputDatasetType = Field( title="dataset_type", description="Dataset type.", json_schema_extra={ "options": InputDatasetType.format_for_docs(), "updateable": False, }, ) version: str = Field( title="version", description="Version of the registered dataset", default=None, json_schema_extra={ "requirements": ( # TODO: add notes about warnings for outdated versions DSGRID-189 & DSGRID-148 # TODO: need to assume the latest version. DSGRID-190 "The version specification is optional. If no version is supplied, then the latest" " version in the registry is assumed.", "The version string must be in semver format (e.g., '1.0.0') and it must be a valid/" "existing version in the registry.", ), "updateable": False, # TODO: add notes about warnings for outdated versions? DSGRID-189. }, ) required_dimensions: RequiredDimensionsModel = Field( title="required_dimensions", description="Defines required record IDs that must exist for each dimension.", default=RequiredDimensionsModel(), ) mapping_references: list[DimensionMappingReferenceModel] = Field( title="mapping_references", description="Defines how to map the dataset dimensions to the project.", default=[], ) status: DatasetRegistryStatus = Field( title="status", description="Registration status of the dataset, added by dsgrid.", default=DatasetRegistryStatus.UNREGISTERED, json_schema_extra={ "dsgrid_internal": True, "notes": ("status is "), "updateable": False, }, ) wrap_time_allowed: bool = Field( title="wrap_time_allowed", description="Whether to allow dataset time to be wrapped to project time if different", default=False, ) time_based_data_adjustment: TimeBasedDataAdjustmentModel = Field( title="time_based_data_adjustment", description="Defines how the rest of the dataframe is adjusted with respect to time. " "E.g., when drop associated data when dropping a leap day timestamp.", default=TimeBasedDataAdjustmentModel(), )
[docs] @field_validator("time_based_data_adjustment") @classmethod def check_data_adjustment(cls, time_based_data_adjustment): """Check daylight saving adjustment""" sfh = time_based_data_adjustment.daylight_saving_adjustment.spring_forward_hour fbh = time_based_data_adjustment.daylight_saving_adjustment.fall_back_hour if fbh == DaylightSavingFallBackType.NONE and sfh == DaylightSavingSpringForwardType.NONE: return time_based_data_adjustment if fbh != DaylightSavingFallBackType.NONE and sfh != DaylightSavingSpringForwardType.NONE: return time_based_data_adjustment msg = f"mismatch between spring_forward_hour and fall_back_hour, {time_based_data_adjustment=}." raise ValueError(msg)
# TODO: write validation that if daylight_saving_adjustment is specified, dataset time config must be IndexTimeDimensionConfig
[docs] class DimensionMappingsModel(DSGBaseModel): """Defines all dimension mappings associated with a dsgrid project, including base-to-supplemental mappings and dataset-to-project mappings. """ base_to_supplemental_references: list[DimensionMappingReferenceModel] = Field( title="base_to_supplemental_references", description="Base dimension to supplemental dimension mappings (e.g., county-to-state)" " used to support various queries and dimension transformations.", default=[], ) dataset_to_project: dict[str, list[DimensionMappingReferenceModel]] = Field( title="dataset_to_project", description="Dataset-to-project mappings map dataset dimensions to project dimensions.", default={}, json_schema_extra={ "dsgrid_internal": True, "notes": ( "Once a dataset is submitted to a project, dsgrid adds the dataset-to-project mappings" " to the project config", "Some projects may not have any dataset-to-project mappings. Dataset-to-project" " mappings are only supplied if a dataset's dimensions do not match the project's" " dimension. ", ), "updateable": False, }, # TODO: need to document missing dimension records, fill values, etc. DSGRID-191. )
[docs] class ProjectConfigModel(DSGBaseDatabaseModel): """Represents project configurations""" project_id: str = Field( title="project_id", description="A unique project identifier that is project-specific (e.g., " "'standard-scenarios-2021').", json_schema_extra={ "requirements": ("Must not contain any dashes (`-`)",), "updateable": False, }, ) name: str = Field( title="name", description="A project name to accompany the ID.", ) description: str = Field( title="description", description="Detailed project description.", json_schema_extra={ "notes": ( "The description will get stored in the project registry and may be used for" " searching", ), }, ) status: ProjectRegistryStatus = Field( title="status", description="project registry status", default=ProjectRegistryStatus.INITIAL_REGISTRATION, json_schema_extra={ "dsgrid_internal": True, "updateable": False, }, ) datasets: list[InputDatasetModel] = Field( title="datasets", description="List of input datasets for the project.", ) dimensions: DimensionsModel = Field( title="dimensions", description="List of `base` and `supplemental` dimensions.", ) dimension_mappings: DimensionMappingsModel = Field( title="dimension_mappings", description="List of project mappings. Initialized with base-to-base and" " base-to-supplemental mappings. dataset-to-project mappings are added by dsgrid as" " datasets get registered with the project.", default=DimensionMappingsModel(), json_schema_extra={ "notes": ("`[dimension_mappings]` are optional at the project level.",), }, )
[docs] @field_validator("project_id") @classmethod def check_project_id_handle(cls, project_id): """Check for valid characters in project id""" if "-" in project_id: raise ValueError('invalid character "-" in project id') check_config_id_strict(project_id, "Project") return project_id
[docs] class DimensionsByCategoryModel(DSGBaseModel): """Defines the query names by base and supplemental category.""" base: str subset: list[str] supplemental: list[str]
[docs] class ProjectDimensionQueryNamesModel(DSGBaseModel): """Defines the query names for all base and supplemental dimensions in the project.""" # This is here because Pydantic doesn't like fields that start with 'model_' model_config = make_model_config(protected_namespaces=()) geography: DimensionsByCategoryModel metric: DimensionsByCategoryModel model_year: DimensionsByCategoryModel scenario: DimensionsByCategoryModel sector: DimensionsByCategoryModel subsector: DimensionsByCategoryModel time: DimensionsByCategoryModel weather_year: DimensionsByCategoryModel
[docs] class ProjectConfig(ConfigBase): """Provides an interface to a ProjectConfigModel.""" def __init__(self, model): super().__init__(model) self._base_dimensions = {} # ConfigKey to DimensionConfig self._subset_dimensions = ( {} ) # {DimensionType: {subset_name: {ConfigKey: DimensionConfig}}} self._supplemental_dimensions = {} # ConfigKey to DimensionConfig self._base_to_supplemental_mappings = {} self._dimensions_by_query_name = {} @staticmethod def model_class() -> Type: return ProjectConfigModel @staticmethod def config_filename() -> str: return "project.json5"
[docs] def get_base_dimension(self, dimension_type: DimensionType) -> DimensionBaseConfig: """Return the base dimension matching dimension_type.""" for dim_config in self.base_dimensions.values(): if dim_config.model.dimension_type == dimension_type: return dim_config assert False, dimension_type
def get_base_dimension_and_version( self, dimension_type: DimensionType ) -> tuple[DimensionBaseConfig, str]: """Return the base dimension and version matching dimension_type.""" for key, dim_config in self.base_dimensions.items(): if dim_config.model.dimension_type == dimension_type: return dim_config, key.version assert False, dimension_type
[docs] def get_dimension(self, dimension_query_name: str) -> DimensionBaseConfig: """Return an instance of DimensionBaseConfig.""" dim = self._dimensions_by_query_name.get(dimension_query_name) if dim is None: raise DSGInvalidDimension(f"dimension_query_name={dimension_query_name} is not stored") return dim
[docs] def get_dimension_records(self, dimension_query_name: str) -> DataFrame: """Return a DataFrame containing the records for a dimension.""" return self.get_dimension(dimension_query_name).get_records_dataframe()
def get_dimension_reference(self, dimension_id: str) -> DimensionReferenceModel: """Return the reference of the dimension matching dimension_id.""" for ref in itertools.chain( self.model.dimensions.base_dimension_references, self.model.dimensions.supplemental_dimension_references, ): if ref.dimension_id == dimension_id: return ref raise DSGInvalidDimension(f"{dimension_id} is not stored")
[docs] def list_supplemental_dimensions( self, dimension_type: DimensionType, sort_by=None ) -> list[DimensionBaseConfig]: """Return the supplemental dimensions matching dimension (if any). Parameters ---------- dimension_type : DimensionType sort_by : str | None If set, sort the dimensions by this dimension attribute. """ dims = [ x for x in self.supplemental_dimensions.values() if x.model.dimension_type == dimension_type ] if sort_by is not None: dims.sort(key=lambda x: getattr(x.model, sort_by)) return dims
def get_matching_subset_dimension( self, dimension_type: DimensionType, unique_data_records: set[str] ) -> DimensionReferenceModel | None: """Return a dimension reference if there is a matching subset dimension, otherwise None.""" for group in self.model.dimensions.subset_dimensions: if group.dimension_type == dimension_type: for ref in group.selector_references: key = ConfigKey(ref.dimension_id, ref.version) records = self._subset_dimensions[dimension_type][group.name][ key ].get_unique_ids() if not unique_data_records.symmetric_difference(records): logger.info("Found matching subset dimension: %s", group.name) return ref return None
[docs] def get_base_to_supplemental_dimension_mappings_by_types( self, dimension_type: DimensionType ) -> list[MappingTableConfig]: """Return the base-to-supplemental dimension mappings for the dimension (if any).""" return [ x for x in self._base_to_supplemental_mappings.values() if x.model.from_dimension.dimension_type == dimension_type ]
[docs] def get_base_to_supplemental_config( self, dimension_query_name: str ) -> tuple[ConfigKey, MappingTableConfig]: """Return the project's base-to-supplemental dimension mapping config.""" dim = self.get_dimension(dimension_query_name) dimension_type = dim.model.dimension_type base_dim = self.get_base_dimension(dimension_type) if dim.model.dimension_id == base_dim.model.dimension_id: raise DSGInvalidParameter( f"Cannot pass base dimension: {dimension_type}/{dimension_query_name}" ) for key, mapping in self._base_to_supplemental_mappings.items(): if mapping.model.to_dimension.dimension_id == dim.model.dimension_id: return key, mapping raise DSGInvalidParameter( f"No mapping is stored for {dimension_type}/{dimension_query_name}" )
[docs] def get_base_to_supplemental_mapping_records(self, dimension_query_name: str) -> DataFrame: """Return the project's base-to-supplemental dimension mapping records.""" _, config = self.get_base_to_supplemental_config(dimension_query_name) return config.get_records_dataframe().filter("to_id is not NULL")
def has_base_to_supplemental_dimension_mapping_types(self, dimension_type) -> bool: """Return True if the config has these base-to-supplemental mappings.""" return self._has_mapping( dimension_type, dimension_type, self._base_to_supplemental_mappings, ) @staticmethod def _has_mapping(from_dimension_type, to_dimension_type, mapping): for config in mapping.values(): if ( config.model.from_dimension.dimension_type == from_dimension_type and config.model.to_dimension.dimension_type == to_dimension_type ): return True return False
[docs] def list_dimension_query_names(self, category: DimensionCategory | None = None) -> list[str]: """Return query names for all dimensions in the project. Parameters ---------- category : DimensionCategory | None Optionally, filter return by category. """ if category is None: return sorted(self._dimensions_by_query_name.keys()) match category: case DimensionCategory.BASE: method = self._iter_base_dimensions case DimensionCategory.SUBSET: method = self._iter_subset_dimensions case DimensionCategory.SUPPLEMENTAL: method = self._iter_supplemental_dimensions case _: raise NotImplementedError(f"{category=}") return sorted((x.model.dimension_query_name for x in method()))
def list_dimension_query_names_by_type(self, dimension_type: DimensionType) -> list[str]: """List the query names available for a dimension type.""" return [ x.model.dimension_query_name for x in self.iter_dimensions() if x.model.dimension_type == dimension_type ]
[docs] def get_base_dimension_to_query_name_mapping(self) -> dict[DimensionType, str]: """Return a mapping of DimensionType to query name for base dimensions.""" query_names = {} for dimension_type in DimensionType: dim = self.get_base_dimension(dimension_type) query_names[dimension_type] = dim.model.dimension_query_name return query_names
def get_subset_dimension_to_query_name_mapping(self) -> dict[DimensionType, list[str]]: """Return a mapping of DimensionType to query name for subset dimensions.""" query_names = defaultdict(list) for dimension_type in DimensionType: if dimension_type in self._subset_dimensions: for selectors in self._subset_dimensions[dimension_type].values(): for dim in selectors.values(): query_names[dimension_type].append(dim.model.dimension_query_name) return query_names
[docs] def get_supplemental_dimension_to_query_name_mapping(self) -> dict[DimensionType, list[str]]: """Return a mapping of DimensionType to query name for supplemental dimensions.""" query_names = {} for dimension_type in DimensionType: query_names[dimension_type] = [ x.model.dimension_query_name for x in self.list_supplemental_dimensions( dimension_type, sort_by="dimension_query_name" ) ] return query_names
def get_dimension_query_names_model(self) -> ProjectDimensionQueryNamesModel: """Return an instance of ProjectDimensionQueryNamesModel for the project.""" base_query_names_by_type = self.get_base_dimension_to_query_name_mapping() subset_query_names_by_type = self.get_subset_dimension_to_query_name_mapping() supp_query_names_by_type = self.get_supplemental_dimension_to_query_name_mapping() model = {} for dimension_type in DimensionType: model[dimension_type.value] = { "base": base_query_names_by_type[dimension_type], "subset": subset_query_names_by_type[dimension_type], "supplemental": supp_query_names_by_type[dimension_type], } return ProjectDimensionQueryNamesModel(**model) def update_dimensions(self, base_dimensions, subset_dimensions, supplemental_dimensions): self._base_dimensions.update(base_dimensions) self._subset_dimensions.update(subset_dimensions) self._supplemental_dimensions.update(supplemental_dimensions) self._dimensions_by_query_name.clear() for dim in self.iter_dimensions(): if dim.model.dimension_query_name in self._dimensions_by_query_name: raise DSGInvalidDimension( f"dimension_query_name={dim.model.dimension_query_name} exists multiple times in project " f"{self.config_id}" ) self._dimensions_by_query_name[dim.model.dimension_query_name] = dim def update_dimension_mappings(self, base_to_supplemental_mappings): self._base_to_supplemental_mappings.update(base_to_supplemental_mappings) # TODO: Once we start using these we may need to store by (from, to) as key instead. @track_timing(timer_stats_collector) def add_dataset_dimension_mappings( self, dataset_config: DatasetConfig, references: list[DimensionMappingReferenceModel] ): """Add a dataset's dimension mappings to the project. Raises ------ DSGInvalidDimensionMapping Raised if a requirement is violated. """ if dataset_config.model.dataset_id not in self.model.dimension_mappings.dataset_to_project: self.model.dimension_mappings.dataset_to_project[dataset_config.model.dataset_id] = [] mappings = self.model.dimension_mappings.dataset_to_project[ dataset_config.model.dataset_id ] existing_ids = set((x.mapping_id for x in mappings)) for reference in references: if reference.mapping_id not in existing_ids: mappings.append(reference) logger.info( "Added dimension mapping for dataset=%s: %s", dataset_config.model.dataset_id, reference.mapping_id, ) @property def config_id(self) -> str: return self._model.project_id def get_dataset(self, dataset_id) -> InputDatasetModel: """Return a dataset by ID.""" for dataset in self.model.datasets: if dataset.dataset_id == dataset_id: return dataset raise DSGInvalidField( f"project_id={self._model.project_id} does not have dataset_id={dataset_id}" ) def has_dataset(self, dataset_id: str, status=None | DatasetRegistryStatus) -> bool: """Return True if the dataset_id is present in the configuration. Parameters ---------- dataset_id : str status : None | DatasetRegistryStatus If set, only return True if the status matches. """ for dataset in self.iter_datasets(): if dataset.dataset_id == dataset_id: if status is None or dataset.status == status: return True return False # TODO: what about benchmark and historical? return False def get_load_data_time_columns(self, dimension_query_name: str) -> list[str]: """Return the time dimension columns expected in the load data table for this query name.""" dim = self.get_dimension(dimension_query_name) time_columns = dim.get_load_data_time_columns() return time_columns def iter_datasets(self): for dataset in self.model.datasets: yield dataset def _iter_base_dimensions(self): return self._base_dimensions.values() def _iter_subset_dimensions(self): for x in self._subset_dimensions.values(): for y in x.values(): for z in y.values(): yield z def _iter_supplemental_dimensions(self): return self._supplemental_dimensions.values() def iter_dimensions(self): """Return an iterator over all dimensions of the project. Yields ------ DimensionConfig """ return itertools.chain( self._iter_base_dimensions(), self._iter_subset_dimensions(), self._iter_supplemental_dimensions(), )
[docs] def list_registered_dataset_ids(self) -> list[str]: """List registered datasets associated with the project.""" status = DatasetRegistryStatus.REGISTERED return [x.dataset_id for x in self._iter_datasets_by_status(status)]
[docs] def list_unregistered_dataset_ids(self) -> list[str]: """List unregistered datasets associated with project registry.""" status = DatasetRegistryStatus.UNREGISTERED return [x.dataset_id for x in self._iter_datasets_by_status(status)]
def _iter_datasets_by_status(self, status: DatasetRegistryStatus) -> InputDatasetModel: for dataset in self.iter_datasets(): if dataset.status == status: yield dataset
[docs] def get_required_dimension_record_ids( self, dataset_id: str, dimension_type: DimensionType ) -> set[str]: """Return the required base dimension record IDs for the dataset and dimension type.""" dataset = self.get_dataset(dataset_id) requirements = getattr( dataset.required_dimensions.single_dimensional, dimension_type.value ) record_ids = self._get_required_record_ids_from_base(requirements, dimension_type) record_ids.update(self._get_required_record_ids_from_subsets(requirements)) record_ids.update( self._get_required_record_ids_from_supplementals(requirements, dimension_type) ) for multi_req in dataset.required_dimensions.multi_dimensional: req = getattr(multi_req, dimension_type.value) record_ids.update(req.base) record_ids.update(self._get_required_record_ids_from_subsets(req)) record_ids.update( self._get_required_record_ids_from_supplementals(req, dimension_type) ) return record_ids
def _build_multi_dim_requirement_associations( self, multi_dim_reqs: list[RequiredDimensionRecordsModel], context: ScratchDirContext ): dfs_by_dim_combo = {} # Example: Partial sector and subsector combinations are required. # [ # {{"sector": {"base": ["com"]}, # "subsector": "supplemental": # {"name": "commercial-subsectors", # "record_ids": ["commercial_subsectors"]}}, # {"sector": {"base": ["res"]}, "subsector": {"base": ["MidriseApartment"]}}, # ] # This code will replace supplemental records with base records and return a list of # dataframes of those combinations - one per unique combination of dimensions. for multi_req in multi_dim_reqs: dim_combo = [] columns = {} for field in sorted(RequiredDimensionRecordsModel.model_fields): dim_type = DimensionType(field) req = getattr(multi_req, field) if req.base == ["__all__"]: record_ids = self.get_base_dimension(dim_type).get_unique_ids() elif req.base_missing: record_ids = ( self.get_base_dimension(dim_type) .get_unique_ids() .difference(req.base_missing) ) else: record_ids = set(req.base) record_ids.update(self._get_required_record_ids_from_subsets(req)) record_ids.update(self._get_required_record_ids_from_supplementals(req, dim_type)) if record_ids: columns[field] = list(record_ids) dim_combo.append(dim_type.value) df = create_dataframe_from_product(columns, context) df = df.select(*sorted(df.columns)) dim_combo = tuple(sorted(dim_combo)) if dim_combo in dfs_by_dim_combo: dfs_by_dim_combo[dim_combo] = dfs_by_dim_combo[dim_combo].union(df) else: dfs_by_dim_combo[dim_combo] = df return list(dfs_by_dim_combo.values()) def _get_required_record_ids_from_base( self, req: RequiredDimensionRecordsByTypeModel, dimension_type: DimensionType ): if req.base: record_ids = set(req.base) elif not req.subset and not req.supplemental: record_ids = self.get_base_dimension(dimension_type).get_unique_ids() else: record_ids = set() return record_ids def _get_subset_dimension_records(self, name: str, selector_name): for group in self.model.dimensions.subset_dimensions: if group.name == name: for ref in group.selector_references: key = ConfigKey(ref.dimension_id, ref.version) dim = self._subset_dimensions[group.dimension_type][group.name][key] if dim.model.name == selector_name: return dim.get_unique_ids() msg = f"subset dimension selector not found: {name=} {selector_name=}" raise DSGInvalidDimension(msg) def _get_required_record_ids_from_subsets(self, req: RequiredDimensionRecordsByTypeModel): record_ids = set() for subset in req.subset: for selector_name in subset.selectors: record_ids.update(self._get_subset_dimension_records(subset.name, selector_name)) return record_ids def _get_required_record_ids_from_supplementals( self, req: RequiredDimensionRecordsByTypeModel, dimension_type: DimensionType ): record_ids = set() supp_name_to_dim = { x.model.name: x for x in self.list_supplemental_dimensions(dimension_type) } for supp in req.supplemental: dim = supp_name_to_dim.get(supp.name) if dim is None: raise DSGInvalidDimensionAssociation( f"Supplemental dimension of type={dimension_type} with name={supp.name} " "does not exist" ) supp_replacements = self._get_record_ids_from_one_supplemental(supp, dim) record_ids.update(supp_replacements) return record_ids def _get_record_ids_from_one_supplemental( self, req: RequiredSupplementalDimensionRecordsModel, dim ): record_ids = set() for supplemental_record_id in req.record_ids: base_record_ids = self._map_supplemental_record_to_base_records( dim, supplemental_record_id, ) record_ids.update(base_record_ids) return record_ids @track_timing(timer_stats_collector) def make_dimension_association_table( self, dataset_id, context: ScratchDirContext ) -> DataFrame: """Build a table that includes all combinations of dimension records that must be provided by the dataset. """ required_dimensions = self.get_dataset(dataset_id).required_dimensions multi_dfs = self._build_multi_dim_requirement_associations( required_dimensions.multi_dimensional, context ) # Project config construction asserts that there is no intersection of dimensions in # multi and single. existing = set() for df in multi_dfs: existing.update(set(df.columns)) single_dfs = {} for field in (x for x in RequiredDimensionRecordsModel.model_fields if x not in existing): dimension_type = DimensionType(field) req = getattr(required_dimensions.single_dimensional, field) record_ids = self._get_required_record_ids_from_base(req, dimension_type) record_ids.update(self._get_required_record_ids_from_subsets(req)) record_ids.update( self._get_required_record_ids_from_supplementals(req, dimension_type) ) single_dfs[field] = list(record_ids) single_df = create_dataframe_from_product(single_dfs, context) return cross_join_dfs(multi_dfs + [single_df]) def _map_supplemental_record_to_base_records(self, dim, supplemental_id): mapping_records = self.get_base_to_supplemental_mapping_records( dim.model.dimension_query_name ).filter(f"to_id == '{supplemental_id}'") if mapping_records.rdd.isEmpty(): raise DSGInvalidDimensionAssociation( f"Did not find {dim.model.dimension_type} supplemental dimension with record ID " f"{supplemental_id} while attempting to substitute the records with base records." ) if get_unique_values(mapping_records, "from_fraction") != {1}: raise DSGInvalidDimensionAssociation( "Supplemental dimensions used for associations must all have fraction=1" ) return get_unique_values(mapping_records, "from_id") def are_all_datasets_submitted(self) -> bool: """Return True if all datasets have been submitted.""" return not self.list_unregistered_dataset_ids() def set_status(self, status: ProjectRegistryStatus) -> None: """Set the project status to the given value.""" self.model.status = status logger.info("Set project_id=%s status=%s", self.config_id, status) def set_dataset_status(self, dataset_id: str, status: DatasetRegistryStatus): """Set the dataset status to the given value. Raises ------ ValueError Raised if dataset_id is not stored. """ dataset = self.get_dataset(dataset_id) dataset.status = status logger.info( "Set dataset_id=%s status=%s for project_id=%s", dataset_id, status, self._model.project_id, ) @property def base_dimensions(self) -> dict: """Return the Base Dimensions. Returns ------- dict dict of DimensionConfig keyed by ConfigKey """ return self._base_dimensions @property def supplemental_dimensions(self) -> dict: """Return the supplemental dimensions. Returns ------- dict dict of DimensionConfig keyed by ConfigKey """ return self._supplemental_dimensions
def load_subset_dimensions(filename: Path) -> tuple[set[str], dict[str, list[str]]]: """Return a mapping of subset dimension name to record IDs.""" df = pd.read_csv(filename, index_col="id") if len(df.columns) == 0: raise DSGInvalidDimension( "A subset dimension records file must at least one dimension column." ) record_ids = set(df.index.values) subset_by_dim_name = {x: df[x].dropna().index.to_list() for x in df.columns} return record_ids, subset_by_dim_name