-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
159 lines (118 loc) · 4.57 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import numpy as np
from PIL import Image
import tensorflow as tf
from keras.preprocessing import image
import os
from os.path import join
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from keras.applications.vgg16 import VGG16
from sklearn.metrics.pairwise import cosine_similarity
import shutil
import Config
embLayer = VGG16(weights='imagenet', include_top=False,
pooling='max', input_shape=(224, 224, 3))
for model_layer in embLayer.layers:
model_layer.trainable = False
def load_image(image_path):
"""
-----------------------------------------------------
Process the image provided.
- Resize the image
-----------------------------------------------------
return resized image
"""
input_image = Image.open(image_path)
resized_image = input_image.resize((224, 224))
return resized_image
def get_image_embeddings(object_image : image):
"""
-----------------------------------------------------
convert image into 3d array and add additional dimension for model input
-----------------------------------------------------
return embeddings of the given image
"""
image_array = np.expand_dims(image.img_to_array(object_image), axis = 0)
image_embedding = embLayer.predict(image_array)
return image_embedding
def get_similarity_score(first_image : str, second_image : str):
"""
-----------------------------------------------------
Takes image array and computes its embedding using VGG16 embedding model.
-----------------------------------------------------
return embedding of the image
"""
first_image = load_image(first_image)
second_image = load_image(second_image)
first_image_vector = get_image_embeddings(first_image)
second_image_vector = get_image_embeddings(second_image)
similarity_score = cosine_similarity(first_image_vector, second_image_vector).reshape(1,)
return similarity_score
def show_image(image_path):
image = mpimg.imread(image_path)
imgplot = plt.imshow(image)
plt.show()
# define the path of the images
before0 = './Before/0.jpg'
before1 = './Before/1.jpg'
before2 = './Before/2.jpg'
after0 = './After/0.jpg'
after1 = './After/1.jpg'
after2 = './After/2.jpg'
THRESHOLD = 0.8
def trigger(before, after):
"""
-----------------------------------------------------
First step to recognize the images and return True/False based on the similarity score compared to the threshold constant.
-----------------------------------------------------
return similarity score
"""
similarity_score = get_similarity_score(before, after)
print(f"Similarity score: {similarity_score}")
if similarity_score < THRESHOLD:
return True
else:
return False
def building_segmentation(image):
"""
-----------------------------------------------------
Segment the image to identify the buildings and return the segmented image with bounded boxes!~
-----------------------------------------------------
return segmented image
"""
print("Machine Learning model to be implemented here")
img = load_image(image)
# # Not sure if using the pre-embedded weights is better or not, but we can try both later
# img_emb = get_image_embeddings(img)
# show_image(image)
pass
def moveDirectory(origin, destination):
shutil.move(origin, destination)
def process():
"""
-----------------------------------------------------
Process the images and return the generated map if flood is detected, otherwise does nothing in nominal mode.
-----------------------------------------------------
return similarity score
"""
imgs = os.listdir(Config.directory_to_write)
imgs.sort()
for img in imgs[:3]:
imgPath = join(Config.directory_to_write, img)
destination = join("./Before", img)
moveDirectory(imgPath, destination)
for img in imgs[3:]:
imgPath = join(Config.directory_to_write, img)
destination = join("./After", img)
moveDirectory(imgPath, destination)
befores = os.listdir("./Before")
afters = os.listdir("./After")
for i in range(3):
before = join("./Before", befores[i])
after = join("./After", afters[i])
triggered = trigger(before, after)
if triggered:
print(f"Flood Detected in the {i+1}/3 of Times Square")
else:
print(f"No flood detected in the {i+1}/3 of Times Square")
process()