-
Notifications
You must be signed in to change notification settings - Fork 65
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Code in scratch for hosting capacity
- Loading branch information
Showing
6 changed files
with
253 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import networkx as nx | ||
from pathlib import Path | ||
import pandas as pd | ||
import math | ||
|
||
import omf | ||
from omf.solvers import opendss | ||
import os | ||
|
||
''' | ||
Simply adds up all load further away from the substation than the given bus to estimate rough hosting capacity. | ||
with open(Path(modelDir, 'treefile.txt'), "w") as file: | ||
for item in tree: | ||
file.write(f"{item}\n") | ||
''' | ||
|
||
def myGetCoords(dssFilePath): | ||
'''Takes in an OpenDSS circuit definition file and outputs the bus coordinates as a dataframe.''' | ||
dssFileLoc = os.path.dirname(dssFilePath) | ||
opendss.runDSS(dssFilePath) | ||
opendss.runDssCommand(f'export buscoords "{dssFileLoc}/coords.csv"') | ||
coords = pd.read_csv(dssFileLoc + '/coords.csv', header=None) | ||
#JENNY | ||
coords.columns = ['Element', 'X', 'Y'] | ||
return coords | ||
|
||
def my_networkPlot(filePath, figsize=(20,20), output_name='networkPlot.png', show_labels=True, node_size=300, font_size=8): | ||
''' Plot the physical topology of the circuit. | ||
Returns a networkx graph of the circuit as a bonus. ''' | ||
dssFileLoc = os.path.dirname(os.path.abspath(filePath)) | ||
opendss.runDSS(filePath) | ||
coords = myGetCoords(filePath) | ||
opendss.runDssCommand(f'export voltages "{dssFileLoc}/volts.csv"') | ||
volts = pd.read_csv(dssFileLoc + '/volts.csv') | ||
#JENNY | ||
coords.columns = ['Bus', 'X', 'Y'] | ||
|
||
buses = opendss.get_meter_buses(filePath) | ||
G = nx.Graph() | ||
# Get the coordinates. | ||
pos = {} | ||
for index, row in coords.iterrows(): | ||
try: | ||
bus_name = str(int(row['Bus'])) | ||
except: | ||
bus_name = row['Bus'] | ||
# Get the connecting edges using Pandas. | ||
|
||
lines = opendss.dss.utils.lines_to_dataframe() | ||
edges = [] | ||
for index, row in lines.iterrows(): | ||
#HACK: dss upercases everything in the coordinate output. | ||
bus1 = row['Bus1'].split('.')[0].upper() | ||
bus2 = row['Bus2'].split('.')[0].upper() | ||
edges.append((bus1, bus2)) | ||
G.add_edges_from(edges) | ||
# Remove buses withouts coords | ||
no_pos_nodes = set(G.nodes()) - set(pos) | ||
G.remove_nodes_from(list(no_pos_nodes)) | ||
return G | ||
|
||
def downline_hosting_capacity( FNAME ): | ||
fullpath = os.path.abspath(FNAME) | ||
tree = opendss.dssConvert.omdToTree(fullpath) | ||
|
||
|
||
if __name__ == '__main__': | ||
modelDir = Path(omf.omfDir, 'scratch', 'hostingcapacity') | ||
circuit_file = 'circuit.dss' | ||
beginning_test_file = Path( omf.omfDir, 'static', 'publicFeeders', 'iowa240.clean.dss.omd') | ||
|
||
tree = opendss.dssConvert.omdToTree(beginning_test_file.resolve()) # this tree is a list | ||
|
||
graph = my_networkPlot( os.path.join( modelDir, circuit_file) ) | ||
print( graph.nodes() ) | ||
#print( nx.descendants(graph, "EQ_SOURCE_BUS") ) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import omf | ||
from pathlib import Path | ||
from omf.models import __neoMetaModel__ | ||
from omf.models.__neoMetaModel__ import * | ||
from omf.solvers import opendss | ||
from omf.solvers import mohca_cl | ||
import time | ||
|
||
meter_file_name = 'mohcaInputCustom.csv' | ||
meter_file_path = Path(omf.omfDir,'static','testFiles', 'hostingCapacity', meter_file_name) | ||
modeldir = Path(omf.omfDir, 'scratch', 'hostingCapacity') | ||
|
||
# 2541.865383386612 - iowa state | ||
def run_ami_algorithm(modelDir, meterfileinput, outData): | ||
# mohca data-driven hosting capacity | ||
inputPath = Path(modelDir, meterfileinput) | ||
outputPath = Path(modelDir, 'AMI_output.csv') | ||
AMI_start_time = time.time() | ||
AMI_output = mohca_cl.sandia1( inputPath, outputPath ) | ||
AMI_end_time = time.time() | ||
runtime = AMI_end_time - AMI_start_time | ||
return runtime | ||
|
||
|
||
def convert_seconds_to_hms_ms(seconds): | ||
# Convert seconds to milliseconds | ||
milliseconds = seconds * 1000 | ||
|
||
# Calculate hours, minutes, seconds, and milliseconds | ||
hours, remainder = divmod(milliseconds, 3600000) # 3600000 milliseconds in an hour | ||
minutes, remainder = divmod(remainder, 60000) # 60000 milliseconds in a minute | ||
seconds, milliseconds = divmod(remainder, 1000) # 1000 milliseconds in a second | ||
|
||
# Format the output | ||
return "{:02d}:{:02d}:{:02d}.{:03d}".format(int(hours), int(minutes), int(seconds), int(milliseconds)) | ||
|
||
runtime = run_ami_algorithm(modeldir, meter_file_path, 'output.csv') | ||
print( 'AMI_runtime: ', runtime) | ||
print( 'formatted time: ', convert_seconds_to_hms_ms( runtime )) |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
133 changes: 133 additions & 0 deletions
133
omf/scratch/hostingcapacity/selenium_test_hostingCapacity.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import time | ||
import sys | ||
|
||
from pathlib import Path | ||
|
||
import omf | ||
|
||
# Selenium Imports | ||
|
||
def login_to_OMF( driver, username, password ): | ||
from selenium.webdriver.common.keys import Keys | ||
from selenium.webdriver.common.by import By | ||
|
||
|
||
username_field = driver.find_element(by=By.ID, value="username") | ||
password_field = driver.find_element(by=By.ID, value='password') | ||
username_field.send_keys(username) | ||
password_field.send_keys(password) | ||
password_field.send_keys(Keys.RETURN) | ||
|
||
|
||
|
||
def firefox_test( url ): | ||
firefox_geckodriver_path = "/snap/bin/firefox" | ||
firefox_options = webdriver.FirefoxOptions() | ||
driver_service = webdriver.FirefoxService(executable_path=firefox_geckodriver_path) | ||
|
||
driver = webdriver.Firefox(service=driver_service, options=firefox_options) | ||
driver.get( url ) | ||
|
||
time.sleep(2) | ||
driver.quit() | ||
|
||
def chrome_test_iowa_state_algo( url, username, password ): | ||
from selenium import webdriver | ||
from selenium.webdriver.common.by import By | ||
from selenium.webdriver.common.keys import Keys | ||
from selenium.webdriver.support.select import Select | ||
from selenium.webdriver.support.ui import WebDriverWait | ||
from selenium.webdriver.support import expected_conditions | ||
|
||
retVal = True | ||
|
||
test_name = "Selenium Test for Hosting Capacity Iowa State Algorithm" | ||
modelDir = Path( omf.omfDir, "data", "Model", username, test_name ) | ||
|
||
iowa_state_input = 'isu_testInputData.csv' | ||
default_name = 'mohcaInputCustom.csv' | ||
|
||
# TODO: Do I want to check if the automated test exists already and if so, delete it? | ||
|
||
driver = webdriver.Chrome() | ||
driver.get( url ) | ||
|
||
login_to_OMF( driver, username, password ) | ||
|
||
# Create hostingCapacity Model | ||
driver.find_element(by=By.ID, value="newModelButton").click() | ||
driver.find_element(by=By.XPATH, value="//a[contains(@href, 'hostingCapacity')]" ).click() | ||
|
||
# Not sure if waiting one second for the pop-up for the name is needed | ||
wait = WebDriverWait(driver, 1) | ||
alert = wait.until(expected_conditions.alert_is_present()) | ||
|
||
alert.send_keys(test_name) | ||
alert.accept() | ||
|
||
# Wait for model to load up | ||
# wait = WebDriverWait(driver, 3) # Adjust the timeout as needed | ||
wait.until(expected_conditions.staleness_of(driver.find_element(By.TAG_NAME, "html"))) | ||
|
||
# Upload Iowa State Input File | ||
ami_data_input = driver.find_element(by=By.ID, value="AMIDataFile") | ||
# Input must be of type string and be the absolute path to the file | ||
ami_data_input.send_keys( str( Path( omf.omfDir, 'static', 'testFiles', 'hostingCapacity', iowa_state_input).resolve() ) ) | ||
|
||
# Set Algorithm | ||
algo_type_dropdown = driver.find_element(By.ID, 'algorithm') | ||
select = Select(algo_type_dropdown) | ||
select.select_by_visible_text('iastate') | ||
|
||
# Turn Traditional Hosting Cap Off | ||
trad_algo_dropdown = driver.find_element(By.ID, 'optionalCircuitFile') | ||
select = Select(trad_algo_dropdown) | ||
select.select_by_visible_text('Off') | ||
|
||
time.sleep(3) | ||
|
||
driver.find_element(by=By.ID, value="runButton").click() | ||
|
||
#Can check for raw output and see if that is a good source to determine completion? | ||
|
||
userAMIDisplayFileName = driver.find_element(by=By.ID, value="userAMIDisplayFileName") | ||
newFileUploadedName = userAMIDisplayFileName.get_attribute('value') | ||
|
||
# Check if the HTML updates with the new file name that the user uploaded | ||
try: | ||
assert newFileUploadedName == iowa_state_input | ||
# DELETE print( newFileUploadedName + " " + voltageTestFile1) | ||
except AssertionError: | ||
print( "FAILED: Assertion that the HTML display name of the AMI Data input file correctly changed to the users file name") | ||
print( "Actual: " + newFileUploadedName + " Expected: " + iowa_state_input ) | ||
retVal = False | ||
|
||
#Check if the file is present in the directory with the correct standard naming convention name | ||
if Path( modelDir, default_name).exists() == False: | ||
print( default_name + " does not exist in the directory") | ||
retVal = False | ||
|
||
#Check to make sure that a file by the name the user uploaded is NOT there | ||
if Path( modelDir, iowa_state_input ).exists(): | ||
print( "File created with users file input name: " + iowa_state_input ) | ||
retVal = False | ||
|
||
# TODO: Check outputs - p vague idea | ||
# One idea - check if the raw input/output files exists. I think that's present for every algorithm after completion. | ||
WebDriverWait(driver, 600).until(expected_conditions.presence_of_element_located((By.ID, 'rawOutput'))) | ||
|
||
driver.quit() | ||
return retVal | ||
|
||
if __name__ == '__main__': | ||
url = 'http://localhost:5000/' | ||
if len( sys.argv ) != 3: | ||
print("Usage: python script.py <username> <password>") | ||
sys.exit(1) | ||
|
||
username = sys.argv[1] | ||
password = sys.argv[2] | ||
|
||
if chrome_test_iowa_state_algo( url, username, password ): | ||
print( "PASSED: Hosting Capacity Iowa State Algorithm") | ||
#firefox_test |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.