# The datafev framework
# Copyright (C) 2022,
# Institute for Automation of Complex Power Systems (ACS),
# E.ON Energy Research Center (E.ON ERC),
# RWTH Aachen University
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
from pyomo.core import *
import pyomo.kernel as pmo
[docs]def reschedule(
solver,
opt_step,
opt_horizon,
upperlimit,
lowerlimit,
tolerance,
bcap,
inisoc,
tarsoc,
minsoc,
maxsoc,
ch_eff,
ds_eff,
pmax_pos,
pmax_neg,
deptime,
rho_y,
rho_eps,
):
"""
This function reschedules the charging operations of a cluster by considering:
- upper-lower limits of aggregate power consumption of the cluster,
- and pre-defined reference schedules of the individual EVs in the system.
This is run typically when some events require deviations from previously
determined schedules.
Parameters
----------
solver : pyomo SolverFactory object
Optimization solver.
opt_step : int
Size of one time step in the optimization (seconds).
opt_horizon : list of integers
Time step identifiers in the optimization horizon.
upperlimit : dict of float
Soft upper limit of cluster power consumption (kW).
lowerlimit : dict of float
Soft lower limit of cluster power consumption (kW).
tolerance : float
Maximum allowed violation of upper-lower limits (kW).
bcap : dict of float
Battery capactiy of EVs (kWs).
inisoc : dict of float
Initial SOCs of EV batteries (0<inisoc[key]<1).
tarsoc : dict of float
Target SOCs of EVs (0<inisoc[key]<1).
minsoc : dict of float
Minimum allowed SOCs.
maxsoc : dict of float
Maximum allowed SOCs.
ch_eff : dict of float
Charging efficiency of chargers.
ds_eff : dict of float
Discharging efficiency of chargers.
pmax_pos : dict of float
Maximum charge power that EV battery can withdraw (kW).
pmax_neg : dict of float
Maximum discharge power that EV battery can supply (kW).
deptime : dict of int
Number of time steps until departures of EVs.
rho_y : float
Penalty factor for deviation of reference schedules (unitless).
rho_eps : float
Penalty factor for violation of upper-lower soft limits (unitless).
Returns
-------
p_schedule : dict
Power schedule.
It contains a dictionary for each EV. Each item in the EV dictionary
indicates the power to be supplied to the EV(kW) during a particular
time step.
s_schedule : dict
SOC schedule.
It contains a dictionary for each EV. Each item in the EV dictionary
indicates the SOC to be achieved by the EV by a particular time step.
"""
###########################################################################
####################Constructing the optimization model####################
model = ConcreteModel()
model.V = Set(initialize=list(bcap.keys())) # Index set for the EVs
# Time parameters
model.deltaSec = opt_step # Time discretization (Size of one time step in seconds)
model.T = Set(
initialize=opt_horizon[:-1], ordered=True
) # Index set for the time steps in opt horizon
model.Tp = Set(
initialize=opt_horizon, ordered=True
) # Index set for the time steps in opt horizon for SoC
# Power capability parameters
model.P_EV_pos = pmax_pos # Maximum charging power to EV battery
model.P_EV_neg = pmax_neg # Maximum discharging power from EV battery
model.P_CC_up = (
upperlimit # Upper limit of the power that can be consumed by a cluster
)
model.P_CC_low = lowerlimit # Lower limit of the power that can be consumed by a cluster (negative values indicating export limit)
model.P_CC_vio = tolerance # Cluster upper-lower limit violation tolerance
# Battery and charger parameters
model.eff_ch = ch_eff # Charging efficiency
model.eff_ds = ds_eff # Discharging efficiency
model.E = bcap # Battery capacities
# Demand parameters
model.s_ini = inisoc # SoC when the optimization starts
model.s_tar = tarsoc # Target SOC
model.s_min = minsoc # Minimum SOC
model.s_max = maxsoc # Maximum SOC
model.t_dep = deptime # Estimated departure of EVs
# Penalty parameters
model.rho_y = rho_y
model.rho_eps = rho_eps
# EV Variables
model.p_ev = Var(
model.V, model.T, within=Reals
) # Net charging power of EV indexed by
model.p_ev_pos = Var(
model.V, model.T, within=NonNegativeReals
) # Charging power of EV
model.p_ev_neg = Var(
model.V, model.T, within=NonNegativeReals
) # Disharging power of EV
model.x_ev = Var(model.V, model.T, within=pmo.Binary) # Whether EV is charging
model.s = Var(model.V, model.Tp, within=NonNegativeReals) # EV SOC variable
# System variables
model.p_cc = Var(model.T, within=Reals) # Power flows into the cluster c
# Deviation
model.eps = Var(
within=NonNegativeReals
) # Deviation from aggregate conspumtion limit
model.y = Var(
model.V, within=NonNegativeReals
) # Deviation from individual schedules
# CONSTRAINTS
def initialsoc(model, v):
return model.s[v, 0] == model.s_ini[v]
model.inisoc = Constraint(model.V, rule=initialsoc)
def minimumsoc(model, v, t):
return model.s_min[v] <= model.s[v, t]
model.minsoc_con = Constraint(model.V, model.T, rule=minimumsoc)
def maximumsoc(model, v, t):
return model.s_max[v] >= model.s[v, t]
model.maxsoc_con = Constraint(model.V, model.T, rule=maximumsoc)
def storageConservation(
model, v, t
): # SOC of EV batteries will change with respect to the charged power and battery energy capacity
return model.s[v, t + 1] == (
model.s[v, t]
+ (model.p_ev_pos[v, t] - model.p_ev_neg[v, t])
/ model.E[v]
* model.deltaSec
)
model.socconst = Constraint(model.V, model.T, rule=storageConservation)
def chargepowerlimit(
model, v, t
): # Net power into EV decoupled into positive and negative parts
return model.p_ev[v, t] == model.p_ev_pos[v, t] - model.p_ev_neg[v, t]
model.chrpowconst = Constraint(model.V, model.T, rule=chargepowerlimit)
def combinatorics_ch(
model, v, t
): # EV indexed by v can charge only when x[v,t]==1 at t
if t >= model.t_dep[v]:
return model.p_ev_pos[v, t] == 0
else:
return model.p_ev_pos[v, t] <= model.x_ev[v, t] * model.P_EV_pos[v]
model.combconst1 = Constraint(model.V, model.T, rule=combinatorics_ch)
def combinatorics_ds(
model, v, t
): # EV indexed by v can discharge only when x[v,t]==0 at t
if t >= model.t_dep[v]:
return model.p_ev_neg[v, t] == 0
else:
return model.p_ev_neg[v, t] <= (1 - model.x_ev[v, t]) * model.P_EV_neg[v]
model.combconst2 = Constraint(model.V, model.T, rule=combinatorics_ds)
def ccpower(model, t): # Mapping EV powers to CC power
return model.p_cc[t] == sum(
model.p_ev_pos[v, t] / model.eff_ch[v]
- model.p_ev_neg[v, t] * model.eff_ds[v]
for v in model.V
)
model.ccpowtotal = Constraint(model.T, rule=ccpower)
def cluster_limit_violation(model):
return model.eps <= model.P_CC_vio
model.viol_clust = Constraint(rule=cluster_limit_violation)
def cluster_upper_limit(model, t): # Upper limit of aggregate power consumption
return model.p_cc[t] <= model.eps + model.P_CC_up[t]
model.ccpowcap_pos = Constraint(model.T, rule=cluster_upper_limit)
def cluster_lower_limit(model, t): # Lower limit of aggregate power consumption
return -model.eps + model.P_CC_low[t] <= model.p_cc[t]
model.ccpowcap_neg = Constraint(model.T, rule=cluster_lower_limit)
def individual_pos_deviation(model, v):
return model.s_tar[v] - model.s[v, max(opt_horizon)] <= model.y[v]
model.indev_pos = Constraint(model.V, rule=individual_pos_deviation)
def individual_neg_deviation(model, v):
return -model.y[v] <= model.s_tar[v] - model.s[v, max(opt_horizon)]
model.indev_neg = Constraint(model.V, rule=individual_neg_deviation)
# OBJECTIVE FUNCTION
def obj_rule(model):
return (
model.rho_y * (sum(model.y[v] * model.E[v] / 3600 for v in model.V))
+ model.rho_eps * model.eps
)
model.obj = Objective(rule=obj_rule, sense=minimize)
###########################################################################
###########################################################################
######################Solving the optimization model ######################
result = solver.solve(model)
###########################################################################
###########################################################################
################################Saving the results#########################
p_schedule = {}
s_schedule = {}
for v in model.V:
p_schedule[v] = {}
s_schedule[v] = {}
for t in opt_horizon:
if t < max(opt_horizon):
p_schedule[v][t] = model.p_ev[v, t]()
s_schedule[v][t] = model.s[v, t]()
###########################################################################
return p_schedule, s_schedule
if __name__ == "__main__":
import pandas as pd
import numpy as np
from pyomo.environ import SolverFactory
###########################################################################
# Input parameters
PCU = 11
eff = 1.0
N = 4
PPL = 0.5
CAP = 55 * 3600
PC = PPL * N * PCU
nb_of_ts = 12
solver = SolverFactory("gurobi")
opt_step = 300
opt_horizon = list(range(nb_of_ts + 1))
upperlimit = dict(
enumerate(np.ones(nb_of_ts) * PC)
) # The cluster is allowed to import half as the installed capacity
lowerlimit = dict(
enumerate(np.zeros(nb_of_ts))
) # The cluster is not allowed to export
tolerance = 0
np.random.seed(0)
evdata = {}
pmax_pos = {}
pmax_neg = {}
ch_eff = {}
ds_eff = {}
bcap = {}
tarsoc = {}
deptime = {}
inisoc = {}
minsoc = {}
maxsoc = {}
for n in range(1, N + 1):
evid = "EV" + str(n)
pmax_pos[evid] = PCU
pmax_neg[evid] = PCU
ch_eff[evid] = eff
ds_eff[evid] = eff
bcap[evid] = CAP
inisoc[evid] = np.random.uniform(low=0.4, high=0.8)
tarsoc[evid] = inisoc[evid] + PCU * opt_step * int(nb_of_ts / 2) / CAP
minsoc[evid] = 0.2
maxsoc[evid] = 1.0
deptime[evid] = np.random.randint(
low=int(nb_of_ts / 2), high=int(nb_of_ts * 1.5)
)
rho_y = 1
rho_eps = 1
###########################################################################
# To show only two decimals in the table
pd.options.display.float_format = "{:,.2f}".format
print("The cluster with total installed capacity of:", N * PCU)
print()
print("...has power limits of:")
limit_data = pd.DataFrame(
columns=["Upper Limit", "Lower Limit", "Violation Tolerance"]
)
limit_data["Upper Limit"] = pd.Series(upperlimit)
limit_data["Lower Limit"] = pd.Series(lowerlimit)
limit_data.loc[:, "Violation Tolerance"] = tolerance
print(limit_data)
print()
print("...must control the charging profiles of the EVs with the demands:")
demand_data = pd.DataFrame(
columns=["Battery Capacity", "Initial SOC", "Target SOC", "Estimated Departure"]
)
demand_data["Battery Capacity"] = pd.Series(bcap) / 3600
demand_data["Initial SOC"] = pd.Series(inisoc)
demand_data["Target SOC"] = pd.Series(tarsoc)
demand_data["Estimated Departure"] = pd.Series(deptime)
print(demand_data)
print()
print("Solving the optimization problem...")
p_ref, s_ref = reschedule(
solver,
opt_step,
opt_horizon,
upperlimit,
lowerlimit,
tolerance,
bcap,
inisoc,
tarsoc,
minsoc,
maxsoc,
ch_eff,
ds_eff,
pmax_pos,
pmax_neg,
deptime,
rho_y,
rho_eps,
)
print()
print("Printing optimal schedules:")
results = {}
for v in demand_data.index:
results[v] = pd.DataFrame(
columns=["P (kW)", "SOC (%)"], index=sorted(s_ref[v].keys())
)
results[v]["P (kW)"] = pd.Series(p_ref[v])
results[v]["SOC (%)"] = pd.Series(s_ref[v]) * 100
print(pd.concat(results, axis=1))