Source code for gridstatus.spp

from typing import BinaryIO

import pandas as pd
import requests
import tqdm

from gridstatus import utils
from gridstatus.base import (
    InterconnectionQueueStatus,
    ISOBase,
    Markets,
    NoDataFoundException,
    NotSupported,
)
from gridstatus.decorators import FiveMinOffset, support_date_range
from gridstatus.gs_logging import log

[docs]RTBM_LMP_BY_BUS = "rtbm-lmp-by-bus"
[docs]FS_RTBM_LMP_BY_LOCATION = "rtbm-lmp-by-location"
[docs]FS_DAM_LMP_BY_LOCATION = "da-lmp-by-location"
[docs]LMP_BY_SETTLEMENT_LOCATION_WEIS = "lmp-by-settlement-location-weis"
[docs]OPERATING_RESERVES = "operating-reserves"
[docs]MARKETPLACE_BASE_URL = "https://portal.spp.org"
[docs]FILE_BROWSER_API_URL = "https://portal.spp.org/file-browser-api/"
[docs]FILE_BROWSER_DOWNLOAD_URL = "https://portal.spp.org/file-browser-api/download"
[docs]BASE_SOLAR_AND_WIND_SHORT_TERM_URL = ( f"{FILE_BROWSER_DOWNLOAD_URL}/shortterm-resource-forecast?path=" )
[docs]BASE_SOLAR_AND_WIND_MID_TERM_URL = ( f"{FILE_BROWSER_DOWNLOAD_URL}/midterm-resource-forecast?path=" )
[docs]BASE_LOAD_FORECAST_SHORT_TERM_URL = f"{FILE_BROWSER_DOWNLOAD_URL}/stlf-vs-actual?path="
[docs]BASE_LOAD_FORECAST_MID_TERM_URL = f"{FILE_BROWSER_DOWNLOAD_URL}/mtlf-vs-actual?path="
[docs]LOCATION_TYPE_ALL = "ALL"
[docs]LOCATION_TYPE_BUS = "Bus"
[docs]LOCATION_TYPE_HUB = "Hub"
[docs]LOCATION_TYPE_INTERFACE = "Interface"
[docs]LOCATION_TYPE_SETTLEMENT_LOCATION = "Settlement Location"
[docs]QUERY_RTM5_HUBS_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/1/query" # noqa
[docs]QUERY_RTM5_INTERFACES_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/2/query" # noqa
[docs]RELIABILITY_LEVELS = [ "Normal Operations", "Weather Advisory", "Resource Advisory", "Conservative Operations Advisory", "Energy Emergency Alert Level 1", "Energy Emergency Alert Level 2", "Energy Emergency Alert Level 3", "Restoration Event", ]
[docs]LAST_UPDATED_KEYWORDS = [ "last updated", "as of", ]
[docs]RELIABILITY_LEVELS_ALIASES = { "Normal Operations": "Normal", }
[docs]STATUS_STOP_WORDS = [ "as", "at", "ct", # central time "eea", # energy emergency alert "of", "on", ]
[docs]class SPP(ISOBase): """Southwest Power Pool (SPP)""" name = "Southwest Power Pool" iso_id = "spp" default_timezone = "US/Central" status_homepage = "https://www.spp.org/markets-operations/current-grid-conditions/" interconnection_homepage = ( "https://www.spp.org/engineering/generator-interconnection/" ) markets = [ Markets.REAL_TIME_5_MIN, Markets.DAY_AHEAD_HOURLY, ] location_types = [ LOCATION_TYPE_ALL, LOCATION_TYPE_BUS, LOCATION_TYPE_HUB, LOCATION_TYPE_INTERFACE, LOCATION_TYPE_SETTLEMENT_LOCATION, ] @staticmethod
[docs] def now(): return pd.Timestamp.now(tz=SPP.default_timezone)
[docs] def get_fuel_mix(self, date, detailed=False, verbose=False): """Get fuel mix Args: date: supports today and latest detailed: if True, breaks out self scheduled and market scheduled Note: if today, returns last 2 hours of data. maybe include previous day Returns: pd.DataFrame: fuel mix """ if date == "latest": return self.get_fuel_mix( "today", detailed=detailed, verbose=verbose, ).reset_index(drop=True) if not utils.is_today(date, self.default_timezone): # https://marketplace.spp.org/pages/generation-mix-historical # many years of historical 5 minute data raise NotSupported url = f"{FILE_BROWSER_DOWNLOAD_URL}/generation-mix-historical?path=/GenMix2Hour.csv" # noqa df_raw = pd.read_csv(url) historical_mix = process_gen_mix(df_raw, detailed=detailed) historical_mix = historical_mix.drop( columns=["Short Term Load Forecast", "Average Actual Load"], errors="ignore", ) return historical_mix
[docs] def get_load(self, date, verbose=False): """Returns load for last 24hrs in 5 minute intervals""" original_date = date if date == "latest": date = "today" date = utils._handle_date(date, self.default_timezone) df = self._get_load_and_forecast(verbose=verbose) df = df.dropna(subset=["Actual Load"]) df = df.rename(columns={"Actual Load": "Load"}) df = df[["Time", "Load"]] df = df.reset_index(drop=True) df = add_interval(df, interval_min=5) if original_date == "latest": return df elif utils.is_today(original_date, tz=self.default_timezone): # returns two days, so make sure to only return current day's load df = df[df["Time"].dt.date == date.date()].reset_index(drop=True) return df else: # hourly historical zonal loads # https://marketplace.spp.org/pages/hourly-load # five minute actual load available here: https://portal.spp.org/pages/stlf-vs-actual# raise NotSupported()
[docs] def get_load_forecast(self, date, forecast_type="MID_TERM", verbose=False): """Returns load forecast for next 7 days in hourly intervals Arguments: forecast_type (str): MID_TERM is hourly for next 7 days or SHORT_TERM is every five minutes for a few hours Returns: pd.DataFrame: forecast for current day """ df = self._get_load_and_forecast(verbose=verbose) # gives forecast from before current day # only include forecasts starting at current day last_actual = df.dropna(subset=["Actual Load"])["Time"].max() current_day = last_actual.replace(hour=0, minute=0) current_day_forecast = df[df["Time"] >= current_day].copy() # assume forecast is made at last actual current_day_forecast["Forecast Time"] = last_actual if forecast_type == "MID_TERM": forecast_col = "Mid-Term Forecast" elif forecast_type == "SHORT_TERM": forecast_col = "Short-Term Forecast" else: raise RuntimeError("Invalid forecast type") # there will be empty rows regardless of forecast type since they dont align current_day_forecast = current_day_forecast.dropna( subset=[forecast_col], ) current_day_forecast = current_day_forecast[ ["Forecast Time", "Time", forecast_col] ].rename({forecast_col: "Load Forecast"}, axis=1) current_day_forecast = add_interval( current_day_forecast, interval_min=60, ) return current_day_forecast
@support_date_range("5_MIN")
[docs] def get_load_forecast_short_term(self, date, end=None, verbose=False): """ 5-minute load forecast data for the SPP footprint (system-wide) for +/- 10 minutes. Also includes actual load. Data from https://portal.spp.org/pages/stlf-vs-actual Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ # The short_term forecast is delayed up to 2 minutes. buffer_minutes = 2 if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) # Files do not exist in the future if date > self.now(): return url = self._short_term_load_forecast_url(date.floor("5min")) log(f"Downloading {url}", verbose=verbose) df = pd.read_csv(url) # According to the docs, the end time col should be GMTIntervalEnd, but it's # only GMTInterval in the data df = self._post_process_load_forecast( df, url, forecast_type="SHORT_TERM", forecast_col="STLF", end_time_col="GMTInterval", interval_duration=pd.Timedelta(minutes=5), ) return df
@support_date_range("HOUR_START")
[docs] def get_load_forecast_mid_term(self, date, end=None, verbose=False): """ Returns load forecast for +7 days in hourly intervals. Includes actual load for the past 24 hours. Data from https://portal.spp.org/pages/mtlf-vs-actual Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ # The MID_TERM forecast is delayed up to 10 minutes. buffer_minutes = 10 if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) if date > self.now(): return url = self._mid_term_load_forecast_url(date.floor("h")) log(f"Downloading {url}", verbose=verbose) df = pd.read_csv(url) df = self._post_process_load_forecast( df, url, forecast_type="MID_TERM", forecast_col="MTLF", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) return df
def _post_process_load_forecast( self, df, url, forecast_type, forecast_col, end_time_col, interval_duration, ): df = self._handle_market_end_to_interval(df, end_time_col, interval_duration) # Assume the publish time is in the name of the file. There are different # times on the webpage, but these could be the posting time. df["Publish Time"] = pd.Timestamp( url.split("-")[-1].split(".")[0], tz=self.default_timezone, ) df.columns = [col.strip() for col in df.columns] df["Forecast Type"] = forecast_type df = ( utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time", "Forecast Type"], ) .drop(columns=["Time", "Interval"]) .sort_values(["Interval Start", "Publish Time"]) ) return df.dropna(subset=[forecast_col]).reset_index(drop=True) @support_date_range("5_MIN")
[docs] def get_solar_and_wind_forecast_short_term(self, date, end=None, verbose=False): """ Returns solar and wind generation forecast for +4 hours in 5 minute intervals. Include actuals for past day in 5 minute intervals. Data from https://portal.spp.org/pages/shortterm-resource-forecast Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ # The short_term forecast is delayed up to 2 minutes. buffer_minutes = 2 if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) # Files do not exist in the future if date > self.now(): return url = self._short_term_solar_and_wind_url(date.floor("5min")) log(f"Downloading {url}", verbose=verbose) df = pd.read_csv(url) # According to the docs, the end time col should be GMTIntervalEnd, but it's # only GMTInterval in the data df = self._post_process_solar_and_wind_forecast( df, url, forecast_type="SHORT_TERM", end_time_col="GMTInterval", interval_duration=pd.Timedelta(minutes=5), ) return df
@support_date_range("HOUR_START")
[docs] def get_solar_and_wind_forecast_mid_term(self, date, end=None, verbose=False): """ Returns solar and wind generation forecast for +7 days in hourly intervals. Data from https://portal.spp.org/pages/midterm-resource-forecast. Arguments: date (pd.Timestamp|str): date to get data for. Supports "latest" and "today" verbose (bool): print info Returns: pd.DataFrame: forecast as dataframe. """ # The MID_TERM forecast is delayed up to 10 minutes. buffer_minutes = 10 if date == "latest": date = self.now() - pd.Timedelta(minutes=buffer_minutes) if date > self.now(): return url = self._mid_term_solar_and_wind_url(date.floor("h")) log(f"Downloading {url}", verbose=verbose) df = pd.read_csv(url) df = self._post_process_solar_and_wind_forecast( df, url, forecast_type="MID_TERM", end_time_col="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ) return df
def _post_process_solar_and_wind_forecast( self, df, url, forecast_type, end_time_col, interval_duration, ): df = self._handle_market_end_to_interval(df, end_time_col, interval_duration) # Assume the publish time is in the name of the file. There are different # times on the webpage, but these could be the posting time. df["Publish Time"] = pd.Timestamp( url.split("-")[-1].split(".")[0], tz=self.default_timezone, ) df.columns = [col.strip() for col in df.columns] df["Forecast Type"] = forecast_type df = ( utils.move_cols_to_front( df, ["Interval Start", "Interval End", "Publish Time", "Forecast Type"], ) .drop(columns=["Time", "Interval"]) .sort_values(["Interval Start", "Publish Time"]) ) return df.dropna(subset=["Wind Forecast MW", "Solar Forecast MW"]).reset_index( drop=True, ) def _short_term_solar_and_wind_url(self, date): hour = date.hour padded_hour = str(hour).zfill(2) padded_hour_plus_one = str((hour + 1) % 24).zfill(2) # The first hour in the URL is 1 after the hour in the filename. # Example 2024/01/01/02 has data for 01/01/2024 01:00:00 - 01/01/2024 01:55:00 return BASE_SOLAR_AND_WIND_SHORT_TERM_URL + date.strftime( f"/%Y/%m/%d/{padded_hour_plus_one}/OP-STRF-%Y%m%d{padded_hour}%M.csv", ) def _mid_term_solar_and_wind_url(self, date): # Explicitly set the minutes to 00. return BASE_SOLAR_AND_WIND_MID_TERM_URL + date.strftime( "/%Y/%m/%d/OP-MTRF-%Y%m%d%H00.csv", ) def _short_term_load_forecast_url(self, date): hour = date.hour padded_hour = str(hour).zfill(2) padded_hour_plus_one = str((hour + 1) % 24).zfill(2) # The first hour in the URL is 1 after the hour in the filename. # Example 2024/01/01/02 has data for 01/01/2024 01:00:00 - 01/01/2024 01:55:00 return BASE_LOAD_FORECAST_SHORT_TERM_URL + date.strftime( f"/%Y/%m/%d/{padded_hour_plus_one}/OP-STLF-%Y%m%d{padded_hour}%M.csv", ) def _mid_term_load_forecast_url(self, date): # Explicitly set the minutes to 00. return BASE_LOAD_FORECAST_MID_TERM_URL + date.strftime( "/%Y/%m/%d/OP-MTLF-%Y%m%d%H00.csv", ) def _handle_market_end_to_interval( self, df, column, interval_duration, format=None, ): """Converts market end time to interval end time""" df = df.rename( columns={ column: "Interval End", }, ) df["Interval End"] = pd.to_datetime( df["Interval End"], utc=True, format=format, ).dt.tz_convert(self.default_timezone) df["Interval Start"] = df["Interval End"] - interval_duration df["Time"] = df["Interval Start"] df = utils.move_cols_to_front(df, ["Time", "Interval Start", "Interval End"]) return df def _process_ver_curtailments(self, df): df = df.rename( columns={ "WindRedispatchCurtailments": "Wind Redispatch Curtailments", "WindManualCurtailments": "Wind Manual Curtailments", "WindCurtailedForEnergy": "Wind Curtailed For Energy", "SolarRedispatchCurtailments": "Solar Redispatch Curtailments", "SolarManualCurtailments": "Solar Manual Curtailments", "SolarCurtailedForEnergy": "Solar Curtailed For Energy", }, ) df = self._handle_market_end_to_interval( df, column="GMTIntervalEnding", interval_duration=pd.Timedelta(minutes=5), ) cols = [ "Time", "Interval Start", "Interval End", "Wind Redispatch Curtailments", "Wind Manual Curtailments", "Wind Curtailed For Energy", "Solar Redispatch Curtailments", "Solar Manual Curtailments", "Solar Curtailed For Energy", ] # historical data doesnt have all columns for c in cols: if c not in df.columns: df[c] = pd.NA df = df[cols] return df @support_date_range("DAY_START")
[docs] def get_capacity_of_generation_on_outage(self, date, end=None, verbose=False): """Get Capacity of Generation on Outage. Published daily at 8am CT for next 7 days Args: date: start date end: end date """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/capacity-of-generation-on-outage?path=/{date.strftime('%Y')}/{date.strftime('%m')}/Capacity-Gen-Outage-{date.strftime('%Y%m%d')}.csv" # noqa msg = f"Downloading {url}" log(msg, verbose) df = pd.read_csv(url) return self._process_capacity_of_generation_on_outage(df, publish_time=date)
[docs] def get_capacity_of_generation_on_outage_annual(self, year, verbose=True): """Get VER Curtailments for a year. Starting 2014. Recent data use get_capacity_of_generation_on_outage Args: year: year to get data for verbose: print url Returns: pd.DataFrame: VER Curtailments """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/capacity-of-generation-on-outage?path=/{year}/{year}.zip" # noqa def process_csv(df, file_name): # infer date from '2020/01/Capacity-Gen-Outage-20200101.csv' publish_time_str = file_name.split(".")[0].split("-")[-1] publish_time = pd.to_datetime(publish_time_str).tz_localize( self.default_timezone, ) df = self._process_capacity_of_generation_on_outage(df, publish_time) return df df = utils.download_csvs_from_zip_url( url, process_csv=process_csv, verbose=verbose, ) df = df.sort_values("Interval Start") return df
def _process_capacity_of_generation_on_outage(self, df, publish_time): # strip whitespace from column names df = df.rename(columns=lambda x: x.strip()) df = self._handle_market_end_to_interval( df, column="Market Hour", interval_duration=pd.Timedelta(minutes=60), ) df = df.rename( columns={ "Outaged MW": "Total Outaged MW", }, ) publish_time = pd.to_datetime(publish_time.normalize()) df.insert(0, "Publish Time", publish_time) # drop Time column df = df.drop(columns=["Time"]) return df @support_date_range("DAY_START")
[docs] def get_ver_curtailments(self, date, end=None, verbose=False): """Get VER Curtailments Supports recent data. For historical annual data use get_ver_curtailments_annual Args: date: start date end: end date """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/ver-curtailments?path=/{date.strftime('%Y')}/{date.strftime('%m')}/VER-Curtailments-{date.strftime('%Y%m%d')}.csv" # noqa msg = f"Downloading {url}" log(msg, verbose) df = pd.read_csv(url) return self._process_ver_curtailments(df)
[docs] def get_ver_curtailments_annual(self, year, verbose=True): """Get VER Curtailments for a year. Starting 2014. Recent data use get_ver_curtailments Args: year: year to get data for verbose: print url Returns: pd.DataFrame: VER Curtailments """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/ver-curtailments?path=/{year}/{year}.zip" # noqa df = utils.download_csvs_from_zip_url(url, verbose=verbose) df = self._process_ver_curtailments(df) df = df[~df["Interval Start"].isnull()] df = df.sort_values("Time") return df
def _get_load_and_forecast(self, verbose=False): url = f"{MARKETPLACE_BASE_URL}/chart-api/load-forecast/asChart" msg = f"Getting load and forecast from {url}" log(msg, verbose) r = self._get_json(url)["response"] data = {"Time": r["labels"]} for d in r["datasets"][:3]: if d["label"] == "Actual Load": data["Actual Load"] = d["data"] elif d["label"] == "Mid-Term Load Forecast": data["Mid-Term Forecast"] = d["data"] elif d["label"] == "Short-Term Load Forecast": data["Short-Term Forecast"] = d["data"] df = pd.DataFrame(data) df["Time"] = pd.to_datetime( df["Time"], ).dt.tz_convert(self.default_timezone) return df # todo where does date got in argument order # def get_historical_lmp(self, date, market: str, nodes: list): # 5 minute interal data # {FILE_BROWSER_API_URL}/rtbm-lmp-by-location?path=/2022/08/By_Interval/08 # /RTBM-LMP-SL-202208082125.csv # historical generation mix # https://marketplace.spp.org/pages/generation-mix-rolling-365 # https://marketplace.spp.org/chart-api/gen-mix-365/asFile # 15mb file with five minute resolution
[docs] def get_raw_interconnection_queue(self, verbose=False) -> BinaryIO: url = "https://opsportal.spp.org/Studies/GenerateSummaryCSV" msg = f"Getting interconnection queue from {url}" log(msg, verbose) response = requests.get(url) return utils.get_response_blob(response)
[docs] def get_interconnection_queue(self, verbose=False): """Get interconnection queue Returns: pandas.DataFrame: Interconnection queue """ raw_data = self.get_raw_interconnection_queue(verbose) queue = pd.read_csv(raw_data, skiprows=1) queue["Status (Original)"] = queue["Status"] completed_val = InterconnectionQueueStatus.COMPLETED.value active_val = InterconnectionQueueStatus.ACTIVE.value withdrawn_val = InterconnectionQueueStatus.WITHDRAWN.value queue["Status"] = queue["Status"].map( { "IA FULLY EXECUTED/COMMERCIAL OPERATION": completed_val, "IA FULLY EXECUTED/ON SCHEDULE": completed_val, "IA FULLY EXECUTED/ON SUSPENSION": completed_val, "IA PENDING": active_val, "DISIS STAGE": active_val, "None": active_val, "WITHDRAWN": withdrawn_val, }, ) queue["Generation Type"] = queue[["Generation Type", "Fuel Type"]].apply( lambda x: " - ".join(x.dropna()), axis=1, ) queue["Proposed Completion Date"] = queue["Commercial Operation Date"] rename = { "Generation Interconnection Number": "Queue ID", " Nearest Town or County": "County", "State": "State", "TO at POI": "Transmission Owner", "Capacity": "Capacity (MW)", "MAX Summer MW": "Summer Capacity (MW)", "MAX Winter MW": "Winter Capacity (MW)", "Generation Type": "Generation Type", "Request Received": "Queue Date", "Substation or Line": "Interconnection Location", "Date Withdrawn": "Withdrawn Date", } # todo: there are a few columns being parsed # as "unamed" that aren't being included but should extra_columns = [ "In-Service Date", "Commercial Operation Date", "Cessation Date", "Current Cluster", "Cluster Group", "Replacement Generator Commercial Op Date", "Service Type", "Status (Original)", ] missing = [ "Project Name", "Interconnecting Entity", "Withdrawal Comment", "Actual Completion Date", ] queue = utils.format_interconnection_df( queue=queue, rename=rename, extra=extra_columns, missing=missing, ) return queue
[docs] def get_lmp_real_time_5_min_by_location( self, date, end=None, location_type=LOCATION_TYPE_ALL, verbose=False, ): """Get LMP data by location for the Real-Time 5 Minute Market Args: date: date to get data for end: end date location_type: location type to get data for. Options are: - ``ALL`` (LOCATION_TYPE_ALL) - ``Hub`` (LOCATION_TYPE_HUB) - ``Interface`` (LOCATION_TYPE_INTERFACE) - ``Settlement Location`` (LOCATION_TYPE_SETTLEMENT_LOCATION) verbose: print url """ return self._finalize_spp_df( self._get_real_time_5_min_data( date, end=end, location_type=location_type, verbose=verbose, ), market=Markets.REAL_TIME_5_MIN, location_type=location_type, verbose=verbose, )
[docs] def get_lmp_real_time_5_min_by_bus(self, date, end=None, verbose=False): """Get LMP data by bus for the Real-Time 5 Minute Market Args: date: date to get data for end: end date verbose: print url NOTE: does not take a location_type argument because it always returns LOCATION_TYPE_BUS. """ return self._finalize_spp_df( self._get_real_time_5_min_data( date, end=end, location_type=LOCATION_TYPE_BUS, verbose=verbose, ), market=Markets.REAL_TIME_5_MIN, location_type=LOCATION_TYPE_BUS, verbose=verbose, )
@support_date_range(frequency="5_MIN") def _get_real_time_5_min_data( self, date, end=None, location_type=LOCATION_TYPE_ALL, verbose=False, ): """ Internal function that consolidates logic for getting LMP data for the Real-Time 5 Minute Market """ if location_type not in self.location_types: raise NotSupported(f"Location type {location_type} not supported") if location_type == LOCATION_TYPE_BUS: endpoint = RTBM_LMP_BY_BUS file_prefix = "RTBM-LMP-B" else: endpoint = FS_RTBM_LMP_BY_LOCATION file_prefix = "RTBM-LMP-SL" if date == "latest": url = f"https://portal.spp.org/file-browser-api/download/{endpoint}?path=%2F{file_prefix}-latestInterval.csv" else: url = self._format_5_min_url(date, end, endpoint, file_prefix) log(f"Getting data for {date} from {url}", verbose=verbose) df = pd.read_csv(url) return df @support_date_range(frequency="DAY_START")
[docs] def get_lmp_day_ahead_hourly( self, date, end=None, location_type: str = LOCATION_TYPE_ALL, verbose=False, ): """Get day ahead hourly LMP data Supported Location Types: - ``Hub`` - ``Interface`` - ``ALL`` """ if location_type not in self.location_types: raise NotSupported(f"Location type {location_type} not supported") if date == "latest": raise NotSupported("Latest not supported for day ahead hourly") df = self._get_dam_lmp(date, verbose) return self._finalize_spp_df( df, market=Markets.DAY_AHEAD_HOURLY, location_type=location_type, verbose=verbose, )
def _get_feature_data(self, base_url, verbose=False): """Fetches data from ArcGIS Map Service with Feature Data Returns: pd.DataFrame of features """ args = { "f": "json", "where": "OBJECTID IS NOT NULL", "returnGeometry": "false", "outFields": "*", } doc = self._get_json(base_url, verbose=verbose, params=args) df = pd.DataFrame([feature["attributes"] for feature in doc["features"]]) return df def _get_dam_lmp( self, date, verbose=False, ): url = f"{FILE_BROWSER_DOWNLOAD_URL}/{FS_DAM_LMP_BY_LOCATION}?path=/{date.strftime('%Y')}/{date.strftime('%m')}/By_Day/DA-LMP-SL-{date.strftime('%Y%m%d')}0100.csv" # noqa log(f"Downloading {url}", verbose=verbose) df = pd.read_csv(url) return df def _finalize_spp_df(self, df, market, location_type, verbose=False): """ Finalizes DataFrame: - Sets Market - Filters by location type if needed - Sets location type - Renames and ordering columns - Filters by Location - Resets the index Arguments: pandas.DataFrame: DataFrame with SPP data market (str): Market location_type (str): Location type verbose (bool, optional): Verbose output """ if market == Markets.REAL_TIME_5_MIN: interval_duration = pd.Timedelta(minutes=5) elif market == Markets.DAY_AHEAD_HOURLY: interval_duration = pd.Timedelta(hours=1) else: raise ValueError(f"Market {market} not supported") df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=interval_duration, ) df["Market"] = market.value if location_type != LOCATION_TYPE_BUS: df["Location"] = df["Settlement Location"] df["Location Type"] = LOCATION_TYPE_SETTLEMENT_LOCATION # Create boolean masks for each location type hubs = self._get_location_list(LOCATION_TYPE_HUB, verbose=verbose) interfaces = self._get_location_list( LOCATION_TYPE_INTERFACE, verbose=verbose, ) is_hub = df["Location"].isin(hubs) is_interface = df["Location"].isin(interfaces) df.loc[is_hub, "Location Type"] = LOCATION_TYPE_HUB df.loc[is_interface, "Location Type"] = LOCATION_TYPE_INTERFACE df = utils.filter_lmp_locations(df, location_type=location_type) else: df["Location"] = df["Pnode"] df["Location Type"] = LOCATION_TYPE_BUS df = df.rename( columns={ "Pnode": "PNode", "LMP": "LMP", # for posterity "MLC": "Loss", "MCC": "Congestion", "MEC": "Energy", }, ) df = df[ [ "Time", "Interval Start", "Interval End", "Market", "Location", "Location Type", "PNode", "LMP", "Energy", "Congestion", "Loss", ] ] df = df.reset_index(drop=True) # Since Location = PNode for bus, we can drop PNode if location_type == LOCATION_TYPE_BUS: df = df.drop(columns=["PNode"]) return df.sort_values(["Time", "Location"]) @support_date_range("5_MIN")
[docs] def get_operating_reserves(self, date, end=None, verbose=False): if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/operating-reserves?path=/RTBM-OR-latestInterval.csv" # noqa else: url = self._format_5_min_url( date, end, OPERATING_RESERVES, "RTBM-OR", include_interval=False, ) msg = f"Downloading {url}" log(msg, verbose) df = pd.read_csv(url) return self._process_operating_reserves(df)
def _process_operating_reserves(self, df): df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) # don't need this column df = df.drop(columns=["Interval"]) df = df.rename( columns={ "RegUP_Clr": "Reg_Up_Cleared", "RegDN_Clr": "Reg_Dn_Cleared", "RampUP_Clr": "Ramp_Up_Cleared", "RampDN_Clr": "Ramp_Dn_Cleared", "UncUP_Clr": "Unc_Up_Cleared", "STSUncUP_Clr": "STS_Unc_Up_Cleared", "Spin_Clr": "Spin_Cleared", "Supp_Clr": "Supp_Cleared", }, ) return df @support_date_range("DAY_START")
[docs] def get_day_ahead_operating_reserve_prices(self, date, end=None, verbose=False): """Provides Marginal Clearing Price information by Reserve Zone for each Day-Ahead Market solution for each Operating Day. Posting is updated each day after the DA Market results are posted. Available at https://portal.spp.org/pages/da-mcp# Args: date: date to get data for end: end date verbose: print url Returns: pd.DataFrame: Day Ahead Marginal Clearing Prices """ if date == "latest": raise ValueError( "Latest not supported for Day Ahead Marginal Clearing Prices", ) url = f"{FILE_BROWSER_DOWNLOAD_URL}/da-mcp?path=/{date.strftime('%Y')}/{date.strftime('%m')}/DA-MCP-{date.strftime('%Y%m%d')}0100.csv" # noqa msg = f"Downloading {url}" log(msg, verbose) df = pd.read_csv(url) return self._process_day_ahead_operating_reserve_prices(df)
def _process_day_ahead_operating_reserve_prices(self, df): df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(hours=1), ).assign(Market="DAM") column_mapping = { "RegUP": "Reg_Up", "RegDN": "Reg_Dn", "RampUP": "Ramp_Up", "RampDN": "Ramp_Dn", "Spin": "Spin", "Supp": "Supp", "UncUP": "Unc_Up", } df = df.rename(columns=column_mapping) cols_to_keep = [ "Interval Start", "Interval End", "Market", "Reserve Zone", ] + list( column_mapping.values(), ) # Older datasets might not have all the reserve types return df[[c for c in cols_to_keep if c in df]] @support_date_range("5_MIN")
[docs] def get_lmp_real_time_weis(self, date, end=None, verbose=False): """Get LMP data for real time WEIS Args: date: date to get data for. if end is not provided, will get data for 5 minute interval that date is in. end: end date verbose: print url """ endpoint = "lmp-by-settlement-location-weis" # if no end, find nearest 5 minute interval end # to use if date == "latest": url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/WEIS-RTBM-LMP-SL-latestInterval.csv" # noqa else: url = self._format_5_min_url(date, end, endpoint, "WEIS-RTBM-LMP-SL") # todo before 2022 only annual files are available # TODO: sometimes there are missing interval files (example: https://portal.spp.org/pages/lmp-by-settlement-location-weis#%2F2024%2F01%2FBy_Interval%2F21) # noqa # We can't do anything in these cases but log a message msg = f"Downloading {url}" log(msg, verbose) try: df = pd.read_csv(url) except ConnectionResetError as e: log(f"Error downloading {url}: {e}", verbose) return pd.DataFrame() return self._process_lmp_real_time_weis(df)
def _process_lmp_real_time_weis(self, df): # strip whitespace from column names df = df.rename(columns=lambda x: x.strip()) df = self._handle_market_end_to_interval( df, column="GMTIntervalEnd", interval_duration=pd.Timedelta(minutes=5), ) df["Location Type"] = LOCATION_TYPE_SETTLEMENT_LOCATION df["Market"] = "REAL_TIME_WEIS" df = df.rename( columns={ "Settlement Location": "Location", "Pnode": "PNode", "LMP": "LMP", # for posterity "MLC": "Loss", "MCC": "Congestion", "MEC": "Energy", }, ) df = df[ [ "Interval Start", "Interval End", "Market", "Location", "Location Type", "PNode", "LMP", "Energy", "Congestion", "Loss", ] ] return df def _format_5_min_url( self, start, end, endpoint, file_prefix, include_interval=True, ): # Folder path is based on start date. File name is based on end date. # As an example, the file with the name 202407010000 representing the interval # 2024-06-30 23:55:00 to 2024-07-01 00:00:00 is in the folder # 2024/06/By_Interval/30/ if end is None: end = start + FiveMinOffset() else: # To deal with DST, convert to UTC before ceil end = end.tz_convert("UTC").ceil("5min").tz_convert(self.default_timezone) folder_year = start.strftime("%Y") folder_month = start.strftime("%m") folder_day = start.strftime("%d") interval_str = "/By_Interval" if include_interval else "" url = f"{FILE_BROWSER_DOWNLOAD_URL}/{endpoint}?path=/{folder_year}/{folder_month}{interval_str}/{folder_day}/{file_prefix}-{end.strftime('%Y%m%d%H%M')}.csv" # noqa # Intervals that occur after DST end during the repeated hour have a "d" suffix # Identify these intervals by the offset of the end time. Since CDT is UTC-5 and # CST is UTC-6, if the UTC offset is larger than the offset of the hour before, # it's a repeated hour in CST. if abs(end.utcoffset()) > abs((end - pd.Timedelta(hours=1)).utcoffset()): url = url.split(".csv")[0] + "d.csv" status_code = requests.head(url).status_code if status_code == 200: return url else: raise NoDataFoundException(f"No data found for {url}") def _get_location_list(self, location_type, verbose=False): if location_type == LOCATION_TYPE_HUB: df = self._get_feature_data(QUERY_RTM5_HUBS_URL, verbose=verbose) elif location_type == LOCATION_TYPE_INTERFACE: df = self._get_feature_data( QUERY_RTM5_INTERFACES_URL, verbose=verbose, ) else: raise ValueError(f"Invalid location_type: {location_type}") return df["SETTLEMENT_LOCATION"].unique().tolist() def _fetch_and_concat_csvs(self, urls: list, verbose: bool = False): all_dfs = [] for url in tqdm.tqdm(urls): msg = f"Fetching {url}" log(msg, verbose) df = pd.read_csv(url) all_dfs.append(df) return pd.concat(all_dfs) def _get_marketplace_session(self) -> dict: """ Returns a session object for the Marketplace API """ html = requests.get(FILE_BROWSER_API_URL) jsessionid = html.cookies.get("JSESSIONID") xsrf_token = html.cookies.get("XSRF-TOKEN") return { "cookies": {"JSESSIONID": jsessionid, "XSRF-TOKEN": xsrf_token}, "headers": { "X-XSRF-TOKEN": xsrf_token, }, } @staticmethod def _match( needles, haystacks, needle_norm_fn=lambda x: x.lower(), haystack_norm_fn=lambda x: x.lower(), ): """Returns items from haystacks if any needles are in them""" return [ haystack for haystack in haystacks if any( needle_norm_fn(needle) in haystack_norm_fn(haystack) for needle in needles ) ] @support_date_range("DAY_START")
[docs] def get_hourly_load(self, date, end=None, verbose=False): """Get Hourly Load Supports recent data. For historical annual data use get_hourly_load_annual Args: date: start date end: end date Returns: pd.DataFrame: Hourly Load """ if date in ["today", "latest"] or utils.is_today( date, tz=self.default_timezone, ): raise NotSupported("Only historical data is available for hourly load data") url = f"{FILE_BROWSER_DOWNLOAD_URL}/hourly-load?path=/{date.strftime('%Y')}/DAILY_HOURLY_LOAD-{date.strftime('%Y%m%d')}.csv" # noqa msg = f"Downloading {url}" log(msg, verbose) df = pd.read_csv(url) return self._process_hourly_load(df)
[docs] def get_hourly_load_annual(self, year, verbose=True): """Get Hourly Load for a year. Starting 2011. For recent data use `get_hourly_load` Args: year: year to get data for verbose: print url Returns: pd.DataFrame: Hourly Load """ url = f"{FILE_BROWSER_DOWNLOAD_URL}/hourly-load?path=/{year}/{year}.zip" # noqa df = utils.download_csvs_from_zip_url( url=url, verbose=verbose, strip_whitespace_from_cols=True, ) return self._process_hourly_load(df) return df
def _process_hourly_load(self, df): # Some column names contain leading whitespace in some files - remove it df = df.rename(columns=lambda x: x.strip()) # Some files contain null rows. Drop them. df = df.dropna(how="all") # Some files don't have time 00:00 for the first interval in a day # for example 12/2/2016 instead of 12/2/2016 00:00. This causes datetime # conversion problems. This fixes it. def clean_market_hour(val): if val.endswith(":00"): return val return val + " 00:00" df["MarketHour"] = df["MarketHour"].apply(clean_market_hour) df = self._handle_market_end_to_interval( df, column="MarketHour", interval_duration=pd.Timedelta(minutes=60), format="mixed", ) time_cols = [ "Time", "Interval Start", "Interval End", ] load_cols = [ "CSWS", "EDE", "GRDA", "INDN", "KACY", "KCPL", "LES", "MPS", "NPPD", "OKGE", "OPPD", "SECI", "SPRM", "SPS", "WAUE", "WFEC", "WR", ] all_cols = time_cols + load_cols # historical data doesn't have all columns for c in all_cols: if c not in df.columns: df[c] = pd.NA df = df[all_cols] df = df[~df["Interval Start"].isnull()].drop_duplicates() df = df.sort_values("Time") df["System Total"] = df[load_cols].sum(axis=1, skipna=True) return df
[docs]def process_gen_mix(df, detailed=False): """Parse SPP generation mix data from https://marketplace.spp.org/pages/generation-mix-historical Args: df (pd.DataFrame): raw data detailed (bool): whether to combine market and self columns Returns: pd.DataFrame: processed data """ new_df = df.copy() # remove whitespace from column names new_df.columns = new_df.columns.str.strip() # rename columns to standardize new_df = new_df.rename( columns={ "GMTTime": "Time", "GMT MKT Interval": "Time", "Gas Self": "Natural Gas Self", # rename below is based on documenation "Load": "Short Term Load Forecast", }, ) # parse time new_df["Time"] = pd.to_datetime(new_df["Time"], utc=True).dt.tz_convert( SPP.default_timezone, ) # combine market and self columns columns_to_combine = [ "Coal", "Diesel Fuel Oil", "Hydro", "Natural Gas", "Nuclear", "Solar", "Waste Disposal Services", "Wind", "Waste Heat", "Other", ] if not detailed: for col in columns_to_combine: market_col = f"{col} Market" self_col = f"{col} Self" if market_col not in new_df.columns or self_col not in new_df.columns: continue new_df[col] = new_df[market_col] + new_df[self_col] new_df = new_df.drop([market_col, self_col], axis=1) new_df = add_interval(new_df, 5) return new_df
[docs]def add_interval(df, interval_min): """Adds Interval Start and Interval End columns to df""" df["Interval Start"] = df["Time"] df["Interval End"] = df["Interval Start"] + pd.Timedelta(minutes=interval_min) df = utils.move_cols_to_front( df, ["Time", "Interval Start", "Interval End"], ) return df