import io
import re
from urllib.parse import urlencode
import pandas as pd
import requests
import tqdm
from bs4 import BeautifulSoup, Tag
from gridstatus import utils
from gridstatus.base import (
GridStatus,
InterconnectionQueueStatus,
ISOBase,
Markets,
NotSupported,
)
from gridstatus.decorators import support_date_range
from gridstatus.lmp_config import lmp_config
from gridstatus.logging import log
[docs]FS_RTBM_LMP_BY_LOCATION = "rtbm-lmp-by-location"
[docs]FS_DAM_LMP_BY_LOCATION = "da-lmp-by-location"
[docs]MARKETPLACE_BASE_URL = "https://marketplace.spp.org"
[docs]FILE_BROWSER_API_URL = "https://marketplace.spp.org/file-browser-api/"
[docs]LOCATION_TYPE_HUB = "HUB"
[docs]LOCATION_TYPE_INTERFACE = "INTERFACE"
[docs]LOCATION_TYPE_SETTLEMENT_LOCATION = "SETTLEMENT_LOCATION"
[docs]QUERY_RTM5_HUBS_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/1/query" # noqa
[docs]QUERY_RTM5_INTERFACES_URL = "https://pricecontourmap.spp.org/arcgis/rest/services/MarketMaps/RTBM_FeatureData/MapServer/2/query" # noqa
[docs]RELIABILITY_LEVELS = [
"Normal Operations",
"Weather Advisory",
"Resource Advisory",
"Conservative Operations Advisory",
"Energy Emergency Alert Level 1",
"Energy Emergency Alert Level 2",
"Energy Emergency Alert Level 3",
"Restoration Event",
]
[docs]LAST_UPDATED_KEYWORDS = [
"last updated",
"as of",
]
[docs]RELIABILITY_LEVELS_ALIASES = {
"Normal Operations": "Normal",
}
[docs]STATUS_STOP_WORDS = [
"as",
"at",
"ct", # central time
"eea", # energy emergency alert
"of",
"on",
]
[docs]class SPP(ISOBase):
"""Southwest Power Pool (SPP)"""
name = "Southwest Power Pool"
iso_id = "spp"
default_timezone = "US/Central"
status_homepage = "https://www.spp.org/markets-operations/current-grid-conditions/"
interconnection_homepage = (
"https://www.spp.org/engineering/generator-interconnection/"
)
markets = [
Markets.REAL_TIME_5_MIN,
Markets.DAY_AHEAD_HOURLY,
]
location_types = [
LOCATION_TYPE_HUB,
LOCATION_TYPE_INTERFACE,
LOCATION_TYPE_SETTLEMENT_LOCATION,
]
[docs] def get_status(self, date=None, verbose=False):
if date != "latest":
raise NotSupported()
url = "https://www.spp.org/markets-operations/current-grid-conditions/"
html_text = requests.get(url).content.decode("UTF-8")
return self._get_status_from_html(html_text)
[docs] def get_fuel_mix(self, date, verbose=False):
"""Get fuel mix
Args:
date: supports today and latest
Note:
if today, returns last 2 hours of data. maybe include previous day
Returns:
pd.DataFrame: fuel mix
"""
if date == "latest":
return (
self.get_fuel_mix("today", verbose=verbose)
.tail(1)
.reset_index(drop=True)
)
if not utils.is_today(date, self.default_timezone):
# https://marketplace.spp.org/pages/generation-mix-historical
# many years of historical 5 minute data
raise NotSupported
url = "https://marketplace.spp.org/chart-api/gen-mix/asChart"
r = self._get_json(url, verbose=verbose)["response"]
data = {"Timestamp": r["labels"]}
data.update((d["label"], d["data"]) for d in r["datasets"])
historical_mix = pd.DataFrame(data)
historical_mix["Timestamp"] = pd.to_datetime(
historical_mix["Timestamp"],
).dt.tz_convert(
self.default_timezone,
)
historical_mix.rename(
columns={"Timestamp": "Time"},
inplace=True,
)
historical_mix = add_interval(historical_mix, interval_min=5)
return historical_mix
[docs] def get_load(self, date, verbose=False):
"""Returns load for last 24hrs in 5 minute intervals"""
if date == "latest":
return self._latest_from_today(self.get_load)
elif utils.is_today(date, tz=self.default_timezone):
date = utils._handle_date(date, self.default_timezone)
df = self._get_load_and_forecast(verbose=verbose)
df = df.dropna(subset=["Actual Load"])
df = df.rename(columns={"Actual Load": "Load"})
df = df[["Time", "Load"]]
# returns two days, so make sure to only return current day's load
df = df[df["Time"].dt.date == date.date()]
df = df.reset_index(drop=True)
df = add_interval(df, interval_min=5)
return df
else:
# hourly historical zonal loads
# https://marketplace.spp.org/pages/hourly-load
raise NotSupported()
[docs] def get_load_forecast(self, date, forecast_type="MID_TERM", verbose=False):
"""Returns load forecast for next 7 days in hourly intervals
Arguments:
forecast_type (str): MID_TERM is hourly for next 7 days or SHORT_TERM is
every five minutes for a few hours
Returns:
pd.DataFrame: forecast for current day
"""
df = self._get_load_and_forecast(verbose=verbose)
# gives forecast from before current day
# only include forecasts starting at current day
last_actual = df.dropna(subset=["Actual Load"])["Time"].max()
current_day = last_actual.replace(hour=0, minute=0)
current_day_forecast = df[df["Time"] >= current_day].copy()
# assume forecast is made at last actual
current_day_forecast["Forecast Time"] = last_actual
if forecast_type == "MID_TERM":
forecast_col = "Mid-Term Forecast"
elif forecast_type == "SHORT_TERM":
forecast_col = "Short-Term Forecast"
else:
raise RuntimeError("Invalid forecast type")
# there will be empty rows regardless of forecast type since they dont align
current_day_forecast = current_day_forecast.dropna(
subset=[forecast_col],
)
current_day_forecast = current_day_forecast[
["Forecast Time", "Time", forecast_col]
].rename({forecast_col: "Load Forecast"}, axis=1)
current_day_forecast = add_interval(
current_day_forecast,
interval_min=60,
)
return current_day_forecast
def _get_load_and_forecast(self, verbose=False):
url = "https://marketplace.spp.org/chart-api/load-forecast/asChart"
msg = f"Getting load and forecast from {url}"
log(msg, verbose)
r = self._get_json(url)["response"]
data = {"Time": r["labels"]}
for d in r["datasets"][:3]:
if d["label"] == "Actual Load":
data["Actual Load"] = d["data"]
elif d["label"] == "Mid-Term Load Forecast":
data["Mid-Term Forecast"] = d["data"]
elif d["label"] == "Short-Term Load Forecast":
data["Short-Term Forecast"] = d["data"]
df = pd.DataFrame(data)
df["Time"] = pd.to_datetime(
df["Time"],
).dt.tz_convert(self.default_timezone)
return df
# todo where does date got in argument order
# def get_historical_lmp(self, date, market: str, nodes: list):
# 5 minute interal data
# https://marketplace.spp.org/file-browser-api/download/rtbm-lmp-by-location?path=/2022/08/By_Interval/08/RTBM-LMP-SL-202208082125.csv
# hub and interface prices
# https://marketplace.spp.org/pages/hub-and-interface-prices
# historical generation mix
# https://marketplace.spp.org/pages/generation-mix-rolling-365
# https://marketplace.spp.org/chart-api/gen-mix-365/asFile
# 15mb file with five minute resolution
[docs] def get_interconnection_queue(self, verbose=False):
"""Get interconnection queue
Returns:
pandas.DataFrame: Interconnection queue
"""
url = "https://opsportal.spp.org/Studies/GenerateActiveCSV"
msg = f"Getting interconnection queue from {url}"
log(msg, verbose)
queue = pd.read_csv(url, skiprows=1)
queue["Status (Original)"] = queue["Status"]
completed_val = InterconnectionQueueStatus.COMPLETED.value
active_val = InterconnectionQueueStatus.ACTIVE.value
queue["Status"] = queue["Status"].map(
{
"IA FULLY EXECUTED/COMMERCIAL OPERATION": completed_val,
"IA FULLY EXECUTED/ON SCHEDULE": completed_val,
"IA FULLY EXECUTED/ON SUSPENSION": completed_val,
"IA PENDING": active_val,
"DISIS STAGE": active_val,
"None": active_val,
},
)
queue["Generation Type"] = queue[["Generation Type", "Fuel Type"]].apply(
lambda x: " - ".join(x.dropna()),
axis=1,
)
queue["Proposed Completion Date"] = queue["Commercial Operation Date"]
rename = {
"Generation Interconnection Number": "Queue ID",
" Nearest Town or County": "County",
"State": "State",
"TO at POI": "Transmission Owner",
"Capacity": "Capacity (MW)",
"MAX Summer MW": "Summer Capacity (MW)",
"MAX Winter MW": "Winter Capacity (MW)",
"Generation Type": "Generation Type",
"Request Received": "Queue Date",
"Substation or Line": "Interconnection Location",
}
# todo: there are a few columns being parsed
# as "unamed" that aren't being included but should
extra_columns = [
"In-Service Date",
"Commercial Operation Date",
"Cessation Date",
"Current Cluster",
"Cluster Group",
"Replacement Generator Commercial Op Date",
"Service Type",
]
missing = [
"Project Name",
"Interconnecting Entity",
"Withdrawn Date",
"Withdrawal Comment",
"Actual Completion Date",
]
queue = utils.format_interconnection_df(
queue=queue,
rename=rename,
extra=extra_columns,
missing=missing,
)
return queue
@lmp_config(
supports={
Markets.REAL_TIME_5_MIN: ["latest", "today", "historical"],
Markets.DAY_AHEAD_HOURLY: ["latest", "today", "historical"],
},
)
@support_date_range(frequency="1D")
[docs] def get_lmp(
self,
date,
end=None,
market: str = None,
locations: list = "ALL",
location_type: str = LOCATION_TYPE_HUB,
verbose=False,
):
"""Get LMP data
Supported Markets:
- ``REAL_TIME_5_MIN``
- ``DAY_AHEAD_HOURLY``
Supported Location Types:
- ``hub``
- ``interface``
- ``settlement_location``
"""
if market not in self.markets:
raise NotSupported(f"Market {market} not supported")
location_type = self._normalize_location_type(location_type)
if market == Markets.REAL_TIME_5_MIN:
df = self._get_rtm5_lmp(
date,
end,
market,
locations,
location_type,
verbose,
)
elif market == Markets.DAY_AHEAD_HOURLY:
df = self._get_dam_lmp(
date,
end,
market,
locations,
location_type,
verbose,
)
else:
raise NotSupported(
f"Market {market} is not supported",
)
return self._finalize_spp_df(
df,
market=market,
locations=locations,
location_type=location_type,
verbose=verbose,
)
def _get_feature_data(self, base_url, verbose=False):
"""Fetches data from ArcGIS Map Service with Feature Data
Returns:
pd.DataFrame of features
"""
args = {
"f": "json",
"where": "OBJECTID IS NOT NULL",
"returnGeometry": "false",
"outFields": "*",
}
doc = self._get_json(base_url, params=args, verbose=verbose)
df = pd.DataFrame([feature["attributes"] for feature in doc["features"]])
return df
def _get_rtm5_lmp(
self,
date,
end=None,
market: str = None,
locations: list = "ALL",
location_type: str = LOCATION_TYPE_HUB,
verbose=False,
):
df = self._fetch_and_concat_csvs(
self._fs_get_rtbm_lmp_by_location_paths(date, verbose=verbose),
fs_name=FS_RTBM_LMP_BY_LOCATION,
verbose=verbose,
)
return df
def _get_dam_lmp(
self,
date,
end=None,
market: str = None,
locations: list = "ALL",
location_type: str = LOCATION_TYPE_HUB,
verbose=False,
):
df = self._fetch_and_concat_csvs(
self._fs_get_dam_lmp_by_location_paths(date, verbose=verbose),
fs_name=FS_DAM_LMP_BY_LOCATION,
verbose=verbose,
)
return df
def _finalize_spp_df(self, df, market, locations, location_type, verbose=False):
"""
Finalizes DataFrame:
- Sets Market
- Filters by location type if needed
- Sets location type
- Renames and ordering columns
- Filters by Location
- Resets the index
Arguments:
pandas.DataFrame: DataFrame with SPP data
market (str): Market
locations (list): List of locations to filter by
location_type (str): Location type
verbose (bool, optional): Verbose output
"""
df["Interval End"] = pd.to_datetime(
df["GMTIntervalEnd"],
utc=True,
).dt.tz_convert(self.default_timezone)
if market == Markets.REAL_TIME_5_MIN:
interval_duration = pd.Timedelta(minutes=5)
elif market == Markets.DAY_AHEAD_HOURLY:
interval_duration = pd.Timedelta(hours=1)
df["Interval Start"] = df["Interval End"] - interval_duration
df["Time"] = df["Interval Start"]
df["Location"] = df["Settlement Location"]
df["Market"] = market.value
if location_type == LOCATION_TYPE_SETTLEMENT_LOCATION:
# annotate instead of filter
hubs = self._get_location_list(LOCATION_TYPE_HUB, verbose=verbose)
hub_name = SPP._get_location_type_name(LOCATION_TYPE_HUB)
interfaces = self._get_location_list(
LOCATION_TYPE_INTERFACE,
verbose=verbose,
)
interface_name = SPP._get_location_type_name(
LOCATION_TYPE_INTERFACE,
)
# Determine Location Type by matching to a hub or interface.
# Otherwise, fall back to a settlement location
df["Location Type"] = df["Location"].apply(
lambda location: SPP._lookup_match(
location,
{
hub_name: hubs,
interface_name: interfaces,
},
default_value=SPP._get_location_type_name(
LOCATION_TYPE_SETTLEMENT_LOCATION,
),
),
)
else:
# filter
location_list = self._get_location_list(
location_type,
verbose=verbose,
)
df["Location Type"] = SPP._get_location_type_name(location_type)
df = df[df["Location"].isin(location_list)]
df = df.rename(
columns={
"LMP": "LMP", # for posterity
"MLC": "Loss",
"MCC": "Congestion",
"MEC": "Energy",
},
)
df = utils.filter_lmp_locations(df, locations)
df = df[
[
"Time",
"Interval Start",
"Interval End",
"Market",
"Location",
"Location Type",
"LMP",
"Energy",
"Congestion",
"Loss",
]
]
df = df.reset_index(drop=True)
return df
@staticmethod
def _lookup_match(item, lookup, default_value):
"""Use a dictionary to find the first key-value pair
where the value is a list containing the item
"""
for key, values_list in lookup.items():
if item in values_list:
return key
return default_value
@staticmethod
def _parse_day_ahead_hour_end(df, timezone):
# 'DA_HOUREND': '12/26/2022 9:00:00 AM',
return df["DA_HOUREND"].apply(
lambda x: (pd.Timestamp(x, tz=timezone) - pd.Timedelta(hours=1)),
)
def _normalize_location_type(self, location_type):
norm_location_type = location_type.upper()
if norm_location_type in self.location_types:
return norm_location_type
else:
raise NotSupported(f"Invalid location_type {location_type}")
@staticmethod
def _get_location_type_name(location_type):
if location_type == LOCATION_TYPE_HUB:
return "Hub"
elif location_type == LOCATION_TYPE_INTERFACE:
return "Interface"
elif location_type == LOCATION_TYPE_SETTLEMENT_LOCATION:
return "Settlement Location"
else:
raise ValueError(f"Invalid location_type: {location_type}")
def _get_location_list(self, location_type, verbose=False):
if location_type == LOCATION_TYPE_HUB:
df = self._get_feature_data(QUERY_RTM5_HUBS_URL, verbose=verbose)
elif location_type == LOCATION_TYPE_INTERFACE:
df = self._get_feature_data(
QUERY_RTM5_INTERFACES_URL,
verbose=verbose,
)
else:
raise ValueError(f"Invalid location_type: {location_type}")
return df["SETTLEMENT_LOCATION"].unique().tolist()
def _fs_get_rtbm_lmp_by_location_paths(self, date, verbose=False):
"""
Lists files for Real-Time Balancing Market (RTBM),
Locational Marginal Price (LMP) by Settlement Location (SL)
"""
if date == "latest":
paths = ["/RTBM-LMP-SL-latestInterval.csv"]
else:
files_df = self._file_browser_list(
name=FS_RTBM_LMP_BY_LOCATION,
fs_name=FS_RTBM_LMP_BY_LOCATION,
type="folder",
path=date.strftime("/%Y/%m/By_Interval/%d"),
)
paths = files_df["path"].tolist()
msg = f"Found {len(paths)} files for {date}"
log(msg, verbose)
return paths
def _fetch_and_concat_csvs(self, paths: list, fs_name: str, verbose: bool = False):
all_dfs = []
for path in tqdm.tqdm(paths):
url = self._file_browser_download_url(
fs_name,
params={"path": path},
)
msg = f"Fetching {url}"
log(msg, verbose)
csv = requests.get(url)
df = pd.read_csv(io.StringIO(csv.content.decode("UTF-8")))
all_dfs.append(df)
return pd.concat(all_dfs)
def _fs_get_dam_lmp_by_location_paths(self, date, verbose=False):
"""
Lists files for Day-ahead Market (DAM),
Locational Marginal Price (LMP) by Settlement Location (SL)
"""
paths = []
if date == "latest":
raise ValueError(
"DAM is released daily, so use date='today' instead",
)
date = date.normalize()
# list files for this month
files_df = self._file_browser_list(
name=FS_DAM_LMP_BY_LOCATION,
fs_name=FS_DAM_LMP_BY_LOCATION,
type="folder",
path=date.strftime("/%Y/%m/By_Day"),
)
files_df["date"] = files_df.name.apply(
lambda x: pd.to_datetime(
x.strip(".csv").split("-")[-1],
format="%Y%m%d%H%M",
)
.normalize()
.tz_localize(self.default_timezone),
)
matched_file = files_df[files_df["date"] == date]
# get latest file
paths = matched_file["path"].tolist()
msg = f"Found {len(paths)} files for {date}"
log(msg, verbose)
return paths
def _get_marketplace_session(self) -> dict:
"""
Returns a session object for the Marketplace API
"""
html = requests.get(MARKETPLACE_BASE_URL)
jsessionid = html.cookies.get("JSESSIONID")
soup = BeautifulSoup(html.content, "html.parser")
csrf_token = soup.find("meta", {"id": "_csrf"}).attrs["content"]
csrf_token_header = soup.find(
"meta",
{"id": "_csrf_header"},
).attrs["content"]
return {
"cookies": {"JSESSIONID": jsessionid},
"headers": {
csrf_token_header: csrf_token,
},
}
def _file_browser_list(self, name: str, fs_name: str, type: str, path: str):
"""Lists folders in a browser
Returns: pd.DataFrame of files, or empty pd.DataFrame on error"""
session = self._get_marketplace_session()
json_payload = {
"name": name,
"fsName": fs_name,
"type": type,
"path": path,
}
list_results = requests.post(
FILE_BROWSER_API_URL,
json=json_payload,
headers=session["headers"],
cookies=session["cookies"],
)
if list_results.status_code == 200:
df = pd.DataFrame(list_results.json())
return df
else:
return pd.DataFrame()
def _file_browser_download_url(self, fs_name, params=None):
qs = "?" + urlencode(params) if params else ""
return f"{FILE_BROWSER_API_URL}download/{fs_name}{qs}"
@staticmethod
def _clean_status_text(text):
text = text.lower()
# remove punctuation
text = re.sub(r"[,\.\(\)]", "", text)
# remove non-time colons
text = re.sub(r":$", "", text)
# drop time zone information
text = re.sub(r"central time", "", text)
# truncate starting with last updated
text = re.sub(r".*last updated", "", text)
# drop stop words
tokens = text.split(" ")
filtered_words = [
token for token in tokens if token.lower() not in STATUS_STOP_WORDS
]
text = " ".join(filtered_words)
return text
@staticmethod
def _extract_timestamp(text, year_hint=None, tz=None):
if year_hint is None:
year_hint = pd.Timestamp.now(tz=tz).year
text = SPP._clean_status_text(text)
year_search = re.search(r"[0-9]{4}", text)
if year_search is None:
# append year hint
text = f"{text} {year_hint}"
timestamp = None
try:
# throw the remaining bits at pd.Timestamp
timestamp = pd.Timestamp(text, tz=tz)
except ValueError:
pass
if timestamp is pd.NaT:
timestamp = None
return timestamp
@staticmethod
def _extract_timestamps(texts, year_hint=None, tz=None):
timestamps = [
SPP._extract_timestamp(t, year_hint=year_hint, tz=tz) for t in texts
]
return [t for t in timestamps if t is not None]
@staticmethod
def _match(
needles,
haystacks,
needle_norm_fn=lambda x: x.lower(),
haystack_norm_fn=lambda x: x.lower(),
):
"""Returns items from haystacks if any needles are in them"""
return [
haystack
for haystack in haystacks
if any(
needle_norm_fn(needle) in haystack_norm_fn(haystack)
for needle in needles
)
]
@staticmethod
def _get_leaf_elements(elems):
"""Returns leaf elements, i.e. elements without children"""
accum = []
for elem in elems:
parent = False
if isinstance(elem, Tag):
children = list(elem.children)
if len(children) > 0:
for child in children:
accum += SPP._get_leaf_elements([child])
parent = True
if not parent:
accum.append(elem)
return accum
def _get_status_candidate_texts(self, html):
"""Returns a list of text candidates for status and timestamp extraction"""
# generic pre-Soup cleanup
html = re.sub(r"<[/]?span>", "", html)
html = re.sub(r"<br/>", "", html)
html = re.sub(r"\xa0", "", html)
soup = BeautifulSoup(html, "html.parser")
# use <h1> as the north star
conditions_element = soup.find("h1")
# find all sibling paragraphs, and then their descendant leaves
sibling_paragraphs = self._get_leaf_elements(
conditions_element.parent.find_all("p"),
)
# just the text, please
return [p.text for p in sibling_paragraphs]
def _get_status_from_html(self, html_text, year_hint=None):
"""Extracts timestamp, status, and status notes from HTML"""
candidate_texts = self._get_status_candidate_texts(html_text)
timestamp = self._get_status_timestamp(
candidate_texts,
year_hint=year_hint,
)
status, notes = self._get_status_status_and_notes(candidate_texts)
if timestamp is None:
raise RuntimeError("Cannot parse time of status")
return GridStatus(
time=timestamp,
status=status,
notes=notes,
reserves=None,
iso=self,
)
def _get_status_timestamp(self, candidate_texts, year_hint=None):
"""Get timestamp from candidate texts
Returns
pd.Timestamp or None
"""
timestamp_texts = self._match(
LAST_UPDATED_KEYWORDS,
candidate_texts,
)
new_list = []
for text in timestamp_texts:
"""Truncate to immediately after reliability level,
e.g. "blah blah Normal Operations 12:00 PM Central Time"
-> "12:00 PM Central Time"
"""
for keyword in RELIABILITY_LEVELS:
pos = text.lower().find(keyword.lower())
if pos > -1:
pos += len(keyword)
new_list.append(text[pos:])
new_list.append(text)
timestamp_texts = new_list
last_updated_timestamps = self._extract_timestamps(
timestamp_texts,
year_hint=year_hint,
tz=self.default_timezone,
)
return next(iter(last_updated_timestamps), None)
def _get_status_status_and_notes(self, candidate_texts):
"""Extracts (status, notes,) tuple from candidates texts"""
status_texts = self._match(
RELIABILITY_LEVELS,
candidate_texts,
haystack_norm_fn=lambda x: self._clean_status_text(x),
)
status_text = None
if len(status_texts) > 0:
status_text = status_texts[0]
status = status_text # default
notes = None
norm_status_text = self._clean_status_text(status_text)
for level in RELIABILITY_LEVELS:
if level.lower() in norm_status_text:
status = RELIABILITY_LEVELS_ALIASES.get(level, level)
notes = [status_text]
return (
status,
notes,
)
[docs]def add_interval(df, interval_min):
"""Adds Interval Start and Interval End columns to df"""
df["Interval Start"] = df["Time"]
df["Interval End"] = df["Interval Start"] + pd.Timedelta(minutes=interval_min)
df = utils.move_cols_to_front(
df,
["Time", "Interval Start", "Interval End"],
)
return df