import functools
import os
import pprint
import pandas as pd
import tqdm
import gridstatus
from gridstatus.base import Markets
def _get_args_dict(fn, args, kwargs):
args_names = fn.__code__.co_varnames[: fn.__code__.co_argcount]
return {**dict(zip(args_names, args)), **kwargs}
[docs]class support_date_range:
def __init__(self, frequency, update_dates=None):
"""Maximum frequency of ranges"""
self.frequency = frequency
self.update_dates = update_dates
[docs] def __call__(self, f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
args_dict = _get_args_dict(f, args, kwargs)
# delete end if None to avoid attribute error
if "end" in args_dict and not args_dict["end"]:
del args_dict["end"]
save_to = None
if "save_to" in args_dict:
save_to = args_dict.pop("save_to")
os.makedirs(save_to, exist_ok=True)
error = "ignore"
errors = []
if "error" in args_dict:
error = args_dict.pop("error")
# if date is a tuple, then change to start and end
if "date" in args_dict and isinstance(args_dict["date"], tuple):
args_dict["start"] = args_dict["date"][0]
args_dict["end"] = args_dict["date"][1]
del args_dict["date"]
if "date" in args_dict and "start" in args_dict:
raise ValueError(
"Cannot supply both 'date' and 'start' to function {}".format(
f,
),
)
if "date" not in args_dict and "start" not in args_dict:
raise ValueError(
"Must supply either 'date' or 'start' to function {}".format(
f,
),
)
if "start" in args_dict:
args_dict["date"] = args_dict["start"]
del args_dict["start"]
if args_dict["date"] == "latest":
return f(*args, **kwargs)
args_dict["date"] = gridstatus.utils._handle_date(
args_dict["date"],
args_dict["self"].default_timezone,
)
# no date range handling required
if "end" not in args_dict:
df = f(**args_dict)
_handle_save_to(df, save_to, args_dict, f)
return df
if (
isinstance(args_dict["end"], str)
and args_dict["end"].lower() == "today"
):
# add one day since end is exclusive
args_dict["end"] = pd.Timestamp.now(
tz=args_dict["self"].default_timezone,
).date() + pd.DateOffset(days=1)
args_dict["end"] = gridstatus.utils._handle_date(
args_dict["end"],
args_dict["self"].default_timezone,
)
assert (
args_dict["end"] > args_dict["date"]
), "End date {} must be after start date {}".format(
args_dict["end"],
args_dict["date"],
)
# use .date() to remove timezone info, which doesnt matter
# if just a date
# if frequency is callable, then use it to get the frequency
frequency = self.frequency
if callable(frequency):
frequency = self.frequency(args_dict)
# Note: this may create a split that will end up
# being unnecessary after running update dates below.
# that is because after adding new dates, it's possible that two
# ranges could be added.
# Unnecessary optimization right now to include
# logic to handle this
# if certain frequency, we need to handle first interval
# specially so pd.date_range works
prepend = []
if frequency == "DAY_START":
frequency = "1D"
next_day_start = args_dict["date"].ceil("1D")
if (
next_day_start < args_dict["end"]
and next_day_start != args_dict["date"]
):
prepend = [args_dict["date"]]
args_dict["date"] = args_dict["date"].ceil("1D")
elif frequency == "MONTH_START":
frequency = "1M"
next_month_start = (
args_dict["date"] + pd.offsets.MonthBegin(1)
).normalize()
if (
next_month_start < args_dict["end"]
and next_month_start != args_dict["date"]
):
prepend = [args_dict["date"]]
args_dict["date"] = next_month_start
dates = pd.date_range(
args_dict["date"],
args_dict["end"],
freq=frequency,
inclusive="neither",
)
dates = prepend + [args_dict["date"]] + dates.tolist() + [args_dict["end"]]
dates = [
gridstatus.utils._handle_date(
d,
args_dict["self"].default_timezone,
)
for d in dates
]
# sometime api have restrictions/optimizations based on date ranges
# update_dates allows for the caller to insert this logic
if self.update_dates is not None:
dates = self.update_dates(dates, args_dict)
start_date = dates[0]
# remove end date and add back later if needed
del args_dict["end"]
all_df = []
# every None removes two possible queries
total = len(dates) - dates.count(None) * 2 - 1
with tqdm.tqdm(disable=total <= 1, total=total) as pbar:
for end_date in dates[1:]:
# if we come across None, it means we should reset
if end_date is None:
start_date = None
continue
# if start_date is None, we just reset and end is actually the start
if start_date is None:
start_date = end_date
continue
args_dict["date"] = start_date
# no need for end if we are querying for just 1 day
if frequency != "1D":
args_dict["end"] = end_date
try:
df = f(**args_dict)
except Exception as e:
if error == "raise":
raise e
elif error == "ignore":
df = None
errors += [args_dict.copy()]
print("Error: {}".format(e))
print("Args: {}\n".format(args_dict))
else:
raise ValueError(
"Invalid value for error: {}".format(
error,
),
)
_handle_save_to(df, save_to, args_dict, f)
pbar.update(1)
if df is not None:
all_df.append(df)
start_date = end_date
if errors:
print("Errors that occurred while getting data:")
pprint.pprint(errors)
df = pd.concat(all_df).reset_index(drop=True)
return df
return wrapped_f
def _handle_save_to(df, save_to, args_dict, f):
if df is not None and save_to is not None:
if "end" in args_dict:
filename = "{}_{}_{}_{}.csv".format(
args_dict["self"].__class__.__name__,
f.__name__,
args_dict["date"].strftime("%Y%m%d"),
args_dict["end"].strftime("%Y%m%d"),
)
else:
filename = "{}_{}_{}.csv".format(
args_dict["self"].__class__.__name__,
f.__name__,
args_dict["date"].strftime("%Y%m%d"),
)
path = os.path.join(save_to, filename)
df.to_csv(path, index=None)
def _get_pjm_archive_date(market):
import gridstatus
market = Markets(market)
tz = gridstatus.PJM.default_timezone
if market == Markets.REAL_TIME_5_MIN:
archive_date = pd.Timestamp.now(
tz=tz,
) - pd.Timedelta(days=186)
elif market == Markets.REAL_TIME_HOURLY:
archive_date = pd.Timestamp.now(
tz=tz,
) - pd.Timedelta(days=731)
# todo implemlement location type filter
elif market == Markets.DAY_AHEAD_HOURLY:
archive_date = pd.Timestamp.now(
tz=tz,
) - pd.Timedelta(days=731)
return archive_date.replace(hour=0, minute=0, second=0, microsecond=0)
[docs]def pjm_update_dates(dates, args_dict):
"""PJM has a weird API. This method updates the date range list to account
for the following restrictions:
- date ranges cannot span year boundaries
- date ranges cannot span archive / standard boundaries
- date range is inclusive of start and end dates
"""
archive_date = _get_pjm_archive_date(args_dict["market"])
new_dates = []
for i, date in enumerate(dates):
# stop if last date
if i + 1 == len(dates):
# add last date if new range has started
if new_dates[-1] is not None:
new_dates.append(date)
break
new_dates.append(date)
# restriction 1: year boundary
next_date = dates[i + 1]
for year in range(date.year, next_date.year):
current_year_end = pd.Timestamp(
year=year,
month=12,
day=31,
hour=23,
minute=59,
tz=args_dict["self"].default_timezone,
)
new_dates.append(current_year_end)
next_year_start = pd.Timestamp(
year=year + 1,
month=1,
day=1,
hour=0,
minute=0,
tz=args_dict["self"].default_timezone,
)
new_dates.append(None) # signal to skip to next date
# dont need another range if the range ends at the start of the next year
if next_year_start != next_date:
new_dates.append(next_year_start)
# remove trailing None
if new_dates[-1] is None:
new_dates = new_dates[:-1]
# restriction 2: archive / standard boundary
for i, date in enumerate(new_dates[:-1]):
next_date = new_dates[i + 1]
# check if archive date is between date and next_date
if None not in [date, next_date] and date < archive_date < next_date:
day_before_archive = archive_date - pd.Timedelta(days=1)
add_before = pd.Timestamp(
year=day_before_archive.year,
month=day_before_archive.month,
day=day_before_archive.day,
hour=23,
minute=59,
tz=args_dict["self"].default_timezone,
)
new_dates = (
new_dates[: i + 1]
+ [
add_before,
None,
archive_date,
]
+ new_dates[i + 1 :]
)
return new_dates
[docs]def ercot_update_dates(dates, args_dict):
date = args_dict["date"]
end = args_dict["end"]
if date.year == end.year:
return dates
years = {x for x in range(date.year, end.year + 1)}
fixed_dates = []
for i, year in enumerate(years):
if i == 0:
fixed_dates.append(date)
fixed_dates.append(pd.Timestamp(year, 12, 31))
fixed_dates.append(None)
elif i == len(years) - 1:
fixed_dates.append(pd.Timestamp(year, 1, 1))
fixed_dates.append(end)
else:
fixed_dates.append(pd.Timestamp(year, 1, 1))
fixed_dates.append(pd.Timestamp(year, 12, 31))
fixed_dates.append(None)
return fixed_dates