Source code for gridstatus.eia

import concurrent.futures
import datetime
import json
import os
import re
from pathlib import Path

import numpy as np
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm

import gridstatus
from gridstatus import utils
from gridstatus.gs_logging import log


[docs]class EIA: BASE_URL = "https://api.eia.gov/v2/" def __init__(self, api_key=None): """Initialize EIA API object Args: api_key (str, optional): EIA API key. If not provided, will look for EIA_API_KEY environment variable. """ if api_key is None: api_key = os.environ.get("EIA_API_KEY") self.api_key = api_key if api_key is None: raise ValueError( "API key not provided and EIA_API_KEY \ not found in environment variables.", ) self.api_key = api_key self.session = requests.Session()
[docs] def list_routes(self, route="/"): """List all available routes""" url = f"{self.BASE_URL}{route}" params = { "api_key": self.api_key, } data = self.session.get(url, params=params) response = data.json()["response"] return response
def _fetch_page(self, url, headers): data = self.session.get(url, headers=headers) response = data.json()["response"] df = pd.DataFrame(response["data"]) return df, int(response["total"])
[docs] def get_dataset(self, dataset, start, end, facets=None, n_workers=1, verbose=False): """Get data from a dataset Only supports "electricity/rto/interchange-data" dataset for now. Args: dataset (str): Dataset path start (str or pd.Timestamp): Start date end (str or pd.Timestamp): End date facets (dict, optional): Facets to add to the request header. Defaults to None. n_workers (int, optional): Number of workers to use for fetching data. Defaults to 1. verbose (bool, optional): Whether to print progress. Defaults to False. Returns: pd.DataFrame: Dataframe with data from the dataset """ start = gridstatus.utils._handle_date(start, "UTC") start_str = start.strftime("%Y-%m-%dT%H") end_str = None if end: end = gridstatus.utils._handle_date(end, "UTC") end_str = end.strftime("%Y-%m-%dT%H") url = f"{self.BASE_URL}{dataset}/data/" if facets is None: facets = {} params = { "start": start_str, "end": end_str, "frequency": "hourly", "data": [ "value", ], "facets": facets, "offset": 0, "length": 5000, # pagination breaks if not sorted because # api doesn't return in stable order across requests "sort": [ {"column": col, "direction": "asc"} for col in DATASET_CONFIG[dataset]["index"] ], } headers = { "X-Api-Key": self.api_key, "X-Params": json.dumps(params), } log(f"Fetching data from {url}", verbose=verbose) log(f"Params: {params}", verbose=verbose) log( f"Concurrent workers: {n_workers}", verbose=verbose, ) raw_df, total_records = self._fetch_page(url, headers) # Calculate the number of pages page_size = 5000 total_pages = (total_records + page_size - 1) // page_size if verbose: print(f"Total records: {total_records}") print(f"Total pages: {total_pages}") print("Fetching data:") # Fetch the remaining pages if necessary def fetch_page_wrapper(url, headers, page, page_size): params = json.loads(headers["X-Params"]) params["offset"] = page * page_size headers["X-Params"] = json.dumps(params) page_df, _ = self._fetch_page(url, headers) return page_df if total_pages > 1: pages = range(1, total_pages) with concurrent.futures.ThreadPoolExecutor( max_workers=n_workers, ) as executor: # noqa args = ((url, headers.copy(), page, page_size) for page in pages) futures = [executor.submit(fetch_page_wrapper, *arg) for arg in args] if verbose: with tqdm(total=total_pages, ncols=80) as progress_bar: # for first page done at beginning progress_bar.update(1) for future, page in zip( concurrent.futures.as_completed(futures), pages, ): # noqa progress_bar.update(1) page_dfs = [future.result() for future in futures] raw_df = pd.concat([raw_df, *page_dfs], ignore_index=True) df = raw_df.copy() if dataset in DATASET_CONFIG: df = DATASET_CONFIG[dataset]["handler"](df) return df
[docs] def get_grid_monitor( self, area_id=None, area_type=None, n_workers=4, verbose=False, ): """ Retrieves grid monitor data including generation and emissions. This function cannot filter by time and fetches all available data. It may be slow if fetching data for all areas. Args: area_id (str, optional): ID of area to fetch data for. If provided, fetches data for this area only, ignoring area_type. If both are not provided, fetches data for all areas. Defaults to None. area_type (str, optional): Type of areas ('Region' or 'BA') to fetch data for. Used only if area_id is not provided. If provided, fetches data for all areas of given type. If both are not provided, fetches data for all areas. Defaults to None. n_workers (int, optional): Number of workers to use for fetching data. Only used if multiple areas are being fetched. Defaults to 4. verbose (bool, optional): If True, prints progress. Defaults to False. Returns: dict: Grid monitor data for specified area(s). """ config_path = Path(__file__).parent / "eia_data" / "grid_monitor_files.json" with open(config_path, "r") as f: GRID_MONITOR_FILES = json.load(f) areas_to_fetch = GRID_MONITOR_FILES.keys() if area_id: areas_to_fetch = [area_id] elif area_type: areas_to_fetch = [ area_id for area_id in areas_to_fetch if GRID_MONITOR_FILES[area_id]["Type"].lower() == area_type.lower() ] def fetch_grid_monitor(grid_monitor): url = grid_monitor["URL"] log(f"Fetching data from {url}", verbose=verbose) df = pd.read_excel(url, sheet_name="Published Hourly Data") rename = { "NG": "Net Generation", "D": "Demand", "TI": "Total Interchange", "DF": "Demand Forecast", } df = df.rename(columns=rename) df["Area Id"] = grid_monitor["ID"] df["Area Type"] = grid_monitor["Type"] df["Area Name"] = grid_monitor["Name"] df.insert(0, "Interval End", pd.to_datetime(df["UTC time"], utc=True)) df.insert(0, "Interval Start", df["Interval End"] - pd.Timedelta("1h")) cols = [ "Interval Start", "Interval End", "Area Id", "Area Name", "Area Type", "Demand", "Demand Forecast", "Net Generation", "Total Interchange", "NG: COL", "NG: NG", "NG: NUC", "NG: OIL", "NG: WAT", "NG: SUN", "NG: WND", "NG: UNK", "NG: OTH", "Positive Generation", "Consumed Electricity", "CO2 Factor: COL", "CO2 Factor: NG", "CO2 Factor: OIL", "CO2 Emissions: COL", "CO2 Emissions: NG", "CO2 Emissions: OIL", "CO2 Emissions: Other", "CO2 Emissions Generated", "CO2 Emissions Imported", "CO2 Emissions Exported", "CO2 Emissions Consumed", "CO2 Emissions Intensity for Generated Electricity", "CO2 Emissions Intensity for Consumed Electricity", ] df = df[cols] return df # Set the number of workers you want futures = [] with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor: for area in areas_to_fetch: future = executor.submit(fetch_grid_monitor, GRID_MONITOR_FILES[area]) futures.append(future) if verbose: with tqdm(total=len(areas_to_fetch), ncols=80) as progress_bar: for future in futures: future.result() # Wait for each future to complete progress_bar.update(1) # Combine all the dataframes (assuming you want to do this) all_dfs = [future.result() for future in futures] df = pd.concat(all_dfs, ignore_index=True) return df
[docs] def get_daily_spots_and_futures(self, verbose=False): """ Retrieves daily spots and futures for select energy products. Includes Wholesale Spot and Retail Petroleum, Natural Gas. Prompt-Month Futures, broken on EIA side, for Crude, Gasoline, Heating Oil, Natural Gas, Coal, Ethanol. They are published daily and not persisted, so this should be run once daily. Returns: d: dictionary of DataFrames for each table of values.""" url = "https://www.eia.gov/todayinenergy/prices.php" df_petrol = pd.DataFrame(columns=["product", "area", "price", "percent_change"]) df_ng = pd.DataFrame( columns=[ "region", "natural_gas_price", "natural_gas_percent_change", "electricity_price", "electricity_percent_change", "spark_spread", ], ) def contains_wholesale_petroleum(text): return text and "Wholesale Spot Petroleum Prices" in text log(f"Downloading {url}", verbose) with requests.get(url) as response: content = response.content soup = BeautifulSoup(content, "html.parser") close_date = soup.find("b", string=contains_wholesale_petroleum).text pattern = r"\b\d{1,2}/\d{1,2}/\d{2}\b" close_date = re.findall(pattern=pattern, string=close_date)[0] wholesale_petroleum = soup.select_one( "table[summary='Spot Petroleum Prices']", ) rowspan_sum = 0 directions = ["up", "dn", "nc"] for s1 in wholesale_petroleum.select("td.s1"): text = s1.text parent = s1.find_parent("tr").find_parent("table") if text == "Commodity Price Index": break try: rowspan = int(s1.get("rowspan")) if s1.select("a", class_="lbox"): rowspan -= 1 # down index by one (crack spread) s2 = s1.find_next_sibling("td", class_="s2").text d1 = s1.find_next_sibling("td", class_="d1").text direction = float( s1.find_next_sibling("td", class_=directions).text, ) df_petrol.loc[len(df_petrol)] = ( text, s2, float(d1) if d1 != "NA" else np.nan, float(direction) if direction != "NA" else np.nan, ) else: for i in range(rowspan_sum, rowspan + rowspan_sum): s2_elements = parent.select("td.s2") d1_elements = parent.select("td.d1") direction_elements = parent.find_all(class_=directions) df_petrol.loc[len(df_petrol)] = ( text, s2_elements[i].text, float(d1_elements[i].text) if d1_elements[i].text != "NA" else np.nan, float(direction_elements[i].text) if direction_elements[i].text != "NA" else np.nan, ) rowspan_sum += rowspan except TypeError: s2 = s1.find_next_sibling("td", class_="s2").text d1 = s1.find_next_sibling("td", class_="d1").text direction = float( s1.find_next_sibling("td", class_=directions).text, ) df_petrol.loc[len(df_petrol)] = ( text, s2, float(d1) if d1 != "NA" else np.nan, float(direction) if direction != "NA" else np.nan, ) natural_gas_spots = soup.select_one( "table[summary='Spot Natural Gas and Electric Power Prices']", ) for s1 in natural_gas_spots.select("td.s1"): price_siblings = s1.find_next_siblings("td", class_="d1") direction_siblings = s1.find_next_siblings("td", class_=directions) df_ng.loc[len(df_ng)] = ( s1.text, float(price_siblings[0].text) if price_siblings[0].text != "NA" else np.nan, float(direction_siblings[0].text) if direction_siblings[0].text != "NA" else np.nan, float(price_siblings[1].text) if price_siblings[1].text != "NA" else np.nan, float(direction_siblings[1].text) if direction_siblings[1].text != "NA" else np.nan, float(price_siblings[2].text) if price_siblings[2].text != "NA" else np.nan, ) df_ng["date"] = pd.to_datetime(close_date) df_petrol["date"] = pd.to_datetime(close_date) df_ng = utils.move_cols_to_front(df_ng, cols_to_move=["date"]) df_petrol = utils.move_cols_to_front(df_petrol, cols_to_move=["date"]) d = { "petroleum": df_petrol, "natural_gas": df_ng, } return d
[docs] def get_coal_spots(self, verbose=False): """ Retrieve weekly coal commodity spot prices. TODO: add functionality to grab historicals from https://www.eia.gov/coal/markets/coal_markets_archive_json.php """ url = "https://www.eia.gov/coal/markets/coal_markets_json.php" spot_price_keys = [ "week_ending_date", "central_appalachia_price", "northern_appalachia_price", "illinois_basin_price", "powder_river_basin_price", "uinta_basin_price", ] coal_export_keys = [ "delivery_month", "coal_min", "coal_max", "coal_exports", ] coke_export_keys = [ "delivery_month", "coke_min", "coke_max", "coke_exports", ] spot_prices = {key: [] for key in spot_price_keys} coal_exports = {key: [] for key in coal_export_keys} coke_exports = {key: [] for key in coke_export_keys} log(f"Downloading {url}", verbose) with requests.get(url) as r: json = r.json() for key, value in json["data"][0].items(): if key in ["snl_dpst", "snl_mmbtu"]: for item in value: spot_prices["week_ending_date"].append(item["WEEK_ENDING_DATE"]) spot_prices["central_appalachia_price"].append(item["CENTRAL_APP"]) spot_prices["northern_appalachia_price"].append( item["NORTHERN_APP"], ) spot_prices["illinois_basin_price"].append(item["ILLIOIS_BASIN"]) spot_prices["powder_river_basin_price"].append( item["POWDER_RIVER_BASIN"], ) spot_prices["uinta_basin_price"].append(item["UINTA_BASIN"]) elif key == "coal_exports": for item in value: coal_exports["delivery_month"].append(item["ID"]) coal_exports["coal_min"].append(item["COAL_MIN"]) coal_exports["coal_max"].append(item["COAL_MAX"]) coal_exports["coal_exports"].append(item["COAL_EXPORTS"]) elif key == "coke_exports": for item in value: coke_exports["delivery_month"].append(item["ID"]) coke_exports["coke_min"].append(item["COKE_MIN"]) coke_exports["coke_max"].append(item["COKE_MAX"]) coke_exports["coke_exports"].append(item["COAL_COKE_EXPORTS"]) else: pass weekly_spots = pd.DataFrame(spot_prices) weekly_spots = weekly_spots.loc[weekly_spots["week_ending_date"] != "change"] weekly_spots["week_ending_date"] = weekly_spots["week_ending_date"].map( pd.to_datetime, ) weekly_spots = pd.merge( weekly_spots.drop_duplicates("week_ending_date", keep="first"), weekly_spots.drop_duplicates("week_ending_date", keep="last"), on="week_ending_date", suffixes=("_short_ton", "_mmbtu"), ) coal_exports = pd.DataFrame(coal_exports) coal_exports["delivery_month"] = coal_exports["delivery_month"].map( lambda x: datetime.datetime.strptime(str(x), "%Y%m"), ) coke_exports = pd.DataFrame(coke_exports) coke_exports["delivery_month"] = coke_exports["delivery_month"].map( lambda x: datetime.datetime.strptime(str(x), "%Y%m"), ) return { "weekly_spots": weekly_spots, "coal_exports": coal_exports, "coke_exports": coke_exports, }
def _handle_time(df, frequency="1h"): df.insert(0, "Interval End", pd.to_datetime(df["period"], utc=True)) df.insert(0, "Interval Start", df["Interval End"] - pd.Timedelta(frequency)) df = df.drop("period", axis=1) return df def _handle_region_data(df): df = _handle_time(df, frequency="1h") df = df.rename( { "value": "MW", "respondent": "Respondent", "respondent-name": "Respondent Name", "type": "Type", }, axis=1, ) # ['TI', 'NG', 'DF', 'D'] df["Type"] = df["Type"].map( { "D": "Load", "TI": "Total Interchange", "NG": "Net Generation", "DF": "Load Forecast", }, ) df["MW"] = df["MW"].astype(float) # pivot on type df = df.pivot_table( index=["Interval Start", "Interval End", "Respondent", "Respondent Name"], columns="Type", values="MW", ).reset_index() df.columns.name = None # fix after pivot for col in ["Load", "Net Generation", "Load Forecast", "Total Interchange"]: df[col] = df[col].astype(float) return df def _handle_rto_interchange(df): """electricity/rto/interchange-data""" df = _handle_time(df, frequency="1h") df = df.rename( { "value": "MW", "fromba": "From BA", "toba": "To BA", "fromba-name": "From BA Name", "toba-name": "To BA Name", }, axis=1, ) df = df[ [ "Interval Start", "Interval End", "From BA", "From BA Name", "To BA", "To BA Name", "MW", ] ] df = df.sort_values(["Interval Start", "From BA"]) return df def _handle_fuel_type_data(df): """electricity/rto/fuel-type-data""" df = _handle_time(df, frequency="1h") df = df.rename( { "value": "MW", "respondent": "Respondent", "respondent-name": "Respondent Name", }, axis=1, ) df["MW"] = df["MW"].astype(float) # pivot on type df = df.pivot_table( index=["Interval Start", "Interval End", "Respondent", "Respondent Name"], columns="type-name", values="MW", ).reset_index() fuel_mix_cols = df.columns[4:] # nans after pivot because not # all respondents have all fuel types df[fuel_mix_cols] = df[fuel_mix_cols].astype(float).fillna(0) df.columns.name = None df = df.sort_values(["Interval Start", "Respondent"]) return df
[docs]DATASET_CONFIG = { "electricity/rto/interchange-data": { "index": [ "period", "fromba", "toba", ], "handler": _handle_rto_interchange, }, "electricity/rto/region-data": { "index": ["period", "respondent", "type"], "handler": _handle_region_data, }, "electricity/rto/fuel-type-data": { "index": ["period", "respondent", "fueltype"], "handler": _handle_fuel_type_data, }, }