Source code for gridstatus.ercot

import datetime
import io
import time
from dataclasses import dataclass
from enum import Enum
from typing import BinaryIO, Callable, List, Literal
from zipfile import ZipFile

import numpy as np
import pandas as pd
import pytz
import requests
import tqdm
from bs4 import BeautifulSoup
from pytz.exceptions import NonExistentTimeError

from gridstatus import utils
from gridstatus.base import (
    GridStatus,
    InterconnectionQueueStatus,
    ISOBase,
    Markets,
    NoDataFoundException,
    NotSupported,
)
from gridstatus.decorators import support_date_range
from gridstatus.ercot_60d_utils import (
    DAM_ENERGY_BID_AWARDS_KEY,
    DAM_ENERGY_BIDS_KEY,
    DAM_ENERGY_ONLY_OFFER_AWARDS_KEY,
    DAM_ENERGY_ONLY_OFFERS_KEY,
    DAM_ESR_AS_OFFERS_KEY,
    DAM_ESR_KEY,
    DAM_GEN_RESOURCE_AS_OFFERS_KEY,
    DAM_GEN_RESOURCE_KEY,
    DAM_LOAD_RESOURCE_AS_OFFERS_KEY,
    DAM_LOAD_RESOURCE_KEY,
    DAM_PTP_OBLIGATION_BID_AWARDS_KEY,
    DAM_PTP_OBLIGATION_BIDS_KEY,
    DAM_PTP_OBLIGATION_OPTION_AWARDS_KEY,
    DAM_PTP_OBLIGATION_OPTION_KEY,
    SCED_AS_OFFER_UPDATES_IN_OP_HOUR_KEY,
    SCED_ESR_KEY,
    SCED_GEN_RESOURCE_KEY,
    SCED_LOAD_RESOURCE_KEY,
    SCED_RESOURCE_AS_OFFERS_KEY,
    SCED_SMNE_KEY,
    CurveOutputFormat,
    process_dam_energy_bid_awards,
    process_dam_energy_bids,
    process_dam_energy_only_offer_awards,
    process_dam_energy_only_offers,
    process_dam_esr,
    process_dam_esr_as_offers,
    process_dam_gen,
    process_dam_load,
    process_dam_or_gen_load_as_offers,
    process_dam_ptp_obligation_bid_awards,
    process_dam_ptp_obligation_bids,
    process_dam_ptp_obligation_option,
    process_dam_ptp_obligation_option_awards,
    process_sced_as_offer_updates_in_op_hour,
    process_sced_esr,
    process_sced_gen,
    process_sced_load,
    process_sced_resource_as_offers,
)
from gridstatus.ercot_constants import (
    LOAD_FORECAST_BY_MODEL_COLUMNS,
    SOLAR_ACTUAL_AND_FORECAST_BY_GEOGRAPHICAL_REGION_COLUMNS,
    SOLAR_ACTUAL_AND_FORECAST_COLUMNS,
    WIND_ACTUAL_AND_FORECAST_BY_GEOGRAPHICAL_REGION_COLUMNS,
    WIND_ACTUAL_AND_FORECAST_COLUMNS,
)
from gridstatus.gs_logging import logger
from gridstatus.lmp_config import lmp_config

[docs] LOCATION_TYPE_HUB = "Trading Hub"
[docs] LOCATION_TYPE_RESOURCE_NODE = "Resource Node"
[docs] LOCATION_TYPE_ZONE = "Load Zone"
[docs] LOCATION_TYPE_ZONE_EW = "Load Zone Energy Weighted"
[docs] LOCATION_TYPE_ZONE_DC = "Load Zone DC Tie"
[docs] LOCATION_TYPE_ZONE_DC_EW = "Load Zone DC Tie Energy Weighted"
[docs] ELECTRICAL_BUS_LOCATION_TYPE = "Electrical Bus"
[docs] SETTLEMENT_POINT_LOCATION_TYPE = "Settlement Point"
""" Report Type IDs """ # DAM System Lambda # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-523-CD
[docs] DAM_SYSTEM_LAMBDA_RTID = 13113
# SCED System Lambda # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-322-CD
[docs] SCED_SYSTEM_LAMBDA_RTID = 13114
# DAM Clearing Prices for Capacity # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-188-CD
[docs] DAM_CLEARING_PRICES_FOR_CAPACITY_RTID = 12329
# DAM Ancillary Service Plan # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-33-CD
[docs] DAM_ANCILLARY_SERVICE_PLAN_RTID = 12316
# DAM Settlement Point Prices # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-190-CD
[docs] DAM_SETTLEMENT_POINT_PRICES_RTID = 12331
# GIS Report # https://www.ercot.com/mp/data-products/data-product-details?id=PG7-200-ER
[docs] GIS_REPORT_RTID = 15933
# Historical RTM Load Zone and Hub Prices # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-785-ER
[docs] HISTORICAL_RTM_LOAD_ZONE_AND_HUB_PRICES_RTID = 13061
# Historical DAM Load Zone and Hub Prices # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-180-ER
[docs] HISTORICAL_DAM_LOAD_ZONE_AND_HUB_PRICES_RTID = 13060
# Settlement Points List and Electrical Buses Mapping # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-160-SG
[docs] SETTLEMENT_POINTS_LIST_AND_ELECTRICAL_BUSES_MAPPING_RTID = 10008
# Settlement Point Prices at Resource Nodes, Hubs and Load Zones # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-905-CD
[docs] SETTLEMENT_POINT_PRICES_AT_RESOURCE_NODES_HUBS_AND_LOAD_ZONES_RTID = 12301
# RTM Price Corrections # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-197-M
[docs] RTM_PRICE_CORRECTIONS_RTID = 13045
# DAM Price Corrections # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-196-M
[docs] DAM_PRICE_CORRECTIONS_RTID = 13044
# DAM Shadow Prices # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-191-CD
[docs] DAM_SHADOW_PRICES_RTID = 12332
# DAM LMPs by Electrical Bus # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-183-CD
[docs] DAM_LMPS_BY_ELECTRICAL_BUS_RTID = 12328
# LMPs by Electrical Bus # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-787-CD
[docs] LMPS_BY_ELECTRICAL_BUS_RTID = 11485
# LMPs by Resource Nodes, Load Zones and Trading Hubs # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-788-CD
[docs] LMPS_BY_SETTLEMENT_POINT_RTID = 12300
# System-wide actuals # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-235-CD
[docs] SYSTEM_WIDE_ACTUALS_RTID = 12340
# Short term system adequacy report # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-763-CD
[docs] SHORT_TERM_SYSTEM_ADEQUACY_REPORT_RTID = 12315
# Real-Time ORDC and Reliability Deployment Price Adders and Reserves by SCED Interval # (ORDC = Operating Reserve Demand Curve) # Adders and reserves stopped being published on December 5 # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-323-CD
[docs] REAL_TIME_ADDERS_AND_RESERVES_RTID = 13221
# Real-Time ORDC and Reliability Deployment Price Adders # Adders only started being published on December 5
[docs] REAL_TIME_ADDERS_RTID = 13221
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-722-CD
[docs] TEMPERATURE_FORECAST_BY_WEATHER_ZONE_RTID = 12325
# https://www.ercot.com/mp/data-products/data-product-details?id=NP6-970-CD # https://data.ercot.com/data-product-archive/NP6-970-CD - for historical data
[docs] ERCOT_INDICATIVE_LMP_BY_SETTLEMENT_POINT_RTID = 13073
# https://www.ercot.com/mp/data-products/data-product-details?id=np4-192-cd
[docs] DAM_TOTAL_ENERGY_PURCHASED_RTID = 12333
# https://www.ercot.com/mp/data-products/data-product-details?id=np4-193-cd
[docs] DAM_TOTAL_ENERGY_SOLD_RTID = 12334
# https://www.ercot.com/mp/data-products/data-product-details?id=np1-301
[docs] COP_ADJUSTMENT_PERIOD_SNAPSHOT_RTID = 10038
# https://www.ercot.com/mp/data-products/data-product-details?id=NP6-332-CD
[docs] REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_BY_SCED_INTERVAL_RTID = 24891
# https://www.ercot.com/mp/data-products/data-product-details?id=np6-788-rtcmt
[docs] REAL_TIME_CLEARING_LMPS_BY_RESOURCE_NODES_LOAD_ZONES_AND_TRADING_HUBS_RTD = 4104
# https://www.ercot.com/mp/data-products/data-product-details?id=NP6-331-CD
[docs] REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_15_MIN_RTID = 24898
# https://www.ercot.com/mp/data-products/data-product-details?id=np4-212-cd
[docs] DAM_AND_SCED_ANCILLARY_SERVICE_DEMAND_CURVES_RTID = 24893
# https://www.ercot.com/mp/data-products/data-product-details?id=NP5-526-CD
[docs] PROJECTED_ANCILLARY_SERVICE_DEPLOYMENTS_FACTORS_RTID = 24886
# Weekly RUC Ancillary Service Deployment Factors # https://www.ercot.com/mp/data-products/data-product-details?id=np5-525-cd
[docs] WEEKLY_RUC_AS_DEPLOYMENT_FACTORS_RTID = 24897
# Daily RUC Ancillary Service Deployment Factors # https://www.ercot.com/mp/data-products/data-product-details?id=NP5-527-CD
[docs] DAILY_RUC_AS_DEPLOYMENT_FACTORS_RTID = 24895
# Hourly RUC Ancillary Service Deployment Factors # https://www.ercot.com/mp/data-products/data-product-details?id=NP5-528-CD
[docs] HOURLY_RUC_AS_DEPLOYMENT_FACTORS_RTID = 24896
# Hourly RUC Ancillary Service Demand Curves # https://www.ercot.com/mp/data-products/data-product-details?id=np4-213-cd
[docs] HOURLY_RUC_AS_DEMAND_CURVES_RTID = 26382
# Daily RUC Ancillary Service Demand Curves # https://www.ercot.com/mp/data-products/data-product-details?id=np4-214-cd
[docs] DAILY_RUC_AS_DEMAND_CURVES_RTID = 26383
# Weekly RUC Ancillary Service Demand Curves # https://www.ercot.com/mp/data-products/data-product-details?id=np4-215-cd
[docs] WEEKLY_RUC_AS_DEMAND_CURVES_RTID = 26384
# DAM Total Ancillary Services Sold # https://www.ercot.com/mp/data-products/data-product-details?id=np4-532-cd
[docs] DAM_TOTAL_AS_SOLD_RTID = 24888
# RTD Indicative Real-Time MCPC # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-329-CD
[docs] RTD_INDICATIVE_REAL_TIME_MCPC_RTID = 24889
# Total Capability of Resources Available to Provide Ancillary Service # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-328-CD
[docs] TOTAL_CAPABILITY_OF_RESOURCES_AS_RTID = 24887
# DAM Aggregated Ancillary Service Offer Curve # https://www.ercot.com/mp/data-products/data-product-details?id=np4-19-cd
[docs] DAM_AGGREGATED_AS_OFFER_CURVE_RTID = 12330
# 3-Day Highest Price Bids Selected or Dispatched in SCED # https://www.ercot.com/mp/data-products/data-product-details?id=np3-257-ex
[docs] THREE_DAY_HIGHEST_PRICE_BIDS_SCED_RTID = 13230
[docs] class ERCOTSevenDayLoadForecastReport(Enum): """ Enum class for the Medium Term (Seven Day) Load Forecasts. The values are the report IDs. """ # Seven-Day Load Forecast by Forecast Zone # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-560-CD BY_FORECAST_ZONE = 12311 # Seven-Day Load Forecast by Weather Zone # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-561-CD BY_WEATHER_ZONE = 12312 # Seven-Day Load Forecast by Model and Weather Zone # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-565-CD BY_MODEL_AND_WEATHER_ZONE = 14837 # Seven-Day Load Forecast by Model and Study Area # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-566-CD BY_MODEL_AND_STUDY_AREA = 15953
# intrahour https://www.ercot.com/mp/data-products/data-product-details?id=NP3-562-CD # there are a few days of historical data for the forecast # Actual System Load by Weather Zone # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-345-CD
[docs] ACTUAL_SYSTEM_LOAD_BY_WEATHER_ZONE = 13101
# Actual System Load by Forecast Zone # https://www.ercot.com/mp/data-products/data-product-details?id=NP6-346-CD
[docs] ACTUAL_SYSTEM_LOAD_BY_FORECAST_ZONE = 14836
# 60-Day DAM Disclosure Reports # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-966-ER
[docs] SIXTY_DAY_DAM_DISCLOSURE_REPORTS_RTID = 13051
# 60-Day SCED Disclosure Reports # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-965-ER
[docs] SIXTY_DAY_SCED_DISCLOSURE_REPORTS_RTID = 13052
# Unplanned Resource Outages Report # https://www.ercot.com/mp/data-products/data-product-details?id=NP1-346-ER
[docs] UNPLANNED_RESOURCE_OUTAGES_REPORT_RTID = 22912
# 3-Day Highest Price AS Offer Selected # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-915-EX
[docs] THREE_DAY_DAM_HIGHEST_PRICE_AS_OFFER_SELECTED_RTID = 13018
# 3-Day SCED Highest Price AS Offer Selected # https://www.ercot.com/mp/data-products/data-product-details?id=np3-914-ex
[docs] THREE_DAY_SCED_HIGHEST_PRICE_AS_OFFER_SELECTED_RTID = 25812
# 2-Day Ancillary Services Reports (DAM) # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-911-ER
[docs] TWO_DAY_ANCILLARY_SERVICES_REPORTS_RTID = 13057
# 2-Day SCED Ancillary Service Disclosure # https://www.ercot.com/mp/data-products/data-product-details?id=np3-906-ex
[docs] TWO_DAY_SCED_ANCILLARY_SERVICES_REPORTS_RTID = 25814
# Ancillary Services products - used across multiple AS report methods
[docs] AS_PRODUCTS = [ "RRSPFR", "RRSUFR", "RRSFFR", "ECRSM", "ECRSS", "REGUP", "REGDN", "NSPIN", "NSPNM", ]
# Products that are not present in earlier data (before certain product launches)
[docs] AS_EXCLUDE_PRODUCTS = [ "ECRSM", "ECRSS", "NSPNM", "RRSFFR", "RRSUFR", "RRSPFR", ]
# Hourly Resource Outage Capacity # https://www.ercot.com/mp/data-products/data-product-details?id=NP3-233-CD
[docs] HOURLY_RESOURCE_OUTAGE_CAPACITY_RTID = 13103
# Wind Power Production - Hourly Averaged Actual and Forecasted Values # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-732-CD
[docs] WIND_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_RTID = 13028
# Wind Power Production - Hourly Averaged Actual and Forecasted Values by Geographical Region # noqa # https://www.ercot.com/mp/data-products/data-product-details?id=np4-742-cd
[docs] WIND_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_BY_GEOGRAPHICAL_REGION_RTID = ( # noqa 14787 )
# Solar Power Production - Hourly Averaged Actual and Forecasted Values by Geographical Region # noqa # https://www.ercot.com/mp/data-products/data-product-details?id=NP4-745-CD
[docs] SOLAR_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_BY_GEOGRAPHICAL_REGION_RTID = ( # noqa 21809 )
# Solar Power Production - Hourly Averaged Actual and Forecasted Values # https://www.ercot.com/mp/data-products/data-product-details?id=np4-737-cd
[docs] SOLAR_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_RTID = 13483
""" Settlement Point Type Description ========== ========== =========== Resource Node RN Resource Node for normal resource Resource Node PCCRN Physical Resource Node for combined cycle units Resource Node LCCRN Logical Resource Node for combined cycle plant Resource Node PUN Private Area Network Resource Node Load Zone LZ Congestion Load Zone Load Zone LZ_DC DCTIE Load Zone Hub HU Hub Hub SH ERCOT_345KV_HUBBUSES_AVG Hub AH ERCOT_HUB_AVG ============================================================ Source: https://www.ercot.com/files/docs/2009/10/26/07_tests_for_rsnable_lmps_overview_of_price_valid_tool_09102.ppt """ # noqa
[docs] RESOURCE_NODE_SETTLEMENT_TYPES = ["RN", "PCCRN", "LCCRN", "PUN"]
[docs] LOAD_ZONE_SETTLEMENT_TYPES = ["LZ", "LZ_DC"]
[docs] HUB_SETTLEMENT_TYPES = ["HU", "SH", "AH"]
@dataclass
[docs] class Document: url: str publish_date: pd.Timestamp constructed_name: str friendly_name: str friendly_name_timestamp: pd.Timestamp
[docs] def parse_timestamp_from_friendly_name(friendly_name: str) -> pd.Timestamp: parts = friendly_name.replace("_retry", "").split("_") date_str = parts[1] time_str = parts[2] # Add a colon between hours, minutes, and seconds for pandas to parse if len(time_str) > 4: second_str = time_str[4:6] else: second_str = "00" time_str_formatted = time_str[:2] + ":" + time_str[2:4] + ":" + second_str # Combine date and time strings datetime_str = date_str + " " + time_str_formatted # Convert to pandas timestampp try: timestamp = pd.to_datetime(datetime_str, format="%Y%m%d %H:%M:%S").tz_localize( Ercot.default_timezone, ambiguous=False, ) except: # noqa raise return timestamp
[docs] class Ercot(ISOBase): """Electric Reliability Council of Texas (ERCOT)""" name = "Electric Reliability Council of Texas" iso_id = "ercot" default_timezone = "US/Central" status_homepage = "https://www.ercot.com/gridmktinfo/dashboards/gridconditions" interconnection_homepage = ( "http://mis.ercot.com/misapp/GetReports.do?reportTypeId=15933" ) markets = [ Markets.REAL_TIME_15_MIN, Markets.DAY_AHEAD_HOURLY, ] location_types = [ LOCATION_TYPE_HUB, LOCATION_TYPE_ZONE, LOCATION_TYPE_RESOURCE_NODE, ] BASE = "https://www.ercot.com/api/1/services/read/dashboards" ACTUAL_LOADS_FORECAST_ZONES_URL_FORMAT = "https://www.ercot.com/content/cdr/html/{timestamp}_actual_loads_of_forecast_zones.html" # noqa ACTUAL_LOADS_WEATHER_ZONES_URL_FORMAT = "https://www.ercot.com/content/cdr/html/{timestamp}_actual_loads_of_weather_zones.html" # noqa LOAD_HISTORICAL_MAX_DAYS = 14
[docs] def get_status( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Returns status of grid""" if date != "latest": raise NotSupported() r = self._get_json(self.BASE + "/daily-prc.json", verbose=verbose) time = ( pd.to_datetime(r["current_condition"]["datetime"], unit="s") .tz_localize("UTC") .tz_convert(self.default_timezone) ) status = r["current_condition"]["state"] reserves = float(r["current_condition"]["prc_value"].replace(",", "")) if status == "normal": status = "Normal" notes = [r["current_condition"]["condition_note"]] return GridStatus( time=time, status=status, reserves=reserves, iso=self, notes=notes, )
[docs] def get_energy_storage_resources( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = "latest", verbose: bool = False, ) -> pd.DataFrame: """Get energy storage resources. Always returns data from previous and current day""" url = self.BASE + "/energy-storage-resources.json" data = self._get_json(url, verbose=verbose) df = pd.DataFrame(data["previousDay"]["data"] + data["currentDay"]["data"]) # TODO(kanter): fix this for future DST dates for timestamp in ["2024-11-03 02:00:00-0600", "2025-11-02 02:00:00-0600"]: if timestamp in df["timestamp"].values: # ERCOT publishes two intervals with 2am timestamp # during CDT to CST transition # but skips the repeated 1am timestamp # let's manually fix this before further timestamp parsing df.loc[ (df["timestamp"] == timestamp) & (df["dstFlag"] == "N"), "timestamp", ] = timestamp.replace("02:00:00", "01:00:00") df = df[["timestamp", "totalCharging", "totalDischarging", "netOutput"]] # Parse in UTC to avoid issues with DST transition df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True).dt.tz_convert( self.default_timezone, ) df = df.rename( columns={ "timestamp": "Time", "totalCharging": "Total Charging", "totalDischarging": "Total Discharging", "netOutput": "Net Output", }, ) df = df.sort_values("Time").reset_index(drop=True) return df
[docs] def get_fuel_mix( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get fuel mix 5 minute intervals Arguments: date (datetime.date, str): "latest", "today", and yesterday's date are supported. verbose(bool): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with columns; Time and columns for each fuel \ type """ data = self._get_fuel_mix(date, verbose=verbose) dfs = [] for day in data["data"].keys(): df = pd.DataFrame(data["data"][day]) # Only care about the gen for this method df_transformed = df.apply( lambda col: col.apply( lambda x: x.get("gen") if isinstance(x, dict) else pd.NA, ), ).T dfs.append(df_transformed) mix = pd.concat(dfs) return self._handle_fuel_mix( date, mix, [ "Time", "Coal and Lignite", "Hydro", "Nuclear", "Power Storage", "Solar", "Wind", "Natural Gas", "Other", ], )
def _get_fuel_mix( self, date: str | datetime.datetime | pd.Timestamp, verbose: bool, ): if date != "latest": if not ( utils.is_today(date, tz=self.default_timezone) or utils.is_yesterday(date, tz=self.default_timezone) ): raise NotSupported() url = self.BASE + "/fuel-mix.json" data = self._get_json(url, verbose=verbose) return data def _handle_fuel_mix( self, date: str | datetime.datetime | pd.Timestamp, data: pd.DataFrame, columns: List[str], ): data.index.name = "Time" data = data.reset_index() # need to use apply since there can be mixed # fixed offsets during dst transition # that result in object dtypes in pandas data["Time"] = data["Time"].apply(lambda x: pd.to_datetime(x).tz_convert("UTC")) # most timestamps are a few seconds off round 5 minute ticks # round to nearest minute. must do in utc to avoid dst issues data["Time"] = data["Time"].round("min").dt.tz_convert(self.default_timezone) data = data[columns] if date == "latest": return data parsed_date = utils._handle_date(date, self.default_timezone) return data[data["Time"].dt.date == parsed_date.date()].reset_index(drop=True)
[docs] def get_fuel_mix_detailed( self, date: str | datetime.datetime | pd.Timestamp, verbose: bool = False, ) -> pd.DataFrame: """The fuel mix with gen, hsl, and seasonal capacity for each fuel type.""" data = self._get_fuel_mix(date, verbose=verbose) capacity = data["monthlyCapacity"] dfs = [] for day in data["data"].keys(): df = pd.DataFrame(data["data"][day]) df_transformed = df.T dfs.append(df_transformed) mix = pd.concat(dfs) # Each col is a tuple of (gen, hsl, seasonalCapacity). We want to split this # to separate columns cols_to_drop = [] for col in mix.columns: mix[col + " Gen"] = mix[col].apply(lambda x: x.get("gen")) mix[col + " HSL"] = mix[col].apply(lambda x: x.get("hsl")) mix[col + " Seasonal Capacity"] = capacity[col] cols_to_drop.append(col) mix = mix.drop(columns=cols_to_drop) return self._handle_fuel_mix( date, mix, [ "Time", "Coal and Lignite Gen", "Coal and Lignite HSL", "Coal and Lignite Seasonal Capacity", "Hydro Gen", "Hydro HSL", "Hydro Seasonal Capacity", "Nuclear Gen", "Nuclear HSL", "Nuclear Seasonal Capacity", "Power Storage Gen", "Power Storage HSL", "Power Storage Seasonal Capacity", "Solar Gen", "Solar HSL", "Solar Seasonal Capacity", "Wind Gen", "Wind HSL", "Wind Seasonal Capacity", "Natural Gas Gen", "Natural Gas HSL", "Natural Gas Seasonal Capacity", "Other Gen", "Other HSL", "Other Seasonal Capacity", ], )
@support_date_range("DAY_START")
[docs] def get_load( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get load for a date Arguments: date (datetime.date, str): "latest", "today", or a date string are supported. """ if date == "latest": return self.get_load("today", verbose=verbose) elif utils.is_today(date, tz=self.default_timezone): df = self._get_todays_outlook_non_forecast(date, verbose=verbose) df = df.rename(columns={"demand": "Load"}) return df[["Time", "Interval Start", "Interval End", "Load"]] elif utils.is_within_last_days( date, self.LOAD_HISTORICAL_MAX_DAYS, tz=self.default_timezone, ): df = self._get_forecast_zone_load_html(date, verbose).rename( columns={"TOTAL": "Load"}, ) return df[["Time", "Interval Start", "Interval End", "Load"]] else: raise NotSupported()
@support_date_range("DAY_START")
[docs] def get_load_by_weather_zone( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get hourly load for ERCOT weather zones Arguments: date (datetime.date, str): "today", or a date string are supported. verbose(bool): print verbose output. Defaults to False. Returns: pandas.DataFrame """ # Use the html page for both today and yesterday to ensure all the # data is retrieved. The html page is updated every hour at 20 mins # past the hour but the report is only published once per dat at 0550 UTC. if utils.is_today(date, tz=self.default_timezone) or utils.is_yesterday( date, tz=self.default_timezone, ): df = self._get_weather_zone_load_html(date, verbose=verbose) else: doc_info = self._get_document( report_type_id=ACTUAL_SYSTEM_LOAD_BY_WEATHER_ZONE, date=date + pd.DateOffset(days=1), # published day after constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_doc(doc_info, verbose=verbose) # Clean up columns to match load_forecast_by_weather_zone df.columns = df.columns.map(lambda x: x.replace("_", " ").title()) df = df.rename( columns=self._weather_zone_column_name_mapping(), ).sort_values("Interval Start") df = utils.move_cols_to_front( df, [ "Time", "Interval Start", "Interval End", ] + self._weather_zone_column_name_order() + ["System Total"], ) return df
@support_date_range("DAY_START")
[docs] def get_load_by_forecast_zone( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get hourly load for ERCOT forecast zones Arguments: date (datetime.date, str): "today", or a date string are supported. verbose(bool): print verbose output. Defaults to False. Returns: pandas.DataFrame """ # Use the html page for both today and yesterday to ensure all the # data is retrieved. if utils.is_today(date, tz=self.default_timezone) or utils.is_yesterday( date, tz=self.default_timezone, ): df = self._get_forecast_zone_load_html(date, verbose=verbose) else: doc_info = self._get_document( report_type_id=ACTUAL_SYSTEM_LOAD_BY_FORECAST_ZONE, date=date + pd.DateOffset(days=1), # published day after constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_doc(doc_info, verbose=verbose) return df
def _get_forecast_zone_load_html( self, when: pd.Timestamp, verbose: bool = False, ) -> pd.DataFrame: """Returns load for currentDay or previousDay""" url = self.ACTUAL_LOADS_FORECAST_ZONES_URL_FORMAT.format( timestamp=when.strftime("%Y%m%d"), ) df = self._read_html_display(url=url, verbose=verbose) return df def _get_weather_zone_load_html( self, when: pd.Timestamp, verbose: bool = False, ) -> pd.DataFrame: """Returns load for currentDay or previousDay""" url = self.ACTUAL_LOADS_WEATHER_ZONES_URL_FORMAT.format( timestamp=when.strftime("%Y%m%d"), ) df = self._read_html_display( url=url, verbose=verbose, ) return df def _read_html_display(self, url: str, verbose: bool = False) -> pd.DataFrame: logger.info(f"Fetching {url}") dfs = pd.read_html(url, header=0) df = dfs[0] if df["Hour Ending"].dtype == "object": df["RepeatedHourFlag"] = df["Hour Ending"].str.contains("*", regex=False) df["Hour Ending"] = ( df["Hour Ending"].str.replace("*", "", regex=False).str.strip() ).astype(int) else: # non dst transition day # so no repeated hours df["RepeatedHourFlag"] = False df["Interval Start"] = pd.to_datetime(df["Oper Day"]) + ( df["Hour Ending"] / 100 - 1 ).astype("timedelta64[h]") df["Interval Start"] = df["Interval Start"].dt.tz_localize( self.default_timezone, # Prevent linting to is False ambiguous=df["RepeatedHourFlag"] == False, # noqa ) df["Interval End"] = df["Interval Start"] + pd.Timedelta(hours=1) df["Time"] = df["Interval Start"] df = utils.move_cols_to_front( df, [ "Time", "Interval Start", "Interval End", ], ) to_drop = ["Oper Day", "Hour Ending", "RepeatedHourFlag"] df = df.drop(to_drop, axis=1) return df @support_date_range(frequency="YEAR_START")
[docs] def get_hourly_load_post_settlements( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get historical hourly load data from ERCOT's load archives. Downloads zip files from https://www.ercot.com/gridinfo/load/load_hist and parses the historical load data by weather zones. Arguments: date (str, datetime): Year to download data for, or "latest" for most recent data end (str, datetime): End date for range, or None for single date verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame """ if date == "latest": # NB: Gets the most recent year available, since they are published as annual files current_year = pd.Timestamp.now().year date = pd.Timestamp(f"{current_year}-01-01") end = pd.Timestamp(f"{current_year + 1}-01-01") date = utils._handle_date(date, self.default_timezone) end = utils._handle_date(end, self.default_timezone) logger.info( f"Fetching historical load data for year {date.year}", ) return self._download_post_settlements_load_file(date.year)
def _download_post_settlements_load_file( self, year: int, ) -> pd.DataFrame: """Download and parse ERCOT historical load data for a specific year.""" page_url = "https://www.ercot.com/gridinfo/load/load_hist" response = requests.get(page_url) response.raise_for_status() soup = BeautifulSoup(response.content, "html.parser") year_link = None for link in soup.find_all("a"): href = link.get("href", "") text = link.get_text(strip=True) if str(year) in text and ("zip" in href.lower() or "xls" in href.lower()): year_link = href break if year_link.endswith(".zip"): zip_file = utils.get_zip_folder(year_link) filename = zip_file.namelist()[0] df = pd.read_excel(zip_file.open(filename)) elif year_link.endswith(".xls") or year_link.endswith(".xlsx"): response = requests.get(year_link) response.raise_for_status() df = pd.read_excel(io.BytesIO(response.content)) df = self._process_post_settlements_load_data(df) return df def _process_post_settlements_load_data( self, df: pd.DataFrame, ) -> pd.DataFrame: df.columns = df.columns.str.strip() # Not all of these columns are in all of the files, # but this process ignores the ones that aren't so we can # just maintain one list and not have a bunch of if/else # to parse the columns that are there. column_mapping = { "HOUR_ENDING": "Interval End", "Hour Ending": "Interval End", "Hour_End": "Interval End", "HourEnding": "Interval End", "COAST": "Coast", "EAST": "East", "FWEST": "Far West", "FAR_WEST": "Far West", "NORTH": "North", "NCENT": "North Central", "NORTH_C": "North Central", "SOUTH": "South", "SOUTHERN": "South", "SCENT": "South Central", "SOUTH_C": "South Central", "WEST": "West", "ERCOT": "ERCOT", "TOTAL": "ERCOT", } existing_columns = [col for col in column_mapping.keys() if col in df.columns] rename_dict = {col: column_mapping[col] for col in existing_columns} df = df.rename(columns=rename_dict) if pd.api.types.is_datetime64_any_dtype(df["Interval End"]): if df["Interval End"].dt.tz is not None: df["Interval End"] = df["Interval End"].dt.tz_convert( self.default_timezone, ambiguous=True, nonexistent="shift_forward", ) else: df["Interval End"] = df["Interval End"].dt.round("h") df["Interval End"] = df["Interval End"].dt.tz_localize( self.default_timezone, ambiguous=True, nonexistent="shift_forward", ) else: df["Interval End"] = df["Interval End"].astype(str) # Convert 24:00 to next day's 00:00 directly in string format # Doing so avoids the DST transition issue of converting 24:00 to 00:00 # and then adding a day to the interval end. def convert_24_hour(date_str: str) -> str: if " 24:00" in date_str: # Parse the date part and add one day date_part = date_str.split(" ")[0] parsed_date = pd.to_datetime(date_part) + pd.Timedelta(days=1) return parsed_date.strftime("%m/%d/%Y") + " 00:00" return date_str df["Interval End"] = df["Interval End"].apply(convert_24_hour) df["Interval End"] = df["Interval End"].str.replace(" DST", "") df["Interval End"] = pd.to_datetime(df["Interval End"], errors="coerce") df["Interval End"] = df["Interval End"].dt.round("h") df["Interval End"] = df["Interval End"].dt.tz_localize( self.default_timezone, ambiguous=True, nonexistent="shift_forward", ) df["Interval Start"] = df["Interval End"] - pd.Timedelta(hours=1) # Fix DST fall-back duplicates - find any October/November date with duplicate 1 AM hours fall_dst_mask = (df["Interval Start"].dt.month.isin([10, 11])) & ( df["Interval Start"].dt.hour == 1 ) if fall_dst_mask.any(): # Group by date to find dates with exactly 2 occurrences of 1 AM fall_dates = df[fall_dst_mask]["Interval Start"].dt.date.value_counts() duplicate_dates = fall_dates[fall_dates == 2].index for dup_date in duplicate_dates: date_mask = (df["Interval Start"].dt.date == dup_date) & ( df["Interval Start"].dt.hour == 1 ) duplicate_indices = df[date_mask].index second_idx = duplicate_indices[1] logger.debug( f"Changing timezone for DST duplicate at {df.loc[second_idx, 'Interval Start']}", ) # Change from -06:00 to -05:00 by subtracting 1 hour df.loc[second_idx, "Interval Start"] = df.loc[ second_idx, "Interval Start", ] - pd.Timedelta(hours=1) df.loc[second_idx, "Interval End"] = df.loc[ second_idx, "Interval End", ] - pd.Timedelta(hours=1) numeric_columns = [ "Coast", "East", "Far West", "North", "North Central", "South", "South Central", "West", "ERCOT", ] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric( df[col].astype(str).str.replace(",", ""), errors="coerce", ) expected_columns = [ "Interval Start", "Interval End", "Coast", "East", "Far West", "North", "North Central", "South", "South Central", "West", "ERCOT", ] df = df.dropna(subset=["Interval Start", "Interval End"]) return df[expected_columns].sort_values("Interval Start").reset_index(drop=True) def _get_supply_demand_json(self) -> dict: url = self.BASE + "/supply-demand.json" logger.info(f"Fetching {url}") return self._get_json(url) def _get_update_timestamp_from_supply_demand_json( self, supply_demand_json: dict, ) -> pd.Timestamp: return pd.to_datetime(supply_demand_json["lastUpdated"]) def _get_todays_outlook_non_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Returns most recent data point for supply in MW Updates every 5 minutes """ assert date == "latest" or utils.is_today( date, self.default_timezone, ), "Only today's data is supported" supply_demand_json = self._get_supply_demand_json() data = pd.DataFrame(supply_demand_json["data"]) # Parse in UTC to then convert to local to avoid DST transition issues because # of mixed timezones data["Interval End"] = pd.to_datetime( data["epoch"], unit="ms", utc=True, ).dt.tz_convert(self.default_timezone) data["Interval Start"] = data["Interval End"] - pd.Timedelta(minutes=5) data["Time"] = data["Interval Start"] data = data[ [ "Time", "Interval Start", "Interval End", "demand", "forecast", "capacity", ] ] data = data[data["forecast"] == 0] # only keep non forecast rows return data.reset_index(drop=True) @support_date_range(frequency=None)
[docs] def get_load_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, forecast_type: ERCOTSevenDayLoadForecastReport = ERCOTSevenDayLoadForecastReport.BY_FORECAST_ZONE, verbose: bool = False, ) -> pd.DataFrame: """Returns load forecast of specified forecast type. If date range provided, returns all hourly reports published within. Note: only limited historical data is available Arguments: date (str, datetime): datetime to download. If `end` not provided, returns last hourly report published before. if "latest", returns most recent hourly report. if `end` provided, returns all hourly reports published after this date and before `end`. end (str, datetime,): if provided, returns all hourly reports published after `date` and before `end` forecast_type (ERCOTSevenDayLoadForecastReport): The load forecast type. Enum of possible values. verbose (bool, optional): print verbose output. Defaults to False. """ # TODO: migrate to _get_hourly_report if end is None: doc = self._get_document( report_type_id=forecast_type.value, published_before=date, constructed_name_contains="csv.zip", verbose=verbose, ) docs = [doc] else: docs = self._get_documents( report_type_id=forecast_type.value, published_after=date, published_before=end, constructed_name_contains="csv.zip", verbose=verbose, ) all_df = [] for doc in docs: df = self._handle_load_forecast( doc, forecast_type=forecast_type, verbose=verbose, ) all_df.append(df) df = pd.concat(all_df) df = df.sort_values("Publish Time") return df
def _handle_load_forecast( self, doc: Document, forecast_type: ERCOTSevenDayLoadForecastReport, verbose: bool = False, ) -> pd.DataFrame: """ Function to handle the different types of load forecast parsing. """ df = self.read_doc(doc, verbose=verbose) df["Publish Time"] = doc.publish_date df = df.rename(columns={"SystemTotal": "System Total"}) cols_to_move = [ "Time", "Interval Start", "Interval End", "Publish Time", ] if forecast_type.value == ERCOTSevenDayLoadForecastReport.BY_WEATHER_ZONE.value: df = df.rename( columns=self._weather_zone_column_name_mapping(), ) cols_to_move += self._weather_zone_column_name_order() + ["System Total"] df = utils.move_cols_to_front(df, cols_to_move) return df @support_date_range(frequency=None)
[docs] def get_load_forecast_by_model( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Seven-Day Load Forecast by Model and Weather Zone. Forecasted hourly demand by Model and Weather Zone as reported by ERCOT. Released every hour for the current day and the next 7. Arguments: date (str, datetime): date to get report for. Supports "latest" or a date string. end (str, datetime, optional): end date for date range. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with load forecast by model data Source: https://www.ercot.com/mp/data-products/data-product-details?id=NP3-565-CD """ def handle_doc(doc: Document, verbose: bool = False) -> pd.DataFrame: df = self.read_doc(doc, verbose=verbose) df["Publish Time"] = doc.publish_date return self._handle_load_forecast_by_model(df) df = self._get_hourly_report( start=date, end=end, report_type_id=ERCOTSevenDayLoadForecastReport.BY_MODEL_AND_WEATHER_ZONE.value, extension="csv", handle_doc=handle_doc, verbose=verbose, ) return df[LOAD_FORECAST_BY_MODEL_COLUMNS].sort_values( ["Interval Start", "Publish Time", "Model"], )
def _handle_load_forecast_by_model( self, df: pd.DataFrame, publish_time: pd.Timestamp | None = None, ) -> pd.DataFrame: """Handle parsing of load forecast by model data. Arguments: df: DataFrame with raw load forecast by model data. Returns: DataFrame with renamed columns and optional Publish Time. """ df = df.rename( columns={ **self._weather_zone_column_name_mapping(), "SystemTotal": "System Total", "InUseFlag": "In Use Flag", }, ) # Convert In Use Flag to boolean df["In Use Flag"] = df["In Use Flag"] == "Y" return df
[docs] def get_capacity_committed( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = "latest", verbose: bool = False, ) -> pd.DataFrame: """ Retrieves the actual committed capacity (the amount of power available from generating units that were on-line or providing operating reserves). Data is ephemeral and does not support past days. """ data = self._get_capacity_dataset(verbose=verbose) return ( data.loc[ # Actual values data["forecast"] == 0, ["Interval Start", "Interval End", "capacity"], ] .rename(columns={"capacity": "Capacity"}) .reset_index(drop=True) )
[docs] def get_capacity_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = "latest", verbose: bool = False, ) -> pd.DataFrame: """ Retrieves the forecasted committed capacity (Committed Capacity) and the forecasted available capacity (Available Capacity) for the current day. Data is ephemeral and does not support past days. """ data = self._get_capacity_dataset(verbose=verbose) # Forecast values return ( data.loc[ data["forecast"] == 1, [ "Interval Start", "Interval End", "Publish Time", "capacity", "available", ], ] .rename( columns={ "capacity": "Committed Capacity", "available": "Available Capacity", }, ) .reset_index(drop=True) )
def _get_capacity_dataset(self, verbose: bool = False) -> pd.DataFrame: supply_demand_json = self._get_supply_demand_json() data = pd.DataFrame(supply_demand_json["data"]) data.loc[ :, "Publish Time", ] = self._get_update_timestamp_from_supply_demand_json(supply_demand_json) data["Interval Start"] = pd.to_datetime( data["epoch"], unit="ms", utc=True, ).dt.tz_convert(self.default_timezone) data["Interval End"] = data["Interval Start"] + pd.Timedelta(minutes=5) return data[ [ "Interval Start", "Interval End", "Publish Time", "capacity", "forecast", "available", ] ].sort_values("Interval Start")
[docs] def get_available_seasonal_capacity_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = "latest", verbose: bool = False, ) -> pd.DataFrame: """ Retrieves the forecasted demand (Load Forecast) and the forecasted available seasonal capacity (Available Capacity) for the next 6 days. Data is ephemeral and does not support past days. """ supply_demand_json = self._get_supply_demand_json() data = pd.DataFrame(supply_demand_json["forecast"]) # Use epoch to get the UTC timestamps then convert to local to avoid issues # around DST transitions data["Interval End"] = pd.to_datetime( data["epoch"], unit="ms", utc=True, ).dt.tz_convert(self.default_timezone) data["Interval Start"] = data["Interval End"] - pd.Timedelta(hours=1) # For DST start in March 2026, ERCOT published a row with the same start and # end time which is incorrect (during DST start, there should be a gap of one # hour). We drop this interval data = data[ ~( (data["deliveryDateHrBegin"] == "2026-03-08 03:00:00") & (data["deliveryDateHrEnd"] == "2026-03-08 03:00:00") ) ] data.loc[ :, "Publish Time", ] = self._get_update_timestamp_from_supply_demand_json(supply_demand_json) return ( data[ [ "Interval Start", "Interval End", "Publish Time", "availCapGen", "forecastedDemand", ] ] .rename( columns={ "availCapGen": "Available Capacity", "forecastedDemand": "Load Forecast", }, ) .sort_values("Interval Start") .reset_index(drop=True) )
[docs] def get_rtm_spp(self, year: int, verbose: bool = False) -> pd.DataFrame: """Get Historical RTM Settlement Point Prices(SPPs) for each of the Hubs and Load Zones Arguments: year(int): year to get data for Starting 2011, returns data for the entire year Source: https://www.ercot.com/mp/data-products/data-product-details?id=NP6-785-ER """ # noqa doc_info = self._get_document( report_type_id=HISTORICAL_RTM_LOAD_ZONE_AND_HUB_PRICES_RTID, constructed_name_contains=f"{year}.zip", verbose=verbose, ) x = utils.get_zip_file(doc_info.url, verbose=verbose) all_sheets = pd.read_excel(x, sheet_name=None) df = pd.concat(all_sheets.values()) # fix parsing error where no data is present # should only be 1 row per year count = df[["Delivery Hour", "Delivery Interval"]].isnull().all(axis=1).sum() if count == 1: df = df.dropna( subset=["Delivery Hour", "Delivery Interval"], how="all", ) elif count > 1: raise ValueError( "Parsing error, more than expected null rows found", ) df["Delivery Interval"] = df["Delivery Interval"].astype("Int64") df = self.parse_doc(df, verbose=verbose) return self._finalize_spp_df( df, market=Markets.REAL_TIME_15_MIN, verbose=verbose, )
[docs] def get_dam_spp(self, year: int, verbose: bool = False) -> pd.DataFrame: """Get Historical DAM Settlement Point Prices(SPPs) for each of the Hubs and Load Zones Arguments: year(int): year to get data for. Starting 2011, returns data for the entire year Source: https://www.ercot.com/mp/data-products/data-product-details?id=NP4-180-ER """ doc_info = self._get_document( report_type_id=HISTORICAL_DAM_LOAD_ZONE_AND_HUB_PRICES_RTID, constructed_name_contains=f"{year}.zip", verbose=verbose, ) x = utils.get_zip_file(doc_info.url, verbose=verbose) all_sheets = pd.read_excel(x, sheet_name=None) df = pd.concat(all_sheets.values()) # filter where DSTFlag == 10 df = self.parse_doc(df, verbose=verbose) return self._finalize_spp_df( df, market=Markets.DAY_AHEAD_HOURLY, verbose=verbose, )
[docs] def get_raw_interconnection_queue(self, verbose: bool = False) -> BinaryIO: doc_info = self._get_document( report_type_id=GIS_REPORT_RTID, constructed_name_contains="GIS_Report", verbose=verbose, ) logger.info(f"Downloading interconnection queue from: {doc_info.url} ") response = requests.get(doc_info.url) return utils.get_response_blob(response)
[docs] def get_interconnection_queue(self, verbose: bool = False) -> pd.DataFrame: """ Get interconnection queue for ERCOT Monthly historical data available here: http://mis.ercot.com/misapp/GetReports.do?reportTypeId=15933&reportTitle=GIS%20Report&showHTMLView=&mimicKey """ # noqa raw_data = self.get_raw_interconnection_queue(verbose) # TODO other sheets for small projects, inactive, and cancelled project # TODO see if this data matches up with summaries in excel file # TODO historical data available as well # skip rows and handle header queue = pd.read_excel( raw_data, sheet_name="Project Details - Large Gen", skiprows=30, ).iloc[4:] queue["State"] = "Texas" queue["Queue Date"] = queue["Screening Study Started"] fuel_type_map = { "BIO": "Biomass", "COA": "Coal", "GAS": "Gas", "GEO": "Geothermal", "HYD": "Hydrogen", "NUC": "Nuclear", "OIL": "Fuel Oil", "OTH": "Other", "PET": "Petcoke", "SOL": "Solar", "WAT": "Water", "WIN": "Wind", } technology_type_map = { "BA": "Battery Energy Storage", "CC": "Combined-Cycle", "CE": "Compressed Air Energy Storage", "CP": "Concentrated Solar Power", "EN": "Energy Storage", "FC": "Fuel Cell", "GT": "Combustion (gas) Turbine, but not part of a Combined-Cycle", "HY": "Hydroelectric Turbine", "IC": "Internal Combustion Engine, eg. Reciprocating", "OT": "Other", "PV": "Photovoltaic Solar", "ST": "Steam Turbine other than Combined-Cycle", "WT": "Wind Turbine", } queue["Fuel"] = queue["Fuel"].map(fuel_type_map) queue["Technology"] = queue["Technology"].map(technology_type_map) queue["Generation Type"] = queue["Fuel"] + " - " + queue["Technology"] queue["Status"] = ( queue["IA Signed"] .isna() .map( { True: InterconnectionQueueStatus.ACTIVE.value, False: InterconnectionQueueStatus.COMPLETED.value, }, ) ) queue["Actual Completion Date"] = queue["Approved for Synchronization"] rename = { "INR": "Queue ID", "Project Name": "Project Name", "Interconnecting Entity": "Interconnecting Entity", "Projected COD": "Proposed Completion Date", "POI Location": "Interconnection Location", "County": "County", "State": "State", "Capacity (MW)": "Capacity (MW)", "Queue Date": "Queue Date", "Generation Type": "Generation Type", "Actual Completion Date": "Actual Completion Date", "Status": "Status", } # todo: there are a few columns being parsed # as "unamed" that aren't being included but should extra_columns = [ "Fuel", "Technology", "GIM Study Phase", "Screening Study Started", "Screening Study Complete", "FIS Requested", "FIS Approved", "Economic Study Required", "IA Signed", "Air Permit", "GHG Permit", "Water Availability", "Meets Planning", "Meets All Planning", "CDR Reporting Zone", # "Construction Start", # all null # "Construction End", # all null "Approved for Energization", "Approved for Synchronization", "Comment", ] missing = [ # todo the actual complettion date can be calculated by # looking at status and other date columns "Withdrawal Comment", "Transmission Owner", "Summer Capacity (MW)", "Winter Capacity (MW)", "Withdrawn Date", ] queue = utils.format_interconnection_df( queue=queue, rename=rename, extra=extra_columns, missing=missing, ) return queue
@support_date_range(frequency=None)
[docs] def get_lmp( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, location_type: str = SETTLEMENT_POINT_LOCATION_TYPE, # TODO: support 'ALL' verbose: bool = False, ) -> pd.DataFrame: """Get LMP data for ERCOT normally produced by SCED every five minutes Can specify the location type to return "electrical bus" or "settlement point" data. Defaults to "settlement point" """ if location_type.lower() == ELECTRICAL_BUS_LOCATION_TYPE.lower(): report = LMPS_BY_ELECTRICAL_BUS_RTID elif location_type.lower() == SETTLEMENT_POINT_LOCATION_TYPE.lower(): report = LMPS_BY_SETTLEMENT_POINT_RTID else: raise ValueError( f"Invalid location type: {location_type}. Must be 'settlement point' or 'electrical bus'", # noqa ) # if end is None, assume requesting one day if end is None: start = None end = None date = date else: start = date end = end date = None docs = self._get_documents( report_type_id=report, date=date, friendly_name_timestamp_after=start, friendly_name_timestamp_before=end, extension="csv", verbose=verbose, ) return self._handle_lmp(docs=docs, verbose=verbose)
def _handle_lmp( self, docs: list[Document], verbose: bool = False, sced: bool = True, ) -> pd.DataFrame: df = self.read_docs( docs, parse=False, # need to return a DF that works with the # logic in rest of function empty_df=pd.DataFrame( columns=[ "SCEDTimestamp", "RepeatedHourFlag", "Location", "Location Type", "LMP", ], ), verbose=verbose, ) return self._handle_lmp_df(df, verbose=verbose, sced=sced) def _handle_lmp_df( self, df: pd.DataFrame, verbose: bool = False, sced: bool = True, ) -> pd.DataFrame: df = self._handle_sced_timestamp(df=df, verbose=verbose) if "SettlementPoint" in df.columns: df = self._handle_settlement_point_name_and_type(df, verbose=verbose) elif "ElectricalBus" in df.columns: # do same thing as settlement point but for electrical bus df = df.rename( columns={ "ElectricalBus": "Location", }, ) df["Location Type"] = ELECTRICAL_BUS_LOCATION_TYPE # make Location string and location type category df["Location"] = df["Location"].astype("string") df["Location Type"] = df["Location Type"].astype("category") df["Market"] = ( Markets.REAL_TIME_SCED.value if sced else Markets.DAY_AHEAD_HOURLY.value ) df = df[ [ "Interval Start", "Interval End", "SCED Timestamp", "Market", "Location", "Location Type", "LMP", ] ] # sort by SCED Timestamp and Location df = df.sort_values( [ "SCED Timestamp", "Location", ], ).reset_index(drop=True) return df @support_date_range(frequency=None)
[docs] def get_lmp_by_bus_dam( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Day-Ahead Market (DAM) LMPs by Electrical Bus Returns hourly Locational Marginal Prices per electrical bus from the Day-Ahead Market. https://www.ercot.com/mp/data-products/data-product-details?id=NP4-183-CD Arguments: date (str, datetime): date to get data for. Supports "latest", "today", or a specific date. end (str, datetime, optional): end date for a date range query. If None, returns 1 day of data. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with day-ahead LMPs by electrical bus """ if date == "latest": return self.get_lmp_by_bus_dam("today", verbose=verbose) # DAM data is published the day before delivery publish_date = date.normalize() - pd.DateOffset(days=1) if end is not None: published_before = end - pd.DateOffset(days=1) published_after = publish_date publish_date = None else: published_before = None published_after = None docs = self._get_documents( report_type_id=DAM_LMPS_BY_ELECTRICAL_BUS_RTID, date=publish_date, published_before=published_before, published_after=published_after, constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_docs( docs, empty_df=pd.DataFrame(columns=self._lmp_by_bus_dam_cols), verbose=verbose, ) return self._handle_lmp_by_bus_dam_df(df)
_lmp_by_bus_dam_cols = [ "Interval Start", "Interval End", "Market", "Location", "Location Type", "LMP", ] def _handle_lmp_by_bus_dam_df(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty: raise NoDataFoundException("No DAM LMP by bus data found") df = df.rename(columns={"BusName": "Location"}) df = df.drop(columns=["Time"], errors="ignore") df["Location Type"] = ELECTRICAL_BUS_LOCATION_TYPE df["Market"] = Markets.DAY_AHEAD_HOURLY.value df = df[ [ "Interval Start", "Interval End", "Market", "Location", "Location Type", "LMP", ] ] return df.sort_values("Interval Start").reset_index(drop=True) @lmp_config( supports={ Markets.REAL_TIME_15_MIN: ["latest", "today", "historical"], Markets.DAY_AHEAD_HOURLY: ["latest", "today", "historical"], }, ) @support_date_range(frequency=None)
[docs] def get_spp( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, market: str = None, locations: list = "ALL", location_type: str = "ALL", verbose: bool = False, ) -> pd.DataFrame: """Get SPP data for ERCOT Supported Markets: - ``REAL_TIME_15_MIN`` - ``DAY_AHEAD_HOURLY`` Supported Location Types: - ``Load Zone`` - ``Trading Hub`` - ``Resource Node`` """ publish_date = None published_before = None published_after = None friendly_name_timestamp_before = None friendly_name_timestamp_after = None if market == Markets.REAL_TIME_15_MIN: if date == "latest": publish_date = "latest" elif end is None: # no end, so assume requesting one day # use the timestamp from the friendly name friendly_name_timestamp_after = date.normalize() friendly_name_timestamp_before = ( friendly_name_timestamp_after + pd.DateOffset(days=1) ) else: friendly_name_timestamp_after = date friendly_name_timestamp_before = end report = SETTLEMENT_POINT_PRICES_AT_RESOURCE_NODES_HUBS_AND_LOAD_ZONES_RTID elif market == Markets.DAY_AHEAD_HOURLY: if date == "latest": publish_date = "latest" elif end is None: # no end, so assume requesting one day # data is publish one day prior publish_date = date.normalize() - pd.DateOffset(days=1) else: published_before = end published_after = date report = DAM_SETTLEMENT_POINT_PRICES_RTID docs = self._get_documents( report_type_id=report, date=publish_date, published_before=published_before, published_after=published_after, friendly_name_timestamp_before=friendly_name_timestamp_before, friendly_name_timestamp_after=friendly_name_timestamp_after, constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_docs( docs, empty_df=pd.DataFrame( columns=[ "Time", "Interval Start", "Interval End", "Location", "Location Type", "Market", "SPP", ], ), verbose=verbose, ) return self._finalize_spp_df( df, locations=locations, location_type=location_type, market=market, verbose=verbose, )
def _handle_settlement_point_name_and_type( self, df: pd.DataFrame, verbose: bool = False, ) -> pd.DataFrame: df = df.rename( columns={ "SettlementPoint": "Location", "Settlement Point": "Location", "SettlementPointName": "Location", "Settlement Point Name": "Location", }, ) # todo is this needed if we are defaulting to resource node? mapping_df = self._get_settlement_point_mapping(verbose=verbose) resource_node = mapping_df["RESOURCE_NODE"].dropna().unique() # Create boolean masks for each location type is_hub = df["Location"].str.startswith("HB_") is_load_zone = df["Location"].str.startswith("LZ_") is_load_zone_dc_tie = df["Location"].str.startswith("DC_") is_resource_node = df["Location"].isin(resource_node) # Assign location types based on the boolean masks df.loc[is_hub, "Location Type"] = LOCATION_TYPE_HUB df.loc[is_load_zone, "Location Type"] = LOCATION_TYPE_ZONE df.loc[is_load_zone_dc_tie, "Location Type"] = LOCATION_TYPE_ZONE_DC df.loc[is_resource_node, "Location Type"] = LOCATION_TYPE_RESOURCE_NODE # If a location type is not found, default to LOCATION_TYPE_RESOURCE_NODE df["Location Type"] = df["Location Type"].fillna(LOCATION_TYPE_RESOURCE_NODE) # energy weighted only exists in real time data # since depends on energy usage if "SettlementPointType" in df.columns: is_load_zone_energy_weighted = df["SettlementPointType"] == "LZEW" is_load_zone_dc_tie_energy_weighted = df["SettlementPointType"] == "LZ_DCEW" df.loc[ is_load_zone_energy_weighted, "Location Type", ] = LOCATION_TYPE_ZONE_EW df.loc[ is_load_zone_dc_tie_energy_weighted, "Location Type", ] = LOCATION_TYPE_ZONE_DC_EW # append "_EW" to the end of the location name if it is energy weighted df.loc[is_load_zone_energy_weighted, "Location"] = ( df.loc[is_load_zone_energy_weighted, "Location"] + "_EW" ) df.loc[is_load_zone_dc_tie_energy_weighted, "Location"] = ( df.loc[is_load_zone_dc_tie_energy_weighted, "Location"] + "_EW" ) df["Location"] = df["Location"].astype("string") df["Location Type"] = df["Location Type"].astype("category") return df def _finalize_spp_df( self, df: pd.DataFrame, market: str, locations: list = None, location_type: str = None, verbose: bool = False, ) -> pd.DataFrame: df = self._handle_settlement_point_name_and_type(df, verbose=verbose) df["Market"] = market.value df = df.rename( columns={ "SettlementPointPrice": "SPP", "Settlement Point Price": "SPP", }, ) df = df[ [ "Time", "Interval Start", "Interval End", "Location", "Location Type", "Market", "SPP", ] ] df = utils.filter_lmp_locations( df=df, locations=locations, location_type=location_type, ) df = df.sort_values(by="Interval Start") df = df.reset_index(drop=True) return df @support_date_range(frequency="DAY_START")
[docs] def get_as_prices( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get ancillary service clearing prices in hourly intervals in Day Ahead Market Arguments: date (datetime.date, str): date of delivery for AS services end (datetime.date, str, optional): if declared, function will return data as a range, from "date" to "end" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with prices for "Non-Spinning Reserves", \ "Regulation Up", "Regulation Down", "Responsive Reserves", \ "ERCOT Contingency Reserve Service" """ # subtract one day since it's the day ahead market happens on the day # before for the delivery day date = date - pd.DateOffset(days=1) doc_info = self._get_document( report_type_id=DAM_CLEARING_PRICES_FOR_CAPACITY_RTID, date=date, constructed_name_contains="csv.zip", verbose=verbose, ) logger.info(f"Downloading {doc_info.url}") doc = self.read_doc(doc_info, verbose=verbose) df = self._finalize_as_price_df( doc, pivot=True, ) return df
@support_date_range(frequency=None)
[docs] def get_mcpc_dam( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Market Clearing Prices for Capacity (MCPC) from the Day-Ahead Market Returns hourly MCPC per ancillary service type in long format. https://www.ercot.com/mp/data-products/data-product-details?id=NP4-188-CD Arguments: date (str, datetime): date to get data for. Supports "latest", "today", or a specific date. end (str, datetime, optional): end date for a date range query. If None, returns 1 day of data. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with columns: Interval Start, Interval End, AS Type, MCPC """ if date == "latest": return self.get_mcpc_dam("today", verbose=verbose) # DAM data is published the day before delivery publish_date = date.normalize() - pd.DateOffset(days=1) if end is not None: published_before = end - pd.DateOffset(days=1) published_after = publish_date publish_date = None else: published_before = None published_after = None docs = self._get_documents( report_type_id=DAM_CLEARING_PRICES_FOR_CAPACITY_RTID, date=publish_date, published_before=published_before, published_after=published_after, constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_docs( docs, empty_df=pd.DataFrame(columns=self._mcpc_dam_cols), verbose=verbose, ) return self._handle_mcpc_dam_df(df)
_mcpc_dam_cols = [ "Interval Start", "Interval End", "AS Type", "MCPC", ] def _handle_mcpc_dam_df(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty: raise NoDataFoundException("No DAM MCPC data found") df = df.rename(columns={"AncillaryType": "AS Type"}) df = df.drop(columns=["Time"], errors="ignore") df["MCPC"] = pd.to_numeric(df["MCPC"], errors="coerce") return ( df[["Interval Start", "Interval End", "AS Type", "MCPC"]] .sort_values(["Interval Start", "AS Type"]) .reset_index(drop=True) ) @support_date_range(frequency=None)
[docs] def get_shadow_prices_dam( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Day-Ahead Market Shadow Prices Returns shadow prices for binding transmission constraints from the Day-Ahead Market. https://www.ercot.com/mp/data-products/data-product-details?id=NP4-191-CD Arguments: date (str, datetime): date to get data for. Supports "latest", "today", or a specific date. end (str, datetime, optional): end date for a date range query. If None, returns 1 day of data. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with day-ahead market shadow prices """ if date == "latest": return self.get_shadow_prices_dam("today", verbose=verbose) # DAM data is published the day before delivery publish_date = date.normalize() - pd.DateOffset(days=1) if end is not None: published_before = end - pd.DateOffset(days=1) published_after = publish_date publish_date = None else: published_before = None published_after = None docs = self._get_documents( report_type_id=DAM_SHADOW_PRICES_RTID, date=publish_date, published_before=published_before, published_after=published_after, constructed_name_contains="csv.zip", verbose=verbose, ) df = self.read_docs( docs, empty_df=pd.DataFrame(columns=self._shadow_prices_dam_cols), verbose=verbose, ) return self._handle_shadow_prices_dam_df(df)
def _shadow_prices_column_name_mapper(self): return { "CCTStatus": "CCT Status", "ConstraintId": "Constraint ID", "ConstraintID": "Constraint ID", "ConstraintLimit": "Constraint Limit", "ConstraintName": "Constraint Name", "ConstraintValue": "Constraint Value", "ContingencyName": "Contingency Name", "DeliveryTime": "Delivery Time", "FromStation": "From Station", "FromStationkV": "From Station kV", "MaxShadowPrice": "Max Shadow Price", "ShadowPrice": "Shadow Price", "SystemLambda": "System Lambda", "ToStation": "To Station", "ToStationkV": "To Station kV", "ViolatedMW": "Violated MW", "ViolationAmount": "Violation Amount", } def _construct_limiting_facility_column(self, data): data["Limiting Facility"] = np.where( data["Contingency Name"] != "BASE CASE", data["From Station"].astype(str) + "_" + data["From Station kV"].astype(str) + "_" + data["To Station"].astype(str) + "_" + data["To Station kV"].astype(str), pd.NA, ) return data _shadow_prices_dam_cols = [ "Interval Start", "Interval End", "Constraint ID", "Constraint Name", "Contingency Name", "Limiting Facility", "Constraint Limit", "Constraint Value", "Violation Amount", "Shadow Price", "From Station", "To Station", "From Station kV", "To Station kV", ] def _handle_shadow_prices_dam_df(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty: raise NoDataFoundException("No DAM shadow prices data found") df = df.rename(columns=self._shadow_prices_column_name_mapper()) # Strip whitespace from Contingency Name before constructing # Limiting Facility (raw data may have leading whitespace in # ContingencyName which would break the BASE CASE check) if "Contingency Name" in df.columns: df["Contingency Name"] = df["Contingency Name"].str.strip() df = self._construct_limiting_facility_column(df) # Replace empty strings with NA and strip whitespace after # constructing Limiting Facility to match original behavior df = df.replace("", pd.NA) for col in [ "Constraint Name", "Contingency Name", "Limiting Facility", "To Station", "From Station", ]: if col in df.columns: df[col] = df[col].str.strip() output_cols = [ "Interval Start", "Interval End", "Constraint ID", "Constraint Name", "Contingency Name", "Limiting Facility", "Constraint Limit", "Constraint Value", "Violation Amount", "Shadow Price", "From Station", "To Station", "From Station kV", "To Station kV", ] df = df.drop(columns=["Delivery Time", "Time"], errors="ignore") df = df[output_cols] return df.sort_values( ["Interval Start", "Constraint ID"], ).reset_index(drop=True) @support_date_range(frequency="DAY_START")
[docs] def get_as_plan( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Ancillary Service requirements by type and quantity for each hour of the current day plus the next 6 days Arguments: date (datetime.date, str): date of delivery for AS services end (datetime.date, str, optional): if declared, function will return data as a range, from "date" to "end" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with prices for ECRS, NSPIN, REGDN, REGUP, RRS """ if date == "latest": return self.get_as_plan("today", verbose=verbose) doc_info = self._get_document( report_type_id=DAM_ANCILLARY_SERVICE_PLAN_RTID, date=date, constructed_name_contains="csv.zip", verbose=verbose, ) logger.info(f"Downloading {doc_info.url}") doc = self.read_doc(doc_info, verbose=verbose).drop(columns=["Time"]) doc["Publish Time"] = doc_info.publish_date return self._handle_as_plan(doc)
def _handle_as_plan(self, doc: Document) -> pd.DataFrame: df = doc.pivot( index=["Interval Start", "Interval End", "Publish Time"], columns="AncillaryType", values="Quantity", ).reset_index() # For some hours where there are no values, the data has "Not Applicable" # which becomes a column in the pivot. We want to drop this column if "Not Applicable" in df.columns: df = df.drop(columns=["Not Applicable"]) # ECRS went live 2023-06-10 and isn't present in the data before then if "ECRS" not in df.columns: df["ECRS"] = pd.NA # Put ECRS at the end to match as_prices df = utils.move_cols_to_front( df, [ "Interval Start", "Interval End", "Publish Time", "NSPIN", "REGDN", "REGUP", "RRS", "ECRS", ], ).sort_values(["Interval Start", "Publish Time"]) df.columns.name = None return df # Dates for which ESR data from the normal disclosure file is incorrect # and should be replaced with data from the ESR supplemental correction file. # The correction file was published on Feb 5, 2026. # These are stored as date objects (not timestamps) for timezone-agnostic comparison. ESR_CORRECTION_DATES = [ pd.Timestamp("2025-12-05").date(), pd.Timestamp("2025-12-06").date(), ] # Data dates for which ESR, Gen Resource, and Load Resource data from the # normal disclosure file is incorrect and should be replaced with data from # the supplemental correction files. Published on Feb 23, 2026. # Data dates Dec 5, 2025 through Dec 20, 2025 correspond to report dates # Feb 3, 2026 through Feb 18, 2026. SCED_SUPPLEMENTAL_CORRECTION_START = pd.Timestamp("2025-12-05").date() SCED_SUPPLEMENTAL_CORRECTION_END = pd.Timestamp("2025-12-20").date() @support_date_range("DAY_START")
[docs] def get_60_day_sced_disclosure( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, process: bool = False, verbose: bool = False, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> dict: """Get 60 day SCED Disclosure data Arguments: date (datetime.date, str): date to return end (datetime.date, str, optional): if declared, function will return data as a range, from "date" to "end" process (bool, optional): if True, will process the data into standardized format. if False, will return raw data verbose (bool, optional): print verbose output. Defaults to False. output_format: CurveOutputFormat.LIST (default) returns Python list-of-lists per curve cell. CurveOutputFormat.PG_ARRAY_AS_STRING returns PG array strings, using ~3x less peak memory. Returns: dict: dictionary with keys "sced_load_resource", "sced_gen_resource", "sced_smne", and (when available) "sced_esr", "sced_eoc_updates", "sced_resource_as_offers", mapping to pandas.DataFrame objects """ report_date = date + pd.DateOffset(days=60) doc_info = self._get_document( report_type_id=SIXTY_DAY_SCED_DISCLOSURE_REPORTS_RTID, date=report_date, constructed_name_contains="60_Day_SCED_Disclosure", verbose=verbose, ) z = utils.get_zip_folder(doc_info.url, verbose=verbose) # Check if this date needs correction data # Compare using date() for timezone-agnostic comparison date_as_date = pd.Timestamp(date).date() use_esr_correction = date_as_date in self.ESR_CORRECTION_DATES use_sced_supplemental = ( self.SCED_SUPPLEMENTAL_CORRECTION_START <= date_as_date <= self.SCED_SUPPLEMENTAL_CORRECTION_END ) data = self._handle_60_day_sced_disclosure( z, process=process, verbose=verbose, skip_esr=use_esr_correction or use_sced_supplemental, output_format=output_format, ) if use_sced_supplemental: # Fetch ESR, Gen Resource, and Load Resource from supplemental files supplemental = self._get_sced_supplemental_data( date, process=process, verbose=verbose, output_format=output_format, ) data.update(supplemental) elif use_esr_correction: # Fetch ESR from the supplemental correction file for affected dates esr = self._get_esr_correction_data( date, process=process, verbose=verbose, output_format=output_format, ) if esr is not None: data[SCED_ESR_KEY] = esr return data
def _handle_60_day_sced_disclosure( self, z: ZipFile, process: bool = False, verbose: bool = False, skip_esr: bool = False, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> dict: """Parse a 60-day SCED disclosure zip file into DataFrames. Args: z: Opened ZipFile containing SCED disclosure CSVs. process: If True, apply processing functions to standardize data. verbose: If True, print verbose output. skip_esr: If True, skip ESR data extraction. output_format: Curve output format passed to process functions. """ # TODO: there are other files in the zip folder load_resource_file = None gen_resource_file = None smne_file = None esr_file = None as_offer_updates_file = None resource_as_offers_file = None for file in z.namelist(): cleaned_file = file.replace(" ", "_") if "60d_Load_Resource_Data_in_SCED" in cleaned_file: load_resource_file = file elif "60d_ESR_Data_in_SCED" in cleaned_file: esr_file = file elif "60d_SCED_Gen_Resource_Data" in cleaned_file: gen_resource_file = file elif "60d_SCED_SMNE_GEN_RES" in cleaned_file: smne_file = file elif "60d_SCED_AS_Offer_Updates_in_OpPeriod" in cleaned_file: as_offer_updates_file = file elif "60d_SCED_Resource_AS_OFFERS" in cleaned_file: resource_as_offers_file = file assert load_resource_file, "Could not find load resource file" assert gen_resource_file, "Could not find gen resource file" assert smne_file, "Could not find smne file" load_resource = pd.read_csv(z.open(load_resource_file)) gen_resource = pd.read_csv(z.open(gen_resource_file)) smne = pd.read_csv(z.open(smne_file)) # Skip ESR from the main disclosure file if we need correction data esr = None if not skip_esr and esr_file: esr = pd.read_csv(z.open(esr_file)) as_offer_updates = ( pd.read_csv(z.open(as_offer_updates_file)) if as_offer_updates_file else None ) resource_as_offers = ( pd.read_csv(z.open(resource_as_offers_file)) if resource_as_offers_file else None ) def handle_time( df: pd.DataFrame, time_col: str, is_interval_end: bool = False, ) -> pd.DataFrame: df[time_col] = pd.to_datetime(df[time_col]) if "Repeated Hour Flag" in df.columns: # Repeated Hour Flag is Y during the repeated hour # So, it's N during DST And Y during Standard Time # Pandas wants True for DST and False for Standard Time # during ambiguous times df[time_col] = df[time_col].dt.tz_localize( self.default_timezone, ambiguous=df["Repeated Hour Flag"] == "N", ) interval_start = df[time_col].dt.round( "15min", ambiguous=df["Repeated Hour Flag"] == "N", ) else: # for SMNE data df[time_col] = ( df.sort_values("Interval Number", ascending=True) .groupby("Resource Code")[time_col] .transform( lambda x: x.dt.tz_localize( self.default_timezone, ambiguous="infer", ), ) ) # convert to utc # bc round doesn't work with dst changes # without Repeated Hour Flag interval_start = ( df[time_col] .dt.tz_convert("utc") .dt.round("15min") .dt.tz_convert(self.default_timezone) ) interval_length = pd.Timedelta(minutes=15) if is_interval_end: interval_end = interval_start interval_start = interval_start - interval_length else: interval_end = interval_start + interval_length df.insert(0, "Interval Start", interval_start) df.insert( 1, "Interval End", interval_end, ) return df def localize_sced_timestamp(df: pd.DataFrame) -> pd.DataFrame: """Localize SCED Timestamp without adding Interval Start/End.""" df = df.rename(columns={"SCED Time Stamp": "SCED Timestamp"}) df["SCED Timestamp"] = pd.to_datetime(df["SCED Timestamp"]) df["SCED Timestamp"] = df["SCED Timestamp"].dt.tz_localize( self.default_timezone, ambiguous=df["Repeated Hour Flag"] == "N", ) return df load_resource = localize_sced_timestamp(load_resource) gen_resource = localize_sced_timestamp(gen_resource) # no repeated hour flag like other ERCOT data # likely will error on DST change smne = handle_time(smne, time_col="Interval Time", is_interval_end=True) if esr is not None: esr = localize_sced_timestamp(esr) # Process Resource AS Offers - has SCED Timestamp like other SCED data if resource_as_offers is not None: resource_as_offers = localize_sced_timestamp(resource_as_offers) if process: logger.info("Processing 60 day SCED disclosure data") load_resource = process_sced_load( load_resource, output_format=output_format, ) gen_resource = process_sced_gen(gen_resource, output_format=output_format) smne = smne.rename( columns={ "Resource Code": "Resource Name", }, ) if esr is not None: esr = process_sced_esr(esr, output_format=output_format) if as_offer_updates is not None: as_offer_updates = self.parse_doc(as_offer_updates) as_offer_updates = process_sced_as_offer_updates_in_op_hour( as_offer_updates, ) if resource_as_offers is not None: resource_as_offers = process_sced_resource_as_offers( resource_as_offers, output_format=output_format, ) result = { SCED_LOAD_RESOURCE_KEY: load_resource, SCED_GEN_RESOURCE_KEY: gen_resource, SCED_SMNE_KEY: smne, } if esr is not None: result[SCED_ESR_KEY] = esr if as_offer_updates is not None: result[SCED_AS_OFFER_UPDATES_IN_OP_HOUR_KEY] = as_offer_updates if resource_as_offers is not None: result[SCED_RESOURCE_AS_OFFERS_KEY] = resource_as_offers return result def _get_esr_correction_data( self, date: pd.Timestamp, process: bool = False, verbose: bool = False, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> pd.DataFrame | None: """Fetch ESR data from the supplemental correction file for specific dates. The ESR supplemental correction file was published on Feb 5, 2026 and contains corrected ESR data for Dec 5, 2025 and Dec 6, 2025 (which correspond to report dates Feb 3 and Feb 4, 2026 respectively). Arguments: date: The data date (not report date) to fetch ESR correction data for process: If True, process the data into standardized format verbose: If True, print verbose output output_format: Curve output format passed to process functions. Returns: DataFrame with ESR correction data, or None if not found """ # Fetch the ESR supplemental correction file docs = self._get_documents( report_type_id=SIXTY_DAY_SCED_DISCLOSURE_REPORTS_RTID, constructed_name_contains="60_Day_SCED_ESR_SUPPLEMENTAL_CORRECTION", verbose=verbose, ) if not docs: logger.warning("ESR supplemental correction file not found") return None # Use the most recent correction file doc = docs[0] z = utils.get_zip_folder(doc.url, verbose=verbose) # Find the correct file for the requested date # Files are named like: 60d_ESR_Data_in_SCED-03-FEB-26_SUPPLEMENTAL_CORRECTION.csv # where 03-FEB-26 is the report date (data date + 60 days) report_date = date + pd.DateOffset(days=60) date_str = report_date.strftime("%d-%b-%y").upper() expected_filename = ( f"60d_ESR_Data_in_SCED-{date_str}_SUPPLEMENTAL_CORRECTION.csv" ) target_file = None for file in z.namelist(): if file == expected_filename: target_file = file break if target_file is None: logger.warning( f"Could not find ESR correction file for date {date} " f"(report date {report_date})", ) return None if verbose: logger.info(f"Reading ESR correction data from {target_file}") esr = pd.read_csv(z.open(target_file)) # Localize the SCED Timestamp esr = esr.rename(columns={"SCED Time Stamp": "SCED Timestamp"}) esr["SCED Timestamp"] = pd.to_datetime(esr["SCED Timestamp"]) esr["SCED Timestamp"] = esr["SCED Timestamp"].dt.tz_localize( self.default_timezone, ambiguous=esr["Repeated Hour Flag"] == "N", ) if process: esr = process_sced_esr(esr, output_format=output_format) return esr # Mapping of dataset key to (document name filter, file name prefix) for # the Feb 3-18, 2026 supplemental correction files. _SCED_SUPPLEMENTAL_DATASETS = [ ( SCED_ESR_KEY, "60d_ESR_Data_in_SCED_02032026_thru_02182026_SUPPLEMENTAL", "60d_ESR_Data_in_SCED", process_sced_esr, ), ( SCED_GEN_RESOURCE_KEY, "60d_Gen_Resource_Data_in_SCED_02032026_thru_02182026_SUPPLEMENTAL", "60d_SCED_Gen_Resource_Data", process_sced_gen, ), ( SCED_LOAD_RESOURCE_KEY, "60d_Load_Resource_Data_in_SCED_02032026_thru_02182026_SUPPLEMENTAL", "60d_Load_Resource_Data_in_SCED", process_sced_load, ), ] def _get_sced_supplemental_data( self, date: pd.Timestamp, process: bool = False, verbose: bool = False, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> dict: """Fetch ESR, Gen Resource, and Load Resource data from supplemental correction files for data dates Dec 5-20, 2025. ERCOT published incorrect data for these three datasets for data dates Dec 5, 2025 through Dec 20, 2025 (report dates Feb 3-18, 2026). The supplemental correction files were published on Feb 23, 2026. Arguments: date: The data date (not report date) to fetch supplemental data for process: If True, process the data into standardized format verbose: If True, print verbose output output_format: Curve output format passed to process functions. Returns: dict with keys for the corrected datasets """ # Files in the supplemental zips are named with the report date report_date = date + pd.DateOffset(days=60) date_str = report_date.strftime("%d-%b-%y").upper() result = {} for key, doc_name, file_prefix, process_fn in self._SCED_SUPPLEMENTAL_DATASETS: docs = self._get_documents( report_type_id=SIXTY_DAY_SCED_DISCLOSURE_REPORTS_RTID, constructed_name_contains=doc_name, verbose=verbose, ) if not docs: logger.warning( f"Supplemental correction file not found for {key}", ) continue doc = docs[0] z = utils.get_zip_folder(doc.url, verbose=verbose) # Files are named like: {file_prefix}-{DD-MMM-YY}.csv expected_filename = f"{file_prefix}-{date_str}.csv" target_file = None for file in z.namelist(): cleaned = file.replace(" ", "_") if expected_filename in cleaned: target_file = file break if target_file is None: logger.warning( f"Could not find supplemental file {expected_filename} for {key}", ) continue if verbose: logger.info(f"Reading supplemental data from {target_file}") df = pd.read_csv(z.open(target_file)) # Localize SCED Timestamp df = df.rename(columns={"SCED Time Stamp": "SCED Timestamp"}) df["SCED Timestamp"] = pd.to_datetime(df["SCED Timestamp"]) df["SCED Timestamp"] = df["SCED Timestamp"].dt.tz_localize( self.default_timezone, ambiguous=df["Repeated Hour Flag"] == "N", ) if process: df = process_fn(df, output_format=output_format) result[key] = df return result @support_date_range("DAY_START")
[docs] def get_60_day_dam_disclosure( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, process: bool = False, verbose: bool = False, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> dict: """Get 60 day DAM Disclosure data. Returns a dict with keys - "dam_gen_resource" - "dam_gen_resource_as_offers" - "dam_load_resource" - "dam_load_resource_as_offers" - "dam_energy_only_offer_awards" - "dam_energy_only_offers" - "dam_ptp_obligation_bid_awards" - "dam_ptp_obligation_bids" - "dam_energy_bid_awards" - "dam_energy_bids" - "dam_ptp_obligation_option" - "dam_ptp_obligation_option_awards" - "dam_esr" (when available, starting 2025-12-06) - "dam_esr_as_offers" (when available, starting 2025-12-06) and values as pandas.DataFrame objects The date passed in should be the report date. Since reports are delayed by 60 days, the passed date should not be fewer than 60 days in the past. Args: output_format: CurveOutputFormat.LIST (default) returns Python list-of-lists per curve cell. CurveOutputFormat.PG_ARRAY_AS_STRING returns PG array strings, using ~3x less peak memory. """ report_date = date + pd.DateOffset(days=60) doc_info = self._get_document( report_type_id=SIXTY_DAY_DAM_DISCLOSURE_REPORTS_RTID, date=report_date, constructed_name_contains="60_Day_DAM_Disclosure.zip", verbose=verbose, ) z = utils.get_zip_folder(doc_info.url, verbose=verbose) data = self._handle_60_day_dam_disclosure( z, process=process, verbose=verbose, output_format=output_format, ) return data
def _handle_60_day_dam_disclosure( self, z: ZipFile, process: bool = False, verbose: bool = False, files_prefix: dict = None, output_format: CurveOutputFormat | str = CurveOutputFormat.LIST, ) -> dict: """Parse a 60-day DAM disclosure zip file into DataFrames. Args: z: Opened ZipFile containing DAM disclosure CSVs. process: If True, apply processing functions to standardize data. verbose: If True, print verbose output. files_prefix: Override dict mapping data keys to file name prefixes. output_format: Curve output format passed to process functions. """ if not files_prefix: files_prefix = { DAM_GEN_RESOURCE_KEY: "60d_DAM_Gen_Resource_Data-", DAM_GEN_RESOURCE_AS_OFFERS_KEY: "60d_DAM_Generation_Resource_ASOffers-", DAM_LOAD_RESOURCE_KEY: "60d_DAM_Load_Resource_Data-", DAM_LOAD_RESOURCE_AS_OFFERS_KEY: "60d_DAM_Load_Resource_ASOffers-", DAM_ENERGY_ONLY_OFFER_AWARDS_KEY: "60d_DAM_EnergyOnlyOfferAwards-", DAM_ENERGY_ONLY_OFFERS_KEY: "60d_DAM_EnergyOnlyOffers-", DAM_PTP_OBLIGATION_BID_AWARDS_KEY: "60d_DAM_PTPObligationBidAwards-", DAM_PTP_OBLIGATION_BIDS_KEY: "60d_DAM_PTPObligationBids-", DAM_ENERGY_BID_AWARDS_KEY: "60d_DAM_EnergyBidAwards-", DAM_ENERGY_BIDS_KEY: "60d_DAM_EnergyBids-", DAM_PTP_OBLIGATION_OPTION_KEY: "60d_DAM_PTP_Obligation_Option-", DAM_PTP_OBLIGATION_OPTION_AWARDS_KEY: "60d_DAM_PTP_Obligation_OptionAwards-", # noqa } # ESR files are optional (only available starting 2025-12-06) optional_files_prefix = { DAM_ESR_KEY: "60d_DAM_ESR_Data-", DAM_ESR_AS_OFFERS_KEY: "60d_DAM_ESR_ASOffers-", } files = {} # find required files in zip folder for key, file in files_prefix.items(): for f in z.namelist(): if file in f: files[key] = f assert len(files) == len(files_prefix), "Missing files" # find optional files in zip folder for key, file in optional_files_prefix.items(): for f in z.namelist(): if file in f: files[key] = f data = {} for key, file in files.items(): doc = pd.read_csv(z.open(file)) # weird that these files dont have this column like all other ERCOT files # add so we can parse doc["DSTFlag"] = "N" data[key] = self.parse_doc(doc, verbose=verbose) if process: file_to_function = { DAM_GEN_RESOURCE_KEY: process_dam_gen, DAM_LOAD_RESOURCE_KEY: process_dam_load, DAM_GEN_RESOURCE_AS_OFFERS_KEY: process_dam_or_gen_load_as_offers, DAM_LOAD_RESOURCE_AS_OFFERS_KEY: process_dam_or_gen_load_as_offers, DAM_ENERGY_ONLY_OFFER_AWARDS_KEY: process_dam_energy_only_offer_awards, DAM_ENERGY_ONLY_OFFERS_KEY: process_dam_energy_only_offers, DAM_PTP_OBLIGATION_BID_AWARDS_KEY: process_dam_ptp_obligation_bid_awards, # noqa DAM_PTP_OBLIGATION_BIDS_KEY: process_dam_ptp_obligation_bids, DAM_ENERGY_BID_AWARDS_KEY: process_dam_energy_bid_awards, DAM_ENERGY_BIDS_KEY: process_dam_energy_bids, DAM_PTP_OBLIGATION_OPTION_KEY: process_dam_ptp_obligation_option, DAM_PTP_OBLIGATION_OPTION_AWARDS_KEY: process_dam_ptp_obligation_option_awards, # noqa DAM_ESR_KEY: process_dam_esr, DAM_ESR_AS_OFFERS_KEY: process_dam_esr_as_offers, } # These process functions accept output_format for curve extraction supports_output_format = { DAM_GEN_RESOURCE_KEY, DAM_ESR_KEY, DAM_ENERGY_ONLY_OFFERS_KEY, DAM_ENERGY_BIDS_KEY, DAM_GEN_RESOURCE_AS_OFFERS_KEY, DAM_LOAD_RESOURCE_AS_OFFERS_KEY, DAM_ESR_AS_OFFERS_KEY, } for file_name, process_func in file_to_function.items(): if file_name in data: if file_name in supports_output_format: data[file_name] = process_func( data[file_name], output_format=output_format, ) else: data[file_name] = process_func(data[file_name]) return data
[docs] def get_sara( self, url: str = "https://www.ercot.com/files/docs/2023/05/05/SARA_Summer2023_Revised.xlsx", verbose: bool = False, ) -> pd.DataFrame: """Parse SARA data from url. Seasonal Assessment of Resource Adequacy for the ERCOT Region (SARA) Arguments: url (str, optional): url to download SARA data from. Defaults to Summer 2023 SARA data. """ # only reading SummerCapacities right now # TODO: parse more sheets logger.info(f"Getting SARA data from {url}") df = pd.read_excel(url, sheet_name="SummerCapacities", header=1) # drop cols Unnamed: 0 df = df.drop("Unnamed: 0", axis=1) df = df.rename( columns={ "UNIT NAME": "Unit Name", "GENERATION INTERCONNECTION PROJECT CODE": "Generation Interconnection Project Code", # noqa: E501 "UNIT CODE": "Unit Code", "COUNTY": "County", "FUEL": "Fuel", "ZONE": "Zone", "IN SERVICE YEAR": "In Service Year", "INSTALLED CAPACITY RATING": "Installed Capacity Rating", "SUMMER\nCAPACITY\n(MW)": "Summer Capacity (MW)", "NEW PLANNED PROJECT ADDITIONS TO REPORT": "New Planned Project Additions to Report", # noqa: E501 }, ) # every unit should have this defined df = df.dropna(subset=["Fuel"]) df["In Service Year"] = df["In Service Year"].astype("Int64") category_cols = ["County", "Fuel", "Zone"] for col in category_cols: df[col] = df[col].astype("category") return df
def _finalize_as_price_df( self, doc: pd.DataFrame, pivot: bool = False, ) -> pd.DataFrame: doc["Market"] = "DAM" # recent daily files need to be pivoted if pivot: doc = doc.pivot_table( index=["Time", "Interval Start", "Interval End", "Market"], columns="AncillaryType", values="MCPC", ).reset_index() doc.columns.name = None # some columns from workbook contain trailing/leading whitespace doc.columns = [x.strip() for x in doc.columns] # NSPIN REGDN REGUP RRS ECRS rename = { "NSPIN": "Non-Spinning Reserves", "REGDN": "Regulation Down", "REGUP": "Regulation Up", "RRS": "Responsive Reserves", "ECRS": "ERCOT Contingency Reserve Service", } col_order = [ "Time", "Interval Start", "Interval End", "Market", "Non-Spinning Reserves", "Regulation Down", "Regulation Up", "Responsive Reserves", "ERCOT Contingency Reserve Service", ] if "ECRS" not in doc.columns: doc["ECRS"] = None doc.rename(columns=rename, inplace=True) return doc[col_order]
[docs] def get_as_monitor( self, date: str = "latest", verbose: bool = False, ) -> pd.DataFrame: """Get Ancillary Service Capacity Monitor. Parses table from https://www.ercot.com/content/cdr/html/as_capacity_monitor.html Arguments: date (str): only supports "latest" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with ancillary service capacity monitor data """ url = "https://www.ercot.com/content/cdr/html/as_capacity_monitor.html" logger.info(f"Getting Ancillary Service Capacity Monitor from {url}") html_content = requests.get(url).content df = self._parse_html_table(html_content) return df
[docs] def get_system_as_capacity_monitor( self, date: str | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get System Ancillary Service Capacity Monitor. Fetches real-time ancillary service capacity data from https://www.ercot.com/api/1/services/read/dashboards/ancillary-service-capacity-monitor.json Arguments: date (str): only supports "latest" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with system AS capacity monitor data """ if date is not None and date != "latest": logger.warning( "date argument to get_system_as_capacity_monitor is ignored; only None or 'latest' are supported", ) url = self.BASE + "/ancillary-service-capacity-monitor.json" logger.info(f"Getting System Capacity AS Monitor from {url}...") json_data = self._get_json(url, verbose=verbose) return self._parse_system_as_capacity_monitor(json_data)
def _parse_system_as_capacity_monitor(self, json_data: dict) -> pd.DataFrame: """Parse JSON response from System Ancillary Service Capacity Monitor API. Arguments: json_data: Raw JSON response from the API Returns: pandas.DataFrame: Parsed data with standardized column names """ key_to_column = { "rrcCapPfrGenEsr": "RRS Capability PFR Gen and ESR", "rrcCapLrWoClr": "RRS Capability Load Ex Controllable Load", "rrcCapLr": "RRS Capability PFR Controllable Load", "rrcCapFfr": "RRS Capability FFR Capable Ex ESR", "rrcCapFfrEsr": "RRS Capability FFR ESR", "regUpCap": "Reg Capability Reg Up", "regDownCap": "Reg Capability Reg Down", "regUpUndeployed": "Reg Capability Undeployed Reg Up", "regDownUndeployed": "Reg Capability Undeployed Reg Down", "regUpDeployed": "Reg Capability Deployed Reg Up", "regDownDeployed": "Reg Capability Deployed Reg Down", "rrAwdGen": "RRS Awards PFR Gen and ESR", "rrAwdNonClr": "RRS Awards UFR Load Ex Controllable Load", "rrAwdClr": "RRS Awards PFR Controllable Load", "rrAwdFfr": "RRS Awards FFR Capable", "regUpAwd": "Reg Awards Reg Up", "regDownAwd": "Reg Awards Reg Down", "ecrsCapGen": "ECRS Capability Gen", "ecrsCapNclr": "ECRS Capability Load Ex Controllable Load", "ecrsCapClr": "ECRS Capability Controllable Load", "ecrsCapQs": "ECRS Capability Quick Start Gen", "ecrsCapEsr": "ECRS Capability ESR", "ecrsCapDeployedGenLr": "ECRS Capability Manually Deployed ONSC Status", "capClrDecreaseBp": "Capacity From CLRS Available To Decrease Base Points In SCED", "capClrIncreaseBp": "Capacity From CLRS Available To Increase Base Points In SCED", "capWEoIncreaseBp": "Capacity With Energy Offer Curves To Increase Genres BP In SCED", "capWEoDecreaseBp": "Capacity With Energy Offer Curves To Decrease Genres BP In SCED", "capWoEoIncreaseBp": "Capacity Without Energy Offers To Increase Genres BP In SCED", "capWoEoDecreaseBp": "Capacity Without Energy Offers To Decrease Genres BP In SCED", "esrCapWEoIncreaseBp": "Capacity with energy offers to increase ESR BP in SCED", "esrCapWEoDecreaseBp": "Capacity with energy offers to decrease ESR BP in SCED", "esrCapWoEoIncreaseBp": "Capacity without energy offers to increase ESR BP in SCED", "esrCapWoEoDecreaseBp": "Capacity without energy offers to decrease ESR BP in SCED", "capIncreaseGenBp": "Capacity To Increase Genres BP In Next Five Minutes In SCED HDL", "capDecreaseGenBp": "Capacity To Decrease Genres BP In Next Five Minutes In SCED LDL", "sumCapResRegUpRrs": "Capacity to provide Reg Up RRS or Both", "sumCapResRegUpRrsEcrs": "Capacity to provide Reg Up RRS ECRS or any combo", "sumCapResRegUpRrsEcrsNsr": "Capacity to provide Reg Up RRS ECRS NSpin any combination", "ecrsAwdGen": "ECRS Awards Gen", "ecrsAwdNonClr": "ECRS Awards Load Ex Controllable Load", "ecrsAwdClr": "ECRS Awards Controllable Load", "ecrsAwdQs": "ECRS Awards Quick Start Gen", "ecrsAwdEsr": "ECRS Awards ESR", "prc": "PRC", "nsrCapOnGenWoEo": "Nspin Capability On Line Gen with Energy Offers", "nsrCapOffResWOs": "Nspin Capability Resources with Output Schedules", "nsrCapUndeployedLr": "Nspin Capability Undeployed Load", "nsrCapOffGen": "Nspin Capability Offline Gen Ex QSGR Online Gen with power aug", "nsrCapEsr": "Nspin Capability ESR", "rtReserveOnline": "ORDC Online", "rtReserveOnOffline": "ORDC Online and Offline", "nsrAwdGenWEo": "NSpin Awards On Line Gen with Energy Offer Curves", "nsrAwdGenWOs": "NSpin Awards On Line Gen with Output Schedules", "nsrAwdLr": "NSPin Awards Load", "nsrAwdOffGen": "NSpin Awards Offline Gen Ex QSGR Including power aug", "nsrAwdQs": "NSpin Awards Quick Start Gen", "nsrAwdAs": "NSpin Awards ESR", "telemHslEmr": "Telemetered HSL Capacity Resource Status EMR", "telemHslOut": "Telemetered HSL Capacity Resource Status OUT", "telemHslOutl": "Telemetered Net Consumption Resource status OUTL", } row_data = {} for group_name, group_data in json_data.get("data", {}).items(): if not isinstance(group_data, list) or len(group_data) < 2: continue for item in group_data[1:]: if len(item) >= 2: key, value = item[0], item[1] if key in key_to_column: row_data[key_to_column[key]] = value last_updated = json_data.get("lastUpdated") if last_updated: time = pd.to_datetime(last_updated).tz_convert(self.default_timezone) else: time = pd.Timestamp.now(tz=self.default_timezone) row_data["Time"] = time df = pd.DataFrame([row_data]) df = utils.move_cols_to_front(df, ["Time"]) return df OPERATIONS_MESSAGES_URL = ( "https://www.ercot.com/services/comm/mkt_notices/opsmessages" ) WAYBACK_CDX_URL = "https://web.archive.org/cdx/search/cdx" WAYBACK_SNAP_URL = "https://web.archive.org/web/{timestamp}/{url}"
[docs] def get_operations_messages( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get operations messages from the ERCOT control room. When called without date arguments, scrapes the live page at https://www.ercot.com/services/comm/mkt_notices/opsmessages which shows a rolling window of recent messages (~one month). When called with a date (and optional end), fetches historical snapshots from the Wayback Machine covering the requested range, deduplicates, and filters to the requested window. Arguments: date: Start date for historical query, or None for latest. end: End date for historical query. Defaults to date + 1 month. verbose: Print verbose output. Returns: pandas.DataFrame with columns Time, Notice, Type, Status. """ if date is None: return self._fetch_operations_messages_from_url( self.OPERATIONS_MESSAGES_URL, verbose=verbose, ) date = utils._handle_date(date, tz=self.default_timezone) if end is None: end = date + pd.DateOffset(months=1) else: end = utils._handle_date(end, tz=self.default_timezone) snapshots = self._get_wayback_snapshots( start=date - pd.DateOffset(months=1), end=end + pd.DateOffset(months=1), verbose=verbose, ) if not snapshots: raise NoDataFoundException( f"No Wayback Machine snapshots found for {date} to {end}", ) frames: list[pd.DataFrame] = [] for i, ts in enumerate(snapshots): if i > 0: time.sleep(2) snap_url = self.WAYBACK_SNAP_URL.format( timestamp=ts, url=self.OPERATIONS_MESSAGES_URL, ) try: df = self._fetch_operations_messages_from_url( snap_url, verbose=verbose, ) frames.append(df) except Exception as e: logger.warning(f"Skipping snapshot {ts}: {e}") if not frames: raise NoDataFoundException( f"Could not parse any snapshots for {date} to {end}", ) combined = pd.concat(frames, ignore_index=True) combined = combined.drop_duplicates( subset=["Time", "Notice"], ) combined = combined[(combined["Time"] >= date) & (combined["Time"] < end)] return ( combined[["Time", "Notice", "Type", "Status"]] .sort_values("Time") .reset_index(drop=True) )
def _fetch_operations_messages_from_url( self, url: str, verbose: bool = False, ) -> pd.DataFrame: """Parse the operations messages HTML table from a URL (live or Wayback).""" logger.info(f"Getting operations messages from {url}") try: dfs = pd.read_html(url, match="Date & Time") except ValueError: dfs = pd.read_html(url, match="Date") df = dfs[0] df = df.rename( columns={ "Date & Time": "Time", "Date": "Time", "Message": "Notice", }, ) df["Time"] = pd.to_datetime(df["Time"], format="mixed") now = pd.Timestamp.now(tz=self.default_timezone) ambiguous = (now.utcoffset().total_seconds() / 3600) == -5.0 df["Time"] = df["Time"].dt.tz_localize( self.default_timezone, ambiguous=ambiguous, nonexistent="shift_forward", ) expected_cols = ["Time", "Notice", "Type", "Status"] for col in expected_cols: if col not in df.columns: df[col] = pd.NA return df[expected_cols].sort_values("Time").reset_index(drop=True) def _get_wayback_snapshots( self, start: pd.Timestamp, end: pd.Timestamp, verbose: bool = False, ) -> list[str]: """Query the Wayback Machine CDX API for snapshot timestamps.""" from_ts = start.strftime("%Y%m%d") to_ts = end.strftime("%Y%m%d") params = { "url": self.OPERATIONS_MESSAGES_URL, "output": "json", "fl": "timestamp,statuscode", "filter": "statuscode:200", "collapse": "timestamp:8", "from": from_ts, "to": to_ts, } logger.info( f"Querying Wayback Machine CDX for snapshots from {from_ts} to {to_ts}", ) max_retries = 3 for attempt in range(max_retries): try: resp = requests.get( self.WAYBACK_CDX_URL, params=params, timeout=60, ) resp.raise_for_status() break except (requests.ConnectionError, requests.Timeout) as e: if attempt == max_retries - 1: raise wait = 2 ** (attempt + 1) logger.warning( f"Wayback CDX request failed (attempt {attempt + 1}/" f"{max_retries}), retrying in {wait}s: {e}", ) time.sleep(wait) data = resp.json() if len(data) <= 1: return [] timestamps = [row[0] for row in data[1:]] logger.info(f"Found {len(timestamps)} Wayback snapshots") return timestamps
[docs] def get_real_time_system_conditions( self, date: str = "latest", verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time System Conditions. Parses table from https://www.ercot.com/content/cdr/html/real_time_system_conditions.html Arguments: date (str): only supports "latest" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with real-time system conditions """ url = "https://www.ercot.com/content/cdr/html/real_time_system_conditions.html" logger.info(f"Getting Real-Time System Conditions from {url}") html_content = requests.get(url).content df = self._parse_html_table(html_content) df = df.rename( columns={ "Frequency - Current Frequency": "Current Frequency", "Real-Time Data - Actual System Demand": "Actual System Demand", "Real-Time Data - Average Net Load": "Average Net Load", "Real-Time Data - Total System Capacity (not including Ancillary Services)": "Total System Capacity excluding Ancillary Services", # noqa: E501 "Real-Time Data - Total Wind Output": "Total Wind Output", "Real-Time Data - Total PVGR Output": "Total PVGR Output", "Real-Time Data - Current System Inertia": "Current System Inertia", }, ) return df
def _parse_html_table(self, html_content: bytes) -> pd.DataFrame: logger.info("Parsing HTML table") soup = BeautifulSoup(html_content, "html.parser") table = soup.find("table", attrs={"class": "tableStyle"}) data = {} header = None for row in table.find_all("tr"): cells = row.find_all("td") if cells[0].get("class") == ["headerValueClass"]: header = cells[0].text.strip() # new header for new dataframe else: category = cells[0].text.strip() value = cells[1].text.strip() header_prepend = header if " (MW)" in header: header_prepend = header_prepend.replace(" (MW)", "") category = f"{category} (MW)" parsed_value = value.replace(",", "") try: parsed_value = int(parsed_value) except ValueError: parsed_value = float(parsed_value) data[f"{header_prepend} - {category}"] = parsed_value df = pd.DataFrame([data]) time_div = soup.find("div", attrs={"class": "schedTime rightAlign"}) time_text = time_div.text.split(": ")[ 1 ] # Split the string on ': ' to get just the time part now = pd.Timestamp.now(tz=self.default_timezone) # Determine if during the repeated DST hour. Pandas wants ambiguous=True if the # time is DST during the repeated hour. US/Central is UTC-6 during standard time # and UTC-5 during DST. Outside the repeated hour, Pandas doesn't care about # ambiguous=True or ambiguous=False. ambiguous = (now.utcoffset().total_seconds() / 3600) == -5.0 df.insert( 0, "Time", pd.to_datetime(time_text).tz_localize( self.default_timezone, ambiguous=ambiguous, ), ) return df @support_date_range(frequency=None)
[docs] def get_wind_actual_and_forecast_hourly( self, date: str | datetime.date, end: str | datetime.date = None, verbose: bool = False, ): """Get Hourly Wind Report. This report is posted every hour and includes System-wide and Regional actual hourly averaged wind power production, STWPF, WGRPP and COP HSLs for On-Line WGRs for a rolling historical 48-hour period as well as the System-wide and Regional STWPF, WGRPP and COP HSLs for On-Line WGRs for the rolling future 168-hour period. Our forecasts attempt to predict HSL, which is uncurtailed power generation potential. Arguments: date (str): date to get report for. Supports "latest" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with hourly wind report data """ df = self._get_hourly_report( start=date, end=end, report_type_id=WIND_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_RTID, # noqa: E501 extension="csv", handle_doc=self._handle_hourly_wind_or_solar_report, verbose=True, ) return df[WIND_ACTUAL_AND_FORECAST_COLUMNS].sort_values( ["Interval Start", "Publish Time"], )
@support_date_range(frequency=None)
[docs] def get_wind_actual_and_forecast_by_geographical_region_hourly( self, date: str | datetime.date, end: str | datetime.date = None, verbose: bool = False, ): """Get Hourly Wind Report by geographical region Arguments: date (str): date to get report for. Supports "latest" verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with hourly wind report data """ df = self._get_hourly_report( start=date, end=end, report_type_id=WIND_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_BY_GEOGRAPHICAL_REGION_RTID, # noqa: E501 extension="csv", handle_doc=self._handle_hourly_wind_or_solar_report, verbose=True, ) return df[WIND_ACTUAL_AND_FORECAST_BY_GEOGRAPHICAL_REGION_COLUMNS].sort_values( ["Interval Start", "Publish Time"], )
@support_date_range(frequency=None)
[docs] def get_solar_actual_and_forecast_hourly( self, date: str | datetime.date, end: str | datetime.date = None, verbose: bool = False, ): """Get Hourly Solar Report. Arguments: date (str): date to get report for. Supports "latest" or a date string end (str, optional): end date for date range. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with hourly solar report data """ df = self._get_hourly_report( start=date, end=end, report_type_id=SOLAR_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_RTID, # noqa: E501 extension="csv", handle_doc=self._handle_hourly_wind_or_solar_report, verbose=True, ) return df[SOLAR_ACTUAL_AND_FORECAST_COLUMNS].sort_values( ["Interval Start", "Publish Time"], )
@support_date_range(frequency=None)
[docs] def get_solar_actual_and_forecast_by_geographical_region_hourly( self, date: str | datetime.date, end: str | datetime.date = None, verbose: bool = False, ): """Get Hourly Solar Report by geographical region Posted every hour and includes System-wide and geographic regional hourly averaged solar power production, STPPF, PVGRPP, and COP HSL for On-Line PVGRs for a rolling historical 48-hour period as well as the system-wide and regional STPPF, PVGRPP, and COP HSL for On-Line PVGRs for the rolling future 168-hour period. Arguments: date (str): date to get report for. Supports "latest" or a date string end (str, optional): end date for date range. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with hourly solar report data """ df = self._get_hourly_report( start=date, end=end, report_type_id=SOLAR_POWER_PRODUCTION_HOURLY_AVERAGED_ACTUAL_AND_FORECASTED_VALUES_BY_GEOGRAPHICAL_REGION_RTID, # noqa: E501 extension="csv", handle_doc=self._handle_hourly_wind_or_solar_report, verbose=True, ) return df[SOLAR_ACTUAL_AND_FORECAST_BY_GEOGRAPHICAL_REGION_COLUMNS].sort_values( ["Interval Start", "Publish Time"], )
def _handle_hourly_wind_or_solar_report( self, doc: Document, verbose: bool = False, ) -> pd.DataFrame: df = self.read_doc(doc, verbose=verbose) df.insert( 0, "Publish Time", pd.to_datetime(doc.publish_date).tz_convert(self.default_timezone), ) # replace _ in column names with spaces df.columns = df.columns.str.replace("_", " ") return self._rename_hourly_wind_or_solar_report(df) def _rename_hourly_wind_or_solar_report(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ # on Sept 26, 2024 ercot added this column "SYSTEM WIDE HSL": "HSL SYSTEM WIDE", # on Sept 26, 2024 ercot renamed these columns # let's rename new to the old # since it's more consistent with the rest of the data "SYSTEM WIDE GEN": "GEN SYSTEM WIDE", # on Sept 26, 2024 ercot renamed these columns in wind report # let's rename old names to new names # since it's more consistent with solar report "ACTUAL SYSTEM WIDE": "GEN SYSTEM WIDE", "ACTUAL LZ SOUTH HOUSTON": "GEN LZ SOUTH HOUSTON", "ACTUAL LZ WEST": "GEN LZ WEST", "ACTUAL LZ NORTH": "GEN LZ NORTH", # Older versions of wind power production by geographical region use # "ACTUAL" instead of "GEN" # https://data.ercot.com/data-product-archive/NP4-742-CD "ACTUAL PANHANDLE": "GEN PANHANDLE", "ACTUAL COASTAL": "GEN COASTAL", "ACTUAL SOUTH": "GEN SOUTH", "ACTUAL WEST": "GEN WEST", "ACTUAL NORTH": "GEN NORTH", }, ) # Add HSL SYSTEM WIDE if it is not in the data (older data may not have it) if "HSL SYSTEM WIDE" not in df: df["HSL SYSTEM WIDE"] = pd.NA return df
[docs] def get_reported_outages( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """ Retrieves the 5-minute data behind this dashboard: https://www.ercot.com/gridmktinfo/dashboards/generationoutages Data available at https://www.ercot.com/api/1/services/read/dashboards/generation-outages.json This data is ephemeral in that there is only one file available that is constantly updated. There is no historical data. """ logger.info("Downloading ERCOT reported outages data") json = requests.get( "https://www.ercot.com/api/1/services/read/dashboards/generation-outages.json", # noqa: E501 ).json() current = json["current"] previous = json["previous"] def flatten_dict(data: dict, prefix: str = "") -> dict: """ Recursive function to flatten nested dictionaries with prefix handling. Returns a new dictionary with the flattened data. """ flat_data = {} for key, value in data.items(): if isinstance(value, dict): flat_data.update(flatten_dict(value, f"{prefix}{key} ")) else: flat_data[(prefix + key).title()] = value return flat_data # Flatten each dictionary in the list previous_data = [flatten_dict(data) for data in previous.values()] current_data = [flatten_dict(data) for data in current.values()] df = pd.DataFrame.from_dict(current_data + previous_data) # need to use apply since there can be mixed # fixed offsets during dst transition # that result in object dtypes in pandas df["Time"] = df["Deliverytime"].apply( lambda x: pd.to_datetime(x).tz_convert("UTC"), ) df["Time"] = df["Time"].dt.tz_convert(self.default_timezone) df = utils.move_cols_to_front(df, ["Time"]).drop( columns=["Deliverytime", "Dstflag"], ) return df.sort_values("Time").reset_index(drop=True)
@support_date_range(frequency=None)
[docs] def get_hourly_resource_outage_capacity( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Hourly Resource Outage Capacity report sourced from the Outage Scheduler (OS). Returns outage data for for next 7 days. Total Resource MW doesn't include IRR, New Equipment outages, retirement of old equipment, seasonal mothballed (during the outage season), and mothballed. As such, it is a proxy for thermal outages. Arguments: date (str, pd.Timestamp): time to download. Returns last hourly report before this time. Supports "latest" end (str, pd.Timestamp, optional): end time to download. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with hourly resource outage capacity data """ df = self._get_hourly_report( start=date, end=end, report_type_id=HOURLY_RESOURCE_OUTAGE_CAPACITY_RTID, extension="csv", handle_doc=self._handle_hourly_resource_outage_capacity, ) return df
def _handle_hourly_resource_outage_capacity( self, doc: Document, verbose: bool = False, ) -> pd.DataFrame: df = self.read_doc(doc, parse=False, verbose=verbose) # there is no DST flag column # and the data set ignores DST # so, we will default to assuming it is DST. We will also # set nonexistent times to NaT and drop them df = self.parse_doc( df, dst_ambiguous_default=True, nonexistent="NaT", verbose=verbose, ) df = df.dropna(subset=["Interval Start"]) df.insert( 0, "Publish Time", pd.to_datetime(doc.publish_date).tz_convert(self.default_timezone), ) return self._handle_hourly_resource_outage_capacity_df(df) def _handle_hourly_resource_outage_capacity_df( self, df: pd.DataFrame, ) -> pd.DataFrame: outage_types = ["Total Resource", "Total IRR", "Total New Equip Resource"] # Earlier data doesn't have these columns if all( col in df.columns for col in [ "TotalResourceMWZoneSouth", "TotalResourceMWZoneNorth", "TotalResourceMWZoneWest", "TotalResourceMWZoneHouston", ] ): for t in outage_types: t_no_space = t.replace(" ", "") df = df.rename( columns={ f"{t_no_space}MWZoneSouth": f"{t} MW Zone South", f"{t_no_space}MWZoneNorth": f"{t} MW Zone North", f"{t_no_space}MWZoneWest": f"{t} MW Zone West", f"{t_no_space}MWZoneHouston": f"{t} MW Zone Houston", }, ) df.insert( df.columns.tolist().index(f"{t} MW Zone Houston") + 1, f"{t} MW", ( df[f"{t} MW Zone South"] + df[f"{t} MW Zone North"] + df[f"{t} MW Zone West"] + df[f"{t} MW Zone Houston"] ), ) else: df = df.rename( columns={ "TotalResourceMW": "Total Resource MW", "TotalIRRMW": "Total IRR MW", "TotalNewEquipResourceMW": "Total New Equip Resource MW", }, ) return df @support_date_range(frequency=None)
[docs] def get_unplanned_resource_outages( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Unplanned Resource Outages. Data published at ~5am central on the 3rd day after the day of interest. Since the date argument is the publish date, if you want to get data for a specific date, pass in the date of interest - 3 days. Arguments: date (str, datetime): publish date of the report end (str, datetime, optional): end date to download. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with unplanned resource outages """ docs = self._get_documents( report_type_id=UNPLANNED_RESOURCE_OUTAGES_REPORT_RTID, published_after=date, # If no end provided, use the date plus 1 day published_before=end or date + pd.DateOffset(days=1), verbose=verbose, ) dfs = [] for doc in docs: xls = utils.get_zip_file(doc.url, verbose=verbose) df = self._handle_unplanned_resource_outages_file(doc, xls) dfs.append(df) complete_df = pd.concat(dfs, ignore_index=True) return complete_df
def _handle_unplanned_resource_outages_file( self, doc: Document, xls: ZipFile, ) -> pd.DataFrame: as_of = pd.to_datetime( pd.read_excel( xls, sheet_name="Unplanned Resource Outages", skiprows=2, nrows=1, ) .values[0][0] .split(": ")[1], ).tz_localize(self.default_timezone) df = pd.read_excel( xls, sheet_name="Unplanned Resource Outages", skiprows=4, skipfooter=1, ) # Already has local timezone df["Publish Time"] = pd.to_datetime(doc.publish_date) df.insert(0, "Current As Of", as_of) time_cols = ["Actual Outage Start", "Planned End Date", "Actual End Date"] for col in time_cols: # data doesn't have DST info. So just assume it is DST # when ambiguous \_(-_-)_/ df[col] = pd.to_datetime(df[col]).dt.tz_localize( self.default_timezone, ambiguous=True, ) df = utils.move_cols_to_front( df, [ "Current As Of", "Publish Time", "Actual Outage Start", "Planned End Date", "Actual End Date", "Resource Name", "Resource Unit Code", "Fuel Type", "Outage Type", "Nature Of Work", "Available MW Maximum", "Available MW During Outage", "Effective MW Reduction Due to Outage", ], ) return df @support_date_range("DAY_START")
[docs] def get_as_reports( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get Ancillary Services Reports. Published with a 2 day delay around 3am central """ # This method is not supported starting with the file published on 2025-12-08 # (with data for 2025-12-06) if date == "latest" or date >= pd.Timestamp( "2025-12-06", tz=self.default_timezone, ): raise ValueError( "This method is not supported starting with the file published on 2025-12-08 (with data for 2025-12-06) because the data significantly changed with the launch of ERCOT RTC+B. Please use get_reports_as_dam on or after this date.", ) report_date = date.normalize() + pd.DateOffset(days=2) doc = self._get_document( report_type_id=TWO_DAY_ANCILLARY_SERVICES_REPORTS_RTID, date=report_date, verbose=verbose, ) return self._handle_as_reports_file(doc.url, verbose=verbose)
def _handle_as_reports_file( self, file_path: str, verbose: bool = False, **kwargs, ) -> pd.DataFrame: z = utils.get_zip_folder(file_path, verbose=verbose, **kwargs) # extract the date from the file name date_str = z.namelist()[0][-13:-4] # Legacy method uses slightly different product lists self_arranged_products = AS_PRODUCTS cleared_products = [p for p in AS_PRODUCTS if p != "NSPNM"] offers_products = [ "RRSPFR", "RRSUFR", "RRSFFR", "ECRSM", "ECRSS", "REGUP", "REGDN", "ONNS", "OFFNS", ] prefix = self._get_as_report_prefix(z) all_dfs = [] for as_name in cleared_products: suffix = f"{as_name}-{date_str}.csv" cleared = f"{prefix}_Cleared_DAM_AS_{suffix}" if as_name in AS_EXCLUDE_PRODUCTS and cleared not in z.namelist(): continue df_cleared = pd.read_csv(z.open(cleared)) all_dfs.append(df_cleared) for as_name in self_arranged_products: suffix = f"{as_name}-{date_str}.csv" self_arranged = f"{prefix}_Self_Arranged_AS_{suffix}" if as_name in AS_EXCLUDE_PRODUCTS and self_arranged not in z.namelist(): continue df_self_arranged = pd.read_csv(z.open(self_arranged)) all_dfs.append(df_self_arranged) def _make_bid_curve(df: pd.DataFrame) -> list[list[float]]: return [ list(x) for x in df[["MW Offered", f"{as_name} Offer Price"]].values.tolist() ] for as_name in offers_products: suffix = f"{as_name}-{date_str}.csv" # Starting 2025-12-08, files have DAM in the name offers = f"{prefix}_Agg_AS_Offers_{suffix}" if as_name in AS_EXCLUDE_PRODUCTS and offers not in z.namelist(): continue df_offers = pd.read_csv(z.open(offers)) name = f"Bid Curve - {as_name}" if df_offers.empty: # use last df to get the index # and set values to None df_offers_hourly = all_dfs[0].rename( columns={ all_dfs[0].columns[-1]: name, }, ) df_offers_hourly[name] = None else: df_offers_hourly = ( df_offers.groupby(["Delivery Date", "Hour Ending"]) .apply(_make_bid_curve, include_groups=False) .reset_index(name=name) ) all_dfs.append(df_offers_hourly) df = pd.concat( [df.set_index(["Delivery Date", "Hour Ending"]) for df in all_dfs], axis=1, ).reset_index() return self.parse_doc(df, verbose=verbose) def _get_as_report_document( self, date: str | pd.Timestamp, report_type_id: str, verbose: bool = False, ) -> Document: """Get the AS report document for a given date. Handles "latest" date and applies the 2-day delay offset. Arguments: date: date to fetch reports for (or "latest") report_type_id: RTID of the report type to fetch verbose: print verbose output Returns: Document: The document for the requested date """ if date == "latest": date = self.local_now().normalize() - pd.DateOffset(days=2) report_date = date.normalize() + pd.DateOffset(days=2) return self._get_document( report_type_id=report_type_id, date=report_date, verbose=verbose, ) def _get_as_report_prefix(self, z: ZipFile) -> str: """Determine the file prefix for AS report files. Earlier files use '48h' prefix, newer files use '2d'. Arguments: z: ZipFile containing the AS report files Returns: str: The prefix ('2d' or '48h') """ if z.namelist()[0].split("_")[0] == "48h": return "48h" return "2d" @staticmethod def _make_offer_curve( group_df: pd.DataFrame, mw_col: str = "MW Offered", price_col: str = "Offer Price", ) -> list[list[float]]: """Create an offer curve as a list of [MW, Price] pairs. Arguments: group_df: DataFrame containing MW and price columns mw_col: Name of the MW column price_col: Name of the price column Returns: list[list[float]]: List of [MW, Price] pairs """ return [[mw, price] for mw, price in zip(group_df[mw_col], group_df[price_col])] @support_date_range("DAY_START")
[docs] def get_as_reports_dam( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get Day-Ahead Market Ancillary Services Reports. Published with a 2 day delay around 3am central. Contains cleared, self-arranged, and bid curve data for each AS product. Arguments: date: date to fetch reports for verbose: print verbose output Returns: pandas.DataFrame: A DataFrame with DAM ancillary services reports """ doc = self._get_as_report_document( date=date, report_type_id=TWO_DAY_ANCILLARY_SERVICES_REPORTS_RTID, verbose=verbose, ) return self._handle_as_reports_dam_file(doc.url, verbose=verbose)
def _handle_as_reports_dam_file( self, file_path: str, verbose: bool = False, **kwargs, ) -> pd.DataFrame: """Parse DAM AS reports into long format with columns: interval_start, interval_end, as_type, cleared, self_arranged, offer_curve """ z = utils.get_zip_folder(file_path, verbose=verbose, **kwargs) # extract the date from the file name date_str = z.namelist()[0][-13:-4] prefix = self._get_as_report_prefix(z) # Process each AS product product_dfs = [] for as_name in AS_PRODUCTS: suffix = f"{as_name}-{date_str}.csv" cleared_file = f"{prefix}_Cleared_DAM_AS_{suffix}" self_arranged_file = f"{prefix}_Self_Arranged_AS_{suffix}" # Files before 2025-12-08 (before RTC+B) do not have DAM in the name. offers_file = ( f"{prefix}_Agg_DAM_AS_Offers_{suffix}" if pd.Timestamp(date_str, tz=self.default_timezone) >= pd.Timestamp("2025-12-08", tz=self.default_timezone) else f"{prefix}_Agg_AS_Offers_{suffix}" ) # Skip if product not in this file if as_name in AS_EXCLUDE_PRODUCTS and cleared_file not in z.namelist(): continue # Read cleared data df_cleared = None if cleared_file in z.namelist(): df_cleared = pd.read_csv(z.open(cleared_file)) df_cleared = df_cleared.rename( columns={df_cleared.columns[-1]: "Cleared"}, ) # Read self-arranged data df_self = None if self_arranged_file in z.namelist(): df_self = pd.read_csv(z.open(self_arranged_file)) df_self = df_self.rename( columns={df_self.columns[-1]: "Self Arranged"}, ) # Read offers data and create bid curves df_offers = None if offers_file in z.namelist(): df_offers_raw = pd.read_csv(z.open(offers_file)) if not df_offers_raw.empty: # Create offer curve as list of [MW, Price] pairs def _make_offer_curve(group_df: pd.DataFrame) -> list[list[float]]: return [ [mw, price] for mw, price in zip( group_df["MW Offered"], group_df[f"{as_name} Offer Price"], ) ] df_offers = ( df_offers_raw.groupby(["Delivery Date", "Hour Ending"]) .apply(_make_offer_curve, include_groups=False) .reset_index(name="Offer Curve") ) # Merge cleared, self-arranged, and offers data df_product = None if df_cleared is not None: df_product = df_cleared.copy() if df_self is not None: if df_product is None: df_product = df_self.copy() else: df_product = df_product.merge( df_self[["Delivery Date", "Hour Ending", "Self Arranged"]], on=["Delivery Date", "Hour Ending"], how="outer", ) if df_offers is not None: if df_product is None: df_product = df_offers.copy() else: df_product = df_product.merge( df_offers, on=["Delivery Date", "Hour Ending"], how="outer", ) if df_product is not None: # Add AS Type column df_product["AS Type"] = as_name product_dfs.append(df_product) if not product_dfs: raise NoDataFoundException("No DAM AS reports found in zip file") # Combine all products into long format df = pd.concat(product_dfs, ignore_index=True) # Select and order columns df = df[ [ "Delivery Date", "Hour Ending", "AS Type", "Cleared", "Self Arranged", "Offer Curve", ] ] return self.parse_doc(df, verbose=verbose) @support_date_range("DAY_START")
[docs] def get_as_reports_sced( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], verbose: bool = False, ) -> pd.DataFrame: """Get 2-Day SCED Ancillary Service Disclosure Reports. Published with a 2 day delay around 3am central. Contains offer curves (MW offered and price) for each AS product at each SCED timestamp. Output columns: SCED Timestamp, AS Type, Offer Curve Arguments: date: date to fetch reports for verbose: print verbose output Returns: pandas.DataFrame: A DataFrame with SCED ancillary services offers """ doc = self._get_as_report_document( date=date, report_type_id=TWO_DAY_SCED_ANCILLARY_SERVICES_REPORTS_RTID, verbose=verbose, ) return self._handle_as_reports_sced_file(doc.url, verbose=verbose)
def _handle_as_reports_sced_file( self, file_path: str, verbose: bool = False, **kwargs, ) -> pd.DataFrame: """Parse SCED AS reports with columns: sced_timestamp, as_type, offer_curve (list of [MW, Price] pairs) Based on primary key: sced_timestamp, as_type, mw_offered But output format uses offer_curve column containing list of pairs """ z = utils.get_zip_folder(file_path, verbose=verbose, **kwargs) all_dfs = [] for file_name in z.namelist(): # Skip non-CSV files if not file_name.endswith(".csv"): continue # Determine AS type from file name as_type = None for product in AS_PRODUCTS: if product in file_name.upper(): as_type = product break if as_type is None: continue df = pd.read_csv(z.open(file_name)) if df.empty: continue # Add AS Type column df["AS Type"] = as_type # Find the price column - it should contain "Offer Price" price_col = None for col in df.columns: if "Offer Price" in col: price_col = col break if price_col is None: continue # Rename columns to standardized names df = df.rename(columns={price_col: "Offer Price"}) # Create offer curve as list of [MW, Price] pairs for each SCED timestamp def _make_offer_curve(group_df: pd.DataFrame) -> list[list[float]]: return [ [mw, price] for mw, price in zip( group_df["MW Offered"], group_df["Offer Price"], ) ] df_grouped = ( df.groupby(["SCED Timestamp", "AS Type"]) .apply(_make_offer_curve, include_groups=False) .reset_index(name="Offer Curve") ) all_dfs.append(df_grouped) if not all_dfs: raise NoDataFoundException("No SCED AS reports found in zip file") df = pd.concat(all_dfs, ignore_index=True) # Parse SCED Timestamp directly (it's already a timestamp, not date + hour) df["SCED Timestamp"] = pd.to_datetime(df["SCED Timestamp"]) # Convert to local timezone if df["SCED Timestamp"].dt.tz is None: df["SCED Timestamp"] = df["SCED Timestamp"].dt.tz_localize( self.default_timezone, ) else: df["SCED Timestamp"] = df["SCED Timestamp"].dt.tz_convert( self.default_timezone, ) df = df[["SCED Timestamp", "AS Type", "Offer Curve"]] df = df.sort_values("SCED Timestamp").reset_index(drop=True) return df @support_date_range("DAY_START")
[docs] def get_dam_system_lambda( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Day-Ahead Market System Lambda File is typically published around 12:30 pm for the day ahead https://www.ercot.com/mp/data-products/data-product-details?id=NP4-523-CD Arguments: date (str, datetime): date to get data for end (str, datetime, optional): end time to get data for. If None, return 1 day of data. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with day-ahead market system lambda data """ # Subtract one day since this is the day ahead market date = date if date == "latest" else date - pd.DateOffset(days=1) doc = self._get_document( report_type_id=DAM_SYSTEM_LAMBDA_RTID, date=date, verbose=verbose, ) return self._handle_dam_system_lambda_file(doc, verbose=verbose)
def _handle_dam_system_lambda_file( self, doc: Document, verbose: bool = False, ) -> pd.DataFrame: df = self.read_doc(doc, parse=True, verbose=verbose) # Set the publish time from the document metadata df["Publish Time"] = pd.to_datetime(doc.publish_date) df["Market"] = "DAM" df = utils.move_cols_to_front( df.drop(columns=["Time"]).rename(columns={"SystemLambda": "System Lambda"}), [ "Interval Start", "Interval End", "Publish Time", "Market", "System Lambda", ], ) return df @support_date_range(frequency=None)
[docs] def get_sced_system_lambda( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get System lambda of each successful SCED Normally published every 5 minutes Arguments: date (str, datetime, pd.Timestamp): date or start time to get data for end (str, datetime, optional): end time to get data for. If None, return 1 day of data. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame """ # no end, so assume requesting one day # use the timestamp from the friendly name if date == "latest": date = date friendly_name_timestamp_after = None friendly_name_timestamp_before = None elif end is None: friendly_name_timestamp_after = date.normalize() friendly_name_timestamp_before = ( friendly_name_timestamp_after + pd.DateOffset(days=1) ) date = None else: friendly_name_timestamp_after = date friendly_name_timestamp_before = end date = None docs = self._get_documents( report_type_id=SCED_SYSTEM_LAMBDA_RTID, date=date, friendly_name_timestamp_after=friendly_name_timestamp_after, friendly_name_timestamp_before=friendly_name_timestamp_before, verbose=verbose, constructed_name_contains="csv.zip", ) df = self._handle_sced_system_lambda(docs, verbose=verbose) return df
def _handle_sced_timestamp( self, df: pd.DataFrame, verbose: bool = False, ) -> pd.DataFrame: df = df.rename( columns={ "RepeatHourFlag": "RepeatedHourFlag", "SCEDTimeStamp": "SCED Timestamp", "SCEDTimestamp": "SCED Timestamp", }, ) # Some files have errors with the timestamp df["SCED Timestamp"] = pd.to_datetime( df["SCED Timestamp"], errors="coerce", ).dt.tz_localize( self.default_timezone, ambiguous=df["RepeatedHourFlag"] == "N", ) df = df.dropna(subset=["SCED Timestamp"]) # SCED runs at least every 5 minutes. These values are only approximations, # not exact. # Round to nearest 5 minutes df["Interval Start"] = df["SCED Timestamp"].dt.floor( "5min", ambiguous=df["RepeatedHourFlag"] == "N", ) df["Interval End"] = df["Interval Start"] + pd.Timedelta(minutes=5) df = df.drop("RepeatedHourFlag", axis=1) return df def _handle_sced_system_lambda( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: all_dfs = [] for doc in tqdm.tqdm( docs, desc="Reading SCED System Lambda files", disable=not verbose, ): logger.info(f"Reading {doc.url}") df = pd.read_csv(doc.url, compression="zip") all_dfs.append(df) if len(all_dfs) == 0: df = pd.DataFrame( columns=["SCEDTimeStamp", "RepeatedHourFlag", "SystemLambda"], ) else: df = pd.concat(all_dfs) df = self._handle_sced_timestamp(df, verbose=verbose) df["SystemLambda"] = df["SystemLambda"].astype("float64") df = df.rename( columns={ "SystemLambda": "System Lambda", }, ) df.sort_values("SCED Timestamp", inplace=True) return df[["Interval Start", "Interval End", "SCED Timestamp", "System Lambda"]] @support_date_range("DAY_START")
[docs] def get_highest_price_as_offer_selected( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get the offer price and the name of the Entity submitting the offer for the highest-priced Ancillary Service (AS) Offer. Published with 3 day delay Arguments: date (str, datetime): date to get data for verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrameq """ # This report ends on 2025-12-05 if date == "latest" or date >= pd.Timestamp( "2025-12-06", tz=self.default_timezone, ): raise ValueError( "This method is not supported starting with the file published on 2025-12-06 (with data for 2025-12-06) because the data changed with the launch of ERCOT RTC+B. Please use get_highest_price_as_offer_selected_dam on or after this date.", ) report_date = date.normalize() + pd.DateOffset(days=3) doc = self._get_document( report_type_id=THREE_DAY_DAM_HIGHEST_PRICE_AS_OFFER_SELECTED_RTID, date=report_date, verbose=verbose, ) df = self._handle_highest_price_as_offer_selected_file( doc, market="dam", verbose=verbose, ) # Add Time and Market columns for backwards compatibility df["Time"] = df["Interval Start"] df["Market"] = "dam" # Reorder columns to match old format return df[ [ "Time", "Interval Start", "Interval End", "Market", "QSE", "DME", "Resource Name", "AS Type", "Block Indicator", "Offered Price", "Total Offered Quantity", "Offered Quantities", ] ]
@support_date_range("DAY_START")
[docs] def get_highest_price_as_offer_selected_dam( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get the offer price and the name of the Entity submitting the offer for the highest-priced Ancillary Service (AS) Offer selected in the Day-Ahead Market (DAM). Published with 3 day delay Arguments: date (str, datetime): date to get data for end (str, datetime, optional): end date for date range verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with columns: - Interval Start - Interval End - QSE - DME - Resource Name - AS Type - Block Indicator - Offered Price - Total Offered Quantity - Offered Quantities """ report_date = date.normalize() + pd.DateOffset(days=3) doc = self._get_document( report_type_id=THREE_DAY_DAM_HIGHEST_PRICE_AS_OFFER_SELECTED_RTID, date=report_date, verbose=verbose, ) df = self._handle_highest_price_as_offer_selected_file( doc, market="dam", verbose=verbose, ) return ( df[ [ "Interval Start", "Interval End", "QSE", "DME", "Resource Name", "AS Type", "Block Indicator", "Offered Price", "Total Offered Quantity", "Offered Quantities", ] ] .sort_values(["Interval Start", "AS Type", "Resource Name"]) .reset_index(drop=True) )
@support_date_range("DAY_START")
[docs] def get_highest_price_as_offer_selected_sced( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get the offer price and the name of the Entity submitting the offer for the highest-priced Ancillary Service (AS) Offer selected in the Real-Time Market (SCED). Published with 3 day delay Arguments: date (str, datetime): date to get data for end (str, datetime, optional): end date for date range verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with columns: - SCED Timestamp - QSE - DME - Resource Name - AS Type - Offered Price - Total Offered Quantity - Offered Quantities """ report_date = date.normalize() + pd.DateOffset(days=3) doc = self._get_document( report_type_id=THREE_DAY_SCED_HIGHEST_PRICE_AS_OFFER_SELECTED_RTID, date=report_date, verbose=verbose, ) df = self._handle_highest_price_as_offer_selected_file( doc, market="sced", verbose=verbose, ) return ( df[ [ "SCED Timestamp", "QSE", "DME", "Resource Name", "AS Type", "Offered Price", "Total Offered Quantity", "Offered Quantities", ] ] .sort_values(["SCED Timestamp", "AS Type", "Resource Name"]) .reset_index(drop=True) )
@support_date_range("DAY_START")
[docs] def get_3_day_highest_price_sced( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get the bid price and name of the Load Resource submitting the highest-priced bid selected or dispatched by SCED for the given operating day. Published daily, three days after the applicable operating day. https://www.ercot.com/mp/data-products/data-product-details?id=np3-257-ex Arguments: date: operating day to fetch data for. end: optional end operating day for date range queries. verbose: print verbose output. Returns: pandas.DataFrame with columns: Interval Start, Interval End, SCED Timestamp, QSE, DME, Load Resource, Highest Price Dispatched by SCED, Proxy Extension. """ report_date = date.normalize() + pd.DateOffset(days=3) doc = self._get_document( report_type_id=THREE_DAY_HIGHEST_PRICE_BIDS_SCED_RTID, date=report_date, verbose=verbose, ) df = self.read_doc(doc, parse=False, verbose=verbose) return self._handle_3_day_highest_price_sced(df)
def _handle_3_day_highest_price_sced(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ "SCED Time Stamp": "SCED Timestamp", "Repeated Hour Flag": "RepeatedHourFlag", }, ) df = self._handle_sced_timestamp(df) df["Highest Price Dispatched by SCED"] = pd.to_numeric( df["Highest Price Dispatched by SCED"], errors="coerce", ) return ( df[ [ "Interval Start", "Interval End", "SCED Timestamp", "QSE", "DME", "Load Resource", "Highest Price Dispatched by SCED", "Proxy Extension", ] ] .sort_values(["SCED Timestamp", "QSE", "DME", "Load Resource"]) .reset_index(drop=True) ) def _handle_highest_price_as_offer_selected_file( self, doc: Document, market: str, verbose: bool = False, ) -> pd.DataFrame: """Handle the new 3-day highest price AS offer selected files for both DAM and SCED markets. The new files (post RTC+B) don't have a Market column, so we add it manually based on the market parameter. """ df = self.read_doc(doc, verbose=verbose, parse=False) # Determine format based on market parameter is_sced_format = market.lower() == "sced" df = df.rename( columns={ # New data (post RTC+B) "Resource Name with Highest-Priced AS Offer Selected in DAM": "Resource Name", # noqa: E501 "Resource Name with Highest-Priced AS Offer Selected in SCED": "Resource Name", # noqa: E501 # SCED has spaces around the dash "Resource Name with Highest - Priced AS Offer Selected in SCED": "Resource Name", # noqa: E501 # Older data "Resource Name with Highest-Priced Offer Selected in DAM and SASMs": "Resource Name", # noqa: E501 "Resource Name with Highest-Priced Offer Selected in DAM": "Resource Name", # noqa: E501 }, ) def _handle_offers(df: pd.DataFrame) -> pd.Series: return pd.Series( { "Offered Price": df["Offered Price"].iloc[0], "Total Offered Quantity": df["Offered Quantity"].sum(), "Offered Quantities": df["Offered Quantity"].tolist(), }, ) if is_sced_format: # SCED format: Parse SCED Timestamp directly # SCED Timestamp format is like "12/07/2025 00:00:17" # TODO: this dataset doesn't have a DSTFlag so we may have issues with # DST transitions df["SCED Timestamp"] = pd.to_datetime(df["SCED Timestamp"]).dt.tz_localize( self.default_timezone, ) # Group by SCED Timestamp and resource identifiers only df = ( df.groupby( ["SCED Timestamp", "QSE", "DME", "Resource Name", "AS Type"], dropna=False, ) .apply(_handle_offers, include_groups=False) .reset_index() ) else: # DAM format: Parse Hour Ending and Delivery Date is_dst_end = 25 in df["Hour Ending"].values # TODO: we haven't been able to test this with an actual DST transition # so we may have to update this logic if not is_dst_end: df = self.parse_doc(df) else: # Hours go up to 25. Assume hour 2 is CDT and hour 3 is CST df["Interval Start"] = ( pd.to_datetime(df["Delivery Date"]) + pd.to_timedelta(df["Hour Ending"] - 1, unit="h") ).dt.tz_localize( self.default_timezone, ambiguous=df["Hour Ending"] == 2, ) df.loc[df["Hour Ending"] >= 3, "Interval Start"] = df.loc[ df["Hour Ending"] >= 3, "Interval Start", ] - pd.Timedelta(hours=1) df["Interval End"] = df["Interval Start"] + pd.Timedelta(hours=1) # Group by intervals and resource identifiers df = ( df.groupby( [ "Interval Start", "Interval End", "QSE", "DME", "Resource Name", "AS Type", "Block Indicator", ], dropna=False, # Have to include missing because older data has missing # values in some columns ) .apply(_handle_offers, include_groups=False) .reset_index() ) return df
[docs] def get_dam_price_corrections( self, dam_type: str, verbose: bool = False, ) -> pd.DataFrame: """ Get DAM Price Corrections Arguments: dam_type (str): 'DAM_SPP', 'DAM_EBLMP' """ docs = self._get_documents( report_type_id=DAM_PRICE_CORRECTIONS_RTID, constructed_name_contains=dam_type, extension="csv", verbose=verbose, ) df = self._handle_price_corrections(docs, verbose=verbose) return df
[docs] def get_rtm_price_corrections( self, rtm_type: str, verbose: bool = False, ) -> pd.DataFrame: """ Get RTM Price Corrections Arguments: rtm_type (str): 'RTM_SPP', 'RTM_SPLMP', 'RTM_EBLMP', 'RTM_ShadowPrice', 'RTM_SOGLMP', 'RTM_SOGPRICE' """ docs = self._get_documents( report_type_id=RTM_PRICE_CORRECTIONS_RTID, constructed_name_contains=rtm_type, extension="csv", verbose=verbose, ) df = self._handle_price_corrections(docs, verbose=verbose) return df
[docs] def get_mcpc_dam_price_corrections( self, verbose: bool = False, ) -> pd.DataFrame: """ Get Market Clearing Price for Capacity (MCPC) corrections for DAM. MCPC (Market Clearing Price for Capacity) corrections contain ancillary service prices at the system level. Returns: pd.DataFrame: DataFrame with columns: - Price Correction Time - Interval Start - Interval End - AS Type - MCPC Original - MCPC Corrected """ docs = self._get_documents( report_type_id=DAM_PRICE_CORRECTIONS_RTID, constructed_name_contains="DAM_MCPC", extension="csv", verbose=verbose, ) df = self._handle_mcpc_price_corrections(docs, verbose=verbose) return df
def _handle_price_corrections( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: df = self.read_docs(docs, verbose=verbose) df = self._handle_settlement_point_name_and_type(df) df = df.rename( columns={ "SettlementPointName": "Settlement Point Name", "SettlementPoint": "Settlement Point Name", "SettlementPointType": "Settlement Point Type", "SPPOriginal": "SPP Original", "SPPCorrected": "SPP Corrected", "PriceCorrectionTime": "Price Correction Time", }, ) df["Price Correction Time"] = pd.to_datetime( df["Price Correction Time"], ).dt.tz_localize(self.default_timezone) df = df[ [ "Price Correction Time", "Interval Start", "Interval End", "Location", "Location Type", "SPP Original", "SPP Corrected", ] ] return df def _handle_mcpc_price_corrections( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: """Handle MCPC (Market Clearing Price for Capacity) price corrections. MCPC corrections have a different structure than SPP corrections: - They are for ancillary services at the system level (not settlement points) - read_docs() already creates Interval Start/End columns """ df = self.read_docs(docs, verbose=verbose) # Rename columns to match gridstatus conventions df = df.rename( columns={ "ASType": "AS Type", "MCPCOriginal": "MCPC Original", "MCPCCorrected": "MCPC Corrected", "PriceCorrectionTime": "Price Correction Time", }, ) # Parse Price Correction Time df["Price Correction Time"] = pd.to_datetime( df["Price Correction Time"], ).dt.tz_localize(self.default_timezone) # Select and order final columns df = df[ [ "Price Correction Time", "Interval Start", "Interval End", "AS Type", "MCPC Original", "MCPC Corrected", ] ] return df @support_date_range(frequency=None)
[docs] def get_system_wide_actual_load( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get 15-minute system-wide actual load. This report is posted every hour five minutes after the hour. Args: date (str, datetime): date to get data for end (str, datetime, optional): end time to get data for. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with system actuals data """ report_type_id = SYSTEM_WIDE_ACTUALS_RTID if date == "latest": # Go back one hour to ensure we have data date = pd.Timestamp.now(tz=self.default_timezone).floor("h") - pd.Timedelta( hours=1, ) if end is None: doc = self._get_document( report_type_id=report_type_id, published_after=date + pd.Timedelta(hours=1), published_before=date + pd.Timedelta(hours=2), extension="csv", verbose=verbose, ) docs = [doc] else: docs = self._get_documents( report_type_id=report_type_id, published_after=date + pd.Timedelta(hours=1), published_before=end + pd.Timedelta(hours=1), extension="csv", verbose=verbose, ) all_df = [ self._handle_system_wide_actual_load(doc, verbose=verbose) for doc in docs ] return pd.concat(all_df).sort_values("Interval Start")
def _handle_system_wide_actual_load(self, doc, verbose=False): return self.read_doc(doc, verbose=verbose) @support_date_range("HOUR_START")
[docs] def get_short_term_system_adequacy( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Short Term System Adequacy published between date and end. Arguments: date (str, datetime): date to get data for verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with system adequacy data """ return self._get_hourly_report( start=date, end=end, report_type_id=SHORT_TERM_SYSTEM_ADEQUACY_REPORT_RTID, handle_doc=self._handle_short_term_system_adequacy_file, verbose=verbose, extension="csv", ).sort_values(["Interval Start", "Publish Time"])
def _handle_short_term_system_adequacy_file( self, doc: Document, verbose: bool = False, ) -> pd.DataFrame: df = self.read_doc(doc, verbose=verbose) df["Publish Time"] = doc.publish_date df = utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time"], ).drop(columns=["Time"]) df = df.rename( columns={ "CapGenResSouth": "Capacity Generation Resource South", "CapGenResNorth": "Capacity Generation Resource North", "CapGenResWest": "Capacity Generation Resource West", "CapGenResHouston": "Capacity Generation Resource Houston", "CapLoadResSouth": "Capacity Load Resource South", "CapLoadResNorth": "Capacity Load Resource North", "CapLoadResWest": "Capacity Load Resource West", "CapLoadResHouston": "Capacity Load Resource Houston", "OfflineAvailableMWSouth": "Offline Available MW South", "OfflineAvailableMWNorth": "Offline Available MW North", "OfflineAvailableMWWest": "Offline Available MW West", "OfflineAvailableMWHouston": "Offline Available MW Houston", "AvailCapGen": "Available Capacity Generation", "AvailCapReserve": "Available Capacity Reserve", "CapGenResTotal": "Capacity Generation Resource Total", "CapLoadResTotal": "Capacity Load Resource Total", "OfflineAvailableMWTotal": "Offline Available MW Total", "CapREGUPTotal": "Capacity Reg Up Total", "CapREGDNTotal": "Capacity Reg Down Total", "CapRRSTotal": "Capacity RRS Total", "CapECRSTotal": "Capacity ECRS Total", "CapNSPINTotal": "Capacity NSPIN Total", "CapREGUP_RRSTotal": "Capacity Reg Up RRS Total", "CapREGUP_RRS_ECRSTotal": "Capacity Reg Up RRS ECRS Total", "CapREGUP_RRS_ECRS_NSPINTotal": "Capacity Reg Up RRS ECRS NSPIN Total", }, ) return df @support_date_range(frequency=None)
[docs] def get_real_time_adders_and_reserves( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time ORDC and Reliability Deployment Price Adders and Reserves by SCED Interval At: https://www.ercot.com/mp/data-products/data-product-details?id=NP6-323-CD Arguments: date (str, datetime): date to get data for end (str, datetime): end date to get data for verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with ORDC data NOTE: data only goes back 5 days """ if date == "latest": docs = [ self._get_document( report_type_id=REAL_TIME_ADDERS_AND_RESERVES_RTID, published_before=date, verbose=verbose, ), ] else: # Set date to get a full day of published data if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=REAL_TIME_ADDERS_AND_RESERVES_RTID, published_after=date, published_before=end, extension="csv", verbose=verbose, ) return self._handle_real_time_adders_and_reserves_docs(docs, verbose=verbose)
def _handle_real_time_adders_and_reserves_docs( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: df = self.read_docs(docs, parse=False, verbose=verbose) df = self._handle_sced_timestamp(df) df = utils.move_cols_to_front( df, ["SCED Timestamp", "Interval Start", "Interval End", "BatchID"], ) df = df.rename(columns={"SystemLambda": "System Lambda"}) return df.sort_values("SCED Timestamp") @support_date_range(frequency=None)
[docs] def get_temperature_forecast_by_weather_zone( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get temperature forecast by weather zone in hourly intervals. Published once a day at 5 am central. Arguments: date (str, datetime): date to get data for end (str, datetime, optional): end time to get data for. Defaults to None. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with temperature forecast data """ if date == "latest": return self.get_temperature_forecast_by_weather_zone( "today", verbose=verbose, ) else: # Set end to get a full day of published data if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=TEMPERATURE_FORECAST_BY_WEATHER_ZONE_RTID, extension="csv", published_after=date, published_before=end, ) return self._handle_temperature_forecast_by_weather_zone_docs(docs, verbose)
def _handle_temperature_forecast_by_weather_zone_docs( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: # Process files in a loop to add the publish time for each doc df = pd.concat( [ self.read_doc(doc, verbose=verbose, parse=False).assign( **{"Publish Time": doc.publish_date}, ) for doc in docs ], ) # For the 2025 DST end transition, the raw data looks like # DeliveryDate,HourEnding,Coast,East,FarWest,North,NorthCentral,SouthCentral,Southern,West,DSTFlag # 11/02/2025,01:00,63.3, 58, 60, 57, 60, 63.5, 71.4, 59,N # 11/02/2025,02:00,124.3, 113, 114, 111, 116.25, 120.5, 140.6, 112.8,N # 11/02/2025,03:00,60.9, 55, 54, 54, 56.25, 58, 68.8, 54.2,Y # 11/02/2025,03:00,60.9, 55, 54, 54, 56.25, 58, 68.8, 54.2,N # The 3:00 ending hour is duplicated when it should be the 2:00 ending hour # (We will not correct the obviously wrong temperature values) dst_transition_date_2025 = "11/02/2025" if dst_transition_date_2025 in df["DeliveryDate"].unique(): mask = ( (df["DeliveryDate"] == "11/02/2025") & (df["HourEnding"] == "03:00") & (df["DSTFlag"] == "Y") ) df.loc[mask, "HourEnding"] = "2:00" df = self.parse_doc(df) df = df.drop(columns=["Time"]).rename( columns=self._weather_zone_column_name_mapping(), ) df = utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time"] + list(self._weather_zone_column_name_order()), ) # NOTE(kladar): ERCOT is currently publishing a duplicate for the Fall 2024 DST transition # we will remove the duplicates here and adjust the times to be our best guess at what is correct dst_transition_date = pd.Timestamp("2024-11-03") if dst_transition_date.date() in df["Interval Start"].dt.date.values: logger.info("Problematic DST transition detected, fixing duplicate hour") # take half the duplicate rows and adjust them to 1:00 to fix missing interval duplicate_mask = df["Interval Start"] == pd.Timestamp( "2024-11-03 02:00:00-0600", ) duplicate_indices = df[duplicate_mask].index first_half_indices = duplicate_indices[: len(duplicate_indices) // 2] df.loc[first_half_indices, "Interval Start"] = pd.Timestamp( "2024-11-03 01:00:00-0600", ) df["Interval End"] = df["Interval Start"] + pd.Timedelta(hours=1) # after the correction, the straight duplicate intervals remain, so we remove them df = df.drop_duplicates(subset=["Interval Start", "Publish Time"]) return df.sort_values("Interval Start") def _get_document( self, report_type_id: int, date: Literal["latest"] | datetime.datetime | None = None, published_after: str | None = None, published_before: str | None = None, constructed_name_contains: str | None = None, extension: str | None = None, verbose: bool = False, base_url: str = "www.ercot.com", request_kwargs: dict | None = None, ) -> Document: """Searches by Report Type ID, filtering for date and/or constructed name Raises a ValueError if no document matches Returns: Latest Document by publish_date """ # no need to pass this on # since this only returns the latest # document anyways if date == "latest": date = None documents = self._get_documents( report_type_id=report_type_id, date=date, published_after=published_after, published_before=published_before, constructed_name_contains=constructed_name_contains, extension=extension, verbose=verbose, base_url=base_url, request_kwargs=request_kwargs, ) return max(documents, key=lambda x: x.publish_date) def _get_documents( self, report_type_id: int, date: Literal["latest"] | datetime.datetime | None = None, published_after: str | None = None, published_before: str | None = None, friendly_name_timestamp_after: str | None = None, friendly_name_timestamp_before: str | None = None, constructed_name_contains: str | None = None, extension: str | None = None, verbose: bool = False, base_url: str = "www.ercot.com", request_kwargs: dict | None = None, ) -> list: """Searches by Report Type ID, filtering for date and/or constructed name Returns: list of Document with URL and Publish Date """ # Include a cache buster to ensure we get the latest data url = f"https://{base_url}/misapp/servlets/IceDocListJsonWS?reportTypeId={report_type_id}&_{int(time.time())}" # noqa logger.info(f"Fetching document {url}") # if latest, we dont need to filter # so we can set to None if published_before == "latest": published_before = None docs = self._get_json(url, verbose=verbose, **(request_kwargs or {}))[ "ListDocsByRptTypeRes" ]["DocumentList"] matches = [] for doc in docs: match = True doc_url = f"https://{base_url}/misdownload/servlets/mirDownload?doclookupId={doc['Document']['DocID']}" # noqa # make sure to handle retry files # e.g SPPHLZNP6905_retry_20230608_1545_csv try: friendly_name_timestamp = parse_timestamp_from_friendly_name( doc["Document"]["FriendlyName"], ) except Exception: friendly_name_timestamp = None friendly_name = doc["Document"]["FriendlyName"] # ERCOT adds xhr to the second set of file names during the repeated hour # for DST end. However, ERCOT may get the timezone offset wrong in the # PublishDate. Therefore, we remove the ERCOT provided timezone offset then # re-add the offset accounting for the repeated hour. # https://lists.ercot.com/cgi-bin/wa?A3=1111&L=NOTICE_TRAINING&E=quoted-printable&P=4519&B=--_000_B117FDA9B7BC68479362C1197F77D8790950ADCPW0005ercotcom_&T=text%2Fhtml;%20charset=us-ascii&XSS=3&header=1 publish_date = ( pd.Timestamp(doc["Document"]["PublishDate"]) .tz_localize(None) .tz_localize( self.default_timezone, # Pandas wants ambiguous to be True when DST is True (Pandas only # uses ambiguous during the repeated hour) The "xhr" file occurs # after the clock has been set back an hour so is not in DST. ambiguous="xhr" not in friendly_name, ) ) doc_obj = Document( url=doc_url, publish_date=publish_date, constructed_name=doc["Document"]["ConstructedName"], friendly_name=friendly_name, friendly_name_timestamp=friendly_name_timestamp, ) if published_after: match = match and doc_obj.publish_date > published_after if published_before: match = match and doc_obj.publish_date <= published_before if doc_obj.friendly_name_timestamp: if friendly_name_timestamp_after: match = ( match and doc_obj.friendly_name_timestamp > friendly_name_timestamp_after ) if friendly_name_timestamp_before: match = ( match and doc_obj.friendly_name_timestamp <= friendly_name_timestamp_before ) if date and date != "latest": match = match and doc_obj.publish_date.date() == date.date() if extension: match = match and doc_obj.friendly_name.endswith(extension) if constructed_name_contains: match = match and constructed_name_contains in doc_obj.constructed_name if match: matches.append(doc_obj) if date == "latest": return [max(matches, key=lambda x: x.publish_date)] if not matches: params = { k: v for k, v in locals().items() if k not in ["self", "msg", "url", "docs"] } raise NoDataFoundException( f"No documents found with the given parameters: {params}", # noqa ) return matches def _get_hourly_report( self, start: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None, report_type_id: int, handle_doc: Callable[[Document, bool], pd.DataFrame], extension: str | None = None, verbose: bool = False, ) -> pd.DataFrame: if start == "latest": # _get_document can handle "latest" doc = self._get_document( report_type_id=report_type_id, extension=extension, published_before=start, verbose=verbose, ) docs = [doc] else: # Set end to get a full day of published data if end is None: end = start + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=report_type_id, extension=extension, published_before=end, published_after=start, verbose=verbose, ) all_df = [] for doc in docs: df = handle_doc(doc, verbose=verbose) all_df.append(df) df = pd.concat(all_df) df = df.sort_values("Publish Time") return df def _handle_json_data(self, df: pd.DataFrame, columns: dict) -> pd.DataFrame: df["Time"] = ( pd.to_datetime(df["epoch"], unit="ms") .dt.tz_localize("UTC") .dt.tz_convert(self.default_timezone) ) cols_to_keep = ["Time"] + list(columns.keys()) return df[cols_to_keep].rename(columns=columns) def _get_settlement_points_mapping_documents( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> list[Document]: if date == "latest": date = None if date is None and end is None: return [ self._get_document( report_type_id=SETTLEMENT_POINTS_LIST_AND_ELECTRICAL_BUSES_MAPPING_RTID, extension=None, verbose=verbose, ), ] all_docs = self._get_documents( report_type_id=SETTLEMENT_POINTS_LIST_AND_ELECTRICAL_BUSES_MAPPING_RTID, extension=None, verbose=verbose, ) if not all_docs: raise NoDataFoundException( "No settlement points mapping documents found", ) if date is not None and end is None: closest = min( all_docs, key=lambda d: abs(d.publish_date.normalize() - date.normalize()), ) return [closest] filtered = [] for doc in all_docs: pub = doc.publish_date.normalize() if date is not None and pub < date.normalize(): continue if end is not None and pub >= end.normalize(): continue filtered.append(doc) if not filtered: raise NoDataFoundException( f"No settlement points mapping documents found between {date} and {end}", ) return sorted(filtered, key=lambda d: d.publish_date) def _download_and_extract_csv( self, doc: Document, filename_contains: str, ) -> tuple[pd.DataFrame, pd.Timestamp]: logger.info(f"Fetching {doc.url}") r = requests.get(doc.url) z = ZipFile(io.BytesIO(r.content)) names = z.namelist() matching = [name for name in names if filename_contains in name] if not matching: raise ValueError( f"No file matching '{filename_contains}' found in zip. " f"Available files: {names}", ) df = pd.read_csv(z.open(matching[0])) return df, doc.publish_date def _get_settlement_points_csv( self, filename_contains: str, column_mapping: dict[str, str], date: pd.Timestamp | None = None, end: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: docs = self._get_settlement_points_mapping_documents(date, end, verbose) dfs = [] for doc in docs: df, publish_date = self._download_and_extract_csv( doc, filename_contains, ) df = df.rename(columns=column_mapping) df["Publish Date"] = publish_date.date() cols = ["Publish Date"] + list(column_mapping.values()) dfs.append(df[cols]) return pd.concat(dfs).reset_index(drop=True) def _get_settlement_point_mapping(self, verbose: bool = False) -> pd.DataFrame: """Get DataFrame whose columns can help us filter out values""" docs = self._get_settlement_points_mapping_documents(verbose=verbose) df, _ = self._download_and_extract_csv(docs[0], "Settlement_Points") return df @support_date_range(frequency=None)
[docs] def get_settlement_points_electrical_bus_mapping( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: return self._get_settlement_points_csv( filename_contains="Settlement_Points", column_mapping={ "ELECTRICAL_BUS": "Electrical Bus", "NODE_NAME": "Node Name", "PSSE_BUS_NAME": "PSSE Bus Name", "VOLTAGE_LEVEL": "Voltage Level", "SUBSTATION": "Substation", "SETTLEMENT_LOAD_ZONE": "Settlement Load Zone", "RESOURCE_NODE": "Resource Node", "HUB_BUS_NAME": "Hub Bus Name", "HUB": "Hub", "PSSE_BUS_NUMBER": "PSSE Bus Number", }, date=date, end=end, verbose=verbose, )
@support_date_range(frequency=None)
[docs] def get_ccp_resource_names( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: return self._get_settlement_points_csv( filename_contains="CCP_Resource_Names", column_mapping={ "CCP_NAME": "CCP Name", "LOGICALREOURCENODENAME": "Logical Resource Node Name", }, date=date, end=end, verbose=verbose, )
@support_date_range(frequency=None)
[docs] def get_noie_mapping( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: return self._get_settlement_points_csv( filename_contains="NOIE_Mapping", column_mapping={ "PHYSICAL_LOAD": "Physical Load", "NOIE": "NOIE", "VOLTAGE_NAME": "Voltage", "SUBSTATION": "Substation", "ELECTRICAL_BUS": "Electrical Bus", }, date=date, end=end, verbose=verbose, )
@support_date_range(frequency=None)
[docs] def get_resource_node_to_unit( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: return self._get_settlement_points_csv( filename_contains="Resource_Node_to_Unit", column_mapping={ "RESOURCE_NODE": "Resource Node", "UNIT_SUBSTATION": "Unit Substation", "UNIT_NAME": "Unit Name", }, date=date, end=end, verbose=verbose, )
@support_date_range(frequency=None)
[docs] def get_hub_name_dc_ties( self, date: str | pd.Timestamp | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: return self._get_settlement_points_csv( filename_contains="Hub_Name_AND_DC_Ties", column_mapping={ "NAME": "Name", }, date=date, end=end, verbose=verbose, )
[docs] def read_doc( self, doc: Document, parse: bool = True, verbose: bool = False, request_kwargs: dict | None = None, read_csv_kwargs: dict | None = None, ) -> pd.DataFrame: logger.debug(f"Reading {doc.url}") if request_kwargs: response = requests.get(doc.url, **(request_kwargs or {})).content df = pd.read_csv( io.BytesIO(response), compression="zip", **(read_csv_kwargs or {}), ) else: df = pd.read_csv(doc.url, compression="zip", **(read_csv_kwargs or {})) if parse: df = self.parse_doc(df, verbose=verbose) return df
[docs] def read_docs( self, docs: list[Document], parse: bool = True, empty_df: pd.DataFrame | None = None, verbose: bool = False, request_kwargs: dict | None = None, ) -> pd.DataFrame: if len(docs) == 0: return empty_df dfs = [] for doc in tqdm.tqdm(docs, desc="Reading files", disable=not verbose): dfs.append( self.read_doc( doc, parse=parse, verbose=verbose, request_kwargs=request_kwargs, ), ) return pd.concat(dfs).reset_index(drop=True)
[docs] def ambiguous_based_on_dstflag(self, df: pd.DataFrame) -> pd.Series: # DSTFlag is Y during the repeated hour (after the clock has been set back) # so it's False/N during DST And True/Y during Standard Time. # For ambiguous, Pandas wants True for DST and False for Standard Time # during repeated hours. Therefore, ambgiuous should be True when # DSTFlag is False/N # Some ERCOT datasets use a boolean, some use a string if df["DSTFlag"].dtype == bool: return ~df["DSTFlag"] # Assume that if the DSTFlag column is a string, it's "Y" or "N" else: assert set(df["DSTFlag"].unique()).issubset({"Y", "N"}) return df["DSTFlag"] == "N"
[docs] def parse_doc( self, doc: pd.DataFrame, dst_ambiguous_default: str = "infer", verbose: bool = False, nonexistent: str = "raise", ) -> pd.DataFrame: # files sometimes have different naming conventions # a more elegant solution would be nice doc.rename( columns={ "deliveryDate": "DeliveryDate", "Delivery Date": "DeliveryDate", "DELIVERY_DATE": "DeliveryDate", "OperDay": "DeliveryDate", "hourEnding": "HourEnding", "Hour Ending": "HourEnding", "HOUR_ENDING": "HourEnding", "Repeated Hour Flag": "DSTFlag", "RepeatedHourFlag": "DSTFlag", "Date": "DeliveryDate", "DeliveryHour": "HourEnding", "Delivery Hour": "HourEnding", "Delivery Interval": "DeliveryInterval", # fix whitespace in column name "DSTFlag ": "DSTFlag", }, inplace=True, ) original_cols = doc.columns.tolist() ending_time_col_name = "HourEnding" ambiguous = dst_ambiguous_default if "DSTFlag" in doc.columns: ambiguous = self.ambiguous_based_on_dstflag(doc) # i think DeliveryInterval only shows up # in 15 minute data along with DeliveryHour if "DeliveryInterval" in original_cols: interval_length = pd.Timedelta(minutes=15) doc["HourBeginning"] = doc[ending_time_col_name] - 1 doc["Interval Start"] = ( pd.to_datetime(doc["DeliveryDate"]) + doc["HourBeginning"].astype("timedelta64[h]") + ((doc["DeliveryInterval"] - 1) * interval_length) ) # 15-minute system wide actuals elif "TimeEnding" in original_cols: ending_time_col_name = "TimeEnding" interval_length = pd.Timedelta(minutes=15) doc["Interval End"] = pd.to_datetime( doc["DeliveryDate"] + " " + doc["TimeEnding"] + ":00", ) doc["Interval End"] = doc["Interval End"].dt.tz_localize( self.default_timezone, ambiguous=ambiguous, ) doc["Interval Start"] = doc["Interval End"] - interval_length else: interval_length = pd.Timedelta(hours=1) doc["HourBeginning"] = ( doc[ending_time_col_name] .astype(str) .str.split( ":", ) .str[0] .astype(int) - 1 ) doc["Interval Start"] = pd.to_datetime(doc["DeliveryDate"]) + doc[ "HourBeginning" ].astype("timedelta64[h]") if "TimeEnding" not in original_cols: try: doc["Interval Start"] = doc["Interval Start"].dt.tz_localize( self.default_timezone, ambiguous=ambiguous, nonexistent=nonexistent, ) except NonExistentTimeError: # this handles how ercot does labels the instant # of the DST transition differently than # pandas does doc["Interval Start"] = doc["Interval Start"] + pd.Timedelta(hours=1) doc["Interval Start"] = doc["Interval Start"].dt.tz_localize( self.default_timezone, ambiguous=ambiguous, ) - pd.Timedelta(hours=1) except pytz.AmbiguousTimeError as e: # Handle datasets where HourEnding is like 01:00 if doc["HourEnding"].min() == "01:00": doc["HourEnding"] = ( doc["HourEnding"] .astype(str) .str.split( ":", ) .str[0] .astype(int) ) # Sometimes ERCOT handles DST end by putting 25 hours in HourEnding # which makes IntervalStart where HourEnding >= 3 an hour later than # they should be. We correct this by subtracting an hour. assert doc["HourEnding"].max() == 25, ( f"Time parsing error. Did not find HourEnding = 25. {e}" ) doc.loc[doc["HourEnding"] >= 3, "Interval Start"] = doc.loc[ doc["HourEnding"] >= 3, "Interval Start", ] - pd.Timedelta(hours=1) # Now there will be a repeated hour and Pandas can infer # the ambiguous value doc["Interval Start"] = doc["Interval Start"].dt.tz_localize( self.default_timezone, ambiguous="infer", ) doc["Interval End"] = doc["Interval Start"] + interval_length doc["Time"] = doc["Interval Start"] doc = doc.sort_values("Time", ascending=True) cols_to_keep = [ "Time", "Interval Start", "Interval End", ] + original_cols # TODO: try to clean up this logic doc = doc[cols_to_keep] doc = doc.drop( columns=["DeliveryDate", ending_time_col_name], ) optional_drop = ["DSTFlag", "DeliveryInterval"] for col in optional_drop: if col in doc.columns: doc = doc.drop(columns=[col]) return doc
def _weather_zone_column_name_mapping(self) -> dict[str, str]: return { "Coast": "Coast", "East": "East", "FarWest": "Far West", "North": "North", "NorthCentral": "North Central", "North C": "North Central", "SouthCentral": "South Central", "South C": "South Central", "Southern": "Southern", "West": "West", "Total": "System Total", } def _weather_zone_column_name_order(self) -> list[str]: return [ "Coast", "East", "Far West", "North", "North Central", "South Central", "Southern", "West", ] @support_date_range(frequency=None)
[docs] def get_indicative_lmp_by_settlement_point( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: if date == "latest": self.get_indicative_lmp_by_settlement_point(date="today") if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=ERCOT_INDICATIVE_LMP_BY_SETTLEMENT_POINT_RTID, extension="csv", published_before=end, published_after=date, verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_indicative_lmp_by_settlement_point(df)
def _handle_indicative_lmp_by_settlement_point( self, df: pd.DataFrame, ) -> pd.DataFrame: columns_to_rename = { "RTDTimestamp": "RTD Timestamp", "IntervalEnding": "Interval End", "SettlementPoint": "Location", "SettlementPointType": "Location Type", "LMP": "LMP", } df.rename(columns=columns_to_rename, inplace=True) assert set(df["RepeatedHourFlag"].unique()).issubset({"Y", "N"}) assert set(df["IntervalRepeatedHourFlag"].unique()).issubset({"Y", "N"}) df["RTD Timestamp"] = pd.to_datetime(df["RTD Timestamp"]).dt.tz_localize( self.default_timezone, ambiguous=df["RepeatedHourFlag"] == "N", nonexistent="shift_forward", ) df["Interval End"] = pd.to_datetime(df["Interval End"]).dt.tz_localize( self.default_timezone, ambiguous=df["IntervalRepeatedHourFlag"] == "N", nonexistent="shift_forward", ) df["Interval Start"] = df["Interval End"] - pd.Timedelta(minutes=5) df = df.sort_values("Interval Start").reset_index(drop=True) return df[ [ "RTD Timestamp", "Interval Start", "Interval End", "Location", "Location Type", "LMP", ] ] @support_date_range(frequency="DAY_START")
[docs] def get_dam_total_energy_purchased( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get DAM Total Energy Purchased Arguments: date (str, datetime): date to get data for end (str, datetime): end time to get data for verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with DAM total energy purchased data """ if date == "latest": return self.get_dam_total_energy_purchased( date="today", verbose=verbose, ) # DAM data so subtract one from the date doc = self._get_document( report_type_id=DAM_TOTAL_ENERGY_PURCHASED_RTID, date=date - pd.DateOffset(days=1), extension="csv", ) return self._process_dam_total_energy( doc, verbose=verbose, )
def _process_dam_total_energy( self, doc: Document, verbose: bool = False, ) -> pd.DataFrame: return ( self.read_doc(doc, verbose=verbose) .rename( columns={ "Settlement_Point": "Location", "TotalDAMEnergySold": "Total", "Total_DAM_Energy_Bought": "Total", }, ) .drop( columns=["Time"], ) .sort_values(["Interval Start", "Location"]) .reset_index(drop=True) ) @support_date_range(frequency="DAY_START")
[docs] def get_dam_total_energy_sold( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get DAM Total Energy Sold Arguments: date (str, datetime): date to get data for end (str, datetime): end time to get data for verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with DAM total energy sold data """ if date == "latest": return self.get_dam_total_energy_sold( date="today", verbose=verbose, ) # DAM data so subtract one from the date doc = self._get_document( report_type_id=DAM_TOTAL_ENERGY_SOLD_RTID, date=date - pd.DateOffset(days=1), extension="csv", ) return self._process_dam_total_energy( doc, verbose=verbose, )
@support_date_range(frequency="DAY_START")
[docs] def get_cop_adjustment_period_snapshot_60_day( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: if date == "latest" or date > pd.Timestamp.now( tz=self.default_timezone, ) - pd.DateOffset(days=60): raise ValueError( "Cannot get COP Adjustment Period Snapshot for date < 60 days in the past", ) # Data is delayed by 60 days. To get the report for a given day, we have to # look 60 days in the future relative to the target date report_date = date + pd.DateOffset(days=60) # Delayed by 60 days doc = self._get_document( report_type_id=COP_ADJUSTMENT_PERIOD_SNAPSHOT_RTID, date=report_date, ) data = self.read_doc(doc, verbose=verbose) return self._process_cop_adjustment_period_snapshot_60_day_data( data, verbose=verbose, )
def _process_cop_adjustment_period_snapshot_60_day_data( self, data: pd.DataFrame, verbose: bool = False, ) -> pd.DataFrame: data = ( data.rename(columns={"QSE Name": "QSE"}) .drop( columns=["Time"], ) .sort_values(["Interval Start", "Resource Name"]) .reset_index(drop=True) ) # Columns not in older data files or not in newer data files. for col in [ # Present in old data, but not new "RRS", # These four columns are present only in new data "RRSPFR", "RRSFFR", "RRSUFR", "ECRS", # These three columns first have data on 2024-06-28 "Minimum SOC", "Maximum SOC", "Hour Beginning Planned SOC", ]: if col not in data.columns: data[col] = pd.NA data = data[ [ "Interval Start", "Interval End", "Resource Name", "QSE", "Status", "High Sustained Limit", "Low Sustained Limit", "High Emergency Limit", "Low Emergency Limit", "Reg Up", "Reg Down", "RRS", "RRSPFR", "RRSFFR", "RRSUFR", "NSPIN", "ECRS", "Minimum SOC", "Maximum SOC", "Hour Beginning Planned SOC", ] ] return data # Published every SCED interval @support_date_range(frequency=None)
[docs] def get_mcpc_sced( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Market Clearing Prices for Capacity by SCED interval""" if date == "latest": docs = self._get_documents( report_type_id=REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_BY_SCED_INTERVAL_RTID, extension="csv", date=date, verbose=verbose, ) else: if end is None: # Assume getting data for one day end = date + pd.DateOffset(days=1) published_before = end published_after = date docs = self._get_documents( report_type_id=REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_BY_SCED_INTERVAL_RTID, extension="csv", published_before=published_before, published_after=published_after, verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_mcpc_sced(df)
def _handle_mcpc_sced(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename(columns={"ASType": "AS Type"}) df = self._handle_sced_timestamp(df) df["MCPC"] = pd.to_numeric(df["MCPC"], errors="coerce") return ( # Only need the SCED Timestamps df[["SCED Timestamp", "AS Type", "MCPC"]] .sort_values(["SCED Timestamp", "AS Type"]) .reset_index(drop=True) ) # Published every 15 minutes for the past 15 minutes. @support_date_range(frequency=None)
[docs] def get_mcpc_real_time_15_min( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Market Clearing Prices for Capacity by 15-minute interval""" if date == "latest": docs = self._get_documents( report_type_id=REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_15_MIN_RTID, extension="csv", date=date, verbose=verbose, ) else: # Assume getting data for one day if not end: end = date + pd.DateOffset(days=1) published_before = end + pd.Timedelta(minutes=15) published_after = date + pd.Timedelta(minutes=15) docs = self._get_documents( report_type_id=REAL_TIME_CLEARING_PRICES_FOR_CAPACITY_15_MIN_RTID, extension="csv", published_before=published_before, published_after=published_after, verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_mcpc_real_time_15_min(df)
def _handle_mcpc_real_time_15_min( self, df: pd.DataFrame, ) -> pd.DataFrame: df = df.rename( columns={"ASType": "AS Type", "RepeatedHourFlag": "DSTFlag"}, ) df = self.parse_doc(df) df["MCPC"] = pd.to_numeric(df["MCPC"], errors="coerce") return ( df[["Interval Start", "Interval End", "AS Type", "MCPC"]] .sort_values(["Interval Start", "AS Type"]) .reset_index(drop=True) ) # Should be published once per day for today and tomorrow in the same file # NOTE: occassionally ERCOT will publish multiple copies of the same data # on the same day. @support_date_range(frequency=None)
[docs] def get_as_demand_curves_dam_and_sced( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Ancillary Service Demand Curves""" if date == "latest": docs = self._get_documents( report_type_id=DAM_AND_SCED_ANCILLARY_SERVICE_DEMAND_CURVES_RTID, extension="csv", date=date, verbose=verbose, ) else: if end is None: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=DAM_AND_SCED_ANCILLARY_SERVICE_DEMAND_CURVES_RTID, extension="csv", published_before=end, published_after=date, verbose=verbose, ) df = pd.concat( [ self.read_doc(doc, parse=False, verbose=verbose).assign( **{"Publish Time": doc.publish_date}, ) for doc in docs ], ) return self._handle_as_demand_curves_dam_and_sced(df)
def _handle_as_demand_curves_dam_and_sced(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ "ASType": "AS Type", "DemandCurvePoint": "Demand Curve Point", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) for col in ["Quantity", "Price", "Demand Curve Point"]: df[col] = pd.to_numeric(df[col], errors="coerce") return ( df[ [ "Interval Start", "Interval End", "Publish Time", "AS Type", "Demand Curve Point", "Quantity", "Price", ] ] .sort_values( ["Interval Start", "Publish Time", "AS Type", "Demand Curve Point"], ) .reset_index(drop=True) ) # Published once per DAM run (daily, ~12-2 PM Central) for the next day's # delivery date. Covers the AS types REGDN, REGUP, RRSPF, RRSFF, RRSUF, # ECRSS, and ECRSM. @support_date_range(frequency=None)
[docs] def get_dam_asdc_aggregated( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get DAM Aggregated Ancillary Service Offer Curve (NP4-19-CD). The DAM Aggregated Ancillary Service Demand/Offer Curve contains the aggregated offer curve (price/quantity pairs) per ancillary service type for each hour of the next day's delivery, published once per DAM run. ``date`` and ``end`` are interpreted as delivery dates; the DAM runs the day before, so the underlying file is located by shifting the posted-date filter back by one day. https://www.ercot.com/mp/data-products/data-product-details?id=np4-19-cd """ if date == "latest": docs = self._get_documents( report_type_id=DAM_AGGREGATED_AS_OFFER_CURVE_RTID, extension="csv", date=date, verbose=verbose, ) else: if end is None: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=DAM_AGGREGATED_AS_OFFER_CURVE_RTID, extension="csv", published_after=date - pd.Timedelta(days=1), published_before=end - pd.Timedelta(days=1), verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_dam_asdc_aggregated(df)
def _handle_dam_asdc_aggregated(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ "AncillaryType": "AS Type", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) for col in ["Price", "Quantity"]: df[col] = pd.to_numeric(df[col], errors="coerce") return ( df[["Interval Start", "Interval End", "AS Type", "Price", "Quantity"]] .sort_values(["Interval Start", "AS Type", "Price"]) .reset_index(drop=True) ) # Published once per day for tomorrow @support_date_range(frequency=None)
[docs] def get_as_deployment_factors_projected( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Projected Ancillary Service Deployment Factors""" if date == "latest": docs = self._get_documents( report_type_id=PROJECTED_ANCILLARY_SERVICE_DEPLOYMENTS_FACTORS_RTID, extension="csv", date=date, verbose=verbose, ) else: if end is None: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=PROJECTED_ANCILLARY_SERVICE_DEPLOYMENTS_FACTORS_RTID, extension="csv", published_before=end, published_after=date, verbose=verbose, ) return self._handle_as_deployment_factors_projected(docs, verbose=verbose)
def _handle_as_deployment_factors_projected( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: df = self.read_docs(docs, parse=False, verbose=verbose) df = df.rename( columns={ "ASType": "AS Type", "ASDeploymentFactors": "AS Deployment Factors", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) df["AS Deployment Factors"] = pd.to_numeric( df["AS Deployment Factors"], errors="coerce", ) return ( df[["Interval Start", "Interval End", "AS Type", "AS Deployment Factors"]] .sort_values("Interval Start") .reset_index(drop=True) ) # Published per WRUC run (once per day) for the next 5 days @support_date_range(frequency=None)
[docs] def get_as_deployment_factors_weekly_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Weekly RUC Ancillary Service Deployment Factors Retrieves ancillary service deployment factors used by the Weekly Reliability Unit Commitment (WRUC) process for each hour in the RUC Study Period. Args: date: Date to retrieve data for. Can be a string or pandas Timestamp. end: Optional end date for date range queries. verbose: If True, print verbose output. Returns: DataFrame with columns: Interval Start, Interval End, RUC Timestamp, AS Type, and AS Deployment Factors. """ if date == "latest": docs = self._get_documents( report_type_id=WEEKLY_RUC_AS_DEPLOYMENT_FACTORS_RTID, date=date, constructed_name_contains="csv", verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=WEEKLY_RUC_AS_DEPLOYMENT_FACTORS_RTID, constructed_name_contains="csv", published_after=date, published_before=end, verbose=verbose, ) return self._handle_as_deployment_factors_ruc(docs, verbose=verbose)
# Published per DRUC run (once per day) for the next day @support_date_range(frequency=None)
[docs] def get_as_deployment_factors_daily_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Daily RUC Ancillary Service Deployment Factors""" if date == "latest": docs = self._get_documents( report_type_id=DAILY_RUC_AS_DEPLOYMENT_FACTORS_RTID, date=date, constructed_name_contains="csv", verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=DAILY_RUC_AS_DEPLOYMENT_FACTORS_RTID, constructed_name_contains="csv", published_after=date, published_before=end, verbose=verbose, ) return self._handle_as_deployment_factors_ruc(docs, verbose=verbose)
# Published per HRUC run (once per hour) for the rest of the current day (so each # file can have a differing number of intervals) @support_date_range(frequency=None)
[docs] def get_as_deployment_factors_hourly_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Hourly RUC Ancillary Service Deployment Factors""" if date == "latest": docs = self._get_documents( report_type_id=HOURLY_RUC_AS_DEPLOYMENT_FACTORS_RTID, date=date, constructed_name_contains="csv", verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=HOURLY_RUC_AS_DEPLOYMENT_FACTORS_RTID, constructed_name_contains="csv", published_after=date, published_before=end, verbose=verbose, ) return self._handle_as_deployment_factors_ruc(docs, verbose=verbose)
def _handle_as_deployment_factors_ruc( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: df = self.read_docs(docs, parse=False, verbose=verbose) df = df.rename( columns={ "RUCTimestamp": "RUC Timestamp", "ASType": "AS Type", "ASDeploymentFactors": "AS Deployment Factors", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) # Parse RUC Timestamp df["RUC Timestamp"] = pd.to_datetime( df["RUC Timestamp"], ).dt.tz_localize(self.default_timezone) df["AS Deployment Factors"] = pd.to_numeric( df["AS Deployment Factors"], errors="coerce", ) return ( df[ [ "Interval Start", "Interval End", "RUC Timestamp", "AS Type", "AS Deployment Factors", ] ] .sort_values(["Interval Start", "RUC Timestamp", "AS Type"]) .reset_index(drop=True) ) # Published per HRUC run (every hour) for the rest of the day @support_date_range(frequency=None)
[docs] def get_as_demand_curves_hourly_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Hourly RUC Ancillary Service Demand Curves""" if date == "latest": docs = self._get_documents( report_type_id=HOURLY_RUC_AS_DEMAND_CURVES_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=HOURLY_RUC_AS_DEMAND_CURVES_RTID, published_before=end, published_after=date, extension="csv", verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_ruc_as_demand_curves(df)
# Published per DRUC run (once per day) for the next day @support_date_range(frequency=None)
[docs] def get_as_demand_curves_daily_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Daily RUC Ancillary Service Demand Curves""" if date == "latest": docs = self._get_documents( report_type_id=DAILY_RUC_AS_DEMAND_CURVES_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=DAILY_RUC_AS_DEMAND_CURVES_RTID, published_before=end, published_after=date, extension="csv", verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_ruc_as_demand_curves(df)
# Published per WRUC run (once per day) for the next five days @support_date_range(frequency=None)
[docs] def get_as_demand_curves_weekly_ruc( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Weekly RUC Ancillary Service Demand Curves""" if date == "latest": docs = self._get_documents( report_type_id=WEEKLY_RUC_AS_DEMAND_CURVES_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) docs = self._get_documents( report_type_id=WEEKLY_RUC_AS_DEMAND_CURVES_RTID, published_before=end, published_after=date, extension="csv", verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_ruc_as_demand_curves(df)
def _handle_ruc_as_demand_curves( self, df: pd.DataFrame, ) -> pd.DataFrame: df = df.rename( columns={ "RUCTimeStamp": "RUC Timestamp", "ASType": "AS Type", "DemandCurvePoint": "Demand Curve Point", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) # Parse RUC Timestamp df["RUC Timestamp"] = pd.to_datetime( df["RUC Timestamp"], ).dt.tz_localize(self.default_timezone) for col in ["Quantity", "Price", "Demand Curve Point"]: df[col] = pd.to_numeric(df[col], errors="coerce") return ( df[ [ "Interval Start", "Interval End", "RUC Timestamp", "AS Type", "Demand Curve Point", "Quantity", "Price", ] ] .sort_values( ["Interval Start", "RUC Timestamp", "AS Type", "Demand Curve Point"], ) .reset_index(drop=True) ) # Published per DAM run for the next day @support_date_range(frequency="DAY_START")
[docs] def get_dam_total_as_sold( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get DAM Total Ancillary Services Sold""" if date != "latest": date -= pd.DateOffset(days=1) docs = self._get_documents( report_type_id=DAM_TOTAL_AS_SOLD_RTID, date=date, constructed_name_contains="csv", verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_dam_total_as_sold(df)
def _handle_dam_total_as_sold(self, df: pd.DataFrame) -> pd.DataFrame: """Handle DAM Total Ancillary Services Sold data.""" df = df.rename( columns={ "ASType": "AS Type", "Quantity": "Quantity", "RepeatedHourFlag": "DSTFlag", }, ) df = self.parse_doc(df) df["Quantity"] = pd.to_numeric(df["Quantity"], errors="coerce") return ( df[ [ "Interval Start", "Interval End", "AS Type", "Quantity", ] ] .sort_values(["Interval Start", "AS Type"]) .reset_index(drop=True) ) # Published per RTD run for the next 55 minutes (11 intervals per file) @support_date_range(frequency=None)
[docs] def get_indicative_mcpc_rtd( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get RTD Indicative Real-Time Market Clearing Prices for Capacity""" if date == "latest": docs = self._get_documents( report_type_id=RTD_INDICATIVE_REAL_TIME_MCPC_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) published_before = end published_after = date docs = self._get_documents( report_type_id=RTD_INDICATIVE_REAL_TIME_MCPC_RTID, extension="csv", published_before=published_before, published_after=published_after, verbose=verbose, ) df = self.read_docs(docs, parse=False, verbose=verbose) return self._handle_indicative_mcpc_rtd(df)
def _handle_indicative_mcpc_rtd(self, df: pd.DataFrame) -> pd.DataFrame: # Parse timestamps with DST handling. nonexistent= only applies to # spring-forward (gap when 02:00 does not exist); fall-back repeated # hour is handled by ambiguous= from RepeatedHourFlag. df["Interval End"] = pd.to_datetime(df["IntervalEnding"]).dt.tz_localize( self.default_timezone, ambiguous=self.ambiguous_based_on_dstflag( df.rename(columns={"IntervalEndingRepeatedHourFlag": "DSTFlag"}), ), nonexistent="shift_forward", ) df["Interval Start"] = df["Interval End"] - pd.Timedelta(minutes=5) df["RTD Timestamp"] = pd.to_datetime(df["RTDTimestamp"]).dt.tz_localize( self.default_timezone, ambiguous=self.ambiguous_based_on_dstflag( df.rename(columns={"RepeatedHourFlag": "DSTFlag"}), ), nonexistent="shift_forward", ) # Convert price columns to numeric (float64) price_cols = ["REGUP", "REGDN", "RRS", "ECRS", "NSPIN"] for col in price_cols: df[col] = df[col].astype("float64") return ( df[["Interval Start", "Interval End", "RTD Timestamp"] + price_cols] .sort_values(["Interval Start", "RTD Timestamp"]) .reset_index(drop=True) ) # Published every SCED interval @support_date_range(frequency=None)
[docs] def get_as_total_capability( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Total Capability of Resources Available to Provide Ancillary Service""" if date == "latest": docs = self._get_documents( report_type_id=TOTAL_CAPABILITY_OF_RESOURCES_AS_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: end = date + pd.DateOffset(days=1) published_before = end published_after = date docs = self._get_documents( report_type_id=TOTAL_CAPABILITY_OF_RESOURCES_AS_RTID, extension="csv", published_before=published_before, published_after=published_after, verbose=verbose, ) # Add the publish time to deal with duplicates downstream df = pd.concat( [ self.read_doc(doc, parse=False, verbose=verbose).assign( **{"Publish Time": doc.publish_date}, ) for doc in docs ], ) return self._handle_as_total_capability(df)
def _handle_as_total_capability(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ "SCEDTimestamp": "SCED Timestamp", "CapREGUPTotal": "Cap RegUp Total", "CapREGDNTotal": "Cap RegDn Total", "CapRRSTotal": "Cap RRS Total", "CapECRSTotal": "Cap ECRS Total", "CapNSPINTotal": "Cap NonSpin Total", "CapREGUP_RRSTotal": "Cap RegUp RRS Total", "CapREGUP_RRS_ECRSTotal": "Cap RegUp RRS ECRS Total", "CapREGUP_RRS_ECRS_NSPINTotal": "Cap RegUp RRS ECRS NonSpin Total", "DSTFlag": "RepeatedHourFlag", }, ) df = self._handle_sced_timestamp(df) # Convert capability columns to numeric cap_cols = [ "Cap RegUp Total", "Cap RegDn Total", "Cap RRS Total", "Cap ECRS Total", "Cap NonSpin Total", "Cap RegUp RRS Total", "Cap RegUp RRS ECRS Total", "Cap RegUp RRS ECRS NonSpin Total", ] for col in cap_cols: df[col] = df[col].astype(float) return ( df[["SCED Timestamp", "Publish Time"] + cap_cols] .sort_values(["SCED Timestamp", "Publish Time"]) .reset_index(drop=True) ) # Published every SCED interval @support_date_range(frequency=None)
[docs] def get_real_time_adders( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time ORDC and Reliability Deployment Price Adders and Reserves by SCED Interval produced by SCED every five minutes. Arguments: date: date to get data for end: end date to get data for. If None, defaults to date + 1 day verbose: print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with ORDC price adders data """ if date == "latest": docs = self._get_documents( report_type_id=REAL_TIME_ADDERS_RTID, extension="csv", date=date, verbose=verbose, ) else: if not end: # Assume getting data for one day end = date + pd.DateOffset(days=1) published_before = end published_after = date docs = self._get_documents( report_type_id=REAL_TIME_ADDERS_RTID, published_after=published_after, published_before=published_before, extension="csv", verbose=verbose, ) return self._handle_real_time_adders(docs, verbose=verbose)
def _handle_real_time_adders( self, docs: list[Document], verbose: bool = False, ) -> pd.DataFrame: df = self.read_docs(docs, parse=False, verbose=verbose) df = self._handle_sced_timestamp(df) df = utils.move_cols_to_front( df, ["SCED Timestamp", "Interval Start", "Interval End"], ) df = df.rename(columns={"SystemLambda": "System Lambda"}) return df.sort_values("SCED Timestamp").reset_index(drop=True)