Source code for gridstatus.nyiso

from enum import StrEnum
from typing import BinaryIO, Dict, Literal, NamedTuple

import pandas as pd
import requests

import gridstatus
from gridstatus import utils
from gridstatus.base import (
    InterconnectionQueueStatus,
    ISOBase,
    Markets,
)
from gridstatus.decorators import support_date_range
from gridstatus.gs_logging import logger
from gridstatus.lmp_config import lmp_config


# NYISO offers LMP data at two locational granularities
# load zone and point of generator interconnection
[docs] class NYISOLocationType(StrEnum): ZONE = "zone" GENERATOR = "generator"
[docs] REFERENCE_BUS_LOCATION = "NYISO_LBMP_REFERENCE"
[docs] LOAD_DATASET = "pal"
[docs] FUEL_MIX_DATASET = "rtfuelmix"
[docs] LOAD_FORECAST_DATASET = "isolf"
[docs] DAM_LMP_DATASET = "damlbmp"
[docs] REAL_TIME_LMP_DATASET = "realtime"
[docs] REAL_TIME_HOURLY_LMP_DATASET = "rtlbmp"
[docs] REAL_TIME_EVENTS_DATASET = "RealTimeEvents"
[docs] BTM_SOLAR_ACTUAL_DATASET = "btmactualforecast"
[docs] BTM_SOLAR_FORECAST_DATASET = "btmdaforecast"
[docs] INTERFACE_LIMITS_AND_FLOWS_DATASET = "ExternalLimitsFlows"
[docs] LAKE_ERIE_CIRCULATION_REAL_TIME_DATASET = "eriecirculationrt"
[docs] LAKE_ERIE_CIRCULATION_DAY_AHEAD_DATASET = "eriecirculationda"
[docs] AS_PRICES_DAY_AHEAD_HOURLY_DATASET = "damasp"
[docs] AS_PRICES_REAL_TIME_5_MIN_DATASET = "rtasp"
[docs] LIMITING_CONSTRAINTS_REAL_TIME_DATASET = "LimitingConstraints"
[docs] LIMITING_CONSTRAINTS_DAY_AHEAD_DATASET = "DAMLimitingConstraints"
""" Pricing data: https://www.nyiso.com/en/energy-market-operational-data """
[docs] class DatasetInterval(NamedTuple): time_type: Literal["start", "end", "instantaneous"] interval_duration_minutes: int | None
[docs] DATASET_INTERVAL_MAP: Dict[str, DatasetInterval] = { LOAD_DATASET: DatasetInterval("instantaneous", None), FUEL_MIX_DATASET: DatasetInterval("instantaneous", None), LOAD_FORECAST_DATASET: DatasetInterval("start", 60), DAM_LMP_DATASET: DatasetInterval("start", 60), REAL_TIME_LMP_DATASET: DatasetInterval("end", 5), REAL_TIME_HOURLY_LMP_DATASET: DatasetInterval("start", 60), REAL_TIME_EVENTS_DATASET: DatasetInterval("instantaneous", None), BTM_SOLAR_ACTUAL_DATASET: DatasetInterval("start", 60), BTM_SOLAR_FORECAST_DATASET: DatasetInterval("start", 60), INTERFACE_LIMITS_AND_FLOWS_DATASET: DatasetInterval("start", 5), LAKE_ERIE_CIRCULATION_REAL_TIME_DATASET: DatasetInterval("instantaneous", None), LAKE_ERIE_CIRCULATION_DAY_AHEAD_DATASET: DatasetInterval("instantaneous", None), AS_PRICES_DAY_AHEAD_HOURLY_DATASET: DatasetInterval("start", 60), AS_PRICES_REAL_TIME_5_MIN_DATASET: DatasetInterval("start", 5), LIMITING_CONSTRAINTS_REAL_TIME_DATASET: DatasetInterval("start", 5), LIMITING_CONSTRAINTS_DAY_AHEAD_DATASET: DatasetInterval("start", 60), }
[docs] class NYISO(ISOBase): """New York Independent System Operator (NYISO)""" name = "New York ISO" iso_id = "nyiso" default_timezone = "US/Eastern" markets = [ Markets.REAL_TIME_5_MIN, Markets.REAL_TIME_HOURLY, Markets.DAY_AHEAD_HOURLY, ] status_homepage = "https://www.nyiso.com/system-conditions" interconnection_homepage = "https://www.nyiso.com/interconnections" def _handle_time( self, df: pd.DataFrame, dataset_name: str, groupby: str | None = None, ) -> pd.DataFrame: time_type, interval_duration_minutes = DATASET_INTERVAL_MAP[dataset_name] if "Time Stamp" in df.columns: time_stamp_col = "Time Stamp" elif "Timestamp" in df.columns: time_stamp_col = "Timestamp" def time_to_datetime(s: pd.Series, dst: str = "infer") -> pd.Series: return pd.to_datetime(s).dt.tz_localize( self.default_timezone, ambiguous=dst, ) if "Time Zone" in df.columns: dst = df["Time Zone"] == "EDT" df[time_stamp_col] = time_to_datetime( df[time_stamp_col], dst, ) elif "Name" in df.columns or groupby: groupby = groupby or "Name" # once we group by name, the time series for each group is no longer ambiguous df[time_stamp_col] = df.groupby(groupby, group_keys=False)[ time_stamp_col ].apply( time_to_datetime, "infer", ) else: df[time_stamp_col] = time_to_datetime( df[time_stamp_col], "infer", ) df = df.rename(columns={time_stamp_col: "Time"}) if time_type != "instantaneous": interval_duration = pd.Timedelta(minutes=interval_duration_minutes) if time_type == "start": df["Interval Start"] = df["Time"] df["Interval End"] = df["Interval Start"] + interval_duration elif time_type == "end": df["Interval Start"] = df["Time"] - interval_duration df["Interval End"] = df["Time"] df["Time"] = df["Interval Start"] utils.move_cols_to_front( df, ["Time", "Interval Start", "Interval End"], ) return df @support_date_range(frequency="MONTH_START")
[docs] def get_status( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: status_df = self._download_nyiso_archive( date=date, end=end, dataset_name=REAL_TIME_EVENTS_DATASET, verbose=verbose, ) status_df = status_df.rename( columns={"Message": "Status"}, ) def _parse_status(row: pd.Series) -> pd.Series: STATE_CHANGE = "**State Change. System now operating in " row["Notes"] = None if row["Status"] == "Start of day system state is NORMAL": row["Notes"] = [row["Status"]] row["Status"] = "Normal" elif STATE_CHANGE in row["Status"]: row["Notes"] = [row["Status"]] row["Status"] = row["Status"][ row["Status"].index(STATE_CHANGE) + len(STATE_CHANGE) : -len( " state.**", ) ].capitalize() return row status_df = status_df.apply(_parse_status, axis=1) status_df = status_df[["Time", "Status", "Notes"]] return status_df
@support_date_range(frequency="MONTH_START")
[docs] def get_fuel_mix( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: # note: this is simlar datastructure to pjm mix_df = self._download_nyiso_archive( date=date, end=end, dataset_name=FUEL_MIX_DATASET, verbose=verbose, ) mix_df = mix_df.pivot_table( index=["Time"], columns="Fuel Category", values="Gen MW", aggfunc="first", ).reset_index() mix_df.columns.name = None return mix_df
@support_date_range(frequency="MONTH_START")
[docs] def get_load( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns load at a previous date in 5 minute intervals for each zone and total load Parameters: date (str): Date to get load for. Can be "today", or a date in the format YYYY-MM-DD end (str): End date for date range. Optional. verbose (bool): Whether to print verbose output. Optional. Returns: pandas.DataFrame: Load data for NYISO and each zone """ data = self._download_nyiso_archive( date=date, end=end, dataset_name=LOAD_DATASET, verbose=verbose, ) # pivot table df = data.pivot_table( index=["Time"], columns="Name", values="Load", aggfunc="first", ) df.insert(0, "Load", df.sum(axis=1)) df.reset_index(inplace=True) # drop NA loads # data = data.dropna(subset=["Load"]) return df
@support_date_range(frequency="MONTH_START")
[docs] def get_btm_solar( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Returns estimated BTM solar generation at a previous date in hourly intervals for system and each zone. Available ~8 hours after the end of the operating day. Parameters: date (str, pd.Timestamp, datetime.datetime): Date to get load for. Can be "today", or a date end (str, pd.Timestamp, datetime.datetime): End date for date range. Optional. verbose (bool): Whether to print verbose output. Optional. Returns: pandas.DataFrame: BTM solar data for NYISO system and each zone """ data = self._download_nyiso_archive( date=date, end=end, dataset_name=BTM_SOLAR_ACTUAL_DATASET, filename="BTMEstimatedActual", verbose=verbose, ) df = data.pivot_table( index=["Time", "Interval Start", "Interval End"], columns="Zone Name", values="MW Value", aggfunc="first", ) # move system to first column df.insert(0, "SYSTEM", df.pop("SYSTEM")) df = df.reset_index() df.columns.name = None return df
@support_date_range(frequency="MONTH_START")
[docs] def get_btm_solar_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: data = self._download_nyiso_archive( date=date, end=end, dataset_name=BTM_SOLAR_FORECAST_DATASET, verbose=verbose, ) df = data.pivot_table( index=["Time", "Interval Start", "Interval End"], columns="Zone Name", values="MW Value", aggfunc="first", ) # move system to first column df.insert(0, "SYSTEM", df.pop("SYSTEM")) df = df.reset_index() # Report is published day before the forecast at 7:55 AM in NYISO time df.insert( 3, "Publish Time", df["Interval Start"].dt.floor("D") - pd.Timedelta(days=1, hours=-7, minutes=-55), ) df.columns.name = None return df
@support_date_range(frequency="MONTH_START")
[docs] def get_load_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get load forecast for a date in 1 hour intervals""" # todo optimize this to accept a date range data = self._download_nyiso_archive( date, end=end, dataset_name=LOAD_FORECAST_DATASET, verbose=verbose, ) data = data[ ["Time", "Interval Start", "Interval End", "File Date", "NYISO"] ].rename( columns={ "File Date": "Forecast Time", "NYISO": "Load Forecast", "Time": "Time", }, ) return data
@support_date_range(frequency="MONTH_START")
[docs] def get_zonal_load_forecast( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get zonal load forecast for a date in 1 hour intervals""" data = self._download_nyiso_archive( date, end=end, dataset_name=LOAD_FORECAST_DATASET, verbose=verbose, ) data = data[ [ "Interval Start", "Interval End", "File Date", "NYISO", "Capitl", "Centrl", "Dunwod", "Genese", "Hud Vl", "Longil", "Mhk Vl", "Millwd", "N.Y.C.", "North", "West", ] ].rename( columns={ "File Date": "Publish Time", }, ) return data
@support_date_range(frequency="MONTH_START")
[docs] def get_interface_limits_and_flows_5_min( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Get interface limits and flows for a date""" if date == "latest": data = pd.read_csv( "https://mis.nyiso.com/public/csv/ExternalLimitsFlows/currentExternalLimitsFlows.csv", # noqa ) data = self._handle_time( data, INTERFACE_LIMITS_AND_FLOWS_DATASET, groupby="Interface Name", ) else: data = self._download_nyiso_archive( date, end=end, dataset_name=INTERFACE_LIMITS_AND_FLOWS_DATASET, groupby="Interface Name", verbose=verbose, ) # The source has these values as MWH but they are actually MW data = data.rename( columns={ "Flow (MWH)": "Flow MW", "Positive Limit (MWH)": "Positive Limit MW", "Negative Limit (MWH)": "Negative Limit MW", }, ) data = data[ [ "Interval Start", "Interval End", "Interface Name", "Point ID", "Flow MW", "Positive Limit MW", "Negative Limit MW", ] ].sort_values(["Interval Start", "Interface Name"]) return data
@support_date_range(frequency="MONTH_START")
[docs] def get_lake_erie_circulation_real_time( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: data = self._download_nyiso_archive( date, end=end, dataset_name=LAKE_ERIE_CIRCULATION_REAL_TIME_DATASET, filename="ErieCirculationRT", verbose=verbose, ) # The source has MWH in the column name but it's actually MW data = data.rename(columns={"Lake Erie Circulation (MWH)": "MW"}) data = data[["Time", "MW"]].sort_values("Time") return data
@support_date_range(frequency="MONTH_START")
[docs] def get_lake_erie_circulation_day_ahead( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: data = self._download_nyiso_archive( date, end=end, dataset_name=LAKE_ERIE_CIRCULATION_DAY_AHEAD_DATASET, filename="ErieCirculationDA", verbose=verbose, ) data = data.rename(columns={"Lake Erie Circulation (MWH)": "MW"}) data = data[["Time", "MW"]].sort_values("Time") return data
@lmp_config( supports={ Markets.REAL_TIME_5_MIN: ["latest", "today", "historical"], # TODO: add historical RTC data. # https://www.nyiso.com/custom-reports?report=ham_lbmp_gen Markets.REAL_TIME_15_MIN: ["latest", "today"], Markets.REAL_TIME_HOURLY: ["latest", "today", "historical"], Markets.DAY_AHEAD_HOURLY: ["latest", "today", "historical"], }, ) @support_date_range(frequency="MONTH_START")
[docs] def get_lmp( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, market: Markets | None = None, locations: list | None = None, location_type: NYISOLocationType | None = None, verbose: bool = False, ) -> pd.DataFrame: """ Supported Markets: - ``REAL_TIME_5_MIN`` (RTC) - ``REAL_TIME_15_MIN`` (RTD) - ``REAL_TIME_HOURLY`` (Real-time hourly LMP) - ``DAY_AHEAD_HOURLY`` Supported Location Types: - ``zone`` - ``generator`` NOTE: the generator data contains the single Reference Bus location type. REAL_TIME_5_MIN is the Real Time Dispatch (RTD) market. REAL_TIME_15_MIN is the Real Time Commitment (RTC) market. REAL_TIME_HOURLY is the real-time hourly LMP market. For documentation on real time dispatch and real time commitment, see: https://www.nyiso.com/documents/20142/1404816/RTC-RTD%20Convergence%20Study.pdf/f3843982-dd30-4c66-6c21-e101c3cb85af """ if location_type is None: location_type = NYISOLocationType.ZONE marketname = self._set_marketname(market) file_location_type = self._set_location_type_for_filename(location_type) if locations is None: locations = "ALL" if date == "latest": # The 5 minute data has a dedicated latest interval file if market != Markets.REAL_TIME_5_MIN: return self._latest_lmp_from_today( market=market, locations=locations, location_type=location_type, verbose=verbose, ) else: url = f"https://mis.nyiso.com/public/realtime/realtime_{file_location_type}_lbmp.csv" df = pd.read_csv(url) df = self._handle_time(df, dataset_name=marketname) df["Market"] = market.value else: filename = marketname + f"_{file_location_type}" df = self._download_nyiso_archive( date=date, end=end, dataset_name=marketname, filename=filename, verbose=verbose, ) return self._process_lmp_data(df, date, market, location_type, locations)
def _process_lmp_data( self, df: pd.DataFrame, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None, market: Markets, location_type: NYISOLocationType, locations: list | str, ) -> pd.DataFrame: columns = { "Name": "Location", "LBMP ($/MWHr)": "LMP", "Marginal Cost Losses ($/MWHr)": "Loss", "Marginal Cost Congestion ($/MWHr)": "Congestion", "Marginal Cost Congestion ($/MWH": "Congestion", # Deal with older data } df = df.rename(columns=columns) # In NYISO raw data, a negative congestion number means a higher LMP. We # flip the sign to make it consistent with other ISOs where a negative # congestion number means a lower LMP. Thus, LMP = Energy + Loss + Congestion # for NYISO, as in other ISOs. df["Congestion"] *= -1 df["Energy"] = (df["LMP"] - df["Loss"] - df["Congestion"]).round(2) df["Market"] = market.value df["Location Type"] = NYISOLocationType(location_type).value.capitalize() # We manually update the location type for the reference bus location because # it's included in the generator data. if REFERENCE_BUS_LOCATION in df["Location"].unique(): df.loc[ df["Location"] == REFERENCE_BUS_LOCATION, "Location Type", ] = "Reference Bus" # NYISO includes both 5 min (RTD - Real Time Dispatch) and # 15 min (RTC - Real Time Commitment) data in the daily file if market == Markets.REAL_TIME_15_MIN or ( # In this case, we've only used the latest 5 minute interval file so we # know the data is all REAL_TIME_5_MIN market == Markets.REAL_TIME_5_MIN and date != "latest" ): # The most recent 5 min file is sometimes published before the updated # daily file, so to label the 5 and 15 min daily data, we need to get # the most recent 5 min data and compare it to the data at the same # timestamp in the daily dataframe. If the data matches, then we know all # daily data after that timestamp is 15 minute. If the data doesn't match, # we assume all daily data on or after that timestamp is 15 min data. most_recent_5_min_data = self.get_lmp( date="latest", market=Markets.REAL_TIME_5_MIN, location_type=location_type, ) most_recent_5_min_timestamp = most_recent_5_min_data["Interval Start"].max() daily_subset = ( df[df["Interval Start"] == most_recent_5_min_timestamp] .sort_values(["Interval Start", "Location"]) .reset_index(drop=True) ) if daily_subset[["LMP", "Loss", "Congestion"]].equals( most_recent_5_min_data[["LMP", "Loss", "Congestion"]], ): # When equal, the daily data has the most recent 5 min data mask_15_min = df["Interval Start"] > most_recent_5_min_timestamp else: # When not equal, the data at this timestamp is 15 min data mask_15_min = df["Interval Start"] >= most_recent_5_min_timestamp df.loc[~mask_15_min, "Market"] = Markets.REAL_TIME_5_MIN.value df.loc[mask_15_min, "Market"] = Markets.REAL_TIME_15_MIN.value # For 15-min data, the original "Interval End" column contains the correct # end time (since the raw data has timestamps as interval END). However, # "Interval Start" was calculated assuming a 5-minute interval, so we need # to recalculate it for 15-minute intervals. # Interval Start = Interval End - 15 minutes df.loc[mask_15_min, "Interval Start"] = df.loc[ mask_15_min, "Interval End", ] - pd.Timedelta( minutes=15, ) df = df[ [ "Time", "Interval Start", "Interval End", "Market", "Location", "Location Type", "LMP", "Energy", "Congestion", "Loss", ] ] df = utils.filter_lmp_locations(df, locations) return ( df[df["Market"] == market.value] .sort_values(["Interval Start", "Location"]) .reset_index(drop=True) )
[docs] def get_raw_interconnection_queue(self) -> BinaryIO: url = "https://www.nyiso.com/documents/20142/1407078/NYISO-Interconnection-Queue.xlsx" # noqa logger.info(f"Downloading interconnection queue from {url}") response = requests.get(url) return utils.get_response_blob(response)
[docs] def get_interconnection_queue(self) -> pd.DataFrame: """Return NYISO interconnection queue Additional Non-NYISO queue info: https://www3.dps.ny.gov/W/PSCWeb.nsf/All/286D2C179E9A5A8385257FBF003F1F7E?OpenDocument Returns: pandas.DataFrame: Interconnection queue containing, active, withdrawn, \ and completed project """ # noqa # 5 sheets - ['Interconnection Queue', 'Cluster Projects', 'Withdrawn', 'Cluster Projects-Withdrawn', 'In Service'] # harded coded for now. perhaps this url can be parsed from the html here: raw_data = self.get_raw_interconnection_queue() # Create ExcelFile so we only need to download file once excel_file = pd.ExcelFile(raw_data) # Drop extra rows at bottom active = ( pd.read_excel(excel_file, sheet_name="Interconnection Queue") .dropna( subset=["Queue Pos.", "Project Name"], ) .copy() # Active projects can have multiple values for "Points of Interconnection" ).rename(columns={"Points of Interconnection": "Interconnection Point"}) cluster_active = ( pd.read_excel(excel_file, sheet_name=" Cluster Projects") .dropna( subset=["Queue Pos.", "Project Name"], ) .copy() # Active projects can have multiple values for "Points of Interconnection" ).rename(columns={"Points of Interconnection": "Interconnection Point"}) active = pd.concat([active, cluster_active]) active["Status"] = InterconnectionQueueStatus.ACTIVE.value withdrawn = pd.read_excel(excel_file, sheet_name="Withdrawn") cluster_withdrawn = pd.read_excel( excel_file, sheet_name="Cluster Projects-Withdrawn", ) withdrawn = pd.concat([withdrawn, cluster_withdrawn]) withdrawn["Status"] = InterconnectionQueueStatus.WITHDRAWN.value # assume it was withdrawn when last updated withdrawn["Withdrawn Date"] = withdrawn["Last Update"] withdrawn["Withdrawal Comment"] = None withdrawn = withdrawn.rename(columns={"Utility ": "Utility"}) withdrawn = withdrawn.rename(columns={"Owner/Developer": "Developer Name"}) # make completed look like the other two sheets completed = pd.read_excel(excel_file, sheet_name="In Service", header=[0, 1]) completed.insert(15, "SGIA Tender Date", None) completed.insert(16, "CY Complete Date", None) completed.insert(17, "Proposed Initial-Sync Date", None) completed["Status"] = InterconnectionQueueStatus.COMPLETED.value if ( "SGIA Tender Date" in active.columns and "SGIA Tender Date" not in completed.columns ): active = active.drop(columns=["SGIA Tender Date"]) completed_colnames_map = { ("Queue", "Pos."): "Queue Pos.", ("Queue", "Owner/Developer"): "Developer Name", ("Queue", "Project Name"): "Project Name", ("Date", "of IR"): "Date of IR", ("SP", "(MW)"): "SP (MW)", ("WP", "(MW)"): "WP (MW)", ("Type/", "Fuel"): "Type/ Fuel", ("Location", "County"): "County", ("Location", "State"): "State", ("Z", "Unnamed: 9_level_1"): "Z", ("Interconnection", "Point"): "Interconnection Point", ("Interconnection", "Utility "): "Utility", ("Interconnection", "S"): "S", ("Last Update", "Unnamed: 13_level_1"): "Last Updated Date", ("Availability", "of Studies"): "Availability of Studies", ("SGIA Tender Date", ""): "SGIA Tender Date", ("CY Complete Date", ""): "CY Complete Date", ("Proposed Initial-Sync Date", ""): "Proposed Initial-Sync Date", ("Proposed", " In-Service"): "Proposed In-Service Date", ("Proposed", "COD"): "Proposed COD", ("Proposed", "COD.1"): "Proposed COD.1", ("Proposed", "COD.2"): "Proposed COD.2", ("Proposed", "COD.3"): "Proposed COD.3", ("Status", ""): "Status", } completed.columns = completed.columns.to_flat_index().map( lambda c: completed_colnames_map[c], ) # assume it was finished when last updated completed["Actual Completion Date"] = completed["Last Updated Date"] dfs = [ df for df in [active, withdrawn, completed] if not df.empty and not df.isna().all().all() ] queue = pd.concat(dfs) # fix extra space in column name queue["Type/ Fuel"] = queue["Type/ Fuel"].map( { "S": "Solar", "ES": "Energy Storage", "W": "Wind", "AC": "AC Transmission", "DC": "DC Transmission", "CT": "Combustion Turbine", "CC": "Combined Cycle", "M": "Methane", # "CR": "", "H": "Hydro", "L": "Load", "ST": "Steam Turbine", "CC-NG": "Natural Gas", "FC": "Fuel Cell", "PS": "Pumped Storage", "NU": "Nuclear", "D": "Dual Fuel", # "C": "", "NG": "Natural Gas", "Wo": "Wood", "F": "Flywheel", # "CW": "", "CC-D": "Combined Cycle - Dual Fuel", "SW": "=Solid Waste", # "CR=CSR - ES + Solar": "", "CT-NG": "Combustion Turbine - Natural Gas", "DC/AC": "DC/AC Transmission", "CT-D": "Combustion Turbine - Dual Fuel", "CS-NG": "Steam Turbine & Combustion Turbine- Natural Gas", "ST-NG": "Steam Turbine - Natural Gas", }, ) queue["Capacity (MW)"] = ( queue[["SP (MW)", "WP (MW)"]] .replace( "TBD", 0, ) .replace(" ", 0) .fillna(0) .astype(float) .max(axis=1) ) queue["Date of IR"] = pd.to_datetime(queue["Date of IR"]) queue["Proposed COD"] = pd.to_datetime( queue["Proposed COD"], errors="coerce", ) queue["Proposed In-Service Date"] = pd.to_datetime( queue["Proposed In-Service Date"], errors="coerce", ) queue["Proposed Initial-Sync Date"] = pd.to_datetime( queue["Proposed Initial-Sync Date"], errors="coerce", ) # TODO handle other 2 sheets # TODO they publish past queues, # but not sure what data is in them that is relevant rename = { "Queue Pos.": "Queue ID", "Project Name": "Project Name", "County": "County", "State": "State", "Developer Name": "Interconnecting Entity", "Utility": "Transmission Owner", "Interconnection Point": "Interconnection Location", "Status": "Status", "Date of IR": "Queue Date", "Proposed COD": "Proposed Completion Date", "Type/ Fuel": "Generation Type", "Capacity (MW)": "Capacity (MW)", "SP (MW)": "Summer Capacity (MW)", "WP (MW)": "Winter Capacity (MW)", } extra_columns = [ "Proposed In-Service Date", "Proposed Initial-Sync Date", "Last Updated Date", "Z", "S", "Availability of Studies", "SGIA Tender Date", ] queue = utils.format_interconnection_df(queue, rename, extra_columns) return queue
[docs] def get_generators(self, verbose: bool = False) -> pd.DataFrame: """Get a list of generators in NYISO. When possible return capacity and fuel type information Returns: pandas.DataFrame: a DataFrame of generators and locations **Possible Columns** * Generator Name * PTID * Subzone * Zone * Latitude * Longitude * Owner, Operator, and / or Billing Organization * Station Unit * Town * County * State * In-Service Date * Name Plate Rating (V) MW * 2024 CRIS MW Summer * 2024 CRIS MW Winter * 2024 Capability MW Summer * 2024 Capability MW Winter * Is Dual Fuel * Unit Type * Fuel Type 1 * Fuel Type 2 * 2023 Net Energy GWh * Notes * Generator Type """ generator_url = "http://mis.nyiso.com/public/csv/generator/generator.csv" logger.info(f"Requesting {generator_url}") df = pd.read_csv(generator_url) # need to be updated once a year. approximately around end of april # find it here: https://www.nyiso.com/gold-book-resources capacity_url_2024 = "https://www.nyiso.com/documents/20142/44474211/2024-NYCA-Generators.xlsx/41a5cba2-523a-9fe0-9830-a523839a2831" # noqa logger.info(f"Requesting {capacity_url_2024}") generators = pd.read_excel( capacity_url_2024, sheet_name=[ "Table III-2a", "Table III-2b", ], skiprows=3, header=[0, 1, 2, 3, 4], ) generators["Table III-2a"]["Generator Type"] = "Market Generator" generators["Table III-2b"]["Generator Type"] = "Non-Market Generator" # manually transcribed column names (inspect spreadsheet for confirmation) mapped_columns = [ "LINE REF. NO.", "Owner, Operator, and / or Billing Organization", "Station Unit", "Zone", "PTID", "Town", "County", "State", "In-Service Date", "Name Plate Rating (V) MW", "2024 CRIS MW Summer", "2024 CRIS MW Winter", "2024 Capability MW Summer", "2024 Capability MW Winter", "Is Dual Fuel", "Unit Type", "Fuel Type 1", "Fuel Type 2", "2023 Net Energy GWh", "Notes", "Generator Type", ] # Rename the columns separately, so they match on the concat generators["Table III-2a"].columns = mapped_columns generators["Table III-2b"].columns = mapped_columns # combine both sheets generators = pd.concat(generators.values()) generators = generators.dropna(subset=["PTID"]) generators["PTID"] = generators["PTID"].astype(int) # in other data generators = generators.drop(columns=["Zone", "LINE REF. NO."]) # TODO: df has both Generator PTID and Aggregation PTID combined = pd.merge( df, generators, left_on="Generator PTID", right_on="PTID", how="left", ) unit_type_map = { "CC": "Combined Cycle", "CG": "Cogeneration", "CT": "Combustion Turbine Portion (CC)", "CW": "Waste Heat Only (CC)", "ES": "Energy Storage", "FC": "Fuel Cell", "GT": "Combustion Turbine", "HY": "Conventional Hydro", "IC": "Internal Combustion", "JE": "Jet Engine", "NB": "Steam (BWR Nuclear)", "NP": "Steam (PWR Nuclear)", "PS": "Pumped Storage Hydro", "PV": "Photovoltaic", "ST": "Steam Turbine (Fossil)", "WT": "Wind Turbine", } combined["Unit Type"] = combined["Unit Type"].map(unit_type_map) fuel_type_map = { "BAT": "Battery", "BUT": "Butane", "FO2": "No. 2 Fuel Oil", "FO4": "No. 4 Fuel Oil", "FO6": "No. 6 Fuel Oil", "FW": "Fly Wheel", "JF": "Jet Fuel", "KER": "Kerosene", "MTE": "Methane (Bio Gas)", "NG": "Natural Gas", "OT": "Other (Describe In Footnote)", "REF": "Refuse (Solid Waste)", "SUN": "Sunlight", "UR": "Uranium", "WAT": "Water", "WD": "Wood and/or Wood Waste", "WND": "Wind", } combined["Fuel Type 1"] = combined["Fuel Type 1"].map( fuel_type_map, ) combined["Fuel Type 2"] = combined["Fuel Type 2"].map( fuel_type_map, ) combined["Is Dual Fuel"] = combined["Is Dual Fuel"] == "YES" state_code_map = { 36: "New York", 42: "Pennsylvania", 25: "Massachusetts", 34: "New Jersey", } combined["State"] = combined["State"].map(state_code_map) # todo map county codes to names. info on first sheet of excel return combined
[docs] def get_loads(self) -> pd.DataFrame: """Get a list of loads in NYISO Returns: pandas.DataFrame: a DataFrame of loads and locations """ url = "http://mis.nyiso.com/public/csv/load/load.csv" logger.info(f"Requesting {url}") df = pd.read_csv(url) return df
def _set_marketname(self, market: Markets) -> str: if market in [Markets.REAL_TIME_5_MIN, Markets.REAL_TIME_15_MIN]: marketname = REAL_TIME_LMP_DATASET elif market == Markets.REAL_TIME_HOURLY: marketname = REAL_TIME_HOURLY_LMP_DATASET elif market == Markets.DAY_AHEAD_HOURLY: marketname = DAM_LMP_DATASET else: raise RuntimeError(f"LMP Market {market} is not supported") return marketname def _set_location_type_for_filename(self, location_type: NYISOLocationType) -> str: location_types = [NYISOLocationType.ZONE, NYISOLocationType.GENERATOR] if location_type == NYISOLocationType.ZONE: return NYISOLocationType.ZONE elif location_type == NYISOLocationType.GENERATOR: return "gen" else: raise ValueError( f"Invalid location type. Expected one of: {location_types}", ) def _download_nyiso_archive( self, date: str | pd.Timestamp, end: str | pd.Timestamp | None = None, dataset_name: str | None = None, filename: str | None = None, groupby: str | None = None, verbose: bool = False, ) -> pd.DataFrame: """Download a dataset from NYISO's archive Arguments: date (str or datetime): the date to download. if end is provided, this is the start date end (str or datetime): the end date to download. if provided, date is the start date dataset_name (str): the name of the dataset to download filename (str): the name of the file to download. if not provided, dataset_name is used groupby (str): the column to group by when converting datetimes. Used to avoid ambiguous datetimes when dst ends verbose (bool): print out requested url Returns: pandas.DataFrame: the downloaded data """ if filename is None: filename = dataset_name # NB: need to add the file date to the load forecast dataset to get the # forecast publish time. add_file_date = LOAD_FORECAST_DATASET == dataset_name date = gridstatus.utils._handle_date(date, self.default_timezone) month = date.strftime("%Y%m01") day = date.strftime("%Y%m%d") # NB: if requesting the same day then just download the single file if end is not None and date.normalize() == end.normalize(): end = None date = date.normalize() # NB: the last 7 days of file are hosted directly as csv # todo this can probably be optimized to a single csv in # a range and all files are in the last 7 days if end is None and date > pd.Timestamp.now( tz=self.default_timezone, ).normalize() - pd.DateOffset(days=7): csv_filename = f"{day}{filename}.csv" csv_url = f"http://mis.nyiso.com/public/csv/{dataset_name}/{csv_filename}" logger.info(f"Requesting {csv_url}") df = pd.read_csv(csv_url) df = self._handle_time(df, dataset_name, groupby=groupby) if add_file_date: df["File Date"] = self._get_load_forecast_file_date(date, verbose) else: zip_url = f"http://mis.nyiso.com/public/csv/{dataset_name}/{month}{filename}_csv.zip" # noqa: E501 z = utils.get_zip_folder(zip_url, verbose=verbose) all_dfs = [] if end is None: date_range = [date] else: date_range = pd.date_range( date.date(), end.date(), freq="1D", inclusive="left", ).tolist() # NB: this handles case where end is the first of the next month # this pops up from the support_date_range decorator # and that date will be handled in the next month's zip file if end.month == date.month: date_range += [end] for d in date_range: d = gridstatus.utils._handle_date(d, tz=self.default_timezone) month = d.strftime("%Y%m01") day = d.strftime("%Y%m%d") csv_filename = f"{day}{filename}.csv" if csv_filename not in z.namelist(): logger.info(f"{csv_filename} not found in {zip_url}") continue df = pd.read_csv(z.open(csv_filename)) if add_file_date: # NB: The File Date is the last modified time of the individual csv file df["File Date"] = pd.Timestamp( *z.getinfo(csv_filename).date_time, tz=self.default_timezone, ) df = self._handle_time(df, dataset_name, groupby=groupby) all_dfs.append(df) df = pd.concat(all_dfs) return df.sort_values("Time").reset_index(drop=True) def _get_load_forecast_file_date( self, date: pd.Timestamp, verbose: bool = False, ) -> pd.Timestamp: """Retrieves the last updated time for load forecast file from the archive""" data = pd.read_html( "http://mis.nyiso.com/public/P-7list.htm", skiprows=2, header=0, )[0] last_updated_date = data.loc[ data["CSV Files"] == date.strftime("%m-%d-%Y"), "Last Updated", ].iloc[0] return pd.Timestamp(last_updated_date, tz=self.default_timezone)
[docs] def get_capacity_prices( self, date: pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Pull the most recent capacity market report's market clearing prices Arguments: date (pandas.Timestamp): date that will be used to pull latest capacity report (will refer to month and year) Returns: a DataFrame of monthly capacity prices (all three auctions) for \ each of the four capacity localities within NYISO """ if date is None: date = pd.Timestamp.now(tz=self.default_timezone) else: date = utils._handle_date(date, tz=self.default_timezone) if date.year == 2014: year_code = 1410927 elif date.year == 2015: year_code = 1410895 elif date.year == 2016: year_code = 1410901 if date.year == 2017: year_code = 1410883 if date.year == 2018: year_code = 1410889 if date.year == 2019: year_code = 4266869 elif date.year == 2020: year_code = 10106066 elif date.year == 2021: year_code = 18170164 elif date.year == 2022: year_code = 27447313 elif date.year == 2023: year_code = 35397361 elif date.year == 2024: year_code = 42146126 elif date.year == 2025: year_code = 48997190 elif date.year == 2026: year_code = 56195933 else: raise ValueError( "Year not currently supported. Please file an issue.", ) # todo: it looks like the "27447313" component of the base URL changes # every year but I'm not sure what the link between that and the year # is... capacity_market_base_url = f"https://www.nyiso.com/documents/20142/{year_code}" url = f"{capacity_market_base_url}/ICAP-Market-Report-{date.month_name()}-{date.year}.xlsx" # Special case if date.month_name() == "December" and date.year == 2023: url = f"{capacity_market_base_url}/ICAP%20Market%20Report%20-%20{date.month_name()}%20{date.year}.xlsx" logger.info(f"Requesting {url}") df = pd.read_excel(url, sheet_name="MCP Table", header=[0, 1]) df.rename(columns={"Unnamed: 0_level_0": "", "Date": ""}, inplace=True) df.set_index("", inplace=True) return df.dropna(how="any", axis="columns")
@support_date_range(frequency="DAY_START")
[docs] def get_as_prices_day_ahead_hourly( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: """Pull the most recent ancillary service market report's market clearing prices Arguments: date (pandas.Timestamp): date that will be used to pull latest capacity report (will refer to month and year) """ df = self._download_nyiso_archive( date=date, verbose=verbose, dataset_name="damasp", ) df = self._handle_as_prices(df, rt_or_dam="dam") return df
@support_date_range(frequency="DAY_START")
[docs] def get_as_prices_real_time_5_min( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, end: str | pd.Timestamp | None = None, verbose: bool = False, ) -> pd.DataFrame: """Pull the most recent ancillary service market report's market clearing prices Arguments: date (pandas.Timestamp): date that will be used to pull latest capacity report (will refer to month and year) """ df = self._download_nyiso_archive( date=date, verbose=verbose, dataset_name="rtasp", ) df = self._handle_as_prices(df, rt_or_dam="rt") return df
def _handle_as_prices( self, df: pd.DataFrame, rt_or_dam: Literal["rt", "dam"], ) -> pd.DataFrame: df = df.rename( columns={ "Name": "Zone", "10 Min Spinning Reserve ($/MWHr)": "10 Min Spin Reserves", "10 Min Non-Synchronous Reserve ($/MWHr)": "10 Min Non-Spin Reserves", "30 Min Operating Reserve ($/MWHr)": "30 Min Reserves", "NYCA Regulation Capacity ($/MWHr)": "Regulation Capacity", }, ) if rt_or_dam == "rt": df["Interval End"] = df["Interval Start"] df["Interval Start"] = df["Interval Start"] - pd.Timedelta(minutes=5) else: df["Interval End"] = df["Interval Start"] + pd.Timedelta( minutes=60, ) return df[ [ "Interval Start", "Interval End", "Zone", "10 Min Spin Reserves", "10 Min Non-Spin Reserves", "30 Min Reserves", "Regulation Capacity", ] ] @support_date_range(frequency="DAY_START")
[docs] def get_limiting_constraints_real_time( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: if date == "latest": data = pd.read_csv( "https://mis.nyiso.com/public/csv/LimitingConstraints/currentLimitingConstraints.csv", ) data = self._handle_time( data, LIMITING_CONSTRAINTS_REAL_TIME_DATASET, groupby="Limiting Facility", ) else: data = self._download_nyiso_archive( date, end=end, dataset_name=LIMITING_CONSTRAINTS_REAL_TIME_DATASET, groupby="Limiting Facility", verbose=verbose, ) data = data.rename(columns={"Constraint Cost($)": "Constraint Cost"}) data = ( data[ [ "Interval Start", "Interval End", "Limiting Facility", "Facility PTID", "Contingency", "Constraint Cost", ] ] .sort_values(["Interval Start", "Limiting Facility", "Contingency"]) .reset_index( drop=True, ) ) return data
@support_date_range(frequency="DAY_START")
[docs] def get_limiting_constraints_day_ahead( self, date: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp], end: str | pd.Timestamp | tuple[pd.Timestamp, pd.Timestamp] | None = None, verbose: bool = False, ) -> pd.DataFrame: df = self._download_nyiso_archive( date, end=end, dataset_name=LIMITING_CONSTRAINTS_DAY_AHEAD_DATASET, groupby="Limiting Facility", verbose=verbose, ) df = df.rename(columns={"Constraint Cost($)": "Constraint Cost"}) df = ( df[ [ "Interval Start", "Interval End", "Limiting Facility", "Facility PTID", "Contingency", "Constraint Cost", ] ] .sort_values(["Interval Start", "Limiting Facility", "Contingency"]) .reset_index( drop=True, ) ) return df