diff --git a/flow/core/kernel/vehicle/base.py b/flow/core/kernel/vehicle/base.py index a433b8924..d1804c275 100644 --- a/flow/core/kernel/vehicle/base.py +++ b/flow/core/kernel/vehicle/base.py @@ -350,6 +350,22 @@ def get_fuel_consumption(self, veh_id, error=-1001): """ pass + @abstractmethod + def get_energy_model(self, veh_id, error=""): + """Return the energy model class object of the specified vehicle. + + Parameters + ---------- + veh_id : str or list of str + vehicle id, or list of vehicle ids + error : str + value that is returned if the vehicle is not found + Returns + ------- + subclass of BaseEnergyModel + """ + pass + @abstractmethod def get_speed(self, veh_id, error=-1001): """Return the speed of the specified vehicle. diff --git a/flow/core/kernel/vehicle/traci.py b/flow/core/kernel/vehicle/traci.py index 39bfb35da..6c2b5b48f 100644 --- a/flow/core/kernel/vehicle/traci.py +++ b/flow/core/kernel/vehicle/traci.py @@ -290,6 +290,10 @@ def _add_departed(self, veh_id, veh_type): # specify the type self.__vehicles[veh_id]["type"] = veh_type + # specify energy model + self.__vehicles[veh_id]["energy_model"] = self.type_parameters[ + veh_type]["energy_model"]() + car_following_params = \ self.type_parameters[veh_type]["car_following_params"] @@ -547,6 +551,16 @@ def get_fuel_consumption(self, veh_id, error=-1001): return [self.get_fuel_consumption(vehID, error) for vehID in veh_id] return self.__sumo_obs.get(veh_id, {}).get(tc.VAR_FUELCONSUMPTION, error) * ml_to_gallons + def get_energy_model(self, veh_id, error=""): + """See parent class.""" + if isinstance(veh_id, (list, np.ndarray)): + return [self.get_energy_model(vehID) for vehID in veh_id] + try: + return self.__vehicles.get(veh_id, {'energy_model': error})['energy_model'] + except KeyError: + print("Energy model not specified for vehicle {}".format(veh_id)) + raise + def get_previous_speed(self, veh_id, error=-1001): """See parent class.""" if isinstance(veh_id, (list, np.ndarray)): diff --git a/flow/core/params.py b/flow/core/params.py index 79ad8d689..862b34b4c 100755 --- a/flow/core/params.py +++ b/flow/core/params.py @@ -7,6 +7,8 @@ from flow.controllers.car_following_models import SimCarFollowingController from flow.controllers.rlcontroller import RLController from flow.controllers.lane_change_controllers import SimLaneChangeController +from flow.energy_models.power_demand import PDMCombustionEngine +from flow.energy_models.power_demand import PDMElectric SPEED_MODES = { @@ -39,6 +41,9 @@ "only_right_drive_safe": 576 } +ENERGY_MODELS = set([PDMCombustionEngine, PDMElectric]) +DEFAULT_ENERGY_MODEL = PDMCombustionEngine + # Traffic light defaults PROGRAM_ID = 1 MAX_GAP = 3.0 @@ -262,6 +267,7 @@ def add(self, num_vehicles=0, car_following_params=None, lane_change_params=None, + energy_model=DEFAULT_ENERGY_MODEL, color=None): """Add a sequence of vehicles to the list of vehicles in the network. @@ -298,6 +304,12 @@ def add(self, # FIXME: depends on simulator lane_change_params = SumoLaneChangeParams() + if energy_model not in ENERGY_MODELS: + print('{} for vehicle {} is not a valid energy model. Defaulting to {}\n'.format(energy_model, + veh_id, + DEFAULT_ENERGY_MODEL)) + energy_model = DEFAULT_ENERGY_MODEL + type_params = {} type_params.update(car_following_params.controller_params) type_params.update(lane_change_params.controller_params) @@ -311,7 +323,8 @@ def add(self, "routing_controller": routing_controller, "initial_speed": initial_speed, "car_following_params": car_following_params, - "lane_change_params": lane_change_params} + "lane_change_params": lane_change_params, + "energy_model": energy_model} if color: type_params['color'] = color @@ -334,7 +347,9 @@ def add(self, "car_following_params": car_following_params, "lane_change_params": - lane_change_params + lane_change_params, + "energy_model": + energy_model }) # This is used to return the actual headways from the vehicles class. diff --git a/flow/core/rewards.py b/flow/core/rewards.py index 3cca916f5..78892bbb5 100755 --- a/flow/core/rewards.py +++ b/flow/core/rewards.py @@ -307,58 +307,26 @@ def punish_rl_lane_changes(env, penalty=1): def energy_consumption(env, gain=.001): - """Calculate power consumption of a vehicle. + """Calculate power consumption for all vehicle. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed by a vehicle. - """ - power = 0 - - M = 1200 # mass of average sized vehicle (kg) - g = 9.81 # gravitational acceleration (m/s^2) - Cr = 0.005 # rolling resistance coefficient - Ca = 0.3 # aerodynamic drag coefficient - rho = 1.225 # air density (kg/m^3) - A = 2.6 # vehicle cross sectional area (m^2) - for veh_id in env.k.vehicle.get_ids(): - speed = env.k.vehicle.get_speed(veh_id) - prev_speed = env.k.vehicle.get_previous_speed(veh_id) - - accel = abs(speed - prev_speed) / env.sim_step - power += M * speed * accel + M * g * Cr * speed + 0.5 * rho * A * Ca * speed ** 3 - - return -gain * power - - -def veh_energy_consumption(env, veh_id, gain=.001): - """Calculate power consumption of a vehicle. - - Assumes vehicle is an average sized vehicle. - The power calculated here is the lower bound of the actual power consumed - by a vehicle. + Parameters + ---------- + env : flow.envs.Env + the environment variable, which contains information on the current + state of the system. + gain : float + scaling factor for the reward """ - power = 0 - - M = 1200 # mass of average sized vehicle (kg) - g = 9.81 # gravitational acceleration (m/s^2) - Cr = 0.005 # rolling resistance coefficient - Ca = 0.3 # aerodynamic drag coefficient - rho = 1.225 # air density (kg/m^3) - A = 2.6 # vehicle cross sectional area (m^2) - speed = env.k.vehicle.get_speed(veh_id) - prev_speed = env.k.vehicle.get_previous_speed(veh_id) - - accel = abs(speed - prev_speed) / env.sim_step - - power += M * speed * accel + M * g * Cr * speed + 0.5 * rho * A * Ca * speed ** 3 - - return -gain * power + veh_ids = env.k.vehicle.get_ids() + return veh_energy_consumption(env, veh_ids, gain) -def miles_per_megajoule(env, veh_ids=None, gain=.001): - """Calculate miles per mega-joule of either a particular vehicle or the total average of all the vehicles. +def veh_energy_consumption(env, veh_ids=None, gain=.001): + """Calculate power consumption of a vehicle. Assumes vehicle is an average sized vehicle. The power calculated here is the lower bound of the actual power consumed @@ -369,70 +337,64 @@ def miles_per_megajoule(env, veh_ids=None, gain=.001): env : flow.envs.Env the environment variable, which contains information on the current state of the system. - veh_ids : [list] - list of veh_ids to compute the reward over + veh_ids : [list] or str + list of veh_ids or single veh_id to compute the reward over gain : float scaling factor for the reward """ - mpj = 0 - counter = 0 if veh_ids is None: veh_ids = env.k.vehicle.get_ids() elif not isinstance(veh_ids, list): veh_ids = [veh_ids] - for veh_id in veh_ids: - speed = env.k.vehicle.get_speed(veh_id) - # convert to be positive since the function called is a penalty - power = -veh_energy_consumption(env, veh_id, gain=1.0) - if power > 0 and speed >= 0.0: - counter += 1 - # meters / joule is (v * \delta t) / (power * \delta t) - mpj += speed / power - if counter > 0: - mpj /= counter - # convert from meters per joule to miles per joule - mpj /= 1609.0 - # convert from miles per joule to miles per megajoule - mpj *= 10**6 + power = 0 + for veh_id in veh_ids: + if veh_id not in env.k.vehicle.previous_speeds: + continue + energy_model = env.k.vehicle.get_energy_model(veh_id) + if energy_model != "": + speed = env.k.vehicle.get_speed(veh_id) + accel = env.k.vehicle.get_accel(veh_id, noise=False, failsafe=True) + grade = env.k.vehicle.get_road_grade(veh_id) + power += energy_model.get_instantaneous_power(accel, speed, grade) - return mpj * gain + return -gain * power -def miles_per_gallon(env, veh_ids=None, gain=.001): - """Calculate mpg of either a particular vehicle or the total average of all the vehicles. - - Assumes vehicle is an average sized vehicle. - The power calculated here is the lower bound of the actual power consumed - by a vehicle. +def instantaneous_mpg(env, veh_ids=None, gain=.001): + """Calculate the instantaneous mpg for every simulation step specific to the vehicle type. Parameters ---------- env : flow.envs.Env the environment variable, which contains information on the current state of the system. - veh_ids : [list] - list of veh_ids to compute the reward over + veh_ids : [list] or str + list of veh_ids or single veh_id to compute the reward over gain : float scaling factor for the reward """ - mpg = 0 - counter = 0 if veh_ids is None: veh_ids = env.k.vehicle.get_ids() elif not isinstance(veh_ids, list): veh_ids = [veh_ids] + + cumulative_gallons = 0 + cumulative_distance = 0 for veh_id in veh_ids: - speed = env.k.vehicle.get_speed(veh_id) - gallons_per_s = env.k.vehicle.get_fuel_consumption(veh_id) - if gallons_per_s > 0 and speed >= 0.0: - counter += 1 - # meters / gallon is (v * \delta t) / (gallons_per_s * \delta t) - mpg += speed / gallons_per_s - if counter > 0: - mpg /= counter - - # convert from meters per gallon to miles per gallon - mpg /= 1609.0 + energy_model = env.k.vehicle.get_energy_model(veh_id) + if energy_model != "": + speed = env.k.vehicle.get_speed(veh_id) + accel = env.k.vehicle.get_accel_no_noise_with_failsafe(veh_id) + grade = env.k.vehicle.get_road_grade(veh_id) + gallons_per_hr = energy_model.get_instantaneous_fuel_consumption(accel, speed, grade) + if gallons_per_hr > 0 and speed >= 0.0: + cumulative_gallons += gallons_per_hr + cumulative_distance += speed + + cumulative_gallons /= 3600.0 + cumulative_distance /= 1609.34 + # miles / gallon is (distance_dot * \delta t) / (gallons_dot * \delta t) + mpg = cumulative_distance / cumulative_gallons return mpg * gain diff --git a/flow/energy_models/base_energy.py b/flow/energy_models/base_energy.py new file mode 100644 index 000000000..910ab8c88 --- /dev/null +++ b/flow/energy_models/base_energy.py @@ -0,0 +1,57 @@ +"""Script containing the base vehicle energy class.""" +from abc import ABCMeta, abstractmethod + + +class BaseEnergyModel(metaclass=ABCMeta): + """Base energy model class. + + Calculate the instantaneous power consumption of a vehicle in + the network. It returns the power in Watts regardless of the + vehicle type: whether EV or Combustion Engine, Toyota Prius or Tacoma + or non-Toyota vehicles. Non-Toyota vehicles are set by default + to be an averaged-size vehicle. + """ + + def __init__(self): + # 15 kilowatts = 1 gallon/hour conversion factor + self.conversion = 15e3 + + @abstractmethod + def get_instantaneous_power(self, accel, speed, grade): + """Calculate the instantaneous power consumption of a vehicle. + + Must be implemented by child classes. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + pass + + def get_instantaneous_fuel_consumption(self, accel, speed, grade): + """Calculate the instantaneous fuel consumption of a vehicle. + + Fuel consumption is reported in gallons per hour, with the conversion + rate of 15kW = 1 gallon/hour. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + return self.get_instantaneous_power(accel, speed, grade) / self.conversion diff --git a/flow/energy_models/power_demand.py b/flow/energy_models/power_demand.py new file mode 100644 index 000000000..2e8593ecd --- /dev/null +++ b/flow/energy_models/power_demand.py @@ -0,0 +1,176 @@ +"""Script containing the vehicle power demand model energy classes.""" +import math +import numpy as np +from flow.energy_models.base_energy import BaseEnergyModel +from abc import ABCMeta, abstractmethod + + +class PowerDemandModel(BaseEnergyModel, metaclass=ABCMeta): + """Vehicle Power Demand base energy model class. + + Calculate power consumption of a vehicle based on physics + derivation. Assumes some vehicle characteristics. The + power calculated here is the lower bound of the actual + power consumed by the vehicle plus a bilinear polynomial + function used as a correction factor. + """ + + def __init__(self, + mass=2041, + area=3.2, + rolling_res_coeff=0.0027, + aerodynamic_drag_coeff=0.4, + p1_correction=4598.7155, + p3_correction=975.12719): + self.g = 9.807 + self.rho_air = 1.225 + self.gamma = 1 + self.mass = mass + self.cross_area = area + self.rolling_res_coeff = rolling_res_coeff + self.aerodynamic_drag_coeff = aerodynamic_drag_coeff + self.power_correction_coeffs = np.array([p1_correction, p3_correction]) + + def calculate_power_at_the_wheels(self, accel, speed, grade): + """Calculate the instantaneous power required. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + accel_slope_forces = self.mass * speed * ((np.heaviside(accel, 0.5) * (1 - self.gamma) + self.gamma)) * accel + accel_slope_forces += self.g * math.sin(grade) + rolling_friction = self.mass * self.g * self.rolling_res_coeff * speed + air_drag = 0.5 * self.rho_air * self.cross_area * self.aerodynamic_drag_coeff * speed**3 + power = accel_slope_forces + rolling_friction + air_drag + return power + + @abstractmethod + def get_regen_cap(self, accel, speed, grade): + """Set the maximum power retainable from regenerative braking. + + A negative regen cap is interpretted as a positive regenerative power. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + pass + + def get_power_correction_factor(self, accel, speed, grade): + """Calculate the instantaneous power correction of a vehicle. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + state_variables = np.array([accel, accel * speed]) + return max(0, np.dot(self.power_correction_coeffs, state_variables)) + + def get_instantaneous_power(self, accel, speed, grade): + """Apply the regenerative braking cap to the modelled power demand. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + regen_cap = self.get_regen_cap(accel, speed, grade) + power_at_the_wheels = max(regen_cap, self.calculate_power_at_the_wheels(accel, speed, grade)) + correction_factor = self.get_power_correction_factor(accel, speed, grade) + return power_at_the_wheels + correction_factor + + +class PDMCombustionEngine(PowerDemandModel): + """Power Demand Model for a combustion engine vehicle.""" + + def __init__(self, + idle_coeff=3405.5481762, + linear_friction_coeff=83.123929917, + quadratic_friction_coeff=6.7650718327, + drag_coeff=0.7041355229, + p1_correction=4598.7155, + p3_correction=975.12719): + super(PDMCombustionEngine, self).__init__() + self.fuel_consumption_power_coeffs = np.array([idle_coeff, + linear_friction_coeff, + quadratic_friction_coeff, + drag_coeff]) + + def get_regen_cap(self, accel, speed, grade): + """See parent class.""" + return 0 + + def calculate_fuel_consumption_power(self, accel, speed, grade): + """Calculate the instantaneous power from a fitted function to Toyota Tacoma fuel consumption. + + Parameters + ---------- + accel : float + Instantaneous acceleration of the vehicle + speed : float + Instantaneous speed of the vehicle + grade : float + Instantaneous road grade of the vehicle + Returns + ------- + float + """ + state_variables = np.array([1, speed, speed**2, speed**3]) + power_0 = np.dot(self.fuel_consumption_power_coeffs, state_variables) + return max(self.mass * accel * speed + power_0, 0) + + def get_instantaneous_power(self, accel, speed, grade): + """See parent class.""" + fuel_consumption_power = self.calculate_fuel_consumption_power(accel, speed, grade) + power_correction_factor = self.get_power_correction_factor(accel, speed, grade) + return fuel_consumption_power + power_correction_factor + + +class PDMElectric(PowerDemandModel): + """Power Demand Model for an electric vehicle.""" + + def __init__(self, + mass=1663, + area=2.4, + rolling_res_coeff=0.007, + aerodynamic_drag_coeff=0.24): + super(PDMElectric, self).__init__(mass=mass, + area=area, + rolling_res_coeff=rolling_res_coeff, + aerodynamic_drag_coeff=aerodynamic_drag_coeff) + + def get_regen_cap(self, accel, speed, grade): + """See parent class.""" + return -2.8 * speed