Source code for gridstatus.decorators

import functools
import os
import pprint
from typing import Any, Callable, Dict, List, ParamSpec, TypeVar, cast

import pandas as pd
import tqdm

from gridstatus import utils
from gridstatus.base import Markets

[docs] P = ParamSpec("P")
[docs] T = TypeVar("T")
def _get_args_dict( fn: Callable[..., Any], args: tuple[Any, ...], kwargs: Dict[str, Any], ) -> Dict[str, Any]: args_names = fn.__code__.co_varnames[: fn.__code__.co_argcount] return {**dict(zip(args_names, args)), **kwargs} # make custom function rather than using pd.date_range # due to handling of localized timezones
[docs] def date_range_maker( start: pd.Timestamp, end: pd.Timestamp, freq: str | pd.DateOffset, inclusive: str = "neither", ) -> List[pd.Timestamp]: """Generate a date range based on start and end dates and a frequency.""" # implement other behavior # if/when needed assert inclusive == "neither" if isinstance(freq, str): freq = pd.tseries.frequencies.to_offset(freq) # Generate the date range current_date = start + freq dates = [] while current_date < end: dates.append(current_date) current_date += freq return dates
# TODO(kladar): Add support for date or start to be in args OR kwargs dict as well, since some APIs have # current or latest endpoints that are automatically handled. Currently cannot refactor this confidently # without improved testing since it touches many methods
[docs] class support_date_range: """Decorator that enables automatic date range splitting and iteration. This decorator automatically splits large date ranges into smaller chunks based on the specified frequency, calls the decorated function for each chunk, and combines the results. Args: frequency: Maximum frequency for splitting date ranges. Common values are "DAY_START" (split by day), "HOUR_START" (split by hour), "5_MIN" (split by 5 minute intervals), "MONTH_START" (split by month), "YEAR_START" (split by year), or None (no splitting, pass date range as-is). update_dates: Optional callback to customize date range splitting logic. return_raw: If True, return list of results instead of concatenating. The decorated functions also accept these keyword arguments: - **error** (str): Error handling mode. Default is "ignore" which prints errors and continues. Use "raise" to raise errors immediately. - **save_to** (str): Directory path to save results as CSV files. Creates directory if needed. - **start**: Alternative parameter name for 'date' (automatically converted). Example:: @support_date_range(frequency="DAY_START") def get_data(self, date, end=None, verbose=False): # Function is called once per day in the range return fetch_data_for_date(date) # Usage: df = iso.get_data(date="2024-01-01", end="2024-01-10", error="raise") # Calls get_data for each day and concatenates results """ def __init__( self, frequency: str | Callable[[Dict[str, Any]], str] | None, update_dates: ( Callable[ [List[pd.Timestamp | None], Dict[str, Any]], List[pd.Timestamp | None], ] | None ) = None, return_raw: bool = False, ) -> None: self.frequency = frequency self.update_dates = update_dates self.return_raw = return_raw
[docs] def __call__(self, f: Callable[P, T]) -> Callable[P, T]: # Use a loosely-typed reference for internal dynamic argument manipulation inner_f: Callable[..., Any] = f @functools.wraps(f) def wrapped_f(*args: Any, **kwargs: Any) -> Any: args_dict = _get_args_dict(inner_f, args, kwargs) # delete end if None to avoid attribute error if "end" in args_dict and not args_dict["end"]: del args_dict["end"] save_to = None if "save_to" in args_dict: save_to = args_dict.pop("save_to") os.makedirs(save_to, exist_ok=True) error = "ignore" errors = [] if "error" in args_dict: error = args_dict.pop("error") # if date is a tuple, then change to start and end if "date" in args_dict and isinstance(args_dict["date"], tuple): args_dict["start"] = args_dict["date"][0] args_dict["end"] = args_dict["date"][1] del args_dict["date"] if "date" in args_dict and "start" in args_dict: raise ValueError( "Cannot supply both 'date' and 'start' to function {}".format( f, ), ) if "date" not in args_dict and "start" not in args_dict: raise ValueError( "Must supply either 'date' or 'start' to function {}".format( f, ), ) if "start" in args_dict: args_dict["date"] = args_dict["start"] del args_dict["start"] if args_dict["date"] == "latest": return inner_f(*args, **kwargs) default_timezone = args_dict["self"].default_timezone # For today with sub daily data, create a range that spans the day if ( self.frequency in ["HOUR_START", "5_MIN"] and args_dict.get("date") == "today" ): # noqa args_dict["date"] = pd.Timestamp.now(tz=default_timezone).floor("D") args_dict["end"] = args_dict["date"] + pd.Timedelta(days=1) args_dict["date"] = utils._handle_date( args_dict["date"], default_timezone, ) # no date range handling required if "end" not in args_dict: df = inner_f(**args_dict) _handle_save_to(df, save_to, args_dict, f) return df if ( isinstance(args_dict["end"], str) and args_dict["end"].lower() == "today" ): # add one day since end is exclusive args_dict["end"] = pd.Timestamp.now( tz=default_timezone, ).date() + pd.DateOffset(days=1) args_dict["end"] = utils._handle_date( args_dict["end"], default_timezone, ) assert args_dict["end"] > args_dict["date"], ( "End date {} must be after start date {}".format( args_dict["end"], args_dict["date"], ) ) # if frequency is callable, then use it to get the frequency frequency: Any = self.frequency if callable(frequency): frequency = frequency(args_dict) if frequency is None: dates = [args_dict["date"], args_dict["end"]] else: # Note: this may create a split that will end up # being unnecessary after running update dates below. # that is because after adding new dates, it's possible that two # ranges could be added. # Unnecessary optimization right now to include # logic to handle this # if certain frequency, we need to handle first interval # specially so pd.date_range works if frequency == "DAY_START": frequency = DayBeginOffset() elif frequency == "MONTH_START": frequency = MonthBeginOffset() elif frequency == "HOUR_START": frequency = HourBeginOffset() elif frequency == "5_MIN": frequency = FiveMinOffset() elif frequency == "YEAR_START": frequency = YearBeginOffset() dates = date_range_maker( args_dict["date"], args_dict["end"], freq=frequency, inclusive="neither", ) dates = [args_dict["date"]] + dates + [args_dict["end"]] # make sure everything is in default timezone # of the ISO dates = [utils._handle_date(d, default_timezone) for d in dates] # sometime api have restrictions/optimizations based on date ranges # update_dates allows for the caller to insert this logic if self.update_dates is not None: dates = self.update_dates(dates, args_dict) start_date = dates[0] # remove end date and add back later if needed del args_dict["end"] all_df = [] # every None removes two possible queries total = len(dates) - dates.count(None) * 2 - 1 with tqdm.tqdm(disable=total <= 1, total=total) as pbar: for end_date in dates[1:]: # if we come across None, it means we should reset if end_date is None: start_date = None continue # if start_date is None, we just reset and end is actually the start if start_date is None: start_date = end_date continue args_dict["date"] = start_date # no need for end if we are querying for just 1 day if frequency != "1D" and not isinstance(frequency, DayBeginOffset): args_dict["end"] = end_date try: df = inner_f(**args_dict) except Exception as e: if error == "raise": raise e elif error == "ignore": df = None errors += [args_dict.copy()] print("Error: {}".format(e)) print("Args: {}\n".format(args_dict)) else: raise ValueError( "Invalid value for error: {}".format( error, ), ) _handle_save_to(df, save_to, args_dict, f) pbar.update(1) if df is not None: all_df.append(df) start_date = end_date if errors: print("Errors that occurred while getting data:") pprint.pprint(errors) if self.return_raw: return all_df # if first item is a dict, then we need to concat by key if all_df and isinstance(all_df[0], dict): df = {} for d in all_df: for k, v in d.items(): if k not in df: df[k] = [] df[k].append(v) for k, v in df.items(): df[k] = pd.concat(v).reset_index(drop=True) else: df = pd.concat(all_df).reset_index(drop=True) return df return cast(Callable[P, T], wrapped_f)
def _handle_save_to( df: pd.DataFrame | None, save_to: str | None, args_dict: Dict[str, Any], f: Callable[..., Any], ) -> None: if df is not None and save_to is not None: if "end" in args_dict: filename = "{}_{}_{}_{}.csv".format( args_dict["self"].__class__.__name__, f.__name__, args_dict["date"].strftime("%Y%m%d"), args_dict["end"].strftime("%Y%m%d"), ) else: filename = "{}_{}_{}.csv".format( args_dict["self"].__class__.__name__, f.__name__, args_dict["date"].strftime("%Y%m%d"), ) path = os.path.join(save_to, filename) df.to_csv(path, index=None) def _get_pjm_archive_date(market: str | Markets) -> pd.Timestamp: import gridstatus market = Markets(market) tz = gridstatus.PJM.default_timezone # type: ignore[has-type] if market == Markets.REAL_TIME_5_MIN: archive_date = pd.Timestamp.now( tz=tz, ) - pd.Timedelta(days=186) elif market == Markets.REAL_TIME_HOURLY: archive_date = pd.Timestamp.now( tz=tz, ) - pd.Timedelta(days=731) # todo implemlement location type filter elif market == Markets.DAY_AHEAD_HOURLY: archive_date = pd.Timestamp.now( tz=tz, ) - pd.Timedelta(days=731) return archive_date.replace(hour=0, minute=0, second=0, microsecond=0) # todo convert to custom PJMDateOffset class
[docs] def pjm_update_dates( dates: List[pd.Timestamp | None], args_dict: Dict[str, Any], ) -> List[pd.Timestamp | None]: """PJM has a weird API. This method updates the date range list to account for the following restrictions: - date ranges cannot span year boundaries - date ranges cannot span archive / standard boundaries - date range is inclusive of start and end dates """ archive_date = _get_pjm_archive_date(args_dict["market"]) new_dates: List[pd.Timestamp | None] = [] for i, date in enumerate(dates): # stop if last date if i + 1 == len(dates): # add last date if new range has started if new_dates[-1] is not None: new_dates.append(date) break new_dates.append(date) # restriction 1: year boundary next_date = dates[i + 1] if date is not None and next_date is not None: for year in range(date.year, next_date.year): current_year_end = pd.Timestamp( year=year, month=12, day=31, hour=23, minute=59, tz=args_dict["self"].default_timezone, ) new_dates.append(current_year_end) next_year_start = pd.Timestamp( year=year + 1, month=1, day=1, hour=0, minute=0, tz=args_dict["self"].default_timezone, ) new_dates.append(None) # signal to skip to next date # dont need another range if the range ends at the start of the next year if next_year_start != next_date: new_dates.append(next_year_start) # remove trailing None if new_dates[-1] is None: new_dates = new_dates[:-1] # restriction 2: archive / standard boundary for i, date in enumerate(new_dates[:-1]): next_date = new_dates[i + 1] # check if archive date is between date and next_date if None not in [date, next_date] and date < archive_date < next_date: day_before_archive = archive_date - pd.Timedelta(days=1) add_before = pd.Timestamp( year=day_before_archive.year, month=day_before_archive.month, day=day_before_archive.day, hour=23, minute=59, tz=args_dict["self"].default_timezone, ) new_dates = ( new_dates[: i + 1] + [ add_before, None, archive_date, ] + new_dates[i + 1 :] ) return new_dates
# custom offset that I dont believe exists in pandas
[docs] class DayBeginOffset:
[docs] def __ladd__(self, other: pd.Timestamp) -> pd.Timestamp: return other.normalize() + pd.DateOffset(days=1)
[docs] def __radd__(self, other: pd.Timestamp) -> pd.Timestamp: return self.__ladd__(other)
[docs] class MonthBeginOffset:
[docs] def __ladd__(self, other: pd.Timestamp) -> pd.Timestamp: return other.normalize() + pd.offsets.MonthBegin(1)
[docs] def __radd__(self, other: pd.Timestamp) -> pd.Timestamp: return self.__ladd__(other)
[docs] class FiveMinOffset:
[docs] def __ladd__(self, other: pd.Timestamp) -> pd.Timestamp: # Store the original timezone original_tz = other.tz # Convert to UTC to avoid DST issues other_utc = other.tz_convert("UTC") # Round up to the next 5 min interval # Add 1 microsecond to ensure we make it to the # next interval when already on a 5 min interval rounded_utc = (other_utc + pd.Timedelta(microseconds=1)).ceil("5min") # Convert back to the original timezone s = rounded_utc.tz_convert(original_tz) return s
[docs] def __radd__(self, other: pd.Timestamp) -> pd.Timestamp: return self.__ladd__(other)
[docs] class HourBeginOffset:
[docs] def __ladd__(self, other: pd.Timestamp) -> pd.Timestamp: # Store the original timezone original_tz = other.tz # Convert to UTC to avoid DST issues other_utc = other.tz_convert("UTC") # Round up to the next hour # Add 1 microsecond to ensure we make it to the # next interval when already on a 5 min interval rounded_utc = (other_utc + pd.Timedelta(microseconds=1)).ceil("h") # Convert back to the original timezone s = rounded_utc.tz_convert(original_tz) return s
[docs] def __radd__(self, other: pd.Timestamp) -> pd.Timestamp: return self.__ladd__(other)
[docs] class YearBeginOffset:
[docs] def __ladd__(self, other: pd.Timestamp) -> pd.Timestamp: return other.normalize() + pd.offsets.YearBegin(1)
[docs] def __radd__(self, other: pd.Timestamp) -> pd.Timestamp: return self.__ladd__(other)