Source code for gridstatus.spp

import re
import urllib
from enum import StrEnum
from typing import BinaryIO, Callable

import pandas as pd
import pytz
import requests
import tqdm

from gridstatus import utils
from gridstatus.base import (
    InterconnectionQueueStatus,
    ISOBase,
    Markets,
    NoDataFoundException,
    NotSupported,
)
from gridstatus.decorators import FiveMinOffset, support_date_range
from gridstatus.gs_logging import logger

# Endpoints
[docs] RTBM_LMP_BY_BUS = "rtbm-lmp-by-bus"
[docs] FS_RTBM_LMP_BY_LOCATION = "rtbm-lmp-by-location"
[docs] FS_DAM_LMP_BY_LOCATION = "da-lmp-by-settlement-location"
[docs] FS_DAM_LMP_BY_BUS = "da-lmp-by-bus"
[docs] LMP_BY_SETTLEMENT_LOCATION_WEIS = "lmp-by-settlement-location-weis"
[docs] OPERATING_RESERVES = "operating-reserves"
[docs] RTBM_MCP = "rtbm-mcp"
[docs] DA_BINDING_CONSTRAINTS = "da-binding-constraints"
[docs] RTBM_BINDING_CONSTRAINTS = "rtbm-binding-constraints"
[docs] HOURLY_LOAD_WIDE_FORMAT_END_DATE = pd.Timestamp("2026-03-24", tz="US/Central")
# NOTE: Typically SWPW is ~2000-3000MW and SPP is ~20000-30000MW, so we can tell if there # is a load value with null BAA value, we can tell which BAA it is.
[docs] BAA_LOAD_THRESHOLD_MW = 5000
[docs] MARKETPLACE_BASE_URL = "https://portal.spp.org"
[docs] FILE_BROWSER_API_URL = "https://portal.spp.org/file-browser-api/"
[docs] FILE_BROWSER_DOWNLOAD_URL = "https://portal.spp.org/file-browser-api/download"
[docs] BASE_SOLAR_AND_WIND_SHORT_TERM_URL = ( f"{FILE_BROWSER_DOWNLOAD_URL}/shortterm-resource-forecast?path=" )
[docs] BASE_SOLAR_AND_WIND_MID_TERM_URL = ( f"{FILE_BROWSER_DOWNLOAD_URL}/midterm-resource-forecast?path=" )
[docs] BASE_LOAD_FORECAST_SHORT_TERM_URL = f"{FILE_BROWSER_DOWNLOAD_URL}/stlf-vs-actual?path="
[docs] BASE_LOAD_FORECAST_MID_TERM_URL = f"{FILE_BROWSER_DOWNLOAD_URL}/mtlf-vs-actual?path="
[docs] LOCATION_TYPE_ALL = "ALL"
[docs] LOCATION_TYPE_BUS = "Bus"
[docs] LOCATION_TYPE_HUB = "Hub"
[docs] LOCATION_TYPE_INTERFACE = "Interface"
[docs] LOCATION_TYPE_SETTLEMENT_LOCATION = "Settlement Location"
[docs] QUERY_RTM5_HUBS_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/1/query" # noqa
[docs] QUERY_RTM5_INTERFACES_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/2/query" # noqa
[docs] RELIABILITY_LEVELS = [ "Normal Operations", "Weather Advisory", "Resource Advisory", "Conservative Operations Advisory", "Energy Emergency Alert Level 1", "Energy Emergency Alert Level 2", "Energy Emergency Alert Level 3", "Restoration Event", ]
[docs] LAST_UPDATED_KEYWORDS = [ "last updated", "as of", ]
[docs] RELIABILITY_LEVELS_ALIASES = { "Normal Operations": "Normal", }
[docs] STATUS_STOP_WORDS = [ "as", "at", "ct", # central time "eea", # energy emergency alert "of", "on", ]
[docs] LMP_HUBS_AND_INTERFACES = { "AECI": "Interface", "ALTW": "Interface", "AMRN": "Interface", "BLKW": "Interface", "CLEC": "Interface", "DPC": "Interface", "EDDY": "Interface", "EES": "Interface", "ERCOTE": "Interface", "ERCOTN": "Interface", "GRE": "Interface", "LAM345": "Interface", "MCWEST": "Interface", "MDU": "Interface", "MEC": "Interface", "MISO": "Interface", "NSP": "Interface", "OTP": "Interface", "RCEAST": "Interface", "SCSE": "Interface", "SGE": "Interface", "SPA": "Interface", "SPC": "Interface", "SPPNORTH_HUB": "Hub", "SPPSOUTH_HUB": "Hub", }
[docs] class BAAEnum(StrEnum): SPP = "SPP" SWPW = "SWPW"
[docs] def fill_baa_column(df, load_col): """Fill missing BAA values based on load magnitude. If the BAA column doesn't exist, creates it. If it exists but has NaN values, fills only the missing entries. Uses BAA_LOAD_THRESHOLD_MW to distinguish between SWPW (small loads) and SPP (large loads). Args: df: DataFrame with a load column to use for BAA inference. load_col: Name of the column containing load values. Returns: The DataFrame with BAA column filled in-place. """ if "BAA" not in df.columns: df["BAA"] = df[load_col].apply( lambda x: ( BAAEnum.SWPW.value if pd.notna(x) and x < BAA_LOAD_THRESHOLD_MW else BAAEnum.SPP.value ), ) else: mask = df["BAA"].isna() df.loc[mask, "BAA"] = df.loc[mask, load_col].apply( lambda x: ( BAAEnum.SWPW.value if pd.notna(x) and x < BAA_LOAD_THRESHOLD_MW else BAAEnum.SPP.value ), ) return df
def _binding_constraints_real_time_5_min_frequency(args_dict: dict) -> str: """support_date_range frequency for SPP real-time binding constraints. Splits at 5-min boundaries only for same-day recent windows so the caller's explicit end is preserved through decorator chunking. Otherwise splits per day, matching the source's daily-file granularity. """ iso = args_dict["self"] end = args_dict.get("end") if end is None: return "DAY_START" start = utils._handle_date(args_dict["date"], iso.default_timezone) end = utils._handle_date(end, iso.default_timezone) if start.normalize() != end.normalize(): return "DAY_START" if not utils.is_within_last_days(start, 1, iso.default_timezone): return "DAY_START" return "5_MIN"
[docs] class SPP(ISOBase): """Southwest Power Pool (SPP)""" name = "Southwest Power Pool" iso_id = "spp" default_timezone = "US/Central" status_homepage = "https://www.spp.org/markets-operations/current-grid-conditions/" interconnection_homepage = ( "https://www.spp.org/engineering/generator-interconnection/" ) markets = [ Markets.REAL_TIME_5_MIN, Markets.DAY_AHEAD_HOURLY, ] location_types = [ LOCATION_TYPE_ALL, LOCATION_TYPE_BUS, LOCATION_TYPE_HUB, LOCATION_TYPE_INTERFACE, LOCATION_TYPE_SETTLEMENT_LOCATION, ] @staticmethod
[docs] def now(): return pd.Timestamp.now(tz=SPP.default_timezone)
@support_date_range(frequency=None)
[docs] def get_fuel_mix( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get combined fuel mix summed across SPP and SWPW BAAs Args: date: "latest", "today", a timestamp, or a date range tuple end: optional end date for range queries Returns: pd.DataFrame: fuel mix summed across both BAAs """ return self._get_combined_fuel_mix( date=date, end=end, detailed=False, verbose=verbose, by_baa=False, )
@support_date_range(frequency=None)
[docs] def get_fuel_mix_detailed( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get combined detailed fuel mix summed across SPP and SWPW BAAs Breaks out self scheduled and market scheduled generation. Args: date: "latest", "today", a timestamp, or a date range tuple end: optional end date for range queries Returns: pd.DataFrame: detailed fuel mix summed across both BAAs """ return self._get_combined_fuel_mix( date=date, end=end, detailed=True, verbose=verbose, by_baa=False, )
@support_date_range(frequency=None)
[docs] def get_fuel_mix_by_baa( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get fuel mix for both SPP and SWPW BAAs with a BAA column Args: date: "latest", "today", a timestamp, or a date range tuple end: optional end date for range queries Returns: pd.DataFrame: fuel mix with BAA column differentiating SPP and SWPW """ return self._get_combined_fuel_mix( date=date, end=end, detailed=False, verbose=verbose, by_baa=True, )
@support_date_range(frequency=None)
[docs] def get_fuel_mix_by_baa_detailed( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get detailed fuel mix for both SPP and SWPW BAAs with a BAA column Breaks out self scheduled and market scheduled generation. Args: date: "latest", "today", a timestamp, or a date range tuple end: optional end date for range queries Returns: pd.DataFrame: detailed fuel mix with BAA column """ return self._get_combined_fuel_mix( date=date, end=end, detailed=True, verbose=verbose, by_baa=True, )
def _get_combined_fuel_mix( self, date: str | pd.Timestamp, end: pd.Timestamp | None = None, detailed: bool = False, verbose: bool = False, by_baa: bool = False, ) -> pd.DataFrame: """Fetch fuel mix for both SPP and SWPW BAAs and combine them. Args: date: date to fetch end: optional end date detailed: if True, breaks out self scheduled and market scheduled verbose: if True, log debug info by_baa: if True, keep BAA column; if False, sum across BAAs Returns: pd.DataFrame: combined fuel mix """ spp_df = self._get_fuel_mix( date=date, end=end, detailed=detailed, verbose=verbose, baa=BAAEnum.SPP, ) swpw_df = self._get_fuel_mix( date=date, end=end, detailed=detailed, verbose=verbose, baa=BAAEnum.SWPW, ) if by_baa: df = pd.concat([spp_df, swpw_df], ignore_index=True) df = df.sort_values( ["Interval Start", "BAA"], ignore_index=True, ) return df # Sum fuel columns across BAAs for each interval time_cols = ["Interval Start", "Interval End"] fuel_cols = [c for c in spp_df.columns if c not in time_cols + ["BAA"]] spp_df = spp_df.drop(columns=["BAA"], errors="ignore") swpw_df = swpw_df.drop(columns=["BAA"], errors="ignore") df = pd.merge( spp_df, swpw_df, on=time_cols, suffixes=("_spp", "_swpw"), how="outer", ) for col in fuel_cols: spp_col = f"{col}_spp" swpw_col = f"{col}_swpw" df[col] = df[spp_col].fillna(0) + df[swpw_col].fillna(0) df = df.drop(columns=[spp_col, swpw_col]) df = df.sort_values("Interval Start", ignore_index=True) return df def _get_fuel_mix( self, date: str | pd.Timestamp, end: pd.Timestamp | None = None, detailed: bool = False, verbose: bool = False, baa: BAAEnum = BAAEnum.SPP, ) -> pd.DataFrame: now = pd.Timestamp.now(tz=self.default_timezone) two_hours_ago = now - pd.Timedelta(hours=2) one_year_ago = now - pd.Timedelta(days=365) if date == "latest": file_type = "GenMix2Hour" elif isinstance(date, pd.Timestamp): start = date if start < one_year_ago: raise NotSupported( f"{baa} fuel mix data is only available for the last 365 days", ) if start >= two_hours_ago: file_type = "GenMix2Hour" else: file_type = "GenMix365" else: raise ValueError(f"Unexpected date type: {type(date)}") url = f"{FILE_BROWSER_DOWNLOAD_URL}/generation-mix-historical?path=/{baa}/{file_type}_{baa}.csv" # noqa if verbose: logger.info(f"Downloading fuel mix from {url}") df_raw = pd.read_csv(url) df = process_gen_mix(df_raw, detailed=detailed) df = df.drop( columns=["Short Term Load Forecast", "Average Actual Load", "Time"], errors="ignore", ) if date != "latest" and isinstance(date, pd.Timestamp): df = df[df["Interval Start"] >= date] if end is None: end = date.normalize() + pd.Timedelta(days=1) df = df[df["Interval Start"] < end] df = df.reset_index(drop=True) return df
[docs] def get_load( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns total RTO load in 5 minute intervals from STLF data.""" baa_df = self.get_load_by_baa(date=date, end=end, verbose=verbose) if baa_df.empty: raise NoDataFoundException(f"No load data found for date {date}") return ( baa_df.groupby( ["Interval Start", "Interval End"], as_index=False, )["Load"] .sum() .sort_values("Interval Start") .reset_index(drop=True) )
[docs] def get_load_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns total RTO load forecast in hourly intervals from MTLF data.""" baa_df = self.get_load_forecast_by_baa(date=date, end=end, verbose=verbose) if baa_df.empty: raise NoDataFoundException( f"No load forecast by BAA data found for date {date}", ) summed = baa_df.groupby( ["Interval Start", "Interval End", "Publish Time"], as_index=False, )["Load Forecast"].sum() return summed.sort_values(["Interval Start", "Publish Time"]).reset_index( drop=True, )
@support_date_range("5_MIN")
[docs] def get_load_forecast_short_term( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, drop_null_forecast_rows: bool = True, ) -> pd.DataFrame | None: """ 5-minute load forecast data for the SPP footprint (system-wide) for +/- 10 minutes. Also includes actual load. Data from https://portal.spp.org/pages/stlf-vs-actual Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info end (pd.Timestamp|str): end date drop_null_forecast_rows (bool): if True, drop rows with null forecast values Returns: pd.DataFrame: forecast as dataframe. """ result = self._get_short_term_forecast_data( date, base_url=BASE_LOAD_FORECAST_SHORT_TERM_URL, file_prefix="OP-STLF", buffer_minutes=2, ) if result is None: return None df, url = result # According to the docs, the end time col should be GMTIntervalEnd, but it's # only GMTInterval in the data df = self._post_process_load_forecast( df, url, forecast_type="SHORT_TERM", forecast_col="STLF", end_time_col="GMTInterval", interval_duration=pd.Timedelta(minutes=5), drop_null_forecast_rows=drop_null_forecast_rows, ) fill_baa_column(df, "STLF") return df
@support_date_range("HOUR_START")
[docs] def get_load_forecast_mid_term( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: """ Returns load forecast for +7 days in hourly intervals. Includes actual load for the past 24 hours. Data from https://portal.spp.org/pages/mtlf-vs-actual Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ result = self._get_mid_term_forecast_data( date, base_url=BASE_LOAD_FORECAST_MID_TERM_URL, file_prefix="OP-MTLF", buffer_minutes=10, ) if result is None: return None df, url = result df = self._post_process_load_forecast( df, url, forecast_type="MID_TERM", forecast_col="MTLF", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) return df
[docs] def get_load_forecast_by_baa( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns hourly load forecast by BAA from MTLF data.""" df = self._get_load_forecast_by_baa_raw(date=date, end=end, verbose=verbose) if df is None or df.empty: raise NoDataFoundException( f"No load forecast by BAA data found for {date}", ) return ( df.dropna(subset=["Load Forecast"]) .drop_duplicates( subset=["Interval Start", "Interval End", "Publish Time", "BAA"], keep="last", ) .sort_values(["Interval Start", "Publish Time"]) .reset_index(drop=True) )
@support_date_range("HOUR_START") def _get_load_forecast_by_baa_raw( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: result = self._get_mid_term_forecast_data( date=date, base_url=BASE_LOAD_FORECAST_MID_TERM_URL, file_prefix="OP-MTLF", buffer_minutes=10, ) if result is None: return None df, url = result df = self._post_process_load_forecast( df, url, forecast_type="MID_TERM", forecast_col="MTLF", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) fill_baa_column(df, "MTLF") return df[ ["Interval Start", "Interval End", "Publish Time", "BAA", "MTLF"] ].rename(columns={"MTLF": "Load Forecast"}) def _handle_dst_floor_date( self, date: pd.Timestamp, freq: str = "5min", ) -> pd.Timestamp: """Handle DST transition when flooring a date. Args: date: The date to floor freq: The frequency to floor to (e.g., "5min", "h") Returns: Timestamp floored to the specified frequency """ try: floored_date = date.floor(freq) except pytz.AmbiguousTimeError: floored_date = self.safe_for_dst_transition_floor(date, freq) return floored_date def _get_short_term_forecast_data( self, date: str | pd.Timestamp, base_url: str, file_prefix: str, buffer_minutes: int = 2, ) -> tuple[pd.DataFrame, str] | None: """Get short-term forecast data with common DST handling logic. Args: date: Date to get data for. Supports "latest" and "today" base_url: Base URL for downloads file_prefix: Prefix for the file name (e.g., "OP-STLF", "OP-STRF") buffer_minutes: Buffer minutes for "latest" date Returns: tuple: (dataframe, url) or None if date is in the future """ if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) # Files do not exist in the future if date > self.now(): return None floored_date = self._handle_dst_floor_date(date, "5min") hour = floored_date.hour padded_hour = str(hour).zfill(2) padded_hour_plus_one = str((hour + 1) % 24).zfill(2) # NOTE: this needs to be updated for DST every year # 0105d through 0155d have a "d" on 2025-11-02 add_d = ( floored_date.year == 2025 and floored_date.month == 11 and floored_date.day == 2 and floored_date.hour == 1 and floored_date.minute != 0 ) # The first hour in the URL is 1 after the hour in the filename. url = base_url + floored_date.strftime( f"/%Y/%m/%d/{padded_hour_plus_one}/{file_prefix}-%Y%m%d{padded_hour}%M{'d' if add_d else ''}.csv", ) logger.info(f"Downloading {url}") df = pd.read_csv(url) return df, url def _get_mid_term_forecast_data( self, date: str | pd.Timestamp, base_url: str, file_prefix: str, buffer_minutes: int = 10, ) -> tuple[pd.DataFrame, str] | None: """Get mid-term forecast data with common DST handling logic. Args: date: Date to get data for. Supports "latest" and "today" base_url: Base URL for downloads file_prefix: Prefix for the file name (e.g., "OP-MTLF", "OP-MTRF") buffer_minutes: Buffer minutes for "latest" date Returns: tuple: (dataframe, url) or None if date is in the future """ if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) # Files do not exist in the future if date > self.now(): return None floored_date = self._handle_dst_floor_date(date, "h") # For mid-term hourly forecasts during 2025 DST end there is a 0200d file. # Special case for DST end on 2025-11-02 add_d = ( floored_date.year == 2025 and floored_date.month == 11 and floored_date.day == 2 and floored_date.hour == 2 ) # Explicitly set the minutes to 00 in the URL url = base_url + floored_date.strftime( f"/%Y/%m/%d/{file_prefix}-%Y%m%d{str(floored_date.hour).zfill(2)}00{'d' if add_d else ''}.csv", ) logger.info(f"Downloading {url}") df = pd.read_csv(url) return df, url def _post_process_load_forecast( self, df: pd.DataFrame, url: str, forecast_type: str, forecast_col: str, end_time_col: str, interval_duration: pd.Timedelta, drop_null_forecast_rows: bool = True, ) -> pd.DataFrame: df = self._handle_market_end_to_interval(df, end_time_col, interval_duration) # Assume the publish time is in the name of the file. There are different # times on the webpage, but these could be the posting time. df["Publish Time"] = pd.Timestamp( re.search(r"[0-9]{12}", url).group(0), ).tz_localize( tz=self.default_timezone, # Assume the "d" file occurs during CST and Pandas wants ambiguous=True # during DST. ambiguous=not url.endswith("d.csv"), ) df.columns = [col.strip() for col in df.columns] df["Forecast Type"] = forecast_type df = ( utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time", "Forecast Type"], ) .drop(columns=["Time", "Interval"]) .sort_values(["Interval Start", "Publish Time"]) ) if drop_null_forecast_rows: df = df.dropna(subset=[forecast_col]) return df.reset_index(drop=True) @support_date_range("5_MIN")
[docs] def get_solar_and_wind_forecast_short_term( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: """ Returns solar and wind generation forecast for +4 hours in 5 minute intervals. Include actuals for past day in 5 minute intervals. Data from https://portal.spp.org/pages/shortterm-resource-forecast Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ result = self._get_short_term_forecast_data( date, base_url=BASE_SOLAR_AND_WIND_SHORT_TERM_URL, file_prefix="OP-STRF", buffer_minutes=2, ) if result is None: return None df, url = result # According to the docs, the end time col should be GMTIntervalEnd, but it's # only GMTInterval in the data df = self._post_process_solar_and_wind_forecast( df, url, forecast_type="SHORT_TERM", end_time_col="GMTInterval", interval_duration=pd.Timedelta(minutes=5), ) return df
@support_date_range("HOUR_START")
[docs] def get_solar_and_wind_forecast_mid_term( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: """ Returns solar and wind generation forecast for +7 days in hourly intervals. Data from https://portal.spp.org/pages/midterm-resource-forecast. Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ result = self._get_mid_term_forecast_data( date, base_url=BASE_SOLAR_AND_WIND_MID_TERM_URL, file_prefix="OP-MTRF", buffer_minutes=10, ) if result is None: return None df, url = result df = self._post_process_solar_and_wind_forecast( df, url, forecast_type="MID_TERM", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) return df
[docs] def get_load_by_baa( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns actual load by BAA from short-term load forecast data.""" df = self._get_load_by_baa_raw(date=date, end=end, verbose=verbose) if df is None or df.empty: return pd.DataFrame( columns=["Interval Start", "Interval End", "BAA", "Load"], ) return ( df.dropna(subset=["Load"]) .drop_duplicates( subset=["Interval Start", "Interval End", "BAA"], keep="last", ) .sort_values("Interval Start") .reset_index(drop=True) )
@support_date_range(frequency="5_MIN") def _get_load_by_baa_raw( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: result = self._get_short_term_forecast_data( date=date, base_url=BASE_LOAD_FORECAST_SHORT_TERM_URL, file_prefix="OP-STLF", buffer_minutes=2, ) if result is None: return None df, url = result df = self._post_process_load_forecast( df, url, forecast_type="SHORT_TERM", end_time_col="GMTInterval", interval_duration=pd.Timedelta(minutes=5), forecast_col="STLF", drop_null_forecast_rows=False, ) fill_baa_column(df, "Actual") return ( df[["Interval Start", "Interval End", "BAA", "Actual"]] .rename(columns={"Actual": "Load"}) .copy() ) @support_date_range("DAY_START")
[docs] def get_load_by_baa_hourly( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame | None: """Returns hourly actual load by BAA from mid-term load forecast data.""" if date == "latest": fetch_date: str | pd.Timestamp = date else: date_ts = utils._handle_date(date, self.default_timezone) now = self.now() if date_ts.normalize() == now.normalize(): fetch_date = now - pd.Timedelta(minutes=10) else: fetch_date = date_ts.normalize() + pd.Timedelta(hours=23) result = self._get_mid_term_forecast_data( date=fetch_date, base_url=BASE_LOAD_FORECAST_MID_TERM_URL, file_prefix="OP-MTLF", buffer_minutes=10, ) if result is None: return pd.DataFrame( columns=["Interval Start", "Interval End", "BAA", "Load"], ) df, url = result df = self._post_process_load_forecast( df, url, forecast_type="MID_TERM", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), forecast_col="MTLF", ) fill_baa_column(df, "Averaged Actual") result_df = ( df[["Interval Start", "Interval End", "BAA", "Averaged Actual"]] .rename(columns={"Averaged Actual": "Load"}) .copy() ) if date != "latest": date_ts = utils._handle_date(date, self.default_timezone) day_start = date_ts.normalize() day_end = day_start + pd.Timedelta(days=1) result_df = result_df[ (result_df["Interval Start"] >= day_start) & (result_df["Interval Start"] < day_end) ] return ( result_df.dropna(subset=["Load"]) .sort_values("Interval Start") .reset_index(drop=True) )
def _post_process_solar_and_wind_forecast( self, df: pd.DataFrame, url: str, forecast_type: str, end_time_col: str, interval_duration: pd.Timedelta, ) -> pd.DataFrame: df = self._handle_market_end_to_interval(df, end_time_col, interval_duration) # Assume the publish time is in the name of the file. There are different # times on the webpage, but these could be the posting time. df["Publish Time"] = pd.Timestamp( re.search(r"[0-9]{12}", url).group(0), ).tz_localize( tz=self.default_timezone, # Assume the "d" file occurs during CST and Pandas wants ambiguous=True # during DST. ambiguous=not url.endswith("d.csv"), ) df.columns = [col.strip() for col in df.columns] df["Forecast Type"] = forecast_type df = ( utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time", "Forecast Type"], ) .drop(columns=["Time", "Interval"]) .sort_values(["Interval Start", "Publish Time"]) ) return df.dropna(subset=["Wind Forecast MW", "Solar Forecast MW"]).reset_index( drop=True, ) def _mid_term_solar_and_wind_url(self, date: pd.Timestamp) -> str: # Explicitly set the minutes to 00. return BASE_SOLAR_AND_WIND_MID_TERM_URL + date.strftime( "/%Y/%m/%d/OP-MTRF-%Y%m%d%H00.csv", ) def _mid_term_load_forecast_url(self, date: pd.Timestamp) -> str: # Explicitly set the minutes to 00. return BASE_LOAD_FORECAST_MID_TERM_URL + date.strftime( "/%Y/%m/%d/OP-MTLF-%Y%m%d%H00.csv", ) def _handle_market_end_to_interval( self, df: pd.DataFrame, column: str, interval_duration: pd.Timedelta, format: str | None = None, ) -> pd.DataFrame: """Converts market end time to interval end time""" df = df.rename( columns={ column: "Interval End", }, ) df["Interval End"] = pd.to_datetime( df["Interval End"], utc=True, format=format, ).dt.tz_convert(self.default_timezone) df["Interval Start"] = df["Interval End"] - interval_duration df["Time"] = df["Interval Start"] df = utils.move_cols_to_front(df, ["Time", "Interval Start", "Interval End"]) return df _ver_curtailment_numerical_cols = [ "Wind Redispatch Curtailments", "Wind Manual Curtailments", "Wind Curtailed For Energy", "Solar Redispatch Curtailments", "Solar Manual Curtailments", "Solar Curtailed For Energy", ] def _process_ver_curtailments(self, df: pd.DataFrame) -> pd.DataFrame: df = df.rename( columns={ "WindRedispatchCurtailments": "Wind Redispatch Curtailments", "WindManualCurtailments": "Wind Manual Curtailments", "WindCurtailedForEnergy": "Wind Curtailed For Energy", "SolarRedispatchCurtailments": "Solar Redispatch Curtailments", "SolarManualCurtailments": "Solar Manual Curtailments", "SolarCurtailedForEnergy": "Solar Curtailed For Energy", }, ) df = self._handle_market_end_to_interval( df, column="GMTIntervalEnding", interval_duration=pd.Timedelta(minutes=5), ) cols = [ "Interval Start", "Interval End", "Wind Redispatch Curtailments", "Wind Manual Curtailments", "Wind Curtailed For Energy", "Solar Redispatch Curtailments", "Solar Manual Curtailments", "Solar Curtailed For Energy", "BAA", ] # historical data doesnt have all columns for col in cols: if col not in df.columns: # Default BAA to SPP for historical data df[col] = pd.NA if col != "BAA" else BAAEnum.SPP df = df[cols] # Drop rows where all numerical curtailment columns are NaN df = df.dropna(subset=self._ver_curtailment_numerical_cols, how="all") df = df[~df["Interval Start"].isnull()] df = df.sort_values("Interval Start").reset_index(drop=True) return df def _aggregate_ver_curtailments(self, df: pd.DataFrame) -> pd.DataFrame: """Sum VER curtailment numerical columns across BAAs.""" df = ( df.groupby(["Interval Start", "Interval End"], as_index=False)[ self._ver_curtailment_numerical_cols ] .sum() .sort_values("Interval Start") .reset_index(drop=True) ) return df @support_date_range("DAY_START")
[docs] def get_capacity_of_generation_on_outage( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Capacity of Generation on Outage. Published daily at 8am CT for next 7 days Args: date: start date end: end date """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/capacity-of-generation-on-outage?path=/{date.strftime('%Y')}/{date.strftime('%m')}/Capacity-Gen-Outage-{date.strftime('%Y%m%d')}.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_capacity_of_generation_on_outage(df, publish_time=date)
[docs] def get_capacity_of_generation_on_outage_annual( self, year: int, verbose: bool = True, ) -> pd.DataFrame: """Get VER Curtailments for a year. Starting 2014. Recent data use get_capacity_of_generation_on_outage Args: year: year to get data for verbose: print url Returns: pd.DataFrame: VER Curtailments """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/capacity-of-generation-on-outage?path=/{year}/{year}.zip" # noqa def process_csv(df, file_name): # infer date from '2020/01/Capacity-Gen-Outage-20200101.csv' publish_time_str = file_name.split(".")[0].split("-")[-1] publish_time = pd.to_datetime(publish_time_str).tz_localize( self.default_timezone, ) df = self._process_capacity_of_generation_on_outage(df, publish_time) return df df = utils.download_csvs_from_zip_url( url, process_csv=process_csv, verbose=verbose, ) df = df.sort_values("Interval Start") return df
def _process_capacity_of_generation_on_outage( self, df: pd.DataFrame, publish_time: pd.Timestamp, ) -> pd.DataFrame: # strip whitespace from column names df = df.rename(columns=lambda x: x.strip()) df = self._handle_market_end_to_interval( df, column="Market Hour", interval_duration=pd.Timedelta(minutes=60), ) df = df.rename( columns={ "Outaged MW": "Total Outaged MW", }, ) publish_time = pd.to_datetime(publish_time.normalize()) df.insert(0, "Publish Time", publish_time) # drop Time column df = df.drop(columns=["Time"]) return df def _fetch_ver_curtailments_daily(self, date: pd.Timestamp) -> pd.DataFrame: """Fetch and process a single day's VER curtailments CSV.""" url = f"{FILE_BROWSER_DOWNLOAD_URL}/ver-curtailments?path=/{date.strftime('%Y')}/{date.strftime('%m')}/VER-Curtailments-{date.strftime('%Y%m%d')}.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_ver_curtailments(df) def _fetch_ver_curtailments_annual( self, year: int, verbose: bool = True, ) -> pd.DataFrame: """Fetch and process a full year's VER curtailments zip.""" url = f"{FILE_BROWSER_DOWNLOAD_URL}/ver-curtailments?path=/{year}/{year}.zip" # noqa df = utils.download_csvs_from_zip_url(url, verbose=verbose) return self._process_ver_curtailments(df) @support_date_range("DAY_START")
[docs] def get_ver_curtailments( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get VER Curtailments summed across BAAs. Supports recent data. For historical annual data use get_ver_curtailments_annual. For data broken down by BAA use get_ver_curtailments_by_baa. Args: date: start date end: end date """ df = self._fetch_ver_curtailments_daily(date) return self._aggregate_ver_curtailments(df)
[docs] def get_ver_curtailments_annual( self, year: int, verbose: bool = True, ) -> pd.DataFrame: """Get VER Curtailments summed across BAAs for a year. Starting 2014. Recent data use get_ver_curtailments. For data broken down by BAA use get_ver_curtailments_by_baa_annual. Args: year: year to get data for verbose: print url Returns: pd.DataFrame: VER Curtailments """ df = self._fetch_ver_curtailments_annual(year, verbose=verbose) return self._aggregate_ver_curtailments(df)
@support_date_range("DAY_START")
[docs] def get_ver_curtailments_by_baa( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get VER Curtailments broken down by BAA. Supports recent data. For historical annual data use get_ver_curtailments_by_baa_annual. Args: date: start date end: end date """ return self._fetch_ver_curtailments_daily(date)
[docs] def get_ver_curtailments_by_baa_annual( self, year: int, verbose: bool = True, ) -> pd.DataFrame: """Get VER Curtailments broken down by BAA for a year. Starting 2014. Recent data use get_ver_curtailments_by_baa. Args: year: year to get data for verbose: print url Returns: pd.DataFrame: VER Curtailments """ return self._fetch_ver_curtailments_annual(year, verbose=verbose)
def _get_load_and_forecast(self, verbose: bool = False) -> pd.DataFrame: url = f"{MARKETPLACE_BASE_URL}/chart-api/load-forecast/asChart" logger.info(f"Getting load and forecast from {url}") r = self._get_json(url)["response"] data = {"Time": r["labels"]} for d in r["datasets"][:3]: if d["label"] == "Actual Load": data["Actual Load"] = d["data"] elif d["label"] == "Mid-Term Load Forecast": data["Mid-Term Forecast"] = d["data"] elif d["label"] == "Short-Term Load Forecast": data["Short-Term Forecast"] = d["data"] df = pd.DataFrame(data) df["Time"] = pd.to_datetime( df["Time"], ).dt.tz_convert(self.default_timezone) return df # todo where does date got in argument order # def get_historical_lmp(self, date, market: str, nodes: list): # 5 minute interal data # {FILE_BROWSER_API_URL}/rtbm-lmp-by-location?path=/2022/08/By_Interval/08 # /RTBM-LMP-SL-202208082125.csv # historical generation mix # https://marketplace.spp.org/pages/generation-mix-rolling-365 # https://marketplace.spp.org/chart-api/gen-mix-365/asFile # 15mb file with five minute resolution
[docs] def get_raw_interconnection_queue(self, verbose: bool = False) -> BinaryIO: url = "https://opsportal.spp.org/Studies/GenerateSummaryCSV" logger.info(f"Getting interconnection queue from {url}") response = requests.get(url) return utils.get_response_blob(response)
[docs] def get_interconnection_queue(self, verbose: bool = False) -> pd.DataFrame: """Get interconnection queue Returns: pandas.DataFrame: Interconnection queue """ raw_data = self.get_raw_interconnection_queue(verbose) queue = pd.read_csv(raw_data, skiprows=1) queue["Status (Original)"] = queue["Status"] completed_val = InterconnectionQueueStatus.COMPLETED.value active_val = InterconnectionQueueStatus.ACTIVE.value withdrawn_val = InterconnectionQueueStatus.WITHDRAWN.value queue["Status"] = queue["Status"].map( { "IA FULLY EXECUTED/COMMERCIAL OPERATION": completed_val, "IA FULLY EXECUTED/ON SCHEDULE": completed_val, "IA FULLY EXECUTED/ON SUSPENSION": completed_val, "IA PENDING": active_val, "DISIS STAGE": active_val, "None": active_val, "WITHDRAWN": withdrawn_val, }, ) queue["Generation Type"] = queue[["Generation Type", "Fuel Type"]].apply( lambda x: " - ".join(x.dropna()), axis=1, ) queue["Proposed Completion Date"] = queue["Commercial Operation Date"] rename = { "Generation Interconnection Number": "Queue ID", " Nearest Town or County": "County", "State": "State", "TO at POI": "Transmission Owner", "Capacity": "Capacity (MW)", "MAX Summer MW": "Summer Capacity (MW)", "MAX Winter MW": "Winter Capacity (MW)", "Generation Type": "Generation Type", "Request Received": "Queue Date", "Substation or Line": "Interconnection Location", "Date Withdrawn": "Withdrawn Date", } # todo: there are a few columns being parsed # as "unamed" that aren't being included but should extra_columns = [ "In-Service Date", "Commercial Operation Date", "Cessation Date", "Current Cluster", "Cluster Group", "Service Type", "Status (Original)", ] missing = [ "Project Name", "Interconnecting Entity", "Withdrawal Comment", "Actual Completion Date", ] queue = utils.format_interconnection_df( queue=queue, rename=rename, extra=extra_columns, missing=missing, ) return queue
[docs] def get_lmp_real_time_5_min_by_location( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, location_type: str = LOCATION_TYPE_ALL, verbose: bool = False, use_daily_files: bool = False, ) -> pd.DataFrame: """Get LMP data by location for the Real-Time 5 Minute Market Args: date: date to get data for end: end date location_type: location type to get data for. Options are: - ``ALL`` (LOCATION_TYPE_ALL) - ``Hub`` (LOCATION_TYPE_HUB) - ``Interface`` (LOCATION_TYPE_INTERFACE) - ``Settlement Location`` (LOCATION_TYPE_SETTLEMENT_LOCATION) verbose: print url use_daily_files: if True, use daily files instead of 5 minute files. """ if use_daily_files: df = self._get_real_time_5_min_data_from_daily_files( date, end=end, location_type=location_type, verbose=verbose, ) df.columns = df.columns.str.strip() df = df.rename( columns={ "GMT Interval": "GMTIntervalEnd", "Settlement Location Name": "Settlement Location", "PNODE Name": "PNode", }, ) else: df = self._get_real_time_5_min_data( date, end=end, location_type=location_type, verbose=verbose, ) return self._finalize_spp_df( df, market=Markets.REAL_TIME_5_MIN, location_type=location_type, verbose=verbose, )
[docs] def get_lmp_real_time_5_min_by_bus( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get LMP data by bus for the Real-Time 5 Minute Market Args: date: date to get data for end: end date verbose: print url NOTE: does not take a location_type argument because it always returns LOCATION_TYPE_BUS. """ return self._finalize_spp_df( self._get_real_time_5_min_data( date, end=end, location_type=LOCATION_TYPE_BUS, verbose=verbose, ), market=Markets.REAL_TIME_5_MIN, location_type=LOCATION_TYPE_BUS, verbose=verbose, )
@support_date_range(frequency="5_MIN") def _get_real_time_5_min_data( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, location_type: str = LOCATION_TYPE_ALL, verbose: bool = False, ) -> pd.DataFrame: """ Internal function that consolidates logic for getting LMP data for the Real-Time 5 Minute Market """ if location_type not in self.location_types: raise NotSupported(f"Location type {location_type} not supported") if location_type == LOCATION_TYPE_BUS: endpoint = RTBM_LMP_BY_BUS file_prefix = "RTBM-LMP-B" else: endpoint = FS_RTBM_LMP_BY_LOCATION file_prefix = "RTBM-LMP-SL" if date == "latest": url = f"https://portal.spp.org/file-browser-api/download/{endpoint}?path=%2F{file_prefix}-latestInterval.csv" else: url = self._format_5_min_url(date, end, endpoint, file_prefix) logger.info(f"Getting data for {date} from {url}") df = pd.read_csv(url) return df @support_date_range(frequency="DAY_START") def _get_real_time_5_min_data_from_daily_files( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, location_type: str = LOCATION_TYPE_ALL, verbose: bool = False, ) -> pd.DataFrame: """ Internal function that consolidates logic for getting LMP data for the Real-Time 5 Minute Market """ if location_type not in self.location_types: raise NotSupported(f"Location type {location_type} not supported") if location_type == LOCATION_TYPE_BUS: endpoint = RTBM_LMP_BY_BUS file_prefix = "RTBM-LMP-DAILY-B" else: endpoint = FS_RTBM_LMP_BY_LOCATION file_prefix = "RTBM-LMP-DAILY-SL" url = self._format_daily_url(date, end, endpoint, file_prefix) logger.info(f"Getting data for {date} from {url} (daily file)") df = pd.read_csv(url) return df @support_date_range(frequency="DAY_START")
[docs] def get_lmp_day_ahead_hourly( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, location_type: str = LOCATION_TYPE_ALL, verbose: bool = False, ) -> pd.DataFrame: """Get day ahead hourly LMP data Supported Location Types: - ``Hub`` - ``Interface`` - ``ALL`` """ if location_type not in self.location_types: raise NotSupported(f"Location type {location_type} not supported") if date == "latest": raise NotSupported("Latest not supported for day ahead hourly") df = self._get_dam_lmp(date, location_type=location_type, verbose=verbose) return self._finalize_spp_df( df, market=Markets.DAY_AHEAD_HOURLY, location_type=location_type, verbose=verbose, include_baa=True, )
@support_date_range(frequency="DAY_START")
[docs] def get_lmp_day_ahead_hourly_by_bus( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get day ahead hourly LMP data by bus (pnode-level). Args: date: date to get data for end: end date verbose: print url NOTE: does not take a location_type argument because it always returns LOCATION_TYPE_BUS. """ if date == "latest": raise NotSupported("Latest not supported for day ahead hourly by bus") df = self._get_dam_lmp(date, location_type=LOCATION_TYPE_BUS, verbose=verbose) return self._finalize_spp_df( df, market=Markets.DAY_AHEAD_HOURLY, location_type=LOCATION_TYPE_BUS, verbose=verbose, include_baa=True, )
def _get_feature_data(self, base_url: str, verbose: bool = False) -> pd.DataFrame: """Fetches data from ArcGIS Map Service with Feature Data Returns: pd.DataFrame of features """ args = { "f": "json", "where": "OBJECTID IS NOT NULL", "returnGeometry": "false", "outFields": "*", } doc = self._get_json(base_url, verbose=verbose, params=args) df = pd.DataFrame([feature["attributes"] for feature in doc["features"]]) return df def _get_dam_lmp( self, date: pd.Timestamp, location_type: str = LOCATION_TYPE_ALL, verbose: bool = False, ) -> pd.DataFrame: if location_type == LOCATION_TYPE_BUS: endpoint = FS_DAM_LMP_BY_BUS file_prefix = "DA-LMP-B" else: endpoint = FS_DAM_LMP_BY_LOCATION file_prefix = "DA-LMP-SL" url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/{date.strftime('%Y')}/{date.strftime('%m')}/By_Day/{file_prefix}-{date.strftime('%Y%m%d')}0100.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return df def _finalize_spp_df( self, df: pd.DataFrame, market: Markets, location_type: str, verbose: bool = False, include_baa: bool = False, ) -> pd.DataFrame: """ Finalizes DataFrame: - Sets Market - Filters by location type if needed - Sets location type - Renames and ordering columns - Filters by Location - Resets the index Arguments: pandas.DataFrame: DataFrame with SPP data market (str): Market location_type (str): Location type verbose (bool, optional): Verbose output include_baa (bool, optional): Include BAA column. If BAA is not present and this is True, it will be added with the default value of "SPP" """ if market == Markets.REAL_TIME_5_MIN: interval_duration = pd.Timedelta(minutes=5) elif market == Markets.DAY_AHEAD_HOURLY: interval_duration = pd.Timedelta(hours=1) else: raise ValueError(f"Market {market} not supported") df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=interval_duration, ) df["Market"] = market.value if location_type != LOCATION_TYPE_BUS: df["Location"] = df["Settlement Location"] df["Location Type"] = ( df["Location"] .map( LMP_HUBS_AND_INTERFACES, ) .fillna(LOCATION_TYPE_SETTLEMENT_LOCATION) ) df = utils.filter_lmp_locations(df, location_type=location_type) else: df["Location"] = df["Pnode"] df["Location Type"] = LOCATION_TYPE_BUS df = df.rename( columns={ "Pnode": "PNode", "LMP": "LMP", # for posterity "MLC": "Loss", "MCC": "Congestion", "MEC": "Energy", }, ) if include_baa and "BAA" not in df.columns: df["BAA"] = "SPP" # Insert BAA before location if it exists cols = [ "Time", "Interval Start", "Interval End", "Market", "Location", "Location Type", "PNode", "LMP", "Energy", "Congestion", "Loss", ] if "BAA" in df.columns: cols.insert(cols.index("Location"), "BAA") df = df[cols] df = df.reset_index(drop=True) # Since Location = PNode for bus, we can drop PNode if location_type == LOCATION_TYPE_BUS: df = df.drop(columns=["PNode"]) return df.sort_values(["Time", "Location"]) @support_date_range("5_MIN")
[docs] def get_operating_reserves( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/operating-reserves?path=/RTBM-OR-latestInterval.csv" # noqa else: url = self._format_5_min_url( date, end, OPERATING_RESERVES, "RTBM-OR", include_interval=False, ) logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_operating_reserves(df)
def _process_operating_reserves(self, df: pd.DataFrame) -> pd.DataFrame: df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) # don't need this column df = df.drop(columns=["Interval"]) df = df.rename( columns={ "RegUP_Clr": "Reg_Up_Cleared", "RegDN_Clr": "Reg_Dn_Cleared", "RampUP_Clr": "Ramp_Up_Cleared", "RampDN_Clr": "Ramp_Dn_Cleared", "UncUP_Clr": "Unc_Up_Cleared", "STSUncUP_Clr": "STS_Unc_Up_Cleared", "Spin_Clr": "Spin_Cleared", "Supp_Clr": "Supp_Cleared", }, ) return df
[docs] def get_as_prices_real_time_5_min( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, use_daily_files: bool = False, ) -> pd.DataFrame: """ Provides Marginal Clearing Price information by Reserve Zone for each Real-Time 5-minute Market solution. Args: date: date to get data for. Supports "latest" for most recent interval. end: end date verbose: print url use_daily_files: if True, use daily files instead of 5 minute files. Returns: pd.DataFrame: Real-Time 5-minute Marginal Clearing Prices """ if use_daily_files: return self._get_as_prices_real_time_5_min_from_daily_files( date, end=end, verbose=verbose, ) else: return self._get_as_prices_real_time_5_min_from_intervals( date, end=end, verbose=verbose, )
@support_date_range("5_MIN") def _get_as_prices_real_time_5_min_from_intervals( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get AS prices from 5-minute interval files.""" if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/rtbm-mcp?path=/RTBM-MCP-latestInterval.csv" else: url = self._format_5_min_url( date, end, endpoint=RTBM_MCP, file_prefix="RTBM-MCP", include_interval=True, ) logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_as_prices_real_time(df) @support_date_range("DAY_START") def _get_as_prices_real_time_5_min_from_daily_files( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get AS prices from daily files.""" if date == "latest": raise ValueError("Latest not supported with daily files") url = self._format_daily_mcp_url(date, end) logger.info(f"Downloading {url}") df = pd.read_csv(url) # Strip whitespace from column names for daily files df.columns = df.columns.str.strip() return self._process_as_prices_real_time(df) def _process_as_prices_real_time(self, df: pd.DataFrame) -> pd.DataFrame: df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) df.columns = df.columns.str.strip() # Column mapping to match day-ahead format column_mapping = { "RegUPService": "Reg Up Service", "RegDNService": "Reg DN Service", "RegUpMile": "Reg Up Mile", "RegDNMile": "Reg DN Mile", "RampUP": "Ramp Up", "RampDN": "Ramp DN", "Spin": "Spin", "Supp": "Supp", "UncUP": "Unc Up", } df = df.rename(columns=column_mapping) cols_to_keep = [ "Interval Start", "Interval End", "Reserve Zone", ] + list(column_mapping.values()) return df.sort_values(["Interval Start", "Reserve Zone"]).reset_index( drop=True, )[cols_to_keep] def _format_daily_mcp_url( self, start: pd.Timestamp, end: pd.Timestamp | None, ) -> str: """ Formats the URL for daily MCP data files. Args: start (pd.Timestamp): Start date of the data. end (pd.Timestamp): End date of the data. Returns: str: The formatted URL. """ folder_year = start.strftime("%Y") folder_month = start.strftime("%m") url = f"{FILE_BROWSER_DOWNLOAD_URL}/{RTBM_MCP}?path=/{folder_year}/{folder_month}/By_Day/RTBM-MCP-DAILY-{start.strftime('%Y%m%d')}.csv" return url @support_date_range("DAY_START")
[docs] def get_day_ahead_operating_reserve_prices( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Provides Marginal Clearing Price information by Reserve Zone for each Day-Ahead Market solution for each Operating Day. Posting is updated each day after the DA Market results are posted. Available at https://portal.spp.org/pages/da-mcp# Args: date: date to get data for end: end date verbose: print url Returns: pd.DataFrame: Day Ahead Marginal Clearing Prices """ if date == "latest": raise ValueError( "Latest not supported for Day Ahead Marginal Clearing Prices", ) url = f"{FILE_BROWSER_DOWNLOAD_URL}/da-mcp?path=/{date.strftime('%Y')}/{date.strftime('%m')}/DA-MCP-{date.strftime('%Y%m%d')}0100.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_day_ahead_operating_reserve_prices(df)
def _process_day_ahead_operating_reserve_prices( self, df: pd.DataFrame, ) -> pd.DataFrame: df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ).assign(Market="DAM") column_mapping = { "RegUP": "Reg_Up", "RegDN": "Reg_Dn", "RampUP": "Ramp_Up", "RampDN": "Ramp_Dn", "Spin": "Spin", "Supp": "Supp", "UncUP": "Unc_Up", } df = df.rename(columns=column_mapping) cols_to_keep = [ "Interval Start", "Interval End", "Market", "Reserve Zone", ] + list( column_mapping.values(), ) # Older datasets might not have all the reserve types return df[[c for c in cols_to_keep if c in df]] @support_date_range("5_MIN")
[docs] def get_lmp_real_time_weis( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get LMP data for real time WEIS Args: date: date to get data for. if end is not provided, will get data for 5 minute interval that date is in. end: end date verbose: print url """ endpoint = "lmp-by-settlement-location-weis" # if no end, find nearest 5 minute interval end # to use if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/WEIS-RTBM-LMP-SL-latestInterval.csv" # noqa else: url = self._format_5_min_url(date, end, endpoint, "WEIS-RTBM-LMP-SL") # todo before 2022 only annual files are available # TODO: sometimes there are missing interval files (example: https://portal.spp.org/pages/lmp-by-settlement-location-weis#%2F2024%2F01%2FBy_Interval%2F21) # noqa # We can't do anything in these cases but log a message logger.info(f"Downloading {url}") try: df = pd.read_csv(url) except ConnectionResetError as e: logger.error(f"Error downloading {url}: {e}") return pd.DataFrame() return self._process_lmp_real_time_weis(df)
def _process_lmp_real_time_weis(self, df: pd.DataFrame) -> pd.DataFrame: # strip whitespace from column names df = df.rename(columns=lambda x: x.strip()) df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) df["Location Type"] = LOCATION_TYPE_SETTLEMENT_LOCATION df["Market"] = "REAL_TIME_WEIS" df = df.rename( columns={ "Settlement Location": "Location", "Pnode": "PNode", "LMP": "LMP", # for posterity "MLC": "Loss", "MCC": "Congestion", "MEC": "Energy", }, ) df = df[ [ "Interval Start", "Interval End", "Market", "Location", "Location Type", "PNode", "LMP", "Energy", "Congestion", "Loss", ] ] return df def _format_5_min_url( self, start: pd.Timestamp, end: pd.Timestamp | None, endpoint: str, file_prefix: str, include_interval: bool = True, ) -> str: # Folder path is based on start date. File name is based on end date. # As an example, the file with the name 202407010000 representing the interval # 2024-06-30 23:55:00 to 2024-07-01 00:00:00 is in the folder # 2024/06/By_Interval/30/ if end is None: end = start + FiveMinOffset() else: # To deal with DST, convert to UTC before ceil end = end.tz_convert("UTC").ceil("5min").tz_convert(self.default_timezone) folder_year = start.strftime("%Y") folder_month = start.strftime("%m") folder_day = start.strftime("%d") interval_str = "/By_Interval" if include_interval else "" url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/{folder_year}/{folder_month}{interval_str}/{folder_day}/{file_prefix}-{end.strftime('%Y%m%d%H%M')}.csv" # noqa # Intervals that occur after DST end during the repeated hour have a "d" suffix # Identify these intervals by the offset of the end time. Since CDT is UTC-5 and # CST is UTC-6, if the UTC offset is larger than the offset of the hour before, # it's a repeated hour in CST. if abs(end.utcoffset()) > abs((end - pd.Timedelta(hours=1)).utcoffset()): url = url.split(".csv")[0] + "d.csv" status_code = requests.head(url).status_code if status_code == 200: return url else: raise NoDataFoundException(f"No data found for {url}") def _format_daily_url( self, start: pd.Timestamp, end: pd.Timestamp | None, endpoint: str, file_prefix: str, ) -> str: """ Formats the URL for daily data files. Args: start (pd.Timestamp): Start date of the data. end (pd.Timestamp): End date of the data. endpoint (str): The API endpoint to use. file_prefix (str): The prefix for the file name. Returns: str: The formatted URL. """ folder_year = start.strftime("%Y") folder_month = start.strftime("%m") url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/{folder_year}/{folder_month}/By_Day/{file_prefix}-{start.strftime('%Y%m%d')}.csv" # noqa: E501 return url def _get_location_list( self, location_type: str, verbose: bool = False, ) -> list[str]: if location_type == LOCATION_TYPE_HUB: df = self._get_feature_data(QUERY_RTM5_HUBS_URL, verbose=verbose) elif location_type == LOCATION_TYPE_INTERFACE: df = self._get_feature_data( QUERY_RTM5_INTERFACES_URL, verbose=verbose, ) else: raise ValueError(f"Invalid location_type: {location_type}") return df["SETTLEMENT_LOCATION"].unique().tolist() def _fetch_and_concat_csvs( self, urls: list[str], verbose: bool = False, ) -> pd.DataFrame: all_dfs = [] for url in tqdm.tqdm(urls): logger.info(f"Fetching {url}") df = pd.read_csv(url) all_dfs.append(df) return pd.concat(all_dfs) def _get_marketplace_session(self) -> dict: """ Returns a session object for the Marketplace API """ html = requests.get(FILE_BROWSER_API_URL) jsessionid = html.cookies.get("JSESSIONID") xsrf_token = html.cookies.get("XSRF-TOKEN") return { "cookies": {"JSESSIONID": jsessionid, "XSRF-TOKEN": xsrf_token}, "headers": { "X-XSRF-TOKEN": xsrf_token, }, } @staticmethod def _match( needles: list[str], haystacks: list[str], needle_norm_fn: Callable[[str], str] = lambda x: x.lower(), haystack_norm_fn: Callable[[str], str] = lambda x: x.lower(), ) -> list[str]: """Returns items from haystacks if any needles are in them""" return [ haystack for haystack in haystacks if any( needle_norm_fn(needle) in haystack_norm_fn(haystack) for needle in needles ) ] @support_date_range("DAY_START")
[docs] def get_hourly_load_historical( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Hourly Load in the legacy wide format (before 2026-03-24). Deprecated: SPP changed the hourly load data format on 2026-03-24. Use get_hourly_load for data on or after 2026-03-24. Args: date: start date (must be before 2026-03-24) end: end date Returns: pd.DataFrame: Hourly Load in wide format """ if date >= HOURLY_LOAD_WIDE_FORMAT_END_DATE: raise NotSupported( "SPP changed the hourly load data format on 2026-03-24. " "Use get_hourly_load for data on or after 2026-03-24.", ) url = f"{FILE_BROWSER_DOWNLOAD_URL}/hourly-load?path=/{date.strftime('%Y')}/DAILY_HOURLY_LOAD-{date.strftime('%Y%m%d')}.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_hourly_load(df)
@support_date_range("DAY_START")
[docs] def get_hourly_load( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Hourly Load in the long format (on or after 2026-03-24). Args: date: start date (must be on or after 2026-03-24) end: end date Returns: pd.DataFrame: Hourly Load with columns Time, Interval Start, Interval End, Balancing Area Name, Control Zone Name, Forecast Area Type, Load """ if date in ["today", "latest"] or utils.is_today( date, tz=self.default_timezone, ): raise NoDataFoundException("Data is on at least a one day delay") if date < HOURLY_LOAD_WIDE_FORMAT_END_DATE: raise NoDataFoundException( "Data before 2026-03-24 uses the legacy wide format. " "Use get_hourly_load_historical for data before 2026-03-24.", ) url = f"{FILE_BROWSER_DOWNLOAD_URL}/hourly-load?path=/{date.strftime('%Y')}/DAILY_HOURLY_LOAD-{date.strftime('%Y%m%d')}.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_hourly_load_long(df)
[docs] def get_hourly_load_annual(self, year: int, verbose: bool = True) -> pd.DataFrame: """Get Hourly Load for a year. Starting 2011. For recent data use `get_hourly_load` Args: year: year to get data for verbose: print url Returns: pd.DataFrame: Hourly Load """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/hourly-load?path=/{year}/{year}.zip" # noqa df = utils.download_csvs_from_zip_url( url=url, verbose=verbose, strip_whitespace_from_cols=True, ) return self._process_hourly_load(df)
def _process_hourly_load(self, df: pd.DataFrame) -> pd.DataFrame: """Process hourly load data in the legacy wide format. Deprecated: This method handles the wide format used before 2026-03-24. For data on or after 2026-03-24, use _process_hourly_load_long instead. """ if "Market Hour" in df.columns: raise NotSupported( "SPP changed the hourly load data format on 2026-03-24 from wide " "to long. This method only supports the wide format used before " "2026-03-24. Use get_hourly_load with dates on or after 2026-03-24 " "for the new long format.", ) # Some column names contain leading whitespace in some files - remove it df = df.rename(columns=lambda x: x.strip()) # Some files contain null rows. Drop them. df = df.dropna(how="all") # Some files don't have time 00:00 for the first interval in a day # for example 12/2/2016 instead of 12/2/2016 00:00. This causes datetime # conversion problems. This fixes it. def clean_market_hour(val): if val.endswith(":00"): return val return val + " 00:00" df["MarketHour"] = df["MarketHour"].apply(clean_market_hour) df = self._handle_market_end_to_interval( df, column="MarketHour", interval_duration=pd.Timedelta(minutes=60), format="mixed", ) time_cols = [ "Time", "Interval Start", "Interval End", ] load_cols = [ "CSWS", "EDE", "GRDA", "INDN", "KACY", "KCPL", "LES", "MPS", "NPPD", "OKGE", "OPPD", "SECI", "SPRM", "SPS", "WAUE", "WFEC", "WR", ] all_cols = time_cols + load_cols # historical data doesn't have all columns for c in all_cols: if c not in df.columns: df[c] = pd.NA df = df[all_cols] df = df[~df["Interval Start"].isnull()].drop_duplicates() df = df.sort_values("Time") df["System Total"] = df[load_cols].sum(axis=1, skipna=True) return df def _process_hourly_load_long(self, df: pd.DataFrame) -> pd.DataFrame: """Process hourly load data in the new long format (starting 2026-03-24). The new format has columns: Market Hour, Balancing Area Name, Control Zone Name, Forecast Area Type, Load MW. """ df = df.rename(columns=lambda x: x.strip()) df = df.dropna(how="all") df = self._handle_market_end_to_interval( df, column="Market Hour", interval_duration=pd.Timedelta(minutes=60), format="mixed", ) df = df[~df["Interval Start"].isnull()].drop_duplicates() df = df.rename( columns={ "Load MW": "Load", }, ) col_order = [ "Interval Start", "Interval End", "Balancing Area Name", "Control Zone Name", "Forecast Area Type", "Load", ] df = df[col_order] df = df.sort_values( [ "Interval Start", "Balancing Area Name", "Control Zone Name", "Forecast Area Type", ], ).reset_index(drop=True) return df @support_date_range("DAY_START")
[docs] def get_market_clearing_real_time( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Market Clearing Real Time Args: date: start date end: end date Returns: pd.DataFrame: Market Clearing Real Time """ if date == "latest": try: return self.get_market_clearing_real_time("today") except urllib.error.HTTPError: logger.info("Data not available for today, trying yesterday") return self.get_market_clearing_real_time( self.local_now().normalize() - pd.DateOffset(days=1), verbose=verbose, ) url = f"{FILE_BROWSER_DOWNLOAD_URL}/market-clearing-rtbm?path=/{date.strftime('%Y')}/{date.strftime('%m')}/RTBM-MC-{date.strftime('%Y%m%d')}.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_market_clearing(df, 5)
@support_date_range("DAY_START")
[docs] def get_market_clearing_day_ahead( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Market Clearing Day Ahead Args: date: start date end: end date Returns: pd.DataFrame: Market Clearing Day Ahead """ if date == "latest": date = self.local_now().normalize() + pd.DateOffset(days=1) try: return self.get_market_clearing_day_ahead(date, verbose=verbose) except urllib.error.HTTPError: logger.info( f"Data not available for {date.strftime('%Y-%m-%d')}, trying today", ) return self.get_market_clearing_day_ahead("today", verbose=verbose) url = f"{FILE_BROWSER_DOWNLOAD_URL}/market-clearing?path=/{date.strftime('%Y')}/{date.strftime('%m')}/DA-MC-{date.strftime('%Y%m%d')}0100.csv" # noqa logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_market_clearing(df, 60)
def _process_market_clearing(self, df: pd.DataFrame, interval_minutes: int): df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=interval_minutes), ) df.columns = df.columns.str.strip() df = df.rename( columns={ "RegUP": "Reg Up", "RegDN": "Reg Dn", "RampUP": "Ramp Up", "RampDN": "Ramp Dn", "UncUP": "Unc Up", }, ).drop(columns=["Time", "Interval"]) return df.sort_values("Interval Start") @support_date_range("DAY_START")
[docs] def get_binding_constraints_day_ahead_hourly( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Day-Ahead Binding Constraints Args: date: date to get data for. Supports "latest" for most recently available data. end: end date verbose: print url Returns: pd.DataFrame: Day-Ahead Binding Constraints """ if date == "latest": tomorrow = pd.Timestamp.now( tz=self.default_timezone, ).normalize() + pd.Timedelta(days=1) try: return self.get_binding_constraints_day_ahead_hourly( date=tomorrow, end=end, verbose=verbose, ) except NoDataFoundException: return self.get_binding_constraints_day_ahead_hourly(date="today") url = f"{FILE_BROWSER_DOWNLOAD_URL}/{DA_BINDING_CONSTRAINTS}?path=/{date.strftime('%Y')}/{date.strftime('%m')}/By_Day/DA-BC-{date.strftime('%Y%m%d')}0100.csv" # noqa return self._process_binding_constraints_day_ahead_hourly(url)
def _process_binding_constraints_day_ahead_hourly(self, url: str) -> pd.DataFrame: logger.info(f"Downloading {url}...") try: df = pd.read_csv(url) except urllib.error.HTTPError as e: if e.code == 404: raise NoDataFoundException(f"No data found for {url}") raise df.columns = df.columns.str.strip() df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) df = df.rename(columns={"NERCID": "NERC ID"}) df["NERC ID"] = pd.to_numeric(df["NERC ID"], errors="coerce").astype( pd.Int64Dtype(), ) cols_to_keep = [ "Interval Start", "Interval End", "Constraint Name", "Constraint Type", "NERC ID", "State", "Shadow Price", "Monitored Facility", "Contingent Facility", "Contingency Name", ] return df[cols_to_keep].sort_values(["Interval Start", "Constraint Name"]) # NB: SPP RTBM publishes per-interval CSVs and daily aggregates. Both are # aligned to local midnight. Interval files live under # By_Interval/<DD>/RTBM-BC-<YYYYMMDDHHMM>.csv where DD is the interval's # start-day and HHMM is the interval's end-time; the midnight-crossing # interval (start = prev day 23:55, end = next day 00:00) sits in the # previous day's folder with the next day's filename. Daily files are # faster (one request per day) but lag publication, so we use interval # files for today/yesterday and daily files for older history. Daily files # always return the whole local day; this method slices the result to the # caller's [date, end) window.
[docs] def get_binding_constraints_real_time_5_min( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time Binding Constraints Args: date: date to get data for. Supports "latest" for most recent interval. end: end date verbose: print url Returns: pd.DataFrame: Real-Time Binding Constraints """ if date == "latest": return self._get_binding_constraints_real_time_5_min_from_intervals( date, end=end, verbose=verbose, ) start_ts = utils._handle_date(date, self.default_timezone) end_ts = ( utils._handle_date(end, self.default_timezone) if end is not None else None ) df = self._fetch_binding_constraints_real_time_5_min( date=start_ts, end=end_ts, verbose=verbose, ) if end_ts is None: return df return df[ (df["Interval Start"] >= start_ts) & (df["Interval Start"] < end_ts) ].reset_index(drop=True)
@support_date_range(_binding_constraints_real_time_5_min_frequency) def _fetch_binding_constraints_real_time_5_min( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Per-chunk fetch dispatching to interval or daily-file source.""" start = utils._handle_date(date, self.default_timezone) if not utils.is_within_last_days(start, 1, self.default_timezone): return self._get_binding_constraints_real_time_5_min_from_daily_files( start, end=end, verbose=verbose, ) interval_start, interval_end = self._resolve_recent_interval_window( start, end, ) return self._get_binding_constraints_real_time_5_min_from_intervals( interval_start, end=interval_end, verbose=verbose, ) def _resolve_recent_interval_window( self, start: pd.Timestamp, end: pd.Timestamp | None, ) -> tuple[pd.Timestamp, pd.Timestamp]: """Window for today/yesterday interval fetches. When end is omitted (DAY_START chunk), fetch through end-of-local-day, capping today at the most recent 5-min boundary so we don't request future intervals. """ if end is None: end = start.normalize() + pd.Timedelta(days=1) if utils.is_today(start, self.default_timezone): end = min( end, pd.Timestamp.now(tz=self.default_timezone).ceil("5min"), ) if end <= start: end = start + pd.Timedelta(minutes=5) return start, end def _get_binding_constraints_real_time_5_min_from_daily_files( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time Binding Constraints from daily files.""" folder_year = date.strftime("%Y") folder_month = date.strftime("%m") url = f"{FILE_BROWSER_DOWNLOAD_URL}/{RTBM_BINDING_CONSTRAINTS}?path=/{folder_year}/{folder_month}/By_Day/RTBM-DAILY-BC-{date.strftime('%Y%m%d')}.csv" logger.info(f"Downloading {url} (daily file)...") df = pd.read_csv(url) df.columns = df.columns.str.strip() return self._process_binding_constraints_real_time(df) @support_date_range("5_MIN") def _get_binding_constraints_real_time_5_min_from_intervals( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get Real-Time Binding Constraints from 5-minute interval files.""" if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/{RTBM_BINDING_CONSTRAINTS}?path=/RTBM-BC-latestInterval.csv" # noqa else: url = self._format_5_min_url( date, end, RTBM_BINDING_CONSTRAINTS, "RTBM-BC", ) logger.info(f"Downloading {url}...") df = pd.read_csv(url) return self._process_binding_constraints_real_time(df) def _process_binding_constraints_real_time(self, df: pd.DataFrame) -> pd.DataFrame: df.columns = df.columns.str.strip() # Convert to title case to handle nonstandard input df.columns = df.columns.str.title().str.replace("_", " ") df = df.rename( columns={ "Gmtintervalend": "GMTIntervalEnd", "Nercid": "NERC ID", "Tlr Level": "TLR Level", }, ) df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) df["NERC ID"] = pd.to_numeric(df["NERC ID"], errors="coerce").astype("Int64") cols_to_keep = [ "Interval Start", "Interval End", "Constraint Name", "Constraint Type", "NERC ID", "TLR Level", "State", "Shadow Price", "Monitored Facility", "Contingent Facility", ] return df[cols_to_keep].sort_values(["Interval Start", "Constraint Name"]) @support_date_range("MONTH_START")
[docs] def get_interchange_real_time( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get real-time interchange (tie flow) data. For "latest" and "today", returns ~2 days of 1-minute interchange data from the real-time endpoint. For historical dates, downloads monthly CSV files from the historical tie flow archive. Data from: - Real-time: https://portal.spp.org/pages/integrated-marketplace-interchange-trend - Historical: https://portal.spp.org/pages/historical-tie-flow Args: date: supports "latest", "today", or a historical date/date range end: end date for historical range queries verbose: print info Returns: pd.DataFrame: interchange data """ if date == "latest": return self.get_interchange_real_time( "today", verbose=verbose, ).reset_index(drop=True) # Handle tuple date ranges by checking if the start is recent if isinstance(date, tuple): start = date[0] else: start = utils._handle_date(date, tz=self.default_timezone) if utils.is_within_last_days(start, days=2, tz=self.default_timezone): url = f"{MARKETPLACE_BASE_URL}/chart-api/interchange-trend/asFile" logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_interchange_real_time(df) # Historical data: download monthly CSV month_str = start.strftime("%b%Y") # e.g., "Apr2015" # Starting March 2026, files use "TieFlows_SPP_" prefix if start >= pd.Timestamp("2026-03-01"): filename = f"TieFlows_SPP_{month_str}.csv" else: filename = f"TieFlows_{month_str}.csv" url = f"{FILE_BROWSER_DOWNLOAD_URL}/historical-tie-flow?path=/{filename}" logger.info(f"Downloading {url}") try: df = pd.read_csv(url) except urllib.error.HTTPError as e: if e.code == 404: raise NoDataFoundException( f"No historical tie flow data found for {month_str}. " f"Historical data is available starting Mar2014.", ) raise # Normalize historical column names to match real-time format df = df.rename( columns={ "GMTTIME": "GMTTime", "SPP_NSI": "SPP NSI", "SPP_NAI": "SPP NAI", }, ) return self._process_interchange_real_time(df)
@support_date_range("MONTH_START")
[docs] def get_west_interchange_real_time( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get real-time interchange (tie flow) data for SPP West (SWPW). For "latest" and "today", returns ~2 days of 1-minute interchange data from the real-time endpoint. For historical dates, downloads monthly CSV files from the historical tie flow archive. Data from: - Real-time: https://portal.spp.org/pages/integrated-marketplace-interchange-trend - Historical: https://portal.spp.org/pages/historical-tie-flow Args: date: supports "latest", "today", or a historical date/date range end: end date for historical range queries verbose: print info Returns: pd.DataFrame: interchange data """ if date == "latest": return self.get_west_interchange_real_time( "today", verbose=verbose, ).reset_index(drop=True) # Handle tuple date ranges by checking if the start is recent if isinstance(date, tuple): start = date[0] else: start = utils._handle_date(date, tz=self.default_timezone) if utils.is_within_last_days(start, days=2, tz=self.default_timezone): url = f"{MARKETPLACE_BASE_URL}/chart-api/interchange-trend-swpw/asFile" logger.info(f"Downloading {url}") df = pd.read_csv(url) return self._process_interchange_real_time(df) # Historical data: download monthly CSV month_str = start.strftime("%b%Y") # e.g., "Apr2026" url = ( f"{FILE_BROWSER_DOWNLOAD_URL}/historical-tie-flow" f"?path=/TieFlows_SWPW_{month_str}.csv" ) logger.info(f"Downloading {url}") try: df = pd.read_csv(url) except urllib.error.HTTPError as e: if e.code == 404: raise NoDataFoundException( f"No historical SWPW tie flow data found for {month_str}. " f"Historical data is available starting Mar2026.", ) raise # Normalize historical column names to match real-time format df = df.rename( columns={ "GMTTIME": "GMTTime", "SPP_NSI": "SWPW NSI", "SPP_NAI": "SWPW NAI", }, ) return self._process_interchange_real_time(df)
def _process_interchange_real_time(self, df: pd.DataFrame) -> pd.DataFrame: df["Time"] = pd.to_datetime( df["GMTTime"], utc=True, format="ISO8601", ).dt.tz_convert(self.default_timezone) df = df.drop(columns=["GMTTime"]) # Drop rows with null timestamps (bad data in some historical files) df = df.dropna(subset=["Time"]) # Drop rows where all data columns are null (future forecast rows) data_cols = [c for c in df.columns if c != "Time"] df = df.dropna(subset=data_cols, how="all") # Melt from wide to long format so schema is stable across time periods id_cols = ["Time"] value_cols = [c for c in df.columns if c not in id_cols] df = df.melt( id_vars=id_cols, value_vars=value_cols, var_name="Region", value_name="Interchange", ) # Drop rows where interchange is null (region didn't exist in this period) df = df.dropna(subset=["Interchange"]) return df.sort_values(["Time", "Region"]).reset_index(drop=True)
[docs] def process_gen_mix(df: pd.DataFrame, detailed: bool = False) -> pd.DataFrame: """Parse SPP generation mix data from https://marketplace.spp.org/pages/generation-mix-historical Args: df (pd.DataFrame): raw data detailed (bool): whether to combine market and self columns Returns: pd.DataFrame: processed data """ new_df = df.copy() # remove whitespace from column names new_df.columns = new_df.columns.str.strip() # rename columns to standardize new_df = new_df.rename( columns={ "GMTTime": "Time", "GMT MKT Interval": "Time", "Gas Self": "Natural Gas Self", # rename below is based on documenation "Load": "Short Term Load Forecast", }, ) # parse time new_df["Time"] = pd.to_datetime(new_df["Time"], utc=True).dt.tz_convert( SPP.default_timezone, ) # combine market and self columns columns_to_combine = [ "Coal", "Diesel Fuel Oil", "Hydro", "Natural Gas", "Nuclear", "Solar", "Waste Disposal Services", "Wind", "Waste Heat", "Other", ] if not detailed: for col in columns_to_combine: market_col = f"{col} Market" self_col = f"{col} Self" if market_col not in new_df.columns or self_col not in new_df.columns: continue new_df[col] = new_df[market_col] + new_df[self_col] new_df = new_df.drop([market_col, self_col], axis=1) new_df = add_interval(new_df, 5) return new_df
[docs] def add_interval(df: pd.DataFrame, interval_min: int) -> pd.DataFrame: """Adds Interval Start and Interval End columns to df""" df["Interval Start"] = df["Time"] df["Interval End"] = df["Interval Start"] + pd.Timedelta(minutes=interval_min) df = utils.move_cols_to_front( df, ["Time", "Interval Start", "Interval End"], ) return df