Source code for gridstatus.utils

import glob
import io
import os
from zipfile import ZipFile

import pandas as pd
import requests
import tqdm

import gridstatus
from gridstatus.base import ISOBase, Markets, NotSupported, _interconnection_columns
from gridstatus.caiso import CAISO
from gridstatus.ercot import Ercot
from gridstatus.isone import ISONE
from gridstatus.miso import MISO
from gridstatus.nyiso import NYISO
from gridstatus.pjm import PJM
from gridstatus.spp import SPP

[docs]all_isos = [MISO, CAISO, PJM, Ercot, SPP, NYISO, ISONE]
[docs]def list_isos(): """List available ISOs""" isos = [[i.name, i.iso_id, i.__name__] for i in all_isos] return pd.DataFrame(isos, columns=["Name", "Id", "Class"])
[docs]def get_iso(iso_id): """Get an ISO by its id""" for i in all_isos: if i.iso_id == iso_id: return i raise KeyError
[docs]def make_availability_df(): methods = [ "get_status", "get_fuel_mix", "get_load", "get_load_forecast", "get_storage", ] availability = {} for i in tqdm.tqdm(gridstatus.all_isos): availability[i.__name__] = {} for method in methods: availability[i.__name__][method] = {} for date in ["latest", "today", "historical"]: test = date if date == "historical": test = pd.Timestamp.now( tz=i.default_timezone, ).date() - pd.Timedelta(days=3) if method == "get_load_forecast" and date == "latest": is_defined = "❌" # red x else: try: getattr(i(), method)(test) is_defined = "✅" # green checkmark except NotSupported: is_defined = "❌" # red x except NotImplementedError: is_defined = "❌" # red x availability[i.__name__][method][date] = is_defined availability_dfs = {} for i in all_isos: availability_dfs[i.__name__] = pd.DataFrame(availability[i.__name__]) return availability_dfs
[docs]def make_availability_table(): dfs = make_availability_df() markdown = "" for method, df in sorted(dfs.items()): markdown += "## " + method + "\n" # df.index = ["`" + v + "`" for v in df.index.values] markdown += df.to_markdown() + "\n" return markdown
def _handle_date(date, tz=None): if not isinstance(date, pd.Timestamp): date = pd.to_datetime(date) if tz and date.tzinfo is None: date = date.tz_localize(tz) return date
[docs]def make_lmp_availability(): lmp_availability = {} for i in all_isos: lmp_availability[i.name] = i.markets return lmp_availability
[docs]def make_lmp_availability_table(): a = make_lmp_availability() for iso in a: a[iso] = ["`" + v.value + "`" for v in a[iso]] a[iso] = ", ".join(a[iso]) s = pd.Series(a, name="Markets") return s.to_markdown()
[docs]def filter_lmp_locations(data, locations: list): if locations == "ALL" or locations is None: return data return data[data["Location"].isin(locations)]
[docs]def get_zip_file(url): # todo add retry logic # todo does this need to be a with statement? r = requests.get(url) z = ZipFile(io.BytesIO(r.content)) return z.open(z.namelist()[0])
[docs]def is_today(date, tz=None): return _handle_date(date, tz=tz).date() == pd.Timestamp.now(tz=tz).date()
[docs]def is_within_last_days(date, days, tz=None): """Returns whether date is within N days""" now = pd.Timestamp.now(tz=tz).date() date_value = _handle_date(date, tz=tz).date() period_start = (now - pd.DateOffset(days=days)).date() return date_value <= now and date_value >= period_start
[docs]def format_interconnection_df(queue, rename, extra=None, missing=None): """Format interconnection queue data""" assert set(rename.keys()).issubset(queue.columns), set( rename.keys(), ) - set(queue.columns) queue = queue.rename(columns=rename) columns = _interconnection_columns.copy() if extra: columns += extra if missing: for m in missing: assert m not in queue.columns, "Missing column already exists" queue[m] = None return queue[columns].reset_index(drop=True)
[docs]def get_interconnection_queues(): """Get interconnection queue data for all ISOs""" all_queues = [] for iso in tqdm.tqdm(all_isos): iso = iso() # only shared columns queue = iso.get_interconnection_queue()[_interconnection_columns] queue.insert(0, "ISO", iso.name) all_queues.append(queue) pd.concat(all_queues) all_queues = pd.concat(all_queues).reset_index(drop=True) return all_queues
[docs]def is_dst_end(date): return (date.dst() - (date + pd.DateOffset(1)).dst()).seconds == 3600
[docs]def load_folder(path, time_zone=None, verbose=True): """Load a single dataframe for same schema csv files in a folder Arguments: path {str} -- path to folder time_zone {str} -- time zone to localize to timestamps. By default returns as UTC Returns: pd.DataFrame -- dataframe of all files """ all_files = glob.glob(os.path.join(path, "*.csv")) all_files = sorted(all_files) dfs = [] for f in tqdm.tqdm(all_files, disable=not verbose): df = pd.read_csv(f, parse_dates=True) dfs.append(df) data = pd.concat(dfs).reset_index(drop=True) if "Time" in data.columns: data["Time"] = pd.to_datetime(data["Time"], utc=True) if time_zone: data["Time"] = data["Time"].dt.tz_convert(time_zone) # todo make sure dates get parsed # todo make sure rows are sorted by time return data