Source code for src.datafev.routines.scenario_generation.utils

# The datafev framework

# Copyright (C) 2022,
# Institute for Automation of Complex Power Systems (ACS),
# E.ON Energy Research Center (E.ON ERC),
# RWTH Aachen University

# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


import numpy as np
import pandas as pd
from datetime import timedelta
import datetime as dt
import decimal
import matplotlib.pyplot as plt
import matplotlib.ticker as tck
import os
import matplotlib.dates as mdates


[docs]def excel_to_sceneration_input_simple_pdfs(file_path): """ This method converts the excel inputs into inputs suitable for the generate_fleet_from_simple_pdfs function under sceneration.py. Excel file structure, especially Sheet and Column names should be as it's in the 'tutorials/scenario_generation/input_generator_simple_pdfs.xlsx'. Parameters ---------- file_path : str File path of the Excel input file. Returns ------- arr_times_dict : dict Arrival times nested dictionary. keys: weekend or weekday, values: {keys: time identifier, values: time lower bound, time upper bounds and arrival probabilities}. dep_times_dict : dict Departure times nested dictionary. keys: weekend or weekday, values: {keys: time identifier, values: time lower bound, time upper bounds and departure probabilities}. arr_soc_dict : dict SoC nested dictionaries for arrival. keys: SoC Identifier, values: SoC Lower Bounds, SOC Upper Bounds and their probabilities. dep_soc_dict : dict SoC nested dictionaries for departure. keys: SoC Identifier, values: SoC Lower Bounds, SOC Upper Bounds and their probabilities. ev_dict : dict EV nested dictionary. keys: EV models, values: their data and probability. """ # Read excel file dep_times_df = pd.read_excel(file_path, "DepartureTime") arr_times_df = pd.read_excel(file_path, "ArrivalTime") arr_soc_df = pd.read_excel(file_path, "ArrivalSoC") dep_soc_df = pd.read_excel(file_path, "DepartureSoC") ev_df = pd.read_excel(file_path, "EVData") # Convert percent probabilities to probabilities between 0 and 1 arr_times_df["WeekdayArrivalPercentage"] = arr_times_df[ "WeekdayArrivalPercentage" ].div(100) arr_times_df["WeekendArrivalPercentage"] = arr_times_df[ "WeekendArrivalPercentage" ].div(100) dep_times_df["WeekdayDeparturePercentage"] = dep_times_df[ "WeekdayDeparturePercentage" ].div(100) dep_times_df["WeekendDeparturePercentage"] = dep_times_df[ "WeekendDeparturePercentage" ].div(100) # Separate weekday and weekend arrival/departure times dataframes, rename WeekdayArrivalPercentage to probability weekday_arr_times_df = arr_times_df.filter( ["TimeID", "TimeLowerBound", "TimeUpperBound", "WeekdayArrivalPercentage"], axis=1, ) weekday_arr_times_df.columns = weekday_arr_times_df.columns.str.replace( "WeekdayArrivalPercentage", "Probability" ) weekend_arr_times_df = arr_times_df.filter( ["TimeID", "TimeLowerBound", "TimeUpperBound", "WeekendArrivalPercentage"], axis=1, ) weekend_arr_times_df.columns = weekend_arr_times_df.columns.str.replace( "WeekendArrivalPercentage", "Probability" ) weekday_dep_times_df = dep_times_df.filter( ["TimeID", "TimeLowerBound", "TimeUpperBound", "WeekdayDeparturePercentage"], axis=1, ) weekday_dep_times_df.columns = weekday_dep_times_df.columns.str.replace( "WeekdayDeparturePercentage", "Probability" ) weekend_dep_times_df = dep_times_df.filter( ["TimeID", "TimeLowerBound", "TimeUpperBound", "WeekendDeparturePercentage"], axis=1, ) weekend_dep_times_df.columns = weekend_dep_times_df.columns.str.replace( "WeekendDeparturePercentage", "Probability" ) # Arrival/departure times nested dictionaries # keys: weekend or weekday # values: {keys: Time Identifier, values: Time Lower Bound, Time Upper Bounds and arrival/departure probabilities} arr_times_dict = {} weekday_arr_times_df = weekday_arr_times_df.set_index("TimeID") arr_times_dict["Weekday"] = weekday_arr_times_df.to_dict(orient="index") weekend_arr_times_df = weekend_arr_times_df.set_index("TimeID") arr_times_dict["Weekend"] = weekend_arr_times_df.to_dict(orient="index") dep_times_dict = {} weekday_dep_times_df = weekday_dep_times_df.set_index("TimeID") dep_times_dict["Weekday"] = weekday_dep_times_df.to_dict(orient="index") weekend_dep_times_df = weekend_dep_times_df.set_index("TimeID") dep_times_dict["Weekend"] = weekend_dep_times_df.to_dict(orient="index") # Convert percent SoCs to values between 0 and 1 arr_soc_df["SoCLowerBound(%)"] = arr_soc_df["SoCLowerBound(%)"].div(100) arr_soc_df["SoCUpperBound(%)"] = arr_soc_df["SoCUpperBound(%)"].div(100) dep_soc_df["SoCLowerBound(%)"] = dep_soc_df["SoCLowerBound(%)"].div(100) dep_soc_df["SoCUpperBound(%)"] = dep_soc_df["SoCUpperBound(%)"].div(100) # SoC nested dictionaries for both arrival and departure # keys: SoC Identifier, values: SoC Lower Bounds, SOC Upper Bounds and their probabilities arr_soc_df = arr_soc_df.set_index("SoCID") arr_soc_dict = arr_soc_df.to_dict(orient="index") dep_soc_df = dep_soc_df.set_index("SoCID") dep_soc_dict = dep_soc_df.to_dict(orient="index") # EV nested dictionary # keys: EV models, values: their data and probability ev_df = ev_df.set_index("Model") ev_dict = ev_df.to_dict(orient="index") return arr_times_dict, dep_times_dict, arr_soc_dict, dep_soc_dict, ev_dict
[docs]def excel_to_sceneration_input_conditional_pdfs(file_path): """ This method converts the excel inputs into inputs suitable for the generate_fleet_from_conditional_pdfs function under sceneration.py. Excel file structure, especially Sheet and Column names should be as it's in the 'tutorials/scenario_generation/input_generator_conditional_pdfs.xlsx'. Parameters ---------- file_path : str File path of the Excel input file. Returns ------- times_dict : dict Arrival-departure time combinations nested dictionary. keys: Arrival-departure time combination identifier, values: time upper and lower bounds. times_prob_dict : dict Arrival-departure time combinations' probabilities nested dictionary. keys: Arrival-departure time combination identifier, values: their probabilities. soc_dict : dict Arrival-departure SoC combinations nested dictionary. keys: SoC Identifier, values: SoC upper and lower bounds. soc_prob_dict : dict Arrival-departure SoC combinations' probabilities nested dictionary. keys: Arrival-departure SoC combination identifier, values: their probabilities. ev_dict : dict EV nested dictionary. keys: EV models, values: their data and probability. """ # Read excel file times_df = pd.read_excel(file_path, "TimeID") times_prob_df = pd.read_excel(file_path, "TimeProbabilityDistribution") soc_df = pd.read_excel(file_path, "SoCID") soc_prob_df = pd.read_excel(file_path, "SoCProbabilityDistribution") ev_df = pd.read_excel(file_path, "EVData") times_df = times_df.set_index("TimeID") times_df["TimeLowerBound"] = times_df["TimeLowerBound"].round("S") times_df["TimeUpperBound"] = times_df["TimeUpperBound"].round("S") times_dict = times_df.T.to_dict("list") end_time = list(times_dict.values())[-1][-1] times_prob_df = times_prob_df.set_index("TimeID") times_prob_dict = {} for arr_time_id, row in times_prob_df.iterrows(): id_list = [] for dep_time_id, probability in row.items(): id_list.append(arr_time_id) id_list.append(dep_time_id) id_tuple = tuple(id_list) times_prob_dict[id_tuple] = probability id_list.clear() soc_df = soc_df.set_index("SoCID") # Convert percent SoCs to values between 0 and 1 soc_df["SoCLowerBound(%)"] = soc_df["SoCLowerBound(%)"].div(100) soc_df["SoCUpperBound(%)"] = soc_df["SoCUpperBound(%)"].div(100) soc_dict = soc_df.T.to_dict("list") soc_prob_df = soc_prob_df.set_index("SoCID") soc_prob_dict = {} for arr_soc_id, row in soc_prob_df.iterrows(): id_list = [] for dep_soc_id, probability in row.items(): id_list.append(arr_soc_id) id_list.append(dep_soc_id) id_tuple = tuple(id_list) soc_prob_dict[id_tuple] = probability id_list.clear() # EV nested dictionary # keys: EV models, values: their data and probability ev_df = ev_df.set_index("Model") ev_dict = ev_df.to_dict(orient="index") return end_time, times_dict, times_prob_dict, soc_dict, soc_prob_dict, ev_dict
[docs]def generate_time_list(time_lowerb, time_upperb, timedelta_in_min, date): """ Generates a datetime list with the given resolution and given date. Parameters ---------- time_lowerb : datetime.datetime Start datetime. time_upperb : datetime.datetime End datetime. timedelta_in_min : int Resolution in minutes. date : datetime.datetime Date of the datetimes to be returned. Returns ------- times : list Datetime list. """ times = [] times_str_list = [ (time_lowerb + timedelta(hours=timedelta_in_min * i / 60)).strftime("%H:%M:%S") for i in range( int((time_upperb - time_lowerb).total_seconds() / 60.0 / timedelta_in_min) ) ] for time_str in times_str_list: temp_time = dt.datetime.strptime(time_str, "%H:%M:%S") time = dt.datetime.combine(date, temp_time.time()) times.append(time) return times
[docs]def generate_datetime_list(sdate, edate, timedelta_in_min): """ Generates a datetime list with the given resolution. Parameters ---------- sdate : TYPE Start datetime. edate : TYPE End datetime. timedelta_in_min : int Resolution in minutes. Returns ------- datetime_lst : TYPE Datetime list. """ diff_delta = edate - sdate # as timedelta number_of_ts = int(diff_delta / dt.timedelta(minutes=timedelta_in_min)) datetime_lst = [] new_datetime = sdate for n in range(0, number_of_ts): datetime_lst.append(new_datetime) new_datetime = new_datetime + dt.timedelta(minutes=timedelta_in_min) return datetime_lst
[docs]def drange(x, y, jump): """ Generate a range from x to y with jump spaces. Parameters ---------- x : numpy.float64 Start point. y : float End point. jump : str Space between generated jumps. Yields ------ decimal.Decimal Parts in the equal range of the jump between x and y. """ while x < y: yield float(x) x = decimal.Decimal(x) + decimal.Decimal(jump)
[docs]def visualize_statistical_generation(file_path, gen_ev_df, timedelta_in_min=15): """ This method visualizes generated distribution of arrival and departure times and SoCs of the generated fleet behavior. Parameters ---------- file_path : str The file path for image files to be saved. gen_ev_df : pandas.core.frame.DataFrame Output data frame from generate_fleet_data function. timedelta_in_min : int Resolution of the simulation in minutes. The default is 15. Note: The resolution must be equal to the resolution of the scenario generator! Returns ------- None. """ # Times # Create times dicts for arrival and departure Keys: All possible time assignments, Values: number of assigned EVs current = dt.datetime(2022, 1, 1) # arbitrary day datetime_lst = [ current + timedelta(minutes=m) for m in range(0, 24 * 60, timedelta_in_min) ] arr_times_dict = {} dep_times_dict = {} # Initialize with 0 for item in datetime_lst: arr_times_dict[item.strftime("%H:%M")] = 0 dep_times_dict[item.strftime("%H:%M")] = 0 for ev_id, row in gen_ev_df.iterrows(): for time, value in arr_times_dict.items(): if time == gen_ev_df.at[ev_id, "ArrivalTime"].strftime("%H:%M"): arr_times_dict[time] += 1 for time, value in dep_times_dict.items(): if time == gen_ev_df.at[ev_id, "DepartureTime"].strftime("%H:%M"): dep_times_dict[time] += 1 times_df = pd.DataFrame.from_dict([arr_times_dict, dep_times_dict]).transpose() times_df = times_df.rename(columns={0: "Arrival Times", 1: "Departure Times"}) # Plotting times_df.plot(kind="bar", alpha=0.5, width=1) plt.xticks(np.arange(0, len(times_df), 6)) plt.ylabel("Number of EVs", size=12) plot_name = "generated_time_distribution" plot_path = os.path.join(file_path, plot_name) plt.savefig(plot_path) # Clear memory plt.clf() # SoCs soc_df = gen_ev_df.filter(["ArrivalSoC", "DepartureSoC"]) plot_name = "generated_soc_distribution" soc_df.plot(kind="hist", alpha=0.5) plt.ylabel("Number of EVs", size=12) plot_path = os.path.join(file_path, plot_name) plt.savefig(plot_path) # Clear memory plt.clf()
[docs]def output_to_sim_input(sce_output_df, xlfile, dc_power=False): """ This function converts the fleet behavior (generated from statistical data) to the format that could be simulated in this simulation framework. Parameters ---------- sce_output_df : pandas.core.frame.DataFrame Output data frame from generate_fleet_data function. xlfile : str Desired name of the output excel file. dc_power : bool, optional This parameter indicates whether dc or ac will be used as charging power in the simulation. The default is False. Returns ------- None. """ sim_input_df = pd.DataFrame( columns=[ "Battery Capacity (kWh)", "p_max_ch (kW)", "p_max_ds (kW)", "Real Arrival Time", "Real Arrival SOC", "Estimated Departure Time", "Target SOC @ Estimated Departure Time", ] ) sim_input_df["Battery Capacity (kWh)"] = sce_output_df[ "BatteryCapacity(kWh)" ].values sim_input_df["Real Arrival Time"] = sce_output_df["ArrivalTime"].values sim_input_df["Real Arrival SOC"] = sce_output_df["ArrivalSoC"].values sim_input_df["Estimated Departure Time"] = sce_output_df["DepartureTime"].values sim_input_df["Target SOC @ Estimated Departure Time"] = sce_output_df[ "DepartureSoC" ].values if dc_power is False: # use AC-charging-powers sim_input_df["p_max_ch (kW)"] = sce_output_df["MaxChargingPower(kW)"].values sim_input_df["p_max_ds (kW)"] = sce_output_df["MaxChargingPower(kW)"].values else: # use DC-fast-charging-powers sim_input_df["p_max_ch (kW)"] = sce_output_df["MaxFastChargingPower(kW)"].values sim_input_df["p_max_ds (kW)"] = sce_output_df["MaxFastChargingPower(kW)"].values # Simulation input dataframe to excel file sim_input_df.to_excel(xlfile)