Source code for dsgrid.config.project_config

import itertools
import logging
from collections import defaultdict
from pathlib import Path
from typing import Any, Generator, Iterable, Optional, Type

import pandas as pd
from pydantic import conlist, field_validator, model_validator, Field

from dsgrid.config.common import make_base_dimension_template
from dsgrid.config.dataset_config import DatasetConfig
from dsgrid.config.dimension_config import (
    DimensionBaseConfig,
    DimensionBaseConfigWithFiles,
)
from dsgrid.config.mapping_tables import MappingTableConfig
from dsgrid.config.time_dimension_base_config import TimeDimensionBaseConfig
from dsgrid.data_models import DSGBaseModel, DSGBaseDatabaseModel, make_model_config
from dsgrid.dimension.base_models import (
    check_required_dimensions,
    check_timezone_in_geography,
    DimensionCategory,
    DimensionType,
)
from dsgrid.dimension.time import TimeDimensionType
from dsgrid.exceptions import (
    DSGInvalidDataset,
    DSGInvalidField,
    DSGInvalidDimension,
    DSGInvalidOperation,
    DSGInvalidParameter,
    DSGValueNotRegistered,
)
from dsgrid.registry.common import (
    ConfigKey,
    ProjectRegistryStatus,
    DatasetRegistryStatus,
    check_config_id_strict,
)
from dsgrid.spark.types import (
    DataFrame,
)
from dsgrid.utils.scratch_dir_context import ScratchDirContext
from dsgrid.utils.spark import (
    cross_join_dfs,
    create_dataframe_from_product,
)
from dsgrid.utils.timing import timer_stats_collector, track_timing
from dsgrid.utils.utilities import check_uniqueness
from dsgrid.config.config_base import ConfigBase
from dsgrid.config.dataset_config import InputDatasetType
from dsgrid.config.supplemental_dimension import SupplementalDimensionModel
from dsgrid.config.dimension_mapping_base import DimensionMappingReferenceModel
from dsgrid.config.dimensions import (
    DimensionsListModel,
    DimensionReferenceModel,
    DimensionModel,
)
from dsgrid.dimension.time import (
    TimeBasedDataAdjustmentModel,
    DaylightSavingSpringForwardType,
    DaylightSavingFallBackType,
)


logger = logging.getLogger(__name__)


class SubsetDimensionSelectorModel(DSGBaseModel):
    """Defines a subset dimension selector inclusive of the subset's records and information
    required to define the selector as a record within the supplemental dimension defined by the
    subset dimension group.
    """

    name: str
    description: str
    column_values: dict[str, str] = Field(
        title="column_values",
        description="Optional columns to populate in the subset dimension group's supplemental "
        "dimension records table. For example, if each selector in the group defines the end "
        "uses for one sector (e.g., commercial_end_uses, transportation_end_uses), the "
        "supplemental dimension records table needs to define the 'fuel_id' and 'unit' fields of "
        "the EnergyEndUse data model.",
        default={},
    )
    records: list[str] = Field(
        title="records",
        description="Table of values populated by reading the parent subset dimension records "
        "file. Should not be populated by the user.",
        default=[],
        json_schema_extra={
            "dsgrid_internal": True,
        },
    )


[docs] class SubsetDimensionGroupModel(DSGBaseModel): """Defines one or more subset dimension selectors for a dimension type.""" name: str description: str dimension_type: DimensionType = Field( title="dimension_type", alias="type", description="Type of the dimension", json_schema_extra={ "options": DimensionType.format_for_docs(), }, ) filename: Optional[str] = Field( default=None, title="filename", alias="file", description="Filename containing dimension records. Only populated for initial " "registration. Each selector's records are stored as JSON objects in the dsgrid registry.", ) selectors: list[SubsetDimensionSelectorModel] = Field( title="selectors", description="Dimension selectors", ) selector_references: list[DimensionReferenceModel] = Field( title="selectors", description="References to the subset dimensions generated by dsgrid during registration.", default=[], ) create_supplemental_dimension: bool = Field( title="create_supplemental_dimension", description="Auto-generate supplemental dimensions in order to allow aggregrations on " "the subsets.", default=True, ) base_dimension_name: Optional[str] = Field( default=None, title="base_dimension_name", description="Name of base dimension for the supplemental dimension mapping, if " "create_supplemental_dimension is true. Required if there are multiple base dimensions " "for this type.", ) record_ids: set[str] = set()
[docs] @field_validator("selectors") @classmethod def check_selectors(cls, selectors): """Check that the selectors are defined consistently.""" if len(selectors) > 1: first = sorted(selectors[0].column_values.keys()) for selector in selectors[1:]: columns = sorted(selector.column_values.keys()) if columns != first: raise ValueError( f"All selectors must define the same columns: {first=} {columns=}" ) return selectors
[docs] @model_validator(mode="after") def load_records(self) -> "SubsetDimensionGroupModel": """Load the records for each subset dimension selector.""" if self.filename is None: return self record_ids, mappings = load_subset_dimensions(Path(self.filename)) self.record_ids.update(record_ids) selector_names = check_uniqueness( [x.name for x in self.selectors], "subset dimension selector" ) diff = selector_names.symmetric_difference(mappings) if diff: raise ValueError( f"subset dimension {self.name} selectors have a mismatch with the records file column names: {diff}" ) for dim in self.selectors: dim.records = mappings[dim.name] self.filename = None return self
[docs] class SubsetDimensionGroupListModel(DSGBaseModel): """Defines a list of subset dimensions.""" subset_dimensions: conlist(SubsetDimensionGroupModel, min_length=1) = Field( description="List of subset dimensions to be registered" )
[docs] class DimensionsModel(DSGBaseModel): """Contains dimensions defined by a project""" base_dimensions: DimensionsListModel = Field( title="base_dimensions", description="List of dimensions for a project's base dimensions. They will be " "automatically registered during project registration and then converted to " "base_dimension_references.", json_schema_extra={ "requirements": ( "All base :class:`dsgrid.dimensions.base_model.DimensionType` must be defined and only" " one dimension reference per type is allowed.", ), }, default=[], ) base_dimension_references: list[DimensionReferenceModel] = Field( title="base_dimensions", description="List of registry references (``DimensionReferenceModel``) for a project's " "base dimensions.", json_schema_extra={ "requirements": ( "All base :class:`dsgrid.dimensions.base_model.DimensionType` must be defined and only" " one dimension reference per type is allowed.", ), }, default=[], ) subset_dimensions: list[SubsetDimensionGroupModel] = Field( title="subset_dimensions", description="List of subset dimension groups", json_schema_extra={ "notes": ( "Subset dimension groups are used to specify subsets of base dimension records that a " "dataset must support, dimensionality of derived datasets, and query filters. " "Subset dimension groups also define a new supplemental dimension whose records " "correspond to the table columns/subset selectors, such that defining a subset " "dimension group can be a convenient way to define reporting at a different level of " "aggregation as compared to the project's base dimensions.", ), }, default=[], ) supplemental_dimensions: list[SupplementalDimensionModel] = Field( title="supplemental_dimensions", description="List of supplemental dimensions. They will be automatically registered " "during project registration and then converted to supplemental_dimension_references.", json_schema_extra={ "notes": ( "Supplemental dimensions are used to support additional querying and transformations ", "(e.g., aggregations, disgaggregations, filtering, scaling, etc.) of the project's ", "base data.", ), }, default=[], ) supplemental_dimension_references: list[DimensionReferenceModel] = Field( title="supplemental_dimension_references", description="List of registry references for a project's supplemental dimensions.", json_schema_extra={ "requirements": ( "Dimensions references of the same :class:`dsgrid.dimensions.base_model.DimensionType`" " are allowed for supplemental dimension references (i.e., multiple `Geography` types" " are allowed).", ), "notes": ( "Supplemental dimensions are used to support additional querying and transformations ", "(e.g., aggregations, disgaggregations, filtering, scaling, etc.) of the project's ", "base data.", ), }, default=[], )
[docs] @model_validator(mode="after") def check_dimensions(self) -> "DimensionsModel": """Validate that the dimensions are complete and consistent.""" dimensions = itertools.chain(self.base_dimensions, self.base_dimension_references) check_required_dimensions(dimensions, "project base dimensions") return self
[docs] @model_validator(mode="before") @classmethod def pre_check_values(cls, values: dict) -> dict: """Checks that base dimensions are defined.""" if not values.get("base_dimensions", []) and not values.get( "base_dimension_references", [] ): raise ValueError("Either base_dimensions or base_dimension_references must be defined") return values
[docs] @field_validator("base_dimensions") @classmethod def check_files(cls, values: list) -> list: """Validate dimension files are unique across all dimensions""" check_uniqueness( (x.filename for x in values if isinstance(x, DimensionModel)), "dimension record filename", ) return values
[docs] @field_validator("base_dimensions") @classmethod def check_names(cls, values: list) -> list: """Validate dimension names are unique across all dimensions.""" check_uniqueness( [dim.name for dim in values], "dimension record name", ) return values
[docs] @field_validator("base_dimensions") @classmethod def check_time_zone(cls, values: list) -> list: """Validate the time zone column in geography records.""" for dimension in values: if dimension.dimension_type == DimensionType.GEOGRAPHY: check_timezone_in_geography( dimension, err_msg="Project geography dimension records must include a time_zone column", ) return values
[docs] @field_validator("subset_dimensions") @classmethod def check_subset_dimensions(cls, subset_dimensions): """Check that each subset dimension has a unique name.""" check_uniqueness([x.name for x in subset_dimensions], "subset dimensions name") return subset_dimensions
[docs] @model_validator(mode="after") def check_dimension_names(self) -> "DimensionsModel": """Check that all dimension query names are unique.""" names: set[str] = set() def add_name(name): if name in names: raise ValueError(f"dimension_name={name} is not unique in the project") names.add(name) for dim in self.base_dimensions: add_name(dim.name) for dim in self.supplemental_dimensions: add_name(dim.name) for group in self.subset_dimensions: add_name(group.name) for selector in group.selectors: add_name(selector.name) return self
class RequiredSubsetDimensionRecordsModel(DSGBaseModel): name: str = Field(description="Name of a subset dimension") selectors: list[str] = Field(description="One or more selectors in the subset dimension")
[docs] class RequiredSupplementalDimensionRecordsModel(DSGBaseModel): name: str = Field(description="Name of a supplemental dimension") record_ids: list[str] = Field( description="One or more record IDs in the supplemental dimension" )
class RequiredBaseDimensionModel(DSGBaseModel): record_ids: list[str] = [] dimension_name: Optional[str] = Field( default=None, description="Identifies which base dimension contains the record IDs. Required if there " "is more than one base dimension for a given dimension type.", )
[docs] class RequiredDimensionRecordsByTypeModel(DSGBaseModel): base: RequiredBaseDimensionModel = RequiredBaseDimensionModel() base_missing: RequiredBaseDimensionModel = RequiredBaseDimensionModel() subset: list[RequiredSubsetDimensionRecordsModel] = []
[docs] @model_validator(mode="before") @classmethod def handle_legacy_format(cls, values: dict[str, Any]) -> dict[str, Any]: # 1. base and base_missing used to be list[str] because we used to allow a single base # dimension. # 2. We used to allow supplemental dimension requirements. # This allows backwards compatibility with old files and databases. # This can be removed once we've updated existing dsgrid project repositories. for field in ("base", "base_missing"): if field in values and isinstance(values[field], list): values[field] = {"record_ids": values[field]} values.pop("supplemental", None) return values
[docs] @model_validator(mode="after") def check_base(self) -> "RequiredDimensionRecordsByTypeModel": if self.base.record_ids and self.base_missing.record_ids: msg = f"base and base_missing cannot both contain record_ids: {self.base=} {self.base_missing=}" raise ValueError(msg) return self
[docs] def defines_dimension_requirement(self) -> bool: """Returns True if the model defines a dimension requirement.""" return ( bool(self.base.record_ids) or bool(self.base_missing.record_ids) or bool(self.subset) )
[docs] class RequiredDimensionRecordsModel(DSGBaseModel): # This is here because Pydantic doesn't like fields that start with 'model_' model_config = make_model_config(protected_namespaces=()) # time is excluded geography: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() metric: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() model_year: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() scenario: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() sector: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() subsector: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel() weather_year: RequiredDimensionRecordsByTypeModel = RequiredDimensionRecordsByTypeModel()
[docs] class RequiredDimensionsModel(DSGBaseModel): """Defines required record IDs that must exist for each dimension in a dataset. Record IDs can reside in the project's base or subset dimensions. Requirements can be specified for a single dimension or a combination of dimensions. For example, if a project includes commercial, residential, and transportation sectors but the dataset has only transporation sector records, it should specify a single_dimensional requirement that is a subset of of the project's base dimension. `{"single_dimensional": "sector": {"base": {"record_ids": ["transportation"]}}}`. If a dataset's requirements span multiple dimensions, such as if it does not have some metric records for some geography records, then a multi_dimensional requirement should be specified. (By default, a full cross join is assumed to be present.) `{"multi_dimensional": { "geography": {"base": {"record_ids": ["12345"]}} "metric": {"base": {"record_ids": ["electricity_cooling"]}} } }` If a dataset specifies a dimension type within a multi_dimensional section and wants to use all records from a project base dimension, it can specify `base.record_ids = ["__all__"] as a shorthand notation. Requirements for a dimension cannot be defined in both single_dimensional and multi_dimensional sections. If no records are listed for a dimension then all project base records are required. It might be easier for a dataset to specify what it does not have rather than what it does have. In that case, it is recommended to use the RequiredDimensionRecordsModel.base_missing field. dsgrid will compute the difference of the base dimension records and the base_missing records to determine the dataset's required records. If a project has multiple base dimensions of the same type, the RequiredDimensionRecordsModel.dimension_name must be specified to identify the base dimension that contains the record IDs. If a dataset contains a subset of project base dimension records that are defined in the project's subset dimensions, it is recommended to use that specification. dsgrid will substitute base records for mapped subset records at runtime. """ single_dimensional: RequiredDimensionRecordsModel = Field( description="Required records for a single dimension.", default=RequiredDimensionRecordsModel(), ) multi_dimensional: list[RequiredDimensionRecordsModel] = Field( description="Required records for a combination of dimensions. For example, there may be " "a dataset requirement for only one subsector for a given sector instead of a cross " "product.", default=[], )
[docs] @model_validator(mode="after") def check_for_duplicates(self) -> "RequiredDimensionsModel": """ 1. Ensure that the same dimension does not have requirements in both single and multi dimensional sections. 2. Set any dimensions that do not have specifications to require all base dimension records (as long as there is only one project base dimension). """ single_dimensional: set[str] = set() multi_dimensional: set[str] = set() for field in RequiredDimensionRecordsModel.model_fields: req = getattr(self.single_dimensional, field) if req.defines_dimension_requirement(): single_dimensional.add(field) dim_combos: set[tuple[str, ...]] = set() for item in self.multi_dimensional: dims = [] for field in RequiredDimensionRecordsModel.model_fields: req = getattr(item, field) if req.defines_dimension_requirement(): if field in single_dimensional: msg = ( "dimensions cannot be defined in both single_dimensional and " f"multi_dimensional sections: {field}" ) raise ValueError(msg) dims.append(field) multi_dimensional.add(field) if len(dims) < 2: msg = ( "A multi_dimensional dimension requirement must contain at least two " f"dimensions: {item}" ) raise ValueError(msg) dim_combo = tuple(sorted(dims)) if dim_combo not in dim_combos: for other in dim_combos: if set(dim_combo).intersection(other): msg = ( "All descriptors in the multi-dimensional requirements with an " "intersection of dimensions must have a full intersection. " f"dimension_set1 = {other} dimension_set2 = {dim_combo}" ) raise ValueError(msg) dim_combos.add(dim_combo) not_covered = ( set([x.value for x in DimensionType]) - multi_dimensional - single_dimensional ) for field in not_covered: if field != DimensionType.TIME.value: getattr(self.single_dimensional, field).base.record_ids = ["__all__"] return self
class DatasetBaseDimensionNamesModel(DSGBaseModel): """Defines the query names for project base dimensions to which datasets will be mapped. This is important for cases where a project has multiple base dimensions of the same type. """ # This is here because Pydantic doesn't like fields that start with 'model_' model_config = make_model_config(protected_namespaces=()) geography: Optional[str] = None metric: Optional[str] = None model_year: Optional[str] = None scenario: Optional[str] = None sector: Optional[str] = None subsector: Optional[str] = None time: Optional[str] = None weather_year: Optional[str] = None
[docs] class InputDatasetModel(DSGBaseModel): """Defines an input dataset for the project config.""" dataset_id: str = Field( title="dataset_id", description="Unique dataset identifier.", json_schema_extra={ "updateable": False, }, ) dataset_type: InputDatasetType = Field( title="dataset_type", description="Dataset type.", json_schema_extra={ "options": InputDatasetType.format_for_docs(), "updateable": False, }, ) version: str = Field( title="version", description="Version of the registered dataset", default=None, json_schema_extra={ "requirements": ( # TODO: add notes about warnings for outdated versions DSGRID-189 & DSGRID-148 # TODO: need to assume the latest version. DSGRID-190 "The version specification is optional. If no version is supplied, then the latest" " version in the registry is assumed.", "The version string must be in semver format (e.g., '1.0.0') and it must be a valid/" "existing version in the registry.", ), "updateable": False, # TODO: add notes about warnings for outdated versions? DSGRID-189. }, ) required_dimensions: RequiredDimensionsModel = Field( title="required_dimensions", description="Defines required record IDs that must exist for each dimension.", default=RequiredDimensionsModel(), ) mapping_references: list[DimensionMappingReferenceModel] = Field( title="mapping_references", description="Defines how to map the dataset dimensions to the project. " "Auto-populated during submission.", default=[], ) base_dimension_names: DatasetBaseDimensionNamesModel = Field( title="base_dimension_names", description="Defines the project base dimensions to which the dataset will map itself. " "Auto-populated during submission.", default=DatasetBaseDimensionNamesModel(), ) status: DatasetRegistryStatus = Field( title="status", description="Registration status of the dataset, added by dsgrid.", default=DatasetRegistryStatus.UNREGISTERED, json_schema_extra={ "dsgrid_internal": True, "notes": ("status is "), "updateable": False, }, ) wrap_time_allowed: bool = Field( title="wrap_time_allowed", description="Whether to allow dataset time to be wrapped to project time if different", default=False, ) time_based_data_adjustment: TimeBasedDataAdjustmentModel = Field( title="time_based_data_adjustment", description="Defines how the rest of the dataframe is adjusted with respect to time. " "E.g., when drop associated data when dropping a leap day timestamp.", default=TimeBasedDataAdjustmentModel(), )
[docs] @field_validator("time_based_data_adjustment") @classmethod def check_data_adjustment(cls, time_based_data_adjustment): """Check daylight saving adjustment""" sfh = time_based_data_adjustment.daylight_saving_adjustment.spring_forward_hour fbh = time_based_data_adjustment.daylight_saving_adjustment.fall_back_hour if fbh == DaylightSavingFallBackType.NONE and sfh == DaylightSavingSpringForwardType.NONE: return time_based_data_adjustment if fbh != DaylightSavingFallBackType.NONE and sfh != DaylightSavingSpringForwardType.NONE: return time_based_data_adjustment msg = f"mismatch between spring_forward_hour and fall_back_hour, {time_based_data_adjustment=}." raise ValueError(msg)
# TODO: write validation that if daylight_saving_adjustment is specified, dataset time config must be IndexTimeDimensionConfig
[docs] class DimensionMappingsModel(DSGBaseModel): """Defines all dimension mappings associated with a dsgrid project, including base-to-supplemental mappings and dataset-to-project mappings. """ base_to_supplemental_references: list[DimensionMappingReferenceModel] = Field( title="base_to_supplemental_references", description="Base dimension to supplemental dimension mappings (e.g., county-to-state)" " used to support various queries and dimension transformations.", default=[], ) dataset_to_project: dict[str, list[DimensionMappingReferenceModel]] = Field( title="dataset_to_project", description="Dataset-to-project mappings map dataset dimensions to project dimensions.", default={}, json_schema_extra={ "dsgrid_internal": True, "notes": ( "Once a dataset is submitted to a project, dsgrid adds the dataset-to-project mappings" " to the project config", "Some projects may not have any dataset-to-project mappings. Dataset-to-project" " mappings are only supplied if a dataset's dimensions do not match the project's" " dimension.", ), "updateable": False, }, # TODO: need to document missing dimension records, fill values, etc. DSGRID-191. )
[docs] class ProjectConfigModel(DSGBaseDatabaseModel): """Represents project configurations""" project_id: str = Field( title="project_id", description="A unique project identifier that is project-specific (e.g., " "'standard-scenarios-2021').", json_schema_extra={ "requirements": ("Must not contain any dashes (`-`)",), "updateable": False, }, ) name: str = Field( title="name", description="A project name to accompany the ID.", ) description: str = Field( title="description", description="Detailed project description.", json_schema_extra={ "notes": ( "The description will get stored in the project registry and may be used for" " searching", ), }, ) status: ProjectRegistryStatus = Field( title="status", description="project registry status", default=ProjectRegistryStatus.INITIAL_REGISTRATION, json_schema_extra={ "dsgrid_internal": True, "updateable": False, }, ) datasets: list[InputDatasetModel] = Field( title="datasets", description="List of input datasets for the project.", ) dimensions: DimensionsModel = Field( title="dimensions", description="List of `base` and `supplemental` dimensions.", ) dimension_mappings: DimensionMappingsModel = Field( title="dimension_mappings", description="List of project mappings. Initialized with base-to-base and" " base-to-supplemental mappings. dataset-to-project mappings are added by dsgrid as" " datasets get registered with the project.", default=DimensionMappingsModel(), json_schema_extra={ "notes": ("`[dimension_mappings]` are optional at the project level.",), }, )
[docs] @field_validator("project_id") @classmethod def check_project_id_handle(cls, project_id): """Check for valid characters in project id""" if "-" in project_id: raise ValueError('invalid character "-" in project id') check_config_id_strict(project_id, "Project") return project_id
def make_unvalidated_project_config( project_id: str, dataset_ids: Iterable[str], name: str | None = None, description: str | None = None, time_type: TimeDimensionType = TimeDimensionType.DATETIME, ) -> dict[str, Any]: """Create a project config as a dictionary, skipping validation.""" return { "project_id": project_id, "name": name or "", "description": description or "", "dimensions": { "base_dimensions": make_base_dimension_template(time_type=time_type), "subset_dimensions": [], "supplemental_dimensions": [], }, "datasets": [ { "dataset_id": x, "dataset_type": "", "version": "", "required_dimensions": {}, } for x in dataset_ids ], }
[docs] class DimensionsByCategoryModel(DSGBaseModel): """Defines the query names by base and supplemental category.""" base: list[str] subset: list[str] supplemental: list[str]
class ProjectDimensionNamesModel(DSGBaseModel): """Defines the query names for all base and supplemental dimensions in the project.""" # This is here because Pydantic doesn't like fields that start with 'model_' model_config = make_model_config(protected_namespaces=()) geography: DimensionsByCategoryModel metric: DimensionsByCategoryModel model_year: DimensionsByCategoryModel scenario: DimensionsByCategoryModel sector: DimensionsByCategoryModel subsector: DimensionsByCategoryModel time: DimensionsByCategoryModel weather_year: DimensionsByCategoryModel
[docs] class ProjectConfig(ConfigBase): """Provides an interface to a ProjectConfigModel.""" def __init__(self, model: ProjectConfigModel): super().__init__(model) self._base_dimensions: dict[ConfigKey, DimensionBaseConfig] = {} self._subset_dimensions: dict[ DimensionType, dict[str, dict[ConfigKey, DimensionBaseConfigWithFiles]] ] = {} self._supplemental_dimensions: dict[ConfigKey, DimensionBaseConfig] = {} self._base_to_supplemental_mappings: dict[ConfigKey, MappingTableConfig] = {} self._dimensions_by_name: dict[str, DimensionBaseConfig] = {} @staticmethod def model_class() -> Type: return ProjectConfigModel @staticmethod def config_filename() -> str: return "project.json5"
[docs] def get_base_dimension( self, dimension_type: DimensionType, dimension_name: Optional[str] = None ) -> DimensionBaseConfig: """Return the base dimension matching dimension_type. If there is more than one base dimension of the given type, dimension_name is required. See also -------- list_base_dimensions """ if dimension_name is None: return self._get_single_base_dimension(dimension_type) for dim in self._iter_base_dimensions(): if dim.model.dimension_type == dimension_type and dim.model.name == dimension_name: return dim msg = f"Did not find a dimension of {dimension_type=} with {dimension_name=}" raise DSGValueNotRegistered(msg)
def get_base_time_dimension(self) -> TimeDimensionBaseConfig: """Return the base dimension for time.""" dim = self._get_single_base_dimension(DimensionType.TIME) assert isinstance(dim, TimeDimensionBaseConfig) return dim def _get_single_base_dimension(self, dimension_type: DimensionType) -> DimensionBaseConfig: """Return the base dimension.""" dims = [ x for x in self._iter_base_dimensions() if x.model.dimension_type == dimension_type ] if not dims: msg = f"base dimension {dimension_type=} not found" raise DSGValueNotRegistered(msg) if len(dims) > 1: qnames = " ".join([x.model.name for x in dims]) msg = ( f"Found multiple base dimensions for {dimension_type=}: {qnames}. " "Call get_base_dimension() with a specific name." ) raise DSGInvalidDimension(msg) return dims[0] def get_base_dimension_and_version( self, dimension_type: DimensionType, dimension_name: Optional[str] = None ) -> tuple[DimensionBaseConfig, str]: """Return the base dimension and version matching dimension_type.""" res: tuple[DimensionBaseConfig, str] | None = None for key, dim in self.base_dimensions.items(): if dim.model.dimension_type == dimension_type: if dimension_name is None or dim.model.name == dimension_name: if res is not None: msg = ( f"Found multiple base dimensions for {dimension_type=}. " "You must specify a dimension query name to remove ambiguity." ) raise DSGInvalidOperation(msg) res = dim, key.version if res is None: msg = f"Did not find a dimension with {dimension_type=} {dimension_name=}" raise DSGValueNotRegistered(msg) return res
[docs] def get_dimension(self, name: str) -> DimensionBaseConfig: """Return the dimension with name.""" dim = self._dimensions_by_name.get(name) if dim is None: raise DSGValueNotRegistered(f"dimension_name={name} is not stored") return dim
def get_time_dimension(self, name: str) -> TimeDimensionBaseConfig: """Return the time dimension with dimension_name.""" dim = self.get_dimension(name) if not isinstance(dim, TimeDimensionBaseConfig): msg = f"{dim.model.label} is not a time dimension" raise DSGInvalidParameter(msg) return dim def get_dimension_by_name(self, name: str) -> DimensionBaseConfig: """Return the dimension with name.""" for dim in self._iter_base_dimensions(): if dim.model.name == name: return dim msg = f"No base dimension with {name=} is stored." raise DSGValueNotRegistered(msg) def get_dimension_with_records(self, name: str) -> DimensionBaseConfigWithFiles: """Return a dimension config matching name that has records.""" dim = self._dimensions_by_name.get(name) if dim is None: raise DSGInvalidDimension(f"{name=} is not stored") if not isinstance(dim, DimensionBaseConfigWithFiles): msg = f"{dim.model.label} does not have records" raise DSGInvalidParameter(msg) return dim
[docs] def get_dimension_records(self, name: str) -> DataFrame: """Return a DataFrame containing the records for a dimension.""" return self.get_dimension_with_records(name).get_records_dataframe()
def get_dimension_record_ids(self, name: str) -> set[str]: """Return the record IDs for the dimension identified by name.""" return self.get_dimension_with_records(name).get_unique_ids() def get_dimension_reference(self, dimension_id: str) -> DimensionReferenceModel: """Return the reference of the dimension matching dimension_id.""" for ref in itertools.chain( self.model.dimensions.base_dimension_references, self.model.dimensions.supplemental_dimension_references, ): if ref.dimension_id == dimension_id: return ref raise DSGInvalidDimension(f"{dimension_id} is not stored") def list_base_dimensions( self, dimension_type: Optional[DimensionType] = None ) -> list[DimensionBaseConfig]: """Return all base dimensions, optionally filtering to the dimension_type. See also -------- get_base_dimension """ if dimension_type is None: return list(self._iter_base_dimensions()) return [ x for x in self._iter_base_dimensions() if x.model.dimension_type == dimension_type ] def list_base_dimensions_with_records( self, dimension_type: DimensionType ) -> list[DimensionBaseConfigWithFiles]: """Return all base dimensions of the given dimension_type. See also -------- get_base_dimension """ return [ x for x in self._iter_base_dimensions() if x.model.dimension_type == dimension_type and isinstance(x, DimensionBaseConfigWithFiles) ]
[docs] def list_supplemental_dimensions( self, dimension_type: DimensionType, sort_by=None ) -> list[DimensionBaseConfigWithFiles]: """Return the supplemental dimensions matching dimension (if any). Parameters ---------- dimension_type : DimensionType sort_by : str | None If set, sort the dimensions by this dimension attribute. """ dims = [ x for x in self.supplemental_dimensions.values() if x.model.dimension_type == dimension_type ] if sort_by is not None: dims.sort(key=lambda x: getattr(x.model, sort_by)) return dims
def get_matching_subset_dimension( self, dimension_type: DimensionType, unique_data_records: set[str] ) -> DimensionReferenceModel | None: """Return a dimension reference if there is a matching subset dimension, otherwise None.""" for group in self.model.dimensions.subset_dimensions: if group.dimension_type == dimension_type: for ref in group.selector_references: key = ConfigKey(ref.dimension_id, ref.version) records = self._subset_dimensions[dimension_type][group.name][ key ].get_unique_ids() if not unique_data_records.symmetric_difference(records): logger.info("Found matching subset dimension: %s", group.name) return ref return None
[docs] def get_base_to_supplemental_dimension_mappings_by_types( self, dimension_type: DimensionType ) -> list[MappingTableConfig]: """Return the base-to-supplemental dimension mappings for the dimension (if any).""" return [ x for x in self._base_to_supplemental_mappings.values() if x.model.from_dimension.dimension_type == dimension_type ]
[docs] def get_base_to_supplemental_config( self, base_dim: DimensionBaseConfigWithFiles, supp_dim: DimensionBaseConfigWithFiles ) -> MappingTableConfig: """Return the project's base-to-supplemental dimension mapping config for the given base and supplemental dimensions. """ self._check_not_base_dimension(supp_dim) for mapping in self._base_to_supplemental_mappings.values(): if ( mapping.model.from_dimension.dimension_id == base_dim.model.dimension_id and mapping.model.to_dimension.dimension_id == supp_dim.model.dimension_id ): return mapping msg = f"No mapping is stored for base = {base_dim.model.label}, supplemental = {supp_dim.model.label}" raise DSGValueNotRegistered(msg)
[docs] def get_base_to_supplemental_mapping_records( self, base_dim: DimensionBaseConfigWithFiles, supp_dim: DimensionBaseConfigWithFiles ) -> DataFrame: """Return the project's base-to-supplemental dimension mapping records. Excludes rows with NULL to_id values. """ config = self.get_base_to_supplemental_config(base_dim, supp_dim) return config.get_records_dataframe().filter("to_id is not NULL")
def has_base_to_supplemental_dimension_mapping_types(self, dimension_type) -> bool: """Return True if the config has these base-to-supplemental mappings.""" return self._has_mapping( dimension_type, dimension_type, self._base_to_supplemental_mappings, ) def get_base_dimension_by_id(self, dimension_id: str) -> DimensionBaseConfig: """Return the base dimension with dimension_id.""" for dim in self._iter_base_dimensions(): if dim.model.dimension_id == dimension_id: return dim msg = f"Did not find a base dimension with {dimension_id=}" raise DSGValueNotRegistered(msg) def get_base_dimension_records_by_id(self, dimension_id: str) -> DataFrame: """Return the records for the base dimension with dimension_id.""" dim = self.get_base_dimension_by_id(dimension_id) if not isinstance(dim, DimensionBaseConfigWithFiles): msg = f"{dim.model.label} does not have records" raise DSGInvalidParameter(msg) return dim.get_records_dataframe() def _check_not_base_dimension(self, dim: DimensionBaseConfig) -> None: """Check that the dimension is not a base dimension.""" for base_dim in self.list_base_dimensions(dimension_type=dim.model.dimension_type): if dim.model.dimension_id == base_dim.model.dimension_id: msg = f"Cannot pass base dimension: {dim.model.label}" raise DSGInvalidParameter(msg) @staticmethod def _has_mapping( from_dimension_type: DimensionType, to_dimension_type: DimensionType, mapping: dict ) -> bool: for config in mapping.values(): if ( config.model.from_dimension.dimension_type == from_dimension_type and config.model.to_dimension.dimension_type == to_dimension_type ): return True return False def list_dimension_names(self, category: DimensionCategory | None = None) -> list[str]: """Return query names for all dimensions in the project. Parameters ---------- category : DimensionCategory | None Optionally, filter return by category. """ if category is None: return sorted(self._dimensions_by_name.keys()) match category: case DimensionCategory.BASE: method = self._iter_base_dimensions case DimensionCategory.SUBSET: method = self._iter_subset_dimensions case DimensionCategory.SUPPLEMENTAL: method = self._iter_supplemental_dimensions case _: raise NotImplementedError(f"{category=}") return sorted((x.model.name for x in method())) def list_dimension_names_by_type(self, dimension_type: DimensionType) -> list[str]: """List the query names available for a dimension type.""" return [ x.model.name for x in self.iter_dimensions() if x.model.dimension_type == dimension_type ] def get_dimension_names_mapped_to_type(self) -> dict[str, DimensionType]: """Return a dict of query names mapped to their dimension type.""" return {x.model.name: x.model.dimension_type for x in self.iter_dimensions()} def get_dimension_type_to_base_name_mapping(self) -> dict[DimensionType, list[str]]: """Return a mapping of DimensionType to query names for base dimensions.""" query_names: dict[DimensionType, list[str]] = {} for dimension_type in DimensionType: query_names[dimension_type] = [ x.model.name for x in self.list_base_dimensions(dimension_type=dimension_type) ] return query_names def get_subset_dimension_to_name_mapping(self) -> dict[DimensionType, list[str]]: """Return a mapping of DimensionType to query name for subset dimensions.""" query_names = defaultdict(list) for dimension_type in DimensionType: if dimension_type in self._subset_dimensions: for selectors in self._subset_dimensions[dimension_type].values(): for dim in selectors.values(): query_names[dimension_type].append(dim.model.name) return query_names def get_supplemental_dimension_to_name_mapping(self) -> dict[DimensionType, list[str]]: """Return a mapping of DimensionType to query name for supplemental dimensions.""" query_names = {} for dimension_type in DimensionType: query_names[dimension_type] = [ x.model.name for x in self.list_supplemental_dimensions(dimension_type, sort_by="name") ] return query_names def get_dimension_names_model(self) -> ProjectDimensionNamesModel: """Return an instance of ProjectDimensionNamesModel for the project.""" base_names_by_type = self.get_dimension_type_to_base_name_mapping() subset_names_by_type = self.get_subset_dimension_to_name_mapping() supp_names_by_type = self.get_supplemental_dimension_to_name_mapping() model: dict[str, Any] = {} for dimension_type in DimensionType: model[dimension_type.value] = { "base": base_names_by_type[dimension_type], "subset": subset_names_by_type[dimension_type], "supplemental": supp_names_by_type[dimension_type], } return ProjectDimensionNamesModel(**model) def set_dimensions( self, base_dimensions: dict[ConfigKey, DimensionBaseConfig], subset_dimensions: dict[ DimensionType, dict[str, dict[ConfigKey, DimensionBaseConfigWithFiles]] ], supplemental_dimensions: dict[ConfigKey, DimensionBaseConfig], ) -> None: self._base_dimensions.clear() self._subset_dimensions.clear() self._supplemental_dimensions.clear() self._base_dimensions.update(base_dimensions) self._subset_dimensions.update(subset_dimensions) self._supplemental_dimensions.update(supplemental_dimensions) self._dimensions_by_name.clear() for dim in self.iter_dimensions(): if dim.model.name in self._dimensions_by_name: raise DSGInvalidDimension( f"name={dim.model.name} exists multiple times in project " f"{self.config_id}" ) self._dimensions_by_name[dim.model.name] = dim def set_dimension_mappings( self, base_to_supplemental_mappings: dict[ConfigKey, MappingTableConfig] ): self._base_to_supplemental_mappings.clear() self._base_to_supplemental_mappings.update(base_to_supplemental_mappings) # TODO: Once we start using these we may need to store by (from, to) as key instead. def add_dataset_dimension_mappings( self, dataset_config: DatasetConfig, references: list[DimensionMappingReferenceModel] ): """Add a dataset's dimension mappings to the project. Raises ------ DSGInvalidDimensionMapping Raised if a requirement is violated. """ if dataset_config.model.dataset_id not in self.model.dimension_mappings.dataset_to_project: self.model.dimension_mappings.dataset_to_project[dataset_config.model.dataset_id] = [] mappings = self.model.dimension_mappings.dataset_to_project[ dataset_config.model.dataset_id ] existing_ids = set((x.mapping_id for x in mappings)) for reference in references: if reference.mapping_id not in existing_ids: mappings.append(reference) logger.info( "Added dimension mapping for dataset=%s: %s", dataset_config.model.dataset_id, reference.mapping_id, ) def add_dataset_base_dimension_names( self, dataset_id: str, base_dimension_names: DatasetBaseDimensionNamesModel ): """Add project base dimension query names represented in the dataset.""" for field in base_dimension_names.model_fields: if getattr(base_dimension_names, field) is None: msg = f"DatasetBaseDimensionNamesModel {field} cannot be None" raise DSGInvalidParameter(msg) dataset = self.get_dataset(dataset_id) dataset.base_dimension_names = base_dimension_names def get_dataset_base_dimension_names(self, dataset_id: str) -> DatasetBaseDimensionNamesModel: """Return the project base dimension query names represented in the dataset.""" return self.get_dataset(dataset_id).base_dimension_names @property def config_id(self) -> str: return self._model.project_id def get_dataset(self, dataset_id) -> InputDatasetModel: """Return a dataset by ID.""" for dataset in self.model.datasets: if dataset.dataset_id == dataset_id: return dataset raise DSGInvalidField( f"project_id={self._model.project_id} does not have dataset_id={dataset_id}" ) def has_dataset(self, dataset_id: str, status=None | DatasetRegistryStatus) -> bool: """Return True if the dataset_id is present in the configuration. Parameters ---------- dataset_id : str status : None | DatasetRegistryStatus If set, only return True if the status matches. """ for dataset in self.iter_datasets(): if dataset.dataset_id == dataset_id: if status is None or dataset.status == status: return True return False # TODO: what about benchmark and historical? return False def get_load_data_time_columns(self, name: str) -> list[str]: """Return the time dimension columns expected in the load data table for this query name.""" dim = self.get_time_dimension(name) time_columns = dim.get_load_data_time_columns() return time_columns def iter_datasets(self) -> Generator[InputDatasetModel, None, None]: for dataset in self.model.datasets: yield dataset def _iter_base_dimensions(self) -> Generator[DimensionBaseConfig, None, None]: yield from self._base_dimensions.values() def _iter_subset_dimensions(self) -> Generator[DimensionBaseConfig, None, None]: for x in self._subset_dimensions.values(): for y in x.values(): for z in y.values(): yield z def _iter_supplemental_dimensions(self) -> Generator[DimensionBaseConfig, None, None]: yield from self._supplemental_dimensions.values() def iter_dimensions(self) -> Iterable[DimensionBaseConfig]: """Return an iterator over all dimensions of the project. Yields ------ DimensionConfig """ return itertools.chain( self._iter_base_dimensions(), self._iter_subset_dimensions(), self._iter_supplemental_dimensions(), )
[docs] def list_registered_dataset_ids(self) -> list[str]: """List registered datasets associated with the project.""" status = DatasetRegistryStatus.REGISTERED return [x.dataset_id for x in self._iter_datasets_by_status(status)]
[docs] def list_unregistered_dataset_ids(self) -> list[str]: """List unregistered datasets associated with project registry.""" status = DatasetRegistryStatus.UNREGISTERED return [x.dataset_id for x in self._iter_datasets_by_status(status)]
def _iter_datasets_by_status( self, status: DatasetRegistryStatus ) -> Generator[InputDatasetModel, None, None]: for dataset in self.iter_datasets(): if dataset.status == status: yield dataset
[docs] def get_required_dimension_record_ids( self, dataset_id: str, dimension_type: DimensionType ) -> set[str]: """Return the required base dimension record IDs for the dataset and dimension type.""" dataset = self.get_dataset(dataset_id) req = getattr(dataset.required_dimensions.single_dimensional, dimension_type.value) record_ids = self._get_required_dimension_record_ids(req) for multi_req in dataset.required_dimensions.multi_dimensional: req = getattr(multi_req, dimension_type.value) record_ids.update(self._get_required_dimension_record_ids(req)) return record_ids
def _build_multi_dim_requirement_associations( self, multi_dim_reqs: list[RequiredDimensionRecordsModel], context: ScratchDirContext ): dfs_by_dim_combo: dict[tuple[str, ...], DataFrame] = {} # Example: Partial sector and subsector combinations are required. # [ # {{"sector": {"base": ["com"]}, # "subsector": "supplemental": # {"name": "commercial-subsectors", # "record_ids": ["commercial_subsectors"]}}, # {"sector": {"base": ["res"]}, "subsector": {"base": ["MidriseApartment"]}}, # ] # This code will replace supplemental records with base records and return a list of # dataframes of those combinations - one per unique combination of dimensions. for multi_req in multi_dim_reqs: dim_combo = [] columns = {} for field in sorted(RequiredDimensionRecordsModel.model_fields): dim_type = DimensionType(field) req = getattr(multi_req, field) record_ids = self._get_required_dimension_record_ids(req) if record_ids: columns[field] = list(record_ids) dim_combo.append(dim_type.value) df = create_dataframe_from_product(columns, context) df = df.select(*sorted(df.columns)) dim_combo_tp = tuple(sorted(dim_combo)) if dim_combo_tp in dfs_by_dim_combo: dfs_by_dim_combo[dim_combo_tp] = dfs_by_dim_combo[dim_combo_tp].union(df) else: dfs_by_dim_combo[dim_combo_tp] = df return list(dfs_by_dim_combo.values()) def _get_required_dimension_record_ids( self, reqs: RequiredDimensionRecordsByTypeModel ) -> set[str]: """Return the required record IDs for a dimension based on the specification in the project config. """ record_ids = self._get_required_base_dimension_record_ids(reqs) record_ids.update(self._get_required_record_ids_from_subsets(reqs)) return record_ids def _get_required_base_dimension_record_ids( self, reqs: RequiredDimensionRecordsByTypeModel ) -> set[str]: """Return the required record IDs for a base dimension based on the specification in the project config. """ record_ids: set[str] = set() if not reqs.base.record_ids and not reqs.base_missing.record_ids: return record_ids base_dim_query_name = reqs.base.dimension_name or reqs.base_missing.dimension_name assert base_dim_query_name is not None all_base_record_ids = self.get_dimension_record_ids(base_dim_query_name) if reqs.base.record_ids == ["__all__"]: assert reqs.base.dimension_name is not None record_ids = all_base_record_ids elif reqs.base.record_ids: record_ids = set(reqs.base.record_ids) if diff := record_ids - all_base_record_ids: msg = ( "The project config requires these these record IDs in the dataset's 'base' " "field, but they are not in the base dimension records: " f"name={base_dim_query_name}: {diff=}" ) raise DSGInvalidDataset(msg) elif reqs.base_missing.record_ids: assert reqs.base_missing.dimension_name is not None missing_ids = set(reqs.base_missing.record_ids) if diff := missing_ids - all_base_record_ids: msg = ( "The project config requires these these record IDs in the dataset's " "'base_missing' field, but they are not in the base dimension " f"name={base_dim_query_name}: {diff=}" ) raise DSGInvalidDataset(msg) record_ids = all_base_record_ids - missing_ids return record_ids def _get_subset_dimension_records(self, name: str, selector_name): for group in self.model.dimensions.subset_dimensions: if group.name == name: for ref in group.selector_references: key = ConfigKey(ref.dimension_id, ref.version) dim = self._subset_dimensions[group.dimension_type][group.name][key] if dim.model.name == selector_name: assert isinstance(dim, DimensionBaseConfigWithFiles) return dim.get_unique_ids() msg = f"subset dimension selector not found: {name=} {selector_name=}" raise DSGInvalidDimension(msg) def _get_required_record_ids_from_subsets(self, req: RequiredDimensionRecordsByTypeModel): record_ids = set() for subset in req.subset: for selector_name in subset.selectors: record_ids.update(self._get_subset_dimension_records(subset.name, selector_name)) return record_ids @track_timing(timer_stats_collector) def make_dimension_association_table( self, dataset_id, context: ScratchDirContext ) -> DataFrame: """Build a table that includes all combinations of dimension records that must be provided by the dataset. """ required_dimensions = self.get_dataset(dataset_id).required_dimensions multi_dfs = self._build_multi_dim_requirement_associations( required_dimensions.multi_dimensional, context ) # Project config construction asserts that there is no intersection of dimensions in # multi and single. existing = set() for df in multi_dfs: existing.update(set(df.columns)) single_dfs = {} for field in (x for x in RequiredDimensionRecordsModel.model_fields if x not in existing): req = getattr(required_dimensions.single_dimensional, field) record_ids = self._get_required_dimension_record_ids(req) single_dfs[field] = list(record_ids) single_df = create_dataframe_from_product(single_dfs, context) return cross_join_dfs(multi_dfs + [single_df]) def are_all_datasets_submitted(self) -> bool: """Return True if all datasets have been submitted.""" return not self.list_unregistered_dataset_ids() def set_status(self, status: ProjectRegistryStatus) -> None: """Set the project status to the given value.""" self.model.status = status logger.info("Set project_id=%s status=%s", self.config_id, status) def set_dataset_status(self, dataset_id: str, status: DatasetRegistryStatus): """Set the dataset status to the given value. Raises ------ ValueError Raised if dataset_id is not stored. """ dataset = self.get_dataset(dataset_id) dataset.status = status logger.info( "Set dataset_id=%s status=%s for project_id=%s", dataset_id, status, self._model.project_id, ) @property def base_dimensions(self) -> dict: """Return the Base Dimensions. Returns ------- dict dict of DimensionConfig keyed by ConfigKey """ return self._base_dimensions @property def supplemental_dimensions(self) -> dict: """Return the supplemental dimensions. Returns ------- dict dict of DimensionConfig keyed by ConfigKey """ return self._supplemental_dimensions
def load_subset_dimensions(filename: Path) -> tuple[set[str], dict[str, list[str]]]: """Return a mapping of subset dimension name to record IDs.""" df = pd.read_csv(filename, index_col="id") if len(df.columns) == 0: raise DSGInvalidDimension( "A subset dimension records file must at least one dimension column." ) record_ids = set(df.index.values) subset_by_dim_name = {x: df[x].dropna().index.to_list() for x in df.columns} return record_ids, subset_by_dim_name