Source code for gridstatus.caiso

import io
import time
from contextlib import redirect_stderr
from zipfile import ZipFile

import pandas as pd
import requests
import tabula

from gridstatus import utils
from gridstatus.base import GridStatus, ISOBase, Markets, NotSupported
from gridstatus.decorators import support_date_range
from gridstatus.lmp_config import lmp_config
from gridstatus.logging import log

_BASE = "https://www.caiso.com/outlook/SP"
_HISTORY_BASE = "https://www.caiso.com/outlook/SP/History"


[docs]class CAISO(ISOBase): """California Independent System Operator (CAISO)""" name = "California ISO" iso_id = "caiso" default_timezone = "US/Pacific" status_homepage = "https://www.caiso.com/TodaysOutlook/Pages/default.aspx" interconnection_homepage = "https://rimspub.caiso.com/rimsui/logon.do" # Markets PRC_INTVL_LMP, PRC_RTPD_LMP, PRC_LMP markets = [ Markets.REAL_TIME_5_MIN, Markets.REAL_TIME_15_MIN, Markets.DAY_AHEAD_HOURLY, ] trading_hub_locations = [ "TH_NP15_GEN-APND", "TH_SP15_GEN-APND", "TH_ZP26_GEN-APND", ] def _current_day(self): # get current date from stats api return self.get_status(date="latest").time.date()
[docs] def get_stats(self, verbose=False): stats_url = _BASE + "/stats.txt" r = self._get_json(stats_url, verbose=verbose) return r
[docs] def get_status(self, date="latest", verbose=False) -> str: """Get Current Status of the Grid. Only date="latest" is supported Known possible values: Normal, Restricted Maintenance Operations, Flex Alert """ if date == "latest": # todo is it possible for this to return more than one element? r = self.get_stats(verbose=verbose) time = pd.to_datetime(r["slotDate"]).tz_localize("US/Pacific") # can only store one value for status so we concat them together status = ", ".join(r["gridstatus"]) reserves = r["Current_reserve"] return GridStatus(time=time, status=status, reserves=reserves, iso=self) else: raise NotSupported()
@support_date_range(frequency="1D")
[docs] def get_fuel_mix(self, date, start=None, end=None, verbose=False): """Get fuel mix in 5 minute intervals for a provided day Arguments: date (datetime.date, str): "latest", "today", or an object that can be parsed as a datetime for the day to return data. start (datetime.date, str): start of date range to return. alias for `date` parameter. Only specify one of `date` or `start`. end (datetime.date, str): "today" or an object that can be parsed as a datetime for the day to return data. Only used if requesting a range of dates. verbose (bool, optional): print verbose output. Defaults to False. Returns: pandas.DataFrame: A DataFrame with columns - 'Time' and columns \ for each fuel type. """ if date == "latest": mix = self.get_fuel_mix("today", verbose=verbose) return mix.tail(1).reset_index(drop=True) return self._get_historical_fuel_mix(date, verbose=verbose)
def _get_historical_fuel_mix(self, date, verbose=False): df = _get_historical("fuelsource", date, verbose=verbose) # rename some inconsistent columns names to standardize across dates df = df.rename( columns={ "Small hydro": "Small Hydro", "Natural gas": "Natural Gas", "Large hydro": "Large Hydro", }, ) # when day light savings time switches, there are na rows df = df.dropna() return df @support_date_range(frequency="1D")
[docs] def get_load(self, date, end=None, verbose=False): """Return load at a previous date in 5 minute intervals""" if date == "latest": # todo call today load_url = _BASE + "/demand.csv" df = pd.read_csv(load_url) # get last non null row data = df[~df["Current demand"].isnull()].iloc[-1] return { "time": _make_timestamp(data["Time"], self._current_day()), "load": data["Current demand"], } return self._get_historical_load(date, verbose=verbose)
def _get_historical_load(self, date, verbose=False): df = _get_historical("demand", date, verbose=verbose) df = df[["Time", "Interval Start", "Interval End", "Current demand"]] df = df.rename(columns={"Current demand": "Load"}) df = df.dropna(subset=["Load"]) return df @support_date_range(frequency="31D")
[docs] def get_load_forecast(self, date, end=None, sleep=4, verbose=False): """Returns load forecast for a previous date in 1 hour intervals Arguments: date (datetime.date, pd.Timestamp, str): day to return. If string, format should be YYYYMMDD e.g 20200623 sleep (int): number of seconds to sleep before returning to avoid hitting rate limit in regular usage. Defaults to 5 seconds. """ start, end = _caiso_handle_start_end(date, end) url = ( "http://oasis.caiso.com/oasisapi/SingleZip?" + "resultformat=6&queryname=SLD_FCST&version=1&market_run_id=DAM" + f"&startdatetime={start}&enddatetime={end}" ) df = _get_oasis(url, verbose=verbose, sleep=sleep).rename( columns={"MW": "Load Forecast"}, ) # returns many areas, we only want one overall iso df = df[df["TAC_AREA_NAME"] == "CA ISO-TAC"] df = df.sort_values("Time") # todo - what is the actual time of the forecast? df["Forecast Time"] = df["Time"].iloc[0] df = df[ [ "Forecast Time", "Time", "Interval Start", "Interval End", "Load Forecast", ] ] return df
[docs] def get_pnodes(self): url = "http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname=ATL_PNODE_MAP&version=1&startdatetime=20220801T07:00-0000&enddatetime=20220802T07:00-0000&pnode_id=ALL" # noqa df = pd.read_csv(url, compression="zip").rename( columns={ "APNODE_ID": "Aggregate PNode ID", "PNODE_ID": "PNode ID", }, ) return df
@lmp_config( supports={ Markets.DAY_AHEAD_HOURLY: ["latest", "today", "historical"], Markets.REAL_TIME_15_MIN: ["latest", "today", "historical"], Markets.REAL_TIME_5_MIN: ["latest", "today", "historical"], }, ) @support_date_range(frequency="31D")
[docs] def get_lmp( self, date, market: str, locations: list = None, sleep: int = 5, end=None, verbose=False, ): """Get LMP pricing starting at supplied date for a list of locations. Arguments: date (datetime.date, str): date to return data market: market to return from. supports: locations (list): list of locations to get data from. If no locations are provided, defaults to NP15, SP15, and ZP26, which are the trading hub locations. For a list of locations, call ``CAISO.get_pnodes()`` sleep (int): number of seconds to sleep before returning to avoid hitting rate limit in regular usage. Defaults to 5 seconds. Returns: pandas.DataFrame: A DataFrame of pricing data """ if date == "latest": return self._latest_lmp_from_today(market=market, locations=locations) if locations is None: locations = self.trading_hub_locations assert isinstance(locations, list), "locations must be a list" # todo make sure defaults to local timezone start, end = _caiso_handle_start_end(date, end) if market == Markets.DAY_AHEAD_HOURLY: query_name = "PRC_LMP" market_run_id = "DAM" version = 12 PRICE_COL = "MW" elif market == Markets.REAL_TIME_15_MIN: query_name = "PRC_RTPD_LMP" market_run_id = "RTPD" version = 3 PRICE_COL = "PRC" elif market == Markets.REAL_TIME_5_MIN: query_name = "PRC_INTVL_LMP" market_run_id = "RTM" version = 3 PRICE_COL = "VALUE" else: raise RuntimeError("LMP Market is not supported") nodes_str = ",".join(locations) url = f"http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname={query_name}&version={version}&startdatetime={start}&enddatetime={end}&market_run_id={market_run_id}&node={nodes_str}" # noqa df = _get_oasis( url, verbose=verbose, sleep=sleep, ) df = df.pivot_table( index=["Time", "Interval Start", "Interval End", "NODE"], columns="LMP_TYPE", values=PRICE_COL, aggfunc="first", ) df = df.reset_index().rename( columns={ "NODE": "Location", "LMP": "LMP", "MCE": "Energy", "MCC": "Congestion", "MCL": "Loss", }, ) df["Market"] = market.value df["Location Type"] = None df.loc[ df["Location"].isin(self.trading_hub_locations), "Location Type", ] = "Trading Hub" df = df[ [ "Time", "Interval Start", "Interval End", "Market", "Location", "Location Type", "LMP", "Energy", "Congestion", "Loss", ] ] data = utils.filter_lmp_locations(df, locations) # clean up pivot name in header data.columns.name = None return data
@support_date_range(frequency="1D")
[docs] def get_storage(self, date, verbose=False): """Return storage charging or discharging for today in 5 minute intervals Negative means charging, positive means discharging Arguments: date (datetime.date, str): date to return data """ if date == "latest": return self._latest_from_today(self.get_storage) df = _get_historical("storage", date, verbose=verbose) df = df.rename( columns={ "Total batteries": "Supply", "Stand-alone batteries": "Stand-alone Batteries", "Hybrid batteries": "Hybrid Batteries", }, ) df = df[ [ "Time", "Interval Start", "Interval End", "Supply", "Stand-alone Batteries", "Hybrid Batteries", ] ] return df
@support_date_range(frequency="31D")
[docs] def get_gas_prices( self, date, end=None, fuel_region_id="ALL", sleep=4, verbose=False, ): """Return gas prices at a previous date Arguments: date (datetime.date, str): date to return data end (datetime.date, str): last date of range to return data. If None, returns only date. Defaults to None. fuel_region_id(str, or list): single fuel region id or list of fuel region ids to return data for. Defaults to ALL, which returns all fuel regions. Returns: pandas.DataFrame: A DataFrame of gas prices """ start, end = _caiso_handle_start_end(date, end) if isinstance(fuel_region_id, list): fuel_region_id = ",".join(fuel_region_id) url = f"http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname=PRC_FUEL&version=1&FUEL_REGION_ID={fuel_region_id}&startdatetime={start}&enddatetime={end}" # noqa df = _get_oasis(url, verbose=verbose, sleep=sleep).rename( columns={ "FUEL_REGION_ID": "Fuel Region Id", "PRC": "Price", }, ) df = ( df.sort_values("Time") .sort_values( ["Fuel Region Id", "Time"], ) .reset_index(drop=True) ) df = df[ [ "Time", "Interval Start", "Interval End", "Fuel Region Id", "Price", ] ] return df
@support_date_range(frequency="31D")
[docs] def get_ghg_allowance( self, date, end=None, sleep=4, verbose=False, ): """Return ghg allowance at a previous date Arguments: date (datetime.date, str): date to return data end (datetime.date, str): last date of range to return data. If None, returns only date. Defaults to None. """ start, end = _caiso_handle_start_end(date, end) url = f"http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname=PRC_GHG_ALLOWANCE&version=1&startdatetime={start}&enddatetime={end}" # noqa df = _get_oasis(url, verbose=verbose, sleep=sleep).rename( columns={ "GHG_PRC_IDX": "GHG Allowance Price", }, ) df = df[ [ "Time", "Interval Start", "Interval End", "GHG Allowance Price", ] ] return df
[docs] def get_interconnection_queue(self, verbose=False): url = "http://www.caiso.com/PublishedDocuments/PublicQueueReport.xlsx" msg = f"Downloading interconnection queue from {url}" log(msg, verbose) sheets = pd.read_excel(url, skiprows=3, sheet_name=None) # remove legend at the bottom queued_projects = sheets["Grid GenerationQueue"][:-8] completed_projects = sheets["Completed Generation Projects"][:-2] withdrawn_projects = sheets["Withdrawn Generation Projects"][:-2].rename( columns={"Project Name - Confidential": "Project Name"}, ) queue = pd.concat( [queued_projects, completed_projects, withdrawn_projects], ) queue = queue.rename( columns={ "Interconnection Request\nReceive Date": ( "Interconnection Request Receive Date" ), "Actual\nOn-line Date": "Actual On-line Date", "Current\nOn-line Date": "Current On-line Date", "Interconnection Agreement \nStatus": ( "Interconnection Agreement Status" ), "Study\nProcess": "Study Process", "Proposed\nOn-line Date\n(as filed with IR)": ( "Proposed On-line Date (as filed with IR)" ), "System Impact Study or \nPhase I Cluster Study": ( "System Impact Study or Phase I Cluster Study" ), "Facilities Study (FAS) or \nPhase II Cluster Study": ( "Facilities Study (FAS) or Phase II Cluster Study" ), "Optional Study\n(OS)": "Optional Study (OS)", }, ) type_columns = ["Type-1", "Type-2", "Type-3"] queue["Generation Type"] = queue[type_columns].apply( lambda x: " + ".join(x.dropna()), axis=1, ) rename = { "Queue Position": "Queue ID", "Project Name": "Project Name", "Generation Type": "Generation Type", "Queue Date": "Queue Date", "County": "County", "State": "State", "Application Status": "Status", "Current On-line Date": "Proposed Completion Date", "Actual On-line Date": "Actual Completion Date", "Reason for Withdrawal": "Withdrawal Comment", "Withdrawn Date": "Withdrawn Date", "Utility": "Transmission Owner", "Station or Transmission Line": "Interconnection Location", "Net MWs to Grid": "Capacity (MW)", } extra_columns = [ "Type-1", "Type-2", "Type-3", "Fuel-1", "Fuel-2", "Fuel-3", "MW-1", "MW-2", "MW-3", "Interconnection Request Receive Date", "Interconnection Agreement Status", "Study Process", "Proposed On-line Date (as filed with IR)", "System Impact Study or Phase I Cluster Study", "Facilities Study (FAS) or Phase II Cluster Study", "Optional Study (OS)", "Full Capacity, Partial or Energy Only (FC/P/EO)", "Off-Peak Deliverability and Economic Only", "Feasibility Study or Supplemental Review", ] missing = [ "Interconnecting Entity", "Summer Capacity (MW)", "Winter Capacity (MW)", ] queue = utils.format_interconnection_df( queue=queue, rename=rename, extra=extra_columns, missing=missing, ) return queue
@support_date_range(frequency="1D")
[docs] def get_curtailment(self, date, verbose=False): """Return curtailment data for a given date Notes: * requires java to be installed in order to run * Data available from June 30, 2016 to present Arguments: date (datetime.date, str): date to return data end (datetime.date, str): last date of range to return data. If None, returns only date. Defaults to None. verbose: print out url being fetched. Defaults to False. Returns: pandas.DataFrame: A DataFrame of curtailment data """ # http://www.caiso.com/Documents/Wind_SolarReal-TimeDispatchCurtailmentReport02dec_2020.pdf date_strs = [ date.strftime("%b%d_%Y"), date.strftime( "%d%b_%Y", ).lower(), date.strftime("-%b%d_%Y"), ] # handle specfic case where dec 02, 2021 has wrong year in file name if date_strs[0] == "Dec02_2021": date_strs = ["02dec_2020"] if date_strs[0] == "Dec02_2020": # this correct, so make sure we don't try the # other file since 2021 is published wrong date_strs = ["Dec02_2020"] # todo handle not always just 4th pge pdf = None for date_str in date_strs: url = f"http://www.caiso.com/Documents/Wind_SolarReal-TimeDispatchCurtailmentReport{date_str}.pdf" # noqa: E501 msg = f"Fetching URL: {url}" log(msg, verbose) r = requests.get(url) if b"404 - Page Not Found" in r.content: continue pdf = io.BytesIO(r.content) break if pdf is None: raise ValueError( "Could not find curtailment PDF for {}".format(date), ) with io.StringIO() as buf, redirect_stderr(buf): try: tables = tabula.read_pdf(pdf, pages="all") except Exception: print(buf.getvalue()) raise RuntimeError("Problem Reading PDF") index_curtailment_table = list( map(lambda df: "FUEL TYPE" in df.columns, tables), ).index(True) tables = tables[index_curtailment_table:] if len(tables) == 0: raise ValueError("No tables found") elif len(tables) == 1: df = tables[0] else: # this is case where there was a continuation of the # curtailment table # on a second page. there is no header, # make parsed header of extra table the first row def _handle_extra_table(extra_table): extra_table = pd.concat( [ extra_table.columns.to_frame().T.replace("Unnamed: 0", None), extra_table, ], ) extra_table.columns = tables[0].columns return extra_table extra_tables = [tables[0]] + [_handle_extra_table(t) for t in tables[1:]] df = pd.concat(extra_tables).reset_index() rename = { "DATE": "Date", "HOU\rR": "Hour", "HOUR": "Hour", "CURT TYPE": "Curtailment Type", "REASON": "Curtailment Reason", "FUEL TYPE": "Fuel Type", "CURTAILED MWH": "Curtailment (MWh)", "CURTAILED\rMWH": "Curtailment (MWh)", "CURTAILED MW": "Curtailment (MW)", "CURTAILED\rMW": "Curtailment (MW)", } df = df.rename(columns=rename) # convert from hour ending to hour beginning df["Hour"] = df["Hour"].astype(int) - 1 df["Time"] = df["Hour"].apply( lambda x, date=date: date + pd.Timedelta(hours=x), ) df["Interval Start"] = df["Time"] df["Interval End"] = df["Time"] + pd.Timedelta(hours=1) df = df.drop(columns=["Date", "Hour"]) df["Fuel Type"] = df["Fuel Type"].map( { "SOLR": "Solar", "WIND": "Wind", }, ) df = df[ [ "Time", "Interval Start", "Interval End", "Curtailment Type", "Curtailment Reason", "Fuel Type", "Curtailment (MWh)", "Curtailment (MW)", ] ] return df
@support_date_range(frequency="1D")
[docs] def get_as_prices(self, date, end=None, sleep=4, verbose=False): """Return AS prices for a given date for each region Arguments: date (datetime.date, str): date to return data end (datetime.date, str): last date of range to return data. If None, returns only date. Defaults to None. verbose (bool, optional): print out url being fetched. Defaults to False. Returns: pandas.DataFrame: A DataFrame of AS prices """ start, end = _caiso_handle_start_end(date, end) url = f"http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname=PRC_AS&version=12&startdatetime={start}&enddatetime={end}&market_run_id=DAM&anc_type=ALL&anc_region=ALL" # noqa df = _get_oasis(url=url, verbose=verbose, sleep=sleep).rename( columns={ "ANC_REGION": "Region", "MARKET_RUN_ID": "Market", }, ) as_type_map = { "NR": "Non-Spinning Reserves", "RD": "Regulation Down", "RMD": "Regulation Mileage Down", "RMU": "Regulation Mileage Up", "RU": "Regulation Up", "SR": "Spinning Reserves", } df["ANC_TYPE"] = df["ANC_TYPE"].map(as_type_map) df = df.pivot_table( index=[ "Time", "Interval Start", "Interval End", "Region", "Market", ], columns="ANC_TYPE", values="MW", ).reset_index() df = df.fillna(0) df.columns.name = None return df
@support_date_range(frequency="31D")
[docs] def get_as_procurement( self, date, end=None, market="DAM", sleep=4, verbose=False, ): """Get ancillary services procurement data from CAISO. Arguments: date (datetime.date, str): date to return data end (datetime.date, str): last date of range to return data. If None, returns only date. Defaults to None. market: DAM or RTM. Defaults to DAM. Returns: pandas.DataFrame: A DataFrame of ancillary services data """ assert market in ["DAM", "RTM"], "market must be DAM or RTM" start, end = _caiso_handle_start_end(date, end) url = f"http://oasis.caiso.com/oasisapi/SingleZip?resultformat=6&queryname=AS_RESULTS&version=1&startdatetime={start}&enddatetime={end}&market_run_id={market}&anc_type=ALL&anc_region=ALL" # noqa df = _get_oasis(url=url, verbose=verbose, sleep=sleep).rename( columns={ "ANC_REGION": "Region", "MARKET_RUN_ID": "Market", }, ) as_type_map = { "NR": "Non-Spinning Reserves", "RD": "Regulation Down", "RMD": "Regulation Mileage Down", "RMU": "Regulation Mileage Up", "RU": "Regulation Up", "SR": "Spinning Reserves", } df["ANC_TYPE"] = df["ANC_TYPE"].map(as_type_map) result_type_map = { "AS_BUY_MW": "Procured (MW)", "AS_SELF_MW": "Self-Provided (MW)", "AS_MW": "Total (MW)", "AS_COST": "Total Cost", } df["RESULT_TYPE"] = df["RESULT_TYPE"].map(result_type_map) df["column"] = df["ANC_TYPE"] + " " + df["RESULT_TYPE"] df = df.pivot_table( index=[ "Time", "Interval Start", "Interval End", "Region", "Market", ], columns="column", values="MW", ).reset_index() df.columns.name = None return df
def _make_timestamp(time_str, today, timezone="US/Pacific"): hour, minute = map(int, time_str.split(":")) return pd.Timestamp( year=today.year, month=today.month, day=today.day, hour=hour, minute=minute, tz=timezone, ) def _get_historical(file, date, verbose=False): try: date_str = date.strftime("%Y%m%d") url = _HISTORY_BASE + "/%s/%s.csv" % (date_str, file) msg = f"Fetching URL: {url}" log(msg, verbose) except Exception: # fallback if today and no historical file yet if utils.is_today(date, CAISO.default_timezone): url = _BASE + "/%s.csv" % file msg = f"Fetching URL: {url}" log(msg, verbose) df = pd.read_csv(url) # sometimes there are extra rows at the end, so this lets us ignore them df = df.dropna(subset=["Time"]) df["Time"] = df["Time"].apply( _make_timestamp, today=date, timezone="US/Pacific", ) # sometimes returns midnight, which is technically the next day # to be careful, let's check if that is the case before dropping if df.iloc[-1]["Time"].hour == 0: df = df.iloc[:-1] # insert interval start/end columns df.insert(1, "Interval Start", df["Time"]) # be careful if this is ever not 5 minutes df.insert(2, "Interval End", df["Time"] + pd.Timedelta(minutes=5)) return df def _get_oasis(url, verbose=False, sleep=4): msg = f"Fetching URL: {url}" log(msg, verbose) retry_num = 0 while retry_num < 3: r = requests.get(url) if r.status_code == 200: break retry_num += 1 print(f"Failed to get data from CAISO. Error: {r.status_code}") print(f"Retrying {retry_num}...") time.sleep(5) z = ZipFile(io.BytesIO(r.content)) df = pd.read_csv( z.open(z.namelist()[0]), ) if "INTERVALSTARTTIME_GMT" in df.columns: df["INTERVALSTARTTIME_GMT"] = pd.to_datetime( df["INTERVALSTARTTIME_GMT"], utc=True, ).dt.tz_convert(CAISO.default_timezone) df["INTERVALENDTIME_GMT"] = pd.to_datetime( df["INTERVALENDTIME_GMT"], utc=True, ).dt.tz_convert(CAISO.default_timezone) df.rename( columns={ "INTERVALSTARTTIME_GMT": "Interval Start", "INTERVALENDTIME_GMT": "Interval End", }, inplace=True, ) df.insert(0, "Time", df["Interval Start"]) # avoid rate limiting time.sleep(5) return df # get Ancillary Services def _caiso_handle_start_end(date, end): start = date.tz_convert("UTC") if end: end = end end = end.tz_convert("UTC") else: end = start + pd.DateOffset(1) start = start.strftime("%Y%m%dT%H:%M-0000") end = end.strftime("%Y%m%dT%H:%M-0000") return start, end if __name__ == "__main__": import gridstatus print("asd")
[docs] iso = gridstatus.CAISO()
df = iso.get_curtailment( start="2016-06-30", end="today", save_to="caiso_curtailment_2/", verbose=True, ) # check if any files are missing import glob files = glob.glob("caiso_curtailment/*.csv") dates = ( pd.Series( [pd.to_datetime(f[-12:-4]) for f in files], ) .sort_values() .to_frame() .set_index(0, drop=False) ) diffs = dates.diff()[0].dt.days miss = diffs[diffs > 1]