Source code for test_isone_api

import numpy as np
import pandas as pd
import pytest

from gridstatus.isone_api.isone_api import ISONEAPI, ZONE_LOCATIONID_MAP
from gridstatus.tests.vcr_utils import RECORD_MODE, setup_vcr

[docs]api_vcr = setup_vcr( source="isone_api", record_mode=RECORD_MODE, )
[docs]TEST_DATES = [ ("2024-11-02", "2024-11-04"), ("2024-03-09", "2024-03-11"), ]
[docs]TEST_MULTIPLE_LOCATIONS = [ (".Z.MAINE", ".Z.NEWHAMPSHIRE"), ]
[docs]TEST_SINGLE_LOCATIONS = [loc for pair in TEST_MULTIPLE_LOCATIONS for loc in pair]
[docs]class TestISONEAPI:
[docs] def setup_class(cls): cls.iso = ISONEAPI(sleep_seconds=0.1, max_retries=2)
[docs] def test_class_init(self): assert self.iso.sleep_seconds == 0.1 assert self.iso.max_retries == 2 assert self.iso.username is not None assert self.iso.password is not None
[docs] def test_zone_locationid_map(self): for zone, location_id in ZONE_LOCATIONID_MAP.items(): assert ZONE_LOCATIONID_MAP[zone] == location_id
@api_vcr.use_cassette("test_get_locations.yaml")
[docs] def test_get_locations(self): result = self.iso.get_locations() assert isinstance(result, pd.DataFrame) assert len(result) == 20 assert list(result.columns) == [ "LocationID", "LocationType", "LocationName", "AreaType", ]
@pytest.mark.parametrize( "location", TEST_SINGLE_LOCATIONS, )
[docs] def test_get_realtime_hourly_demand_latest(self, location: str): with api_vcr.use_cassette( f"test_get_realtime_hourly_demand_latest_{location}.yaml", ): result = self.iso.get_realtime_hourly_demand( date="latest", locations=[location], ) assert isinstance(result, pd.DataFrame) assert len(result) == 1 assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", ] assert result["Location"].iloc[0] == location assert result["Location Id"].iloc[0] == ZONE_LOCATIONID_MAP[location] assert isinstance(result["Load"].iloc[0], (int, float))
@api_vcr.use_cassette("test_get_dayahead_hourly_demand_latest.yaml")
[docs] def test_get_dayahead_hourly_demand_latest(self): result = self.iso.get_dayahead_hourly_demand( date="latest", locations=["NEPOOL AREA"], ) assert isinstance(result, pd.DataFrame) assert len(result) == 1 assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", ] assert result["Location"].iloc[0] == "NEPOOL AREA" assert result["Location Id"].iloc[0] == 32 assert isinstance(result["Load"].iloc[0], np.number)
# NOTE(kladar): These two are not super useful as tests go, but starting to think about API failure modes and # how to catch them.
[docs] def test_get_dayahead_hourly_demand_invalid_location(self): with pytest.raises(ValueError): self.iso.get_dayahead_hourly_demand(locations=["INVALID_LOCATION"])
@pytest.mark.parametrize( "locations", TEST_MULTIPLE_LOCATIONS, )
[docs] def test_get_realtime_hourly_demand_multiple_locations( self, locations: tuple[str, str], ): with api_vcr.use_cassette( f"test_get_realtime_hourly_demand_multiple_locations_{locations}.yaml", ): result = self.iso.get_realtime_hourly_demand( date="latest", locations=list(locations), ) assert isinstance(result, pd.DataFrame) assert len(result) == len(locations) assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", ] assert set(result["Location"]) == set(locations) assert set(result["Location Id"]) == { ZONE_LOCATIONID_MAP[loc] for loc in locations } assert all(isinstance(load, (int, float)) for load in result["Load"])
@pytest.mark.parametrize( "date,end,locations", [(date, end, TEST_SINGLE_LOCATIONS) for date, end in TEST_DATES], )
[docs] def test_get_realtime_hourly_demand_date_range( self, date: str, end: str, locations: list[str], ): with api_vcr.use_cassette( f"test_get_realtime_hourly_demand_date_range_{date}_{end}_{locations}.yaml", ): result = self.iso.get_realtime_hourly_demand( date=date, end=end, locations=locations, ) assert isinstance(result, pd.DataFrame) assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", ] assert set(result["Location"]) == set(locations) assert set(result["Location Id"]) == { ZONE_LOCATIONID_MAP[loc] for loc in locations } assert all(isinstance(load, (int, float)) for load in result["Load"]) assert min(result["Interval Start"]).date() == pd.Timestamp(date).date() assert max(result["Interval End"]).date() == pd.Timestamp(end).date()
@pytest.mark.parametrize( "date,end,locations", [(date, end, TEST_SINGLE_LOCATIONS) for date, end in TEST_DATES], )
[docs] def test_get_dayahead_hourly_demand_date_range( self, date: str, end: str, locations: list[str], ): with api_vcr.use_cassette( f"test_get_dayahead_hourly_demand_date_range_{date}_{end}_{locations}.yaml", ): result = self.iso.get_dayahead_hourly_demand( date=date, end=end, locations=locations, ) assert isinstance(result, pd.DataFrame) assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", ] assert set(result["Location"]) == set(locations) assert set(result["Location Id"]) == { ZONE_LOCATIONID_MAP[loc] for loc in locations } assert all(isinstance(load, (int, float)) for load in result["Load"]) assert min(result["Interval Start"]).date() == pd.Timestamp(date).date() assert max(result["Interval End"]).date() == pd.Timestamp(end).date()
@pytest.mark.parametrize( "date,end", TEST_DATES, )
[docs] def test_get_hourly_load_forecast(self, date, end): cassette_name = f"test_get_hourly_load_forecast_{date}_{end}.yaml" with api_vcr.use_cassette(cassette_name): result = self.iso.get_hourly_load_forecast(date=date, end=end) assert isinstance(result, pd.DataFrame) assert list(result.columns) == [ "Interval Start", "Interval End", "Publish Time", "Load Forecast", "Net Load Forecast", ] assert ( min(result["Interval Start"]).date() == pd.Timestamp(date).tz_localize(self.iso.default_timezone).date() ) assert max(result["Interval End"]) == pd.Timestamp(end).tz_localize( self.iso.default_timezone, ) assert result["Load Forecast"].dtype in [np.int64, np.float64] assert result["Net Load Forecast"].dtype in [np.int64, np.float64] assert (result["Load Forecast"] > 0).all() assert ( (result["Interval End"] - result["Interval Start"]) == pd.Timedelta(hours=1) ).all()
@pytest.mark.parametrize( "date,end", TEST_DATES, )
[docs] def test_get_reliability_region_load_forecast(self, date, end): cassette_name = f"test_get_reliability_region_load_forecast_{date}_{end}.yaml" with api_vcr.use_cassette(cassette_name): result = self.iso.get_reliability_region_load_forecast(date=date, end=end) assert isinstance(result, pd.DataFrame) assert list(result.columns) == [ "Interval Start", "Interval End", "Publish Time", "Location", "Load Forecast", "Regional Percentage", ] assert ( min(result["Interval Start"]).date() == pd.Timestamp(date).tz_localize(self.iso.default_timezone).date() ) assert max(result["Interval End"]) == pd.Timestamp(end).tz_localize( self.iso.default_timezone, ) assert ( (result["Interval End"] - result["Interval Start"]) == pd.Timedelta(hours=1) ).all() assert set(result["Location"].unique()) == { ".Z.CONNECTICUT", ".Z.MAINE", ".Z.NEWHAMPSHIRE", ".Z.RHODEISLAND", ".Z.VERMONT", ".Z.SEMASS", ".Z.WCMASS", ".Z.NEMASSBOST", } assert result["Load Forecast"].dtype in [np.int64, np.float64] assert (result["Load Forecast"] > 0).all() assert result["Regional Percentage"].dtype == np.float64 assert ( (result["Regional Percentage"] >= 0) & (result["Regional Percentage"] <= 100) ).all() grouped = result.groupby(["Interval Start", "Publish Time"]) assert (grouped["Regional Percentage"].sum().between(99.9, 100.1)).all()
@api_vcr.use_cassette("test_get_fuel_mix_latest.yaml")
[docs] def test_get_fuel_mix_latest(self): result = self.iso.get_fuel_mix(date="latest") assert isinstance(result, pd.DataFrame) assert len(result) == 1 assert "Time" in result.columns assert isinstance(result["Time"].iloc[0], pd.Timestamp) numeric_cols = [col for col in result.columns if col != "Time"] for col in numeric_cols: assert result[col].dtype in [np.int64, np.float64] assert (result[col] >= 0).all()
@pytest.mark.parametrize( "date,end", TEST_DATES, )
[docs] def test_get_fuel_mix_date_range(self, date, end): cassette_name = f"test_get_fuel_mix_{date}_{end}.yaml" with api_vcr.use_cassette(cassette_name): result = self.iso.get_fuel_mix(date=date, end=end) assert isinstance(result, pd.DataFrame) assert "Time" in result.columns assert min(result["Time"]).date() == pd.Timestamp(date).date() assert max(result["Time"]).date() == pd.Timestamp( end, ).date() - pd.Timedelta( days=1, ) assert all(isinstance(t, pd.Timestamp) for t in result["Time"]) numeric_cols = [col for col in result.columns if col != "Time"] for col in numeric_cols: assert result[col].dtype in [np.int64, np.float64] assert (result[numeric_cols].sum(axis=1) > 0).all()
[docs] def test_get_load_hourly_latest(self): with api_vcr.use_cassette("test_get_load_hourly_latest.yaml"): result = self.iso.get_load_hourly(date="latest") assert isinstance(result, pd.DataFrame) assert len(result) == 1 assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", "Native Load", "ARD Demand", ] assert result["Location"].iloc[0] == "NEPOOL AREA" assert result["Location Id"].iloc[0] == 32 assert isinstance(result["Load"].iloc[0], (int, float)) assert isinstance(result["Native Load"].iloc[0], (int, float)) assert isinstance(result["ARD Demand"].iloc[0], (int, float))
@pytest.mark.parametrize( "date,end", TEST_DATES, )
[docs] def test_get_load_hourly_date_range( self, date, end, ): cassette_name = f"test_get_load_hourly_{date}_{end}.yaml" with api_vcr.use_cassette(cassette_name): result = self.iso.get_load_hourly(date=date, end=end) assert isinstance(result, pd.DataFrame) assert list(result.columns) == [ "Interval Start", "Interval End", "Location", "Location Id", "Load", "Native Load", "ARD Demand", ] assert all(result["Location"] == "NEPOOL AREA") assert all(result["Location Id"] == 32) assert all(isinstance(load, (int, float)) for load in result["Load"]) assert all(isinstance(load, (int, float)) for load in result["Native Load"]) assert all(isinstance(load, (int, float)) for load in result["ARD Demand"]) assert min(result["Interval Start"]).date() == pd.Timestamp(date).date() assert max(result["Interval End"]).date() == pd.Timestamp(end).date() assert ( (result["Interval End"] - result["Interval Start"]) == pd.Timedelta(hours=1) ).all()