import io
from dataclasses import dataclass
from zipfile import ZipFile
import pandas as pd
import requests
import tqdm
from gridstatus import utils
from gridstatus.base import (
GridStatus,
InterconnectionQueueStatus,
ISOBase,
Markets,
NotSupported,
)
from gridstatus.decorators import ercot_update_dates, support_date_range
from gridstatus.gs_logging import log
from gridstatus.lmp_config import lmp_config
[docs]LOCATION_TYPE_HUB = "Trading Hub"
[docs]LOCATION_TYPE_RESOURCE_NODE = "Resource Node"
[docs]LOCATION_TYPE_ZONE = "Load Zone"
"""
Report Type IDs
"""
# DAM Clearing Prices for Capacity
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-188-CD
[docs]DAM_CLEARING_PRICES_FOR_CAPACITY_RTID = 12329
# DAM Settlement Point Prices
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-190-CD
[docs]DAM_SETTLEMENT_POINT_PRICES_RTID = 12331
# GIS Report
# https://www.ercot.com/mp/data-products/data-product-details?id=PG7-200-ER
# Historical RTM Load Zone and Hub Prices
# https://www.ercot.com/mp/data-products/data-product-details?id=NP6-785-ER
[docs]HISTORICAL_RTM_LOAD_ZONE_AND_HUB_PRICES_RTID = 13061
# Historical DAM Load Zone and Hub Prices
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-180-ER
[docs]HISTORICAL_DAM_LOAD_ZONE_AND_HUB_PRICES_RTID = 13060
# Settlement Points List and Electrical Buses Mapping
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-160-SG
[docs]SETTLEMENT_POINTS_LIST_AND_ELECTRICAL_BUSES_MAPPING_RTID = 10008
# Settlement Point Prices at Resource Nodes, Hubs and Load Zones
# https://www.ercot.com/mp/data-products/data-product-details?id=NP6-905-CD
[docs]SETTLEMENT_POINT_PRICES_AT_RESOURCE_NODES_HUBS_AND_LOAD_ZONES_RTID = 12301
# Seven-Day Load Forecast by Forecast Zone
# https://www.ercot.com/mp/data-products/data-product-details?id=NP3-560-CD
[docs]SEVEN_DAY_LOAD_FORECAST_BY_FORECAST_ZONE_RTID = 12311
# Historical DAM Clearing Prices for Capacity
# https://www.ercot.com/mp/data-products/data-product-details?id=NP4-181-ER
[docs]HISTORICAL_DAM_CLEARING_PRICES_FOR_CAPACITY_RTID = 13091
"""
Settlement Point Type Description
========== ========== ===========
Resource Node RN Resource Node for normal resource
Resource Node PCCRN Physical Resource Node for combined cycle units
Resource Node LCCRN Logical Resource Node for combined cycle plant
Resource Node PUN Private Area Network Resource Node
Load Zone LZ Congestion Load Zone
Load Zone LZ_DC DCTIE Load Zone
Hub HU Hub
Hub SH ERCOT_345KV_HUBBUSES_AVG
Hub AH ERCOT_HUB_AVG
============================================================
Source: https://www.ercot.com/files/docs/2009/10/26/07_tests_for_rsnable_lmps_overview_of_price_valid_tool_09102.ppt
""" # noqa
[docs]RESOURCE_NODE_SETTLEMENT_TYPES = ["RN", "PCCRN", "LCCRN", "PUN"]
[docs]LOAD_ZONE_SETTLEMENT_TYPES = ["LZ", "LZ_DC"]
[docs]HUB_SETTLEMENT_TYPES = ["HU", "SH", "AH"]
[docs]class Ercot(ISOBase):
"""Electric Reliability Council of Texas (ERCOT)"""
name = "Electric Reliability Council of Texas"
iso_id = "ercot"
default_timezone = "US/Central"
status_homepage = "https://www.ercot.com/gridmktinfo/dashboards/gridconditions"
interconnection_homepage = (
"http://mis.ercot.com/misapp/GetReports.do?reportTypeId=15933"
)
markets = [
Markets.REAL_TIME_15_MIN,
Markets.DAY_AHEAD_HOURLY,
]
location_types = [
LOCATION_TYPE_HUB,
LOCATION_TYPE_ZONE,
LOCATION_TYPE_RESOURCE_NODE,
]
BASE = "https://www.ercot.com/api/1/services/read/dashboards"
ACTUAL_LOADS_URL_FORMAT = "https://www.ercot.com/content/cdr/html/{timestamp}_actual_loads_of_forecast_zones.html" # noqa
LOAD_HISTORICAL_MAX_DAYS = 14
AS_PRICES_HISTORICAL_MAX_DAYS = 30
@dataclass
[docs] class Document:
url: str
publish_date: pd.Timestamp
constructed_name: str
friendly_name: str
[docs] def get_status(self, date, verbose=False):
"""Returns status of grid"""
if date != "latest":
raise NotSupported()
r = self._get_json(self.BASE + "/daily-prc.json", verbose=verbose)
time = (
pd.to_datetime(r["current_condition"]["datetime"], unit="s")
.tz_localize("UTC")
.tz_convert(self.default_timezone)
)
status = r["current_condition"]["state"]
reserves = float(r["current_condition"]["prc_value"].replace(",", ""))
if status == "normal":
status = "Normal"
notes = [r["current_condition"]["condition_note"]]
return GridStatus(
time=time,
status=status,
reserves=reserves,
iso=self,
notes=notes,
)
[docs] def get_fuel_mix(self, date, verbose=False):
"""Get fuel mix 5 minute intervals
Arguments:
date (datetime.date, str): "latest", "today",
and yesterday's date are supported.
verbose(bool): print verbose output. Defaults to False.
Returns:
pandas.DataFrame: A DataFrame with columns; Time and columns for each fuel \
type
"""
if date != "latest":
date_parsed = utils._handle_date(date, tz=self.default_timezone)
check_yesterday = date_parsed + pd.DateOffset(days=1)
if not (
utils.is_today(date, tz=self.default_timezone)
or utils.is_today(check_yesterday, tz=self.default_timezone)
):
raise NotSupported()
url = self.BASE + "/fuel-mix.json"
data = self._get_json(url, verbose=verbose)
dfs = []
for day in data["data"].keys():
df = (
pd.DataFrame(data["data"][day])
.applymap(
lambda x: x["gen"],
na_action="ignore",
)
.T
)
dfs.append(df)
mix = pd.concat(dfs)
mix.index.name = "Time"
mix = mix.reset_index()
mix["Time"] = pd.to_datetime(mix["Time"]).dt.tz_localize(
self.default_timezone,
ambiguous="infer",
)
# most timestamps are a few seconds off round 5 minute ticks
# round to nearest minute
mix["Time"] = mix["Time"].round("min")
mix = mix[
[
"Time",
"Coal and Lignite",
"Hydro",
"Nuclear",
"Power Storage",
"Solar",
"Wind",
"Natural Gas",
"Other",
]
]
if date == "latest":
return mix
# return where date_parsed matches mix["Time"]
return mix[mix["Time"].dt.date == date_parsed.date()].reset_index(drop=True)
@support_date_range("DAY_START")
[docs] def get_load(self, date, end=None, verbose=False):
"""Get load for a date"""
if date == "latest":
today_load = self.get_load("today", verbose=verbose)
latest = today_load.iloc[-1]
return {"load": latest["Load"], "time": latest["Time"]}
elif utils.is_today(date, tz=self.default_timezone):
df = self._get_todays_outlook_non_forecast(date, verbose=verbose)
df = df.rename(columns={"demand": "Load"})
return df[["Time", "Interval Start", "Interval End", "Load"]]
elif utils.is_within_last_days(
date,
self.LOAD_HISTORICAL_MAX_DAYS,
tz=self.default_timezone,
):
df = self._get_load_html(date, verbose)
return df[["Time", "Interval Start", "Interval End", "Load"]]
else:
raise NotSupported()
def _get_load_html(self, when, verbose=False):
"""Returns load for currentDay or previousDay"""
url = self.ACTUAL_LOADS_URL_FORMAT.format(
timestamp=when.strftime("%Y%m%d"),
)
msg = f"Fetching {url}"
log(msg, verbose)
dfs = pd.read_html(url, header=0)
df = dfs[0]
df = self._handle_html_data(df, {"TOTAL": "Load"})
return df
def _get_todays_outlook_non_forecast(self, date, verbose=False):
"""Returns most recent data point for supply in MW
Updates every 5 minutes
"""
assert date == "latest" or utils.is_today(
date,
self.default_timezone,
), "Only today's data is supported"
url = self.BASE + "/supply-demand.json"
msg = f"Fetching {url}"
log(msg, verbose)
r = self._get_json(url)
date = pd.to_datetime(r["lastUpdated"][:10], format="%Y-%m-%d")
data = pd.DataFrame(r["data"])
data["Interval End"] = (
date
+ data["hourEnding"].astype("timedelta64[h]")
+ data["interval"].astype("timedelta64[m]")
)
data["Interval End"] = data["Interval End"].dt.tz_localize(
self.default_timezone,
ambiguous="infer",
)
data["Interval Start"] = data["Interval End"] - pd.Timedelta(minutes=5)
data["Time"] = data["Interval Start"]
data = data[
[
"Time",
"Interval Start",
"Interval End",
"demand",
"forecast",
"capacity",
]
]
# keep today's data only
data = data[
data["Interval Start"].dt.normalize()
== pd.Timestamp(
date,
)
.tz_localize(self.default_timezone)
.normalize()
]
data = data[data["forecast"] == 0] # only keep non forecast rows
return data.reset_index(drop=True)
[docs] def get_load_forecast(self, date, verbose=False):
"""Returns load forecast
Currently only supports today's forecast
"""
if not utils.is_today(date, self.default_timezone):
raise NotSupported()
# intrahour https://www.ercot.com/mp/data-products/data-product-details?id=NP3-562-CD
# there are a few days of historical date for the forecast
today = pd.Timestamp.now(tz=self.default_timezone).normalize()
doc_info = self._get_document(
report_type_id=SEVEN_DAY_LOAD_FORECAST_BY_FORECAST_ZONE_RTID,
date=today,
constructed_name_contains="csv.zip",
verbose=verbose,
)
doc = self.read_doc(doc_info, verbose=verbose)
doc = doc.rename(columns={"SystemTotal": "Load Forecast"})
doc["Forecast Time"] = doc_info.publish_date
doc = doc[
[
"Time",
"Interval Start",
"Interval End",
"Forecast Time",
"Load Forecast",
]
]
return doc
[docs] def get_rtm_spp(self, year, verbose=False):
"""Get Historical RTM Settlement Point Prices(SPPs)
for each of the Hubs and Load Zones
Arguments:
year(int): year to get data for
Starting 2011, returns data for the entire year
Source:
https://www.ercot.com/mp/data-products/data-product-details?id=NP6-785-ER
""" # noqa
doc_info = self._get_document(
report_type_id=HISTORICAL_RTM_LOAD_ZONE_AND_HUB_PRICES_RTID,
constructed_name_contains=f"{year}.zip",
verbose=verbose,
)
x = utils.get_zip_file(doc_info.url, verbose=verbose)
all_sheets = pd.read_excel(x, sheet_name=None)
df = pd.concat(all_sheets.values())
# fix parsing error where no data is present
# should only be 1 row per year
count = df[["Delivery Hour", "Delivery Interval"]].isnull().all(axis=1).sum()
if count == 1:
df = df.dropna(
subset=["Delivery Hour", "Delivery Interval"],
how="all",
)
elif count > 1:
raise ValueError(
"Parsing error, more than expected null rows found",
)
df["Delivery Interval"] = df["Delivery Interval"].astype("Int64")
df = self.parse_doc(df, verbose=verbose)
df["Market"] = Markets.REAL_TIME_15_MIN.value
return self._finalize_spp_df(df, verbose=verbose)
[docs] def get_dam_spp(self, year, verbose=False):
"""Get Historical DAM Settlement Point Prices(SPPs)
for each of the Hubs and Load Zones
Arguments:
year(int): year to get data for.
Starting 2011, returns data for the entire year
Source:
https://www.ercot.com/mp/data-products/data-product-details?id=NP4-180-ER
"""
doc_info = self._get_document(
report_type_id=HISTORICAL_DAM_LOAD_ZONE_AND_HUB_PRICES_RTID,
constructed_name_contains=f"{year}.zip",
verbose=verbose,
)
x = utils.get_zip_file(doc_info.url, verbose=verbose)
all_sheets = pd.read_excel(x, sheet_name=None)
df = pd.concat(all_sheets.values())
# filter where DSTFlag == 10
df = self.parse_doc(df, verbose=verbose)
df["Market"] = Markets.DAY_AHEAD_HOURLY.value
return self._finalize_spp_df(df, verbose=verbose)
[docs] def get_interconnection_queue(self, verbose=False):
"""
Get interconnection queue for ERCOT
Monthly historical data available here:
http://mis.ercot.com/misapp/GetReports.do?reportTypeId=15933&reportTitle=GIS%20Report&showHTMLView=&mimicKey
""" # noqa
doc_info = self._get_document(
report_type_id=GIS_REPORT_RTID,
constructed_name_contains="GIS_Report",
verbose=verbose,
)
# TODO other sheets for small projects, inactive, and cancelled project
# TODO see if this data matches up with summaries in excel file
# TODO historical data available as well
msg = f"Downloading interconnection queue from: {doc_info.url} "
log(msg, verbose)
# skip rows and handle header
queue = pd.read_excel(
doc_info.url,
sheet_name="Project Details - Large Gen",
skiprows=30,
).iloc[4:]
queue["State"] = "Texas"
queue["Queue Date"] = queue["Screening Study Started"]
fuel_type_map = {
"BIO": "Biomass",
"COA": "Coal",
"GAS": "Gas",
"GEO": "Geothermal",
"HYD": "Hydrogen",
"NUC": "Nuclear",
"OIL": "Fuel Oil",
"OTH": "Other",
"PET": "Petcoke",
"SOL": "Solar",
"WAT": "Water",
"WIN": "Wind",
}
technology_type_map = {
"BA": "Battery Energy Storage",
"CC": "Combined-Cycle",
"CE": "Compressed Air Energy Storage",
"CP": "Concentrated Solar Power",
"EN": "Energy Storage",
"FC": "Fuel Cell",
"GT": "Combustion (gas) Turbine, but not part of a Combined-Cycle",
"HY": "Hydroelectric Turbine",
"IC": "Internal Combustion Engine, eg. Reciprocating",
"OT": "Other",
"PV": "Photovoltaic Solar",
"ST": "Steam Turbine other than Combined-Cycle",
"WT": "Wind Turbine",
}
queue["Fuel"] = queue["Fuel"].map(fuel_type_map)
queue["Technology"] = queue["Technology"].map(technology_type_map)
queue["Generation Type"] = queue["Fuel"] + " - " + queue["Technology"]
queue["Status"] = (
queue["IA Signed"]
.isna()
.map(
{
True: InterconnectionQueueStatus.ACTIVE.value,
False: InterconnectionQueueStatus.COMPLETED.value,
},
)
)
queue["Actual Completion Date"] = queue["Approved for Synchronization"]
rename = {
"INR": "Queue ID",
"Project Name": "Project Name",
"Interconnecting Entity": "Interconnecting Entity",
"Projected COD": "Proposed Completion Date",
"POI Location": "Interconnection Location",
"County": "County",
"State": "State",
"Capacity (MW)": "Capacity (MW)",
"Queue Date": "Queue Date",
"Generation Type": "Generation Type",
"Actual Completion Date": "Actual Completion Date",
"Status": "Status",
}
# todo: there are a few columns being parsed
# as "unamed" that aren't being included but should
extra_columns = [
"Fuel",
"Technology",
"GIM Study Phase",
"Screening Study Started",
"Screening Study Complete",
"FIS Requested",
"FIS Approved",
"Economic Study Required",
"IA Signed",
"Air Permit",
"GHG Permit",
"Water Availability",
"Meets Planning",
"Meets All Planning",
"CDR Reporting Zone",
# "Construction Start", # all null
# "Construction End", # all null
"Approved for Energization",
"Approved for Synchronization",
"Comment",
]
missing = [
# todo the actual complettion date can be calculated by
# looking at status and other date columns
"Withdrawal Comment",
"Transmission Owner",
"Summer Capacity (MW)",
"Winter Capacity (MW)",
"Withdrawn Date",
]
queue = utils.format_interconnection_df(
queue=queue,
rename=rename,
extra=extra_columns,
missing=missing,
)
return queue
@lmp_config(
supports={
Markets.REAL_TIME_15_MIN: ["latest", "today", "historical"],
Markets.DAY_AHEAD_HOURLY: ["latest", "today", "historical"],
},
)
@support_date_range(frequency="DAY_START")
[docs] def get_spp(
self,
date,
end=None,
market: str = None,
locations: list = "ALL",
location_type: str = "ALL",
verbose=False,
):
"""Get SPP data for ERCOT
Supported Markets:
- ``REAL_TIME_15_MIN``
- ``DAY_AHEAD_HOURLY``
Supported Location Types:
- ``zone``
- ``hub``
- ``node``
"""
if market == Markets.REAL_TIME_15_MIN:
df = self._get_spp_rtm15(
date,
verbose,
)
elif market == Markets.DAY_AHEAD_HOURLY:
df = self._get_spp_dam(date, verbose)
return self._finalize_spp_df(
df,
locations=locations,
location_type=location_type,
verbose=verbose,
)
def _finalize_spp_df(self, df, locations=None, location_type=None, verbose=False):
df = df.rename(
columns={
"SettlementPoint": "Location",
"Settlement Point": "Location",
"SettlementPointName": "Location",
"Settlement Point Name": "Location",
},
)
mapping_df = self._get_settlement_point_mapping(verbose=verbose)
resource_node = mapping_df["RESOURCE_NODE"].dropna().unique()
# if df[df.duplicated()].shape[0] > 0:
# import pdb
# pdb.set_trace()
# Create boolean masks for each location type
is_hub = df["Location"].str.startswith("HB_")
is_load_zone = df["Location"].str.startswith("LZ_")
is_resource_node = df["Location"].isin(resource_node)
# Assign location types based on the boolean masks
df.loc[is_hub, "Location Type"] = LOCATION_TYPE_HUB
df.loc[is_load_zone, "Location Type"] = LOCATION_TYPE_ZONE
df.loc[is_resource_node, "Location Type"] = LOCATION_TYPE_RESOURCE_NODE
# If a location type is not found, default to LOCATION_TYPE_RESOURCE_NODE
df["Location Type"].fillna(LOCATION_TYPE_RESOURCE_NODE, inplace=True)
df = df.rename(
columns={
"SettlementPointPrice": "SPP",
"Settlement Point Price": "SPP",
},
)
df = df[
[
"Time",
"Interval Start",
"Interval End",
"Location",
"Location Type",
"Market",
"SPP",
]
]
# todo figure out why
# when you get rid of SettlementPointType some
# rows are duplicated
# For example, SettlementPointType LZ and LZEW
df = df.drop_duplicates(
subset=[
"Time",
"Interval Start",
"Interval End",
"Location",
],
)
df = utils.filter_lmp_locations(
df=df,
locations=locations,
location_type=location_type,
)
df = df.sort_values(by="Interval Start")
df = df.reset_index(drop=True)
return df
def _get_spp_dam(
self,
date,
verbose=False,
):
"""Get day-ahead hourly Market SPP data for ERCOT"""
if date == "latest":
raise ValueError(
"DAM is released daily, so use date='today' instead",
)
publish_date = utils._handle_date(date, self.default_timezone)
# adjust for DAM since it's published a day ahead
publish_date = publish_date.normalize() - pd.DateOffset(days=1)
doc_info = self._get_document(
report_type_id=DAM_SETTLEMENT_POINT_PRICES_RTID,
date=publish_date,
constructed_name_contains="csv.zip",
verbose=verbose,
)
msg = f"Fetching {doc_info.url}"
log(msg, verbose)
df = self.read_doc(doc_info, verbose=verbose)
# fetch mapping
df["Market"] = Markets.DAY_AHEAD_HOURLY.value
return df
def _get_spp_rtm15(
self,
date,
verbose=False,
):
"""Get Real-time 15-minute Market SPP data for ERCOT
https://www.ercot.com/mp/data-products/data-product-details?id=NP6-905-CD
"""
query_date = date
if date == "latest":
query_date = utils._handle_date("today", self.default_timezone)
# returns list of Document(url=,publish_date=)
query_date_str = f"SPPHLZNP6905_{query_date.strftime('%Y%m%d')}"
all_docs = self._get_documents(
report_type_id=SETTLEMENT_POINT_PRICES_AT_RESOURCE_NODES_HUBS_AND_LOAD_ZONES_RTID,
extension="csv",
verbose=verbose,
)
# look at file name to determine the
# date/interval the file represents
docs = []
for doc in all_docs:
if query_date_str + "_0000" in doc.constructed_name:
continue
if (
query_date_str in doc.constructed_name
or f"SPPHLZNP6905_{(query_date + pd.Timedelta(days=1)).strftime('%Y%m%d')}_0000" # noqa: E501
in doc.constructed_name
):
docs.append(doc)
if date == "latest":
# just pluck out the latest document based on publish_date
docs = [max(docs, key=lambda x: x.publish_date)]
if len(docs) == 0:
raise ValueError(f"Could not fetch SPP data for {date}")
all_dfs = []
for doc_info in tqdm.tqdm(docs, disable=not verbose):
doc_url = doc_info.url
msg = f"Fetching {doc_url}"
log(msg, verbose)
df = self.read_doc(doc_info, verbose=verbose)
all_dfs.append(df)
df = pd.concat(all_dfs).reset_index(drop=True)
df["Market"] = Markets.REAL_TIME_15_MIN.value
return df
@support_date_range(frequency="1Y", update_dates=ercot_update_dates)
[docs] def get_as_prices(
self,
date,
end=None,
verbose=False,
):
"""Get ancillary service clearing prices in hourly intervals in Day Ahead Market
Arguments:
date (datetime.date, str): date of delivery for AS services
end (datetime.date, str, optional): if declared, function will return
data as a range, from "date" to "end"
verbose (bool, optional): print verbose output. Defaults to False.
Returns:
pandas.DataFrame: A DataFrame with prices for "Non-Spinning Reserves", \
"Regulation Up", "Regulation Down", "Responsive Reserves".
Source:
https://www.ercot.com/mp/data-products/data-product-details?id=NP4-181-ER
"""
# use to check if we need to pull daily files
if (
date.date()
>= (
pd.Timestamp.now(tz=self.default_timezone)
- pd.DateOffset(days=self.AS_PRICES_HISTORICAL_MAX_DAYS)
).date()
):
return self._get_as_prices_recent(date, end=end)
elif not end:
end = date
doc_info = self._get_document(
report_type_id=HISTORICAL_DAM_CLEARING_PRICES_FOR_CAPACITY_RTID,
constructed_name_contains=f"{date.year}.zip",
verbose=verbose,
)
doc = self.read_doc(doc_info, verbose=verbose)
doc = self._finalize_as_price_df(doc)
max_date = doc.Time.max().date()
df_list = [doc]
# if last df date is less than our specified end
# date, pull the remaining days. Will only be applicable
# if end date is within today - 3days
if max_date < end.date():
df_list.append(
self._get_as_prices_recent(
start=max_date,
end=end,
),
)
# join, sort, filter and reset data index
data = pd.concat(df_list).sort_values(by="Interval Start")
data = (
data.loc[
(data.Time.dt.date >= date.date()) & (data.Time.dt.date <= end.date())
]
.drop_duplicates(subset=["Interval Start"])
.reset_index(drop=True)
)
print(data)
return data
@support_date_range("DAY_START")
def _get_as_prices_recent(self, date, verbose=False):
"""Get ancillary service clearing prices in hourly intervals in Day
Ahead Market. This function is can return the last 31 days
of ancillary pricing.
Arguments:
date (datetime.date, str): date of delivery for AS services
verbose (bool, optional): print verbose output. Defaults to False.
Returns:
pandas.DataFrame: A DataFrame with prices for "Non-Spinning Reserves", \
"Regulation Up", "Regulation Down", "Responsive Reserves".
"""
# subtract one day since it's the day ahead market happens on the day
# before for the delivery day
date = date - pd.DateOffset(days=1)
doc_info = self._get_document(
report_type_id=DAM_CLEARING_PRICES_FOR_CAPACITY_RTID,
date=date,
constructed_name_contains="csv.zip",
verbose=verbose,
)
msg = f"Downloading {doc_info.url}"
log(msg, verbose)
doc = self.read_doc(doc_info, verbose=verbose)
data = self._finalize_as_price_df(
doc,
pivot=True,
)
return data
def _finalize_as_price_df(self, doc, pivot=False):
doc["Market"] = "DAM"
# recent daily files need to be pivoted
if pivot:
doc = doc.pivot_table(
index=["Time", "Interval Start", "Interval End", "Market"],
columns="AncillaryType",
values="MCPC",
).reset_index()
doc.columns.name = None
# some columns from workbook contain trailing/leading whitespace
doc.columns = [x.strip() for x in doc.columns]
# NSPIN REGDN REGUP RRS
rename = {
"NSPIN": "Non-Spinning Reserves",
"REGDN": "Regulation Down",
"REGUP": "Regulation Up",
"RRS": "Responsive Reserves",
}
col_order = [
"Time",
"Interval Start",
"Interval End",
"Market",
"Non-Spinning Reserves",
"Regulation Down",
"Regulation Up",
"Responsive Reserves",
]
doc.rename(columns=rename, inplace=True)
return doc[col_order]
def _get_document(
self,
report_type_id,
date=None,
constructed_name_contains=None,
verbose=False,
) -> Document:
"""Searches by Report Type ID, filtering for date and/or constructed name
Raises a ValueError if no document matches
Returns:
Latest Document by publish_date
"""
documents = self._get_documents(
report_type_id=report_type_id,
date=date,
constructed_name_contains=constructed_name_contains,
verbose=verbose,
)
if len(documents) == 0:
raise ValueError(
f"No document found for {report_type_id} on {date}",
)
return max(documents, key=lambda x: x.publish_date)
def _get_documents(
self,
report_type_id,
date=None,
constructed_name_contains=None,
extension=None,
verbose=False,
) -> list:
"""Searches by Report Type ID, filtering for date and/or constructed name
Returns:
list of Document with URL and Publish Date
"""
url = f"https://www.ercot.com/misapp/servlets/IceDocListJsonWS?reportTypeId={report_type_id}" # noqa
msg = f"Fetching document {url}"
log(msg, verbose)
docs = self._get_json(url)["ListDocsByRptTypeRes"]["DocumentList"]
matches = []
for doc in docs:
match = True
publish_date = pd.Timestamp(doc["Document"]["PublishDate"]).tz_convert(
self.default_timezone,
)
if date:
match = match and publish_date.date() == date.date()
if extension:
match = match and doc["Document"]["FriendlyName"].endswith(extension)
if constructed_name_contains:
match = (
match
and constructed_name_contains in doc["Document"]["ConstructedName"]
)
if match:
doc_id = doc["Document"]["DocID"]
url = f"https://www.ercot.com/misdownload/servlets/mirDownload?doclookupId={doc_id}" # noqa
matches.append(
self.Document(
url=url,
publish_date=publish_date,
constructed_name=doc["Document"]["ConstructedName"],
friendly_name=doc["Document"]["FriendlyName"],
),
)
return matches
def _handle_json_data(self, df, columns):
df["Time"] = (
pd.to_datetime(df["epoch"], unit="ms")
.dt.tz_localize("UTC")
.dt.tz_convert(self.default_timezone)
)
cols_to_keep = ["Time"] + list(columns.keys())
return df[cols_to_keep].rename(columns=columns)
def _handle_html_data(self, df, columns):
df["Interval End"] = pd.to_datetime(df["Oper Day"]) + (
df["Hour Ending"] / 100
).astype("timedelta64[h]")
df["Interval End"] = df["Interval End"].dt.tz_localize(
self.default_timezone,
)
df["Interval Start"] = df["Interval End"] - pd.DateOffset(hours=1)
df["Time"] = df["Interval Start"]
cols_to_keep = [
"Time",
"Interval Start",
"Interval End",
] + list(columns.keys())
return df[cols_to_keep].rename(columns=columns)
def _get_settlement_point_mapping(self, verbose=False):
"""Get DataFrame whose columns can help us filter out values"""
doc_info = self._get_document(
report_type_id=SETTLEMENT_POINTS_LIST_AND_ELECTRICAL_BUSES_MAPPING_RTID,
verbose=verbose,
)
doc_url = doc_info.url
msg = f"Fetching {doc_url}"
log(msg, verbose)
r = requests.get(doc_url)
z = ZipFile(io.BytesIO(r.content))
names = z.namelist()
settlement_points_file = [
name for name in names if "Settlement_Points" in name
][0]
df = pd.read_csv(z.open(settlement_points_file))
return df
[docs] def read_doc(self, doc, verbose=False):
doc = pd.read_csv(doc.url, compression="zip")
return self.parse_doc(doc, verbose=verbose)
[docs] def parse_doc(self, doc, verbose=False):
# files sometimes have different naming conventions
# a more elegant solution would be nice
doc.rename(
columns={
"Delivery Date": "DeliveryDate",
"Hour Ending": "HourEnding",
"Repeated Hour Flag": "DSTFlag",
"DeliveryHour": "HourEnding",
"Delivery Hour": "HourEnding",
"Delivery Interval": "DeliveryInterval",
# fix whitespace in column name
"DSTFlag ": "DSTFlag",
},
inplace=True,
)
original_cols = doc.columns.tolist()
# i think DeliveryInterval only shows up
# in 15 minute data along with DeliveryHour
if "DeliveryInterval" in original_cols:
interval_length = pd.Timedelta(minutes=15)
doc["HourBeginning"] = doc["HourEnding"] - 1
doc["Interval Start"] = (
pd.to_datetime(doc["DeliveryDate"])
+ doc["HourBeginning"].astype("timedelta64[h]")
+ ((doc["DeliveryInterval"] - 1) * interval_length)
)
else:
interval_length = pd.Timedelta(hours=1)
doc["HourBeginning"] = (
doc["HourEnding"]
.astype(str)
.str.split(
":",
)
.str[0]
.astype(int)
- 1
)
doc["Interval Start"] = pd.to_datetime(doc["DeliveryDate"]) + doc[
"HourBeginning"
].astype("timedelta64[h]")
doc["Interval Start"] = doc["Interval Start"].dt.tz_localize(
self.default_timezone,
ambiguous=doc["DSTFlag"] == "Y",
)
doc["Interval End"] = doc["Interval Start"] + interval_length
doc["Time"] = doc["Interval Start"]
cols_to_keep = [
"Time",
"Interval Start",
"Interval End",
] + original_cols
# todo try to clean up this logic
doc = doc[cols_to_keep]
doc = doc.drop(
columns=[
"DeliveryDate",
"HourEnding",
"DSTFlag",
],
)
if "DeliveryInterval" in doc.columns:
doc = doc.drop(columns=["DeliveryInterval"])
return doc
if __name__ == "__main__":
df = iso.get_rtm_spp(2011)