diff --git a/modules/CameraCapture/app/AnnotationParser.py b/modules/CameraCapture/app/AnnotationParser.py index 61adc93..4e697e9 100644 --- a/modules/CameraCapture/app/AnnotationParser.py +++ b/modules/CameraCapture/app/AnnotationParser.py @@ -1,7 +1,9 @@ -#To make python 2 and python 3 compatible code +# To make python 2 and python 3 compatible code from __future__ import absolute_import -#Returns rectangle boundaries in the CV2 format (topLeftX, topLeftY, bottomRightX, bottomRightY) given by a processing service +# Returns rectangle boundaries in the CV2 format (topLeftX, topLeftY, bottomRightX, bottomRightY) given by a processing service + + class AnnotationParser: def getCV2RectanglesFromProcessingService1(self, response): try: @@ -10,53 +12,60 @@ def getCV2RectanglesFromProcessingService1(self, response): for decoration in item: if "box" in decoration.lower(): rectList = item[decoration].split(",") - top = int(rectList[0]) - left = int(rectList[1]) - width = int(rectList[2]) - height = int(rectList[3]) + top = int(rectList[0]) + left = int(rectList[1]) + width = int(rectList[2]) + height = int(rectList[3]) for decorationProperty in item[decoration]: if "top" in decorationProperty.lower(): top = int(item[decoration][decorationProperty]) if "left" in decorationProperty.lower(): - left = int(item[decoration][decorationProperty]) + left = int(item[decoration] + [decorationProperty]) if "width" in decorationProperty.lower(): - width = int(item[decoration][decorationProperty]) + width = int(item[decoration] + [decorationProperty]) if "height" in decorationProperty.lower(): - height = int(item[decoration][decorationProperty]) + height = int(item[decoration] + [decorationProperty]) if top is not None and left is not None and width is not None and height is not None: topLeftX = left topLeftY = top bottomRightX = left + width bottomRightY = top + height - listOfCV2Rectangles.append([topLeftX, topLeftY, bottomRightX, bottomRightY]) + listOfCV2Rectangles.append( + [topLeftX, topLeftY, bottomRightX, bottomRightY]) return listOfCV2Rectangles except: - #Ignoring exceptions for now so that video can be read and analyzed without post-processing in case of errors + # Ignoring exceptions for now so that video can be read and analyzed without post-processing in case of errors pass def getCV2RectanglesFromProcessingService2(self, response): - try: - listOfCV2Rectangles = [] - for item in response: - for decoration in item: - if "rect" in decoration.lower(): - for decorationProperty in item[decoration]: - if "top" in decorationProperty.lower(): - top = int(item[decoration][decorationProperty]) - if "left" in decorationProperty.lower(): - left = int(item[decoration][decorationProperty]) - if "width" in decorationProperty.lower(): - width = int(item[decoration][decorationProperty]) - if "height" in decorationProperty.lower(): - height = int(item[decoration][decorationProperty]) - if top is not None and left is not None and width is not None and height is not None: - topLeftX = left - topLeftY = top - bottomRightX = left + width - bottomRightY = top + height - listOfCV2Rectangles.append([topLeftX, topLeftY, bottomRightX, bottomRightY]) - return listOfCV2Rectangles - except: - #Ignoring exceptions for now so that video can be read and analyzed without post-processing in case of errors - pass - \ No newline at end of file + try: + listOfCV2Rectangles = [] + for item in response: + for decoration in item: + if "rect" in decoration.lower(): + for decorationProperty in item[decoration]: + if "top" in decorationProperty.lower(): + top = int(item[decoration][decorationProperty]) + if "left" in decorationProperty.lower(): + left = int(item[decoration] + [decorationProperty]) + if "width" in decorationProperty.lower(): + width = int(item[decoration] + [decorationProperty]) + if "height" in decorationProperty.lower(): + height = int(item[decoration] + [decorationProperty]) + if top is not None and left is not None and width is not None and height is not None: + topLeftX = left + topLeftY = top + bottomRightX = left + width + bottomRightY = top + height + listOfCV2Rectangles.append( + [topLeftX, topLeftY, bottomRightX, bottomRightY]) + return listOfCV2Rectangles + except: + # Ignoring exceptions for now so that video can be read and analyzed without post-processing in case of errors + pass diff --git a/modules/CameraCapture/app/CameraCapture.py b/modules/CameraCapture/app/CameraCapture.py index 6e5089f..9715a40 100644 --- a/modules/CameraCapture/app/CameraCapture.py +++ b/modules/CameraCapture/app/CameraCapture.py @@ -1,33 +1,33 @@ -#To make python 2 and python 3 compatible code +# To make python 2 and python 3 compatible code from __future__ import division from __future__ import absolute_import -#Imports +# Imports +from ImageServer import ImageServer +import ImageServer +from AnnotationParser import AnnotationParser +import AnnotationParser +from VideoStream import VideoStream +import VideoStream +import time +import json +import requests +import numpy import sys -if sys.version_info[0] < 3:#e.g python version <3 +if sys.version_info[0] < 3: # e.g python version <3 import cv2 else: import cv2 from cv2 import cv2 # pylint: disable=E1101 # pylint: disable=E0401 -# Disabling linting that is not supported by Pylint for C extensions such as OpenCV. See issue https://github.com/PyCQA/pylint/issues/1955 -import numpy -import requests -import json -import time +# Disabling linting that is not supported by Pylint for C extensions such as OpenCV. See issue https://github.com/PyCQA/pylint/issues/1955 -import VideoStream -from VideoStream import VideoStream -import AnnotationParser -from AnnotationParser import AnnotationParser -import ImageServer -from ImageServer import ImageServer class CameraCapture(object): - def __IsInt(self,string): - try: + def __IsInt(self, string): + try: int(string) return True except ValueError: @@ -36,26 +36,26 @@ def __IsInt(self,string): def __init__( self, videoPath, - imageProcessingEndpoint = "", - imageProcessingParams = "", - showVideo = False, - verbose = False, - loopVideo = True, - convertToGray = False, - resizeWidth = 0, - resizeHeight = 0, - annotate = False, - sendToHubCallback = None): + imageProcessingEndpoint="", + imageProcessingParams="", + showVideo=False, + verbose=False, + loopVideo=True, + convertToGray=False, + resizeWidth=0, + resizeHeight=0, + annotate=False, + sendToHubCallback=None): self.videoPath = videoPath if self.__IsInt(videoPath): - #case of a usb camera (usually mounted at /dev/video* where * is an int) + # case of a usb camera (usually mounted at /dev/video* where * is an int) self.isWebcam = True else: - #case of a video file + # case of a video file self.isWebcam = False self.imageProcessingEndpoint = imageProcessingEndpoint if imageProcessingParams == "": - self.imageProcessingParams = "" + self.imageProcessingParams = "" else: self.imageProcessingParams = json.loads(imageProcessingParams) self.showVideo = showVideo @@ -64,30 +64,34 @@ def __init__( self.convertToGray = convertToGray self.resizeWidth = resizeWidth self.resizeHeight = resizeHeight - self.annotate = (self.imageProcessingEndpoint != "") and self.showVideo & annotate + self.annotate = (self.imageProcessingEndpoint != + "") and self.showVideo & annotate self.nbOfPreprocessingSteps = 0 self.autoRotate = False self.sendToHubCallback = sendToHubCallback self.vs = None if self.convertToGray: - self.nbOfPreprocessingSteps +=1 + self.nbOfPreprocessingSteps += 1 if self.resizeWidth != 0 or self.resizeHeight != 0: - self.nbOfPreprocessingSteps +=1 + self.nbOfPreprocessingSteps += 1 if self.verbose: print("Initialising the camera capture with the following parameters: ") print(" - Video path: " + self.videoPath) - print(" - Image processing endpoint: " + self.imageProcessingEndpoint) - print(" - Image processing params: " + json.dumps(self.imageProcessingParams)) + print(" - Image processing endpoint: " + + self.imageProcessingEndpoint) + print(" - Image processing params: " + + json.dumps(self.imageProcessingParams)) print(" - Show video: " + str(self.showVideo)) print(" - Loop video: " + str(self.loopVideo)) print(" - Convert to gray: " + str(self.convertToGray)) print(" - Resize width: " + str(self.resizeWidth)) print(" - Resize height: " + str(self.resizeHeight)) print(" - Annotate: " + str(self.annotate)) - print(" - Send processing results to hub: " + str(self.sendToHubCallback is not None)) + print(" - Send processing results to hub: " + + str(self.sendToHubCallback is not None)) print() - + self.displayFrame = None if self.showVideo: self.imageServer = ImageServer(5012, self) @@ -95,25 +99,30 @@ def __init__( def __annotate(self, frame, response): AnnotationParserInstance = AnnotationParser() - #TODO: Make the choice of the service configurable - listOfRectanglesToDisplay = AnnotationParserInstance.getCV2RectanglesFromProcessingService1(response) + # TODO: Make the choice of the service configurable + listOfRectanglesToDisplay = AnnotationParserInstance.getCV2RectanglesFromProcessingService1( + response) for rectangle in listOfRectanglesToDisplay: - cv2.rectangle(frame, (rectangle(0), rectangle(1)), (rectangle(2), rectangle(3)), (0,0,255),4) + cv2.rectangle(frame, (rectangle(0), rectangle(1)), + (rectangle(2), rectangle(3)), (0, 0, 255), 4) return def __sendFrameForProcessing(self, frame): headers = {'Content-Type': 'application/octet-stream'} try: - response = requests.post(self.imageProcessingEndpoint, headers = headers, params = self.imageProcessingParams, data = frame) + response = requests.post( + self.imageProcessingEndpoint, headers=headers, params=self.imageProcessingParams, data=frame) except Exception as e: print('__sendFrameForProcessing Excpetion -' + str(e)) return "[]" if self.verbose: try: - print("Response from external processing service: (" + str(response.status_code) + ") " + json.dumps(response.json())) + print("Response from external processing service: (" + + str(response.status_code) + ") " + json.dumps(response.json())) except Exception: - print("Response from external processing service (status code): " + str(response.status_code)) + print("Response from external processing service (status code): " + + str(response.status_code)) return json.dumps(response.json()) def __displayTimeDifferenceInMs(self, endTime, startTime): @@ -121,12 +130,13 @@ def __displayTimeDifferenceInMs(self, endTime, startTime): def __enter__(self): if self.isWebcam: - #The VideoStream class always gives us the latest frame from the webcam. It uses another thread to read the frames. + # The VideoStream class always gives us the latest frame from the webcam. It uses another thread to read the frames. self.vs = VideoStream(int(self.videoPath)).start() - time.sleep(1.0)#needed to load at least one frame into the VideoStream class + # needed to load at least one frame into the VideoStream class + time.sleep(1.0) #self.capture = cv2.VideoCapture(int(self.videoPath)) else: - #In the case of a video file, we want to analyze all the frames of the video thus are not using VideoStream class + # In the case of a video file, we want to analyze all the frames of the video thus are not using VideoStream class self.capture = cv2.VideoCapture(self.videoPath) return self @@ -142,7 +152,7 @@ def start(self): if self.verbose: startCapture = time.time() - frameCounter +=1 + frameCounter += 1 if self.isWebcam: frame = self.vs.read() else: @@ -151,107 +161,128 @@ def start(self): if self.capture.get(cv2.CAP_PROP_FRAME_WIDTH) < self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT): self.autoRotate = True if self.autoRotate: - frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) #The counterclockwise is random...It coudl well be clockwise. Is there a way to auto detect it? + # The counterclockwise is random...It coudl well be clockwise. Is there a way to auto detect it? + frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) if self.verbose: if frameCounter == 1: if not self.isWebcam: - print("Original frame size: " + str(int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))) + "x" + str(int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))) - print("Frame rate (FPS): " + str(int(self.capture.get(cv2.CAP_PROP_FPS)))) + print("Original frame size: " + str(int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH)) + ) + "x" + str(int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))) + print("Frame rate (FPS): " + + str(int(self.capture.get(cv2.CAP_PROP_FPS)))) print("Frame number: " + str(frameCounter)) - print("Time to capture (+ straighten up) a frame: " + self.__displayTimeDifferenceInMs(time.time(), startCapture)) + print("Time to capture (+ straighten up) a frame: " + + self.__displayTimeDifferenceInMs(time.time(), startCapture)) startPreProcessing = time.time() - - #Loop video - if not self.isWebcam: + + # Loop video + if not self.isWebcam: if frameCounter == self.capture.get(cv2.CAP_PROP_FRAME_COUNT): - if self.loopVideo: + if self.loopVideo: frameCounter = 0 self.capture.set(cv2.CAP_PROP_POS_FRAMES, 0) else: break - #Pre-process locally + # Pre-process locally if self.nbOfPreprocessingSteps == 1 and self.convertToGray: preprocessedFrame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - + if self.nbOfPreprocessingSteps == 1 and (self.resizeWidth != 0 or self.resizeHeight != 0): - preprocessedFrame = cv2.resize(frame, (self.resizeWidth, self.resizeHeight)) + preprocessedFrame = cv2.resize( + frame, (self.resizeWidth, self.resizeHeight)) if self.nbOfPreprocessingSteps > 1: preprocessedFrame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - preprocessedFrame = cv2.resize(preprocessedFrame, (self.resizeWidth,self.resizeHeight)) - + preprocessedFrame = cv2.resize( + preprocessedFrame, (self.resizeWidth, self.resizeHeight)) + if self.verbose: - print("Time to pre-process a frame: " + self.__displayTimeDifferenceInMs(time.time(), startPreProcessing)) + print("Time to pre-process a frame: " + + self.__displayTimeDifferenceInMs(time.time(), startPreProcessing)) startEncodingForProcessing = time.time() - #Process externally + # Process externally if self.imageProcessingEndpoint != "": - #Encode frame to send over HTTP + # Encode frame to send over HTTP if self.nbOfPreprocessingSteps == 0: encodedFrame = cv2.imencode(".jpg", frame)[1].tostring() else: - encodedFrame = cv2.imencode(".jpg", preprocessedFrame)[1].tostring() + encodedFrame = cv2.imencode(".jpg", preprocessedFrame)[ + 1].tostring() if self.verbose: - print("Time to encode a frame for processing: " + self.__displayTimeDifferenceInMs(time.time(), startEncodingForProcessing)) + print("Time to encode a frame for processing: " + + self.__displayTimeDifferenceInMs(time.time(), startEncodingForProcessing)) startProcessingExternally = time.time() - #Send over HTTP for processing + # Send over HTTP for processing response = self.__sendFrameForProcessing(encodedFrame) if self.verbose: - print("Time to process frame externally: " + self.__displayTimeDifferenceInMs(time.time(), startProcessingExternally)) + print("Time to process frame externally: " + + self.__displayTimeDifferenceInMs(time.time(), startProcessingExternally)) startSendingToEdgeHub = time.time() - #forwarding outcome of external processing to the EdgeHub + # forwarding outcome of external processing to the EdgeHub if response != "[]" and self.sendToHubCallback is not None: self.sendToHubCallback(response) if self.verbose: - print("Time to message from processing service to edgeHub: " + self.__displayTimeDifferenceInMs(time.time(), startSendingToEdgeHub)) + print("Time to message from processing service to edgeHub: " + + self.__displayTimeDifferenceInMs(time.time(), startSendingToEdgeHub)) startDisplaying = time.time() - #Display frames + # Display frames if self.showVideo: try: if self.nbOfPreprocessingSteps == 0: if self.verbose and (perfForOneFrameInMs is not None): - cv2.putText(frame, "FPS " + str(round(1000/perfForOneFrameInMs, 2)),(10, 35),cv2.FONT_HERSHEY_SIMPLEX,1.0,(0,0,255), 2) + cv2.putText(frame, "FPS " + str(round(1000/perfForOneFrameInMs, 2)), + (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2) if self.annotate: - #TODO: fix bug with annotate function + # TODO: fix bug with annotate function self.__annotate(frame, response) - self.displayFrame = cv2.imencode('.jpg', frame)[1].tobytes() + self.displayFrame = cv2.imencode( + '.jpg', frame)[1].tobytes() else: if self.verbose and (perfForOneFrameInMs is not None): - cv2.putText(preprocessedFrame, "FPS " + str(round(1000/perfForOneFrameInMs, 2)),(10, 35),cv2.FONT_HERSHEY_SIMPLEX,1.0,(0,0,255), 2) + cv2.putText(preprocessedFrame, "FPS " + str(round(1000/perfForOneFrameInMs, 2)), + (10, 35), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2) if self.annotate: - #TODO: fix bug with annotate function + # TODO: fix bug with annotate function self.__annotate(preprocessedFrame, response) - self.displayFrame = cv2.imencode('.jpg', preprocessedFrame)[1].tobytes() + self.displayFrame = cv2.imencode( + '.jpg', preprocessedFrame)[1].tobytes() except Exception as e: - print("Could not display the video to a web browser.") + print("Could not display the video to a web browser.") print('Excpetion -' + str(e)) if self.verbose: if 'startDisplaying' in locals(): - print("Time to display frame: " + self.__displayTimeDifferenceInMs(time.time(), startDisplaying)) + print("Time to display frame: " + + self.__displayTimeDifferenceInMs(time.time(), startDisplaying)) elif 'startSendingToEdgeHub' in locals(): - print("Time to display frame: " + self.__displayTimeDifferenceInMs(time.time(), startSendingToEdgeHub)) + print("Time to display frame: " + + self.__displayTimeDifferenceInMs(time.time(), startSendingToEdgeHub)) else: - print("Time to display frame: " + self.__displayTimeDifferenceInMs(time.time(), startEncodingForProcessing)) + print("Time to display frame: " + self.__displayTimeDifferenceInMs( + time.time(), startEncodingForProcessing)) perfForOneFrameInMs = int((time.time()-startOverall) * 1000) if not self.isWebcam: - waitTimeBetweenFrames = max(int(1000 / self.capture.get(cv2.CAP_PROP_FPS))-perfForOneFrameInMs, 1) - print("Wait time between frames :" + str(waitTimeBetweenFrames)) + waitTimeBetweenFrames = max( + int(1000 / self.capture.get(cv2.CAP_PROP_FPS))-perfForOneFrameInMs, 1) + print("Wait time between frames :" + + str(waitTimeBetweenFrames)) if cv2.waitKey(waitTimeBetweenFrames) & 0xFF == ord('q'): break if self.verbose: perfForOneFrameInMs = int((time.time()-startOverall) * 1000) - print("Total time for one frame: " + self.__displayTimeDifferenceInMs(time.time(), startOverall)) + print("Total time for one frame: " + + self.__displayTimeDifferenceInMs(time.time(), startOverall)) def __exit__(self, exception_type, exception_value, traceback): if not self.isWebcam: self.capture.release() if self.showVideo: self.imageServer.close() - cv2.destroyAllWindows() \ No newline at end of file + cv2.destroyAllWindows() diff --git a/modules/CameraCapture/app/ImageServer.py b/modules/CameraCapture/app/ImageServer.py index ddccad3..f950e37 100644 --- a/modules/CameraCapture/app/ImageServer.py +++ b/modules/CameraCapture/app/ImageServer.py @@ -7,6 +7,7 @@ import base64 import os + class ImageStreamHandler(tornado.websocket.WebSocketHandler): def initialize(self, camera): self.clients = [] @@ -30,6 +31,7 @@ def on_close(self): self.clients.remove(self) print("Image Server Connection::closed") + class ImageServer(threading.Thread): def __init__(self, port, cameraObj): @@ -42,16 +44,18 @@ def run(self): try: asyncio.set_event_loop(asyncio.new_event_loop()) - indexPath = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'templates') + indexPath = os.path.join(os.path.dirname( + os.path.realpath(__file__)), 'templates') app = tornado.web.Application([ (r"/stream", ImageStreamHandler, {'camera': self.camera}), - (r"/(.*)", tornado.web.StaticFileHandler, {'path': indexPath, 'default_filename': 'index.html'}) + (r"/(.*)", tornado.web.StaticFileHandler, + {'path': indexPath, 'default_filename': 'index.html'}) ]) app.listen(self.port) - print ('ImageServer::Started.') + print('ImageServer::Started.') tornado.ioloop.IOLoop.current().start() except Exception as e: - print('ImageServer::exited run loop. Exception - '+ str(e)) + print('ImageServer::exited run loop. Exception - ' + str(e)) def close(self): - print ('ImageServer::Closed.') \ No newline at end of file + print('ImageServer::Closed.') diff --git a/modules/CameraCapture/app/VideoStream.py b/modules/CameraCapture/app/VideoStream.py index 3e7a1b6..bd4c54c 100644 --- a/modules/CameraCapture/app/VideoStream.py +++ b/modules/CameraCapture/app/VideoStream.py @@ -1,70 +1,72 @@ -#To make python 2 and python 3 compatible code +# To make python 2 and python 3 compatible code from __future__ import absolute_import from threading import Thread import sys -if sys.version_info[0] < 3:#e.g python version <3 +if sys.version_info[0] < 3: # e.g python version <3 import cv2 else: import cv2 from cv2 import cv2 # pylint: disable=E1101 # pylint: disable=E0401 -# Disabling linting that is not supported by Pylint for C extensions such as OpenCV. See issue https://github.com/PyCQA/pylint/issues/1955 +# Disabling linting that is not supported by Pylint for C extensions such as OpenCV. See issue https://github.com/PyCQA/pylint/issues/1955 # import the Queue class from Python 3 if sys.version_info >= (3, 0): - from queue import Queue + from queue import Queue # otherwise, import the Queue class for Python 2.7 else: - from Queue import Queue + from Queue import Queue + +# This class reads all the video frames in a separate thread and always has the keeps only the latest frame in its queue to be grabbed by another thread + -#This class reads all the video frames in a separate thread and always has the keeps only the latest frame in its queue to be grabbed by another thread class VideoStream(object): - def __init__(self, path, queueSize=3): - self.stream = cv2.VideoCapture(path) - self.stopped = False - self.Q = Queue(maxsize=queueSize) - - def start(self): - # start a thread to read frames from the video stream - t = Thread(target=self.update, args=()) - t.daemon = True - t.start() - return self - - def update(self): - try: - while True: - if self.stopped: - return - - if not self.Q.full(): - (grabbed, frame) = self.stream.read() - - # if the `grabbed` boolean is `False`, then we have - # reached the end of the video file - if not grabbed: - self.stop() - return - - self.Q.put(frame) - - #Clean the queue to keep only the latest frame - while self.Q.qsize() > 1: - self.Q.get() - except Exception as e: - print("got error: "+str(e)) - - def read(self): - return self.Q.get() - - def more(self): - return self.Q.qsize() > 0 - - def stop(self): - self.stopped = True - - def __exit__(self, exception_type, exception_value, traceback): - self.stream.release() \ No newline at end of file + def __init__(self, path, queueSize=3): + self.stream = cv2.VideoCapture(path) + self.stopped = False + self.Q = Queue(maxsize=queueSize) + + def start(self): + # start a thread to read frames from the video stream + t = Thread(target=self.update, args=()) + t.daemon = True + t.start() + return self + + def update(self): + try: + while True: + if self.stopped: + return + + if not self.Q.full(): + (grabbed, frame) = self.stream.read() + + # if the `grabbed` boolean is `False`, then we have + # reached the end of the video file + if not grabbed: + self.stop() + return + + self.Q.put(frame) + + # Clean the queue to keep only the latest frame + while self.Q.qsize() > 1: + self.Q.get() + except Exception as e: + print("got error: "+str(e)) + + def read(self): + return self.Q.get() + + def more(self): + return self.Q.qsize() > 0 + + def stop(self): + self.stopped = True + + def __exit__(self, exception_type, exception_value, traceback): + self.stream.release() diff --git a/modules/CameraCapture/app/main.py b/modules/CameraCapture/app/main.py index 4e04a04..1706135 100644 --- a/modules/CameraCapture/app/main.py +++ b/modules/CameraCapture/app/main.py @@ -9,7 +9,7 @@ import iothub_client # pylint: disable=E0611 -# Disabling linting that is not supported by Pylint for C extensions such as iothub_client. See issue https://github.com/PyCQA/pylint/issues/1955 +# Disabling linting that is not supported by Pylint for C extensions such as iothub_client. See issue https://github.com/PyCQA/pylint/issues/1955 from iothub_client import (IoTHubModuleClient, IoTHubClientError, IoTHubError, IoTHubMessage, IoTHubMessageDispositionResult, IoTHubTransportProvider) @@ -21,15 +21,19 @@ # global counters SEND_CALLBACKS = 0 + def send_to_Hub_callback(strMessage): message = IoTHubMessage(bytearray(strMessage, 'utf8')) hubManager.send_event_to_output("output1", message, 0) # Callback received when the message that we're forwarding is processed. + + def send_confirmation_callback(message, result, user_context): global SEND_CALLBACKS SEND_CALLBACKS += 1 + class HubManager(object): def __init__( @@ -49,26 +53,27 @@ def __init__( self.client = IoTHubModuleClient() self.client.create_from_environment(protocol) self.client.set_option("messageTimeout", self.messageTimeout) - self.client.set_option("product_info","edge-camera-capture") + self.client.set_option("product_info", "edge-camera-capture") if verbose: - self.client.set_option("logtrace", 1)#enables MQTT logging + self.client.set_option("logtrace", 1) # enables MQTT logging def send_event_to_output(self, outputQueueName, event, send_context): - self.client.send_event_async(outputQueueName, event, send_confirmation_callback, send_context) + self.client.send_event_async( + outputQueueName, event, send_confirmation_callback, send_context) def main( videoPath, - imageProcessingEndpoint = "", - imageProcessingParams = "", - showVideo = False, - verbose = False, - loopVideo = True, - convertToGray = False, - resizeWidth = 0, - resizeHeight = 0, - annotate = False - ): + imageProcessingEndpoint="", + imageProcessingParams="", + showVideo=False, + verbose=False, + loopVideo=True, + convertToGray=False, + resizeWidth=0, + resizeHeight=0, + annotate=False +): ''' Capture a camera feed, send it to processing and forward outputs to EdgeHub @@ -84,18 +89,19 @@ def main( :param bool annotate: when showing the video in a window, it will annotate the frames with rectangles given by the image processing service. False by default. Optional. Rectangles should be passed in a json blob with a key containing the string rectangle, and a top left corner + bottom right corner or top left corner with width and height. ''' try: - print ( "\nPython %s\n" % sys.version ) - print ( "Camera Capture Azure IoT Edge Module. Press Ctrl-C to exit." ) + print("\nPython %s\n" % sys.version) + print("Camera Capture Azure IoT Edge Module. Press Ctrl-C to exit.") try: global hubManager - hubManager = HubManager(10000, IoTHubTransportProvider.MQTT, verbose) + hubManager = HubManager( + 10000, IoTHubTransportProvider.MQTT, verbose) except IoTHubError as iothub_error: - print ( "Unexpected error %s from IoTHub" % iothub_error ) + print("Unexpected error %s from IoTHub" % iothub_error) return with CameraCapture(videoPath, imageProcessingEndpoint, imageProcessingParams, showVideo, verbose, loopVideo, convertToGray, resizeWidth, resizeHeight, annotate, send_to_Hub_callback) as cameraCapture: cameraCapture.start() except KeyboardInterrupt: - print ( "Camera capture module stopped" ) + print("Camera capture module stopped") def __convertStringToBool(env): @@ -115,14 +121,15 @@ def __convertStringToBool(env): SHOW_VIDEO = __convertStringToBool(os.getenv('SHOW_VIDEO', 'False')) VERBOSE = __convertStringToBool(os.getenv('VERBOSE', 'False')) LOOP_VIDEO = __convertStringToBool(os.getenv('LOOP_VIDEO', 'True')) - CONVERT_TO_GRAY = __convertStringToBool(os.getenv('CONVERT_TO_GRAY', 'False')) + CONVERT_TO_GRAY = __convertStringToBool( + os.getenv('CONVERT_TO_GRAY', 'False')) RESIZE_WIDTH = int(os.getenv('RESIZE_WIDTH', 0)) - RESIZE_HEIGHT = int(os.getenv('RESIZE_HEIGHT',0)) + RESIZE_HEIGHT = int(os.getenv('RESIZE_HEIGHT', 0)) ANNOTATE = __convertStringToBool(os.getenv('ANNOTATE', 'False')) except ValueError as error: - print ( error ) + print(error) sys.exit(1) - main(VIDEO_PATH, IMAGE_PROCESSING_ENDPOINT, IMAGE_PROCESSING_PARAMS, SHOW_VIDEO, VERBOSE, LOOP_VIDEO, CONVERT_TO_GRAY, RESIZE_WIDTH, RESIZE_HEIGHT, ANNOTATE) - + main(VIDEO_PATH, IMAGE_PROCESSING_ENDPOINT, IMAGE_PROCESSING_PARAMS, SHOW_VIDEO, + VERBOSE, LOOP_VIDEO, CONVERT_TO_GRAY, RESIZE_WIDTH, RESIZE_HEIGHT, ANNOTATE) diff --git a/modules/CameraCapture/test/IntegrationTests.py b/modules/CameraCapture/test/IntegrationTests.py index ecfdd9e..9c2a6f6 100644 --- a/modules/CameraCapture/test/IntegrationTests.py +++ b/modules/CameraCapture/test/IntegrationTests.py @@ -1,7 +1,7 @@ +from CameraCapture import CameraCapture +import CameraCapture import sys sys.path.insert(0, '../app/') -import CameraCapture -from CameraCapture import CameraCapture try: @@ -24,11 +24,11 @@ with CameraCapture("1", verbose=True, loopVideo=False) as cameraCapture: cameraCapture.start() print("Test #4 completed") - + print("Test #5 - video0 device - showvideo = True") with CameraCapture("../test/AppleAndBanana.mp4", showVideo=True, loopVideo=True, resizeHeight=256, resizeWidth=256) as cameraCapture: cameraCapture.start() print("Test #5 completed") except Exception as exception: - print ( "Error while executing camera Capture tests: (%s)" % exception) \ No newline at end of file + print("Error while executing camera Capture tests: (%s)" % exception) diff --git a/modules/CameraCapture/test/UnitTests.py b/modules/CameraCapture/test/UnitTests.py index aa48c7d..6335bd9 100644 --- a/modules/CameraCapture/test/UnitTests.py +++ b/modules/CameraCapture/test/UnitTests.py @@ -1,20 +1,26 @@ +import app.AnnotationParser import unittest import json import sys sys.path.insert(0, '../') # pylint: disable=E0401 -import app.AnnotationParser + class UnitTests(unittest.TestCase): def test_getCV2RectanglesFromProcessingService1(self): AnnotationParser = app.AnnotationParser.AnnotationParser() - response=json.loads("{\"language\":\"en\",\"textAngle\":0,\"orientation\":\"Up\",\"regions\":[{\"boundingBox\":\"400,560,3272,288\",\"lines\":[{\"boundingBox\":\"400,560,3272,288\",\"words\":[{\"boundingBox\":\"400,560,672,280\",\"text\":\"word1\"},{\"boundingBox\":\"1200,568,216,272\",\"text\":\"word2\"}]}]}]}") - self.assertEqual(AnnotationParser.getCV2RectanglesFromProcessingService1(response), [[560,400,3832,688]]) - + response = json.loads( + "{\"language\":\"en\",\"textAngle\":0,\"orientation\":\"Up\",\"regions\":[{\"boundingBox\":\"400,560,3272,288\",\"lines\":[{\"boundingBox\":\"400,560,3272,288\",\"words\":[{\"boundingBox\":\"400,560,672,280\",\"text\":\"word1\"},{\"boundingBox\":\"1200,568,216,272\",\"text\":\"word2\"}]}]}]}") + self.assertEqual(AnnotationParser.getCV2RectanglesFromProcessingService1( + response), [[560, 400, 3832, 688]]) + def test_getCV2RectanglesFromProcessingService2(self): AnnotationParser = app.AnnotationParser.AnnotationParser() - response=json.loads("[{\"Id\": \"c5c24a82-6845-4031-9d5d-978df9175426\",\"rectangle\": {\"top\": 54, \"left\": 394,\"width\": 78,\"height\": 78}}]") - self.assertEqual(AnnotationParser.getCV2RectanglesFromProcessingService2(response), [[394,54,472,132]]) + response = json.loads( + "[{\"Id\": \"c5c24a82-6845-4031-9d5d-978df9175426\",\"rectangle\": {\"top\": 54, \"left\": 394,\"width\": 78,\"height\": 78}}]") + self.assertEqual(AnnotationParser.getCV2RectanglesFromProcessingService2( + response), [[394, 54, 472, 132]]) + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/modules/ImageClassifierService/app/app.py b/modules/ImageClassifierService/app/app.py index b34184e..423453e 100644 --- a/modules/ImageClassifierService/app/app.py +++ b/modules/ImageClassifierService/app/app.py @@ -17,7 +17,7 @@ app = Flask(__name__) # 4MB Max image size limit -app.config['MAX_CONTENT_LENGTH'] = 4 * 1024 * 1024 +app.config['MAX_CONTENT_LENGTH'] = 4 * 1024 * 1024 # Default route just shows simple text @app.route('/') @@ -25,7 +25,7 @@ def index(): return 'CustomVision.ai model host harness' # Like the CustomVision.ai Prediction service /image route handles either -# - octet-stream image file +# - octet-stream image file # - a multipart/form-data with files in the imageData parameter @app.route('/image', methods=['POST']) def predict_image_handler(): @@ -47,7 +47,7 @@ def predict_image_handler(): # Like the CustomVision.ai Prediction service /url route handles url's # in the body of hte request of the form: -# { 'Url': ''} +# { 'Url': ''} @app.route('/url', methods=['POST']) def predict_url_handler(): try: @@ -58,10 +58,10 @@ def predict_url_handler(): print('EXCEPTION:', str(e)) return 'Error processing image' + if __name__ == '__main__': # Load and intialize the model initialize() # Run the server app.run(host='0.0.0.0', port=80) - diff --git a/modules/ImageClassifierService/app/predict.py b/modules/ImageClassifierService/app/predict.py index 795ba7e..70b6dff 100644 --- a/modules/ImageClassifierService/app/predict.py +++ b/modules/ImageClassifierService/app/predict.py @@ -8,12 +8,12 @@ # import scipy # from scipy import misc import sys -import os +import os filename = 'model.pb' labels_filename = 'labels.txt' -mean_values_b_g_r = (0,0,0) +mean_values_b_g_r = (0, 0, 0) size = (256, 256) output_layer = 'loss:0' @@ -22,8 +22,9 @@ graph_def = tf.GraphDef() labels = [] + def initialize(): - print('Loading model...',end=''), + print('Loading model...', end=''), with tf.gfile.FastGFile(filename, 'rb') as f: graph_def.ParseFromString(f.read()) tf.import_graph_def(graph_def, name='') @@ -34,36 +35,40 @@ def initialize(): l = l[:-1] labels.append(l) print(len(labels), 'found. Success!') - -def crop_center(img,cropx,cropy): - y,x,z = img.shape + + +def crop_center(img, cropx, cropy): + y, x, z = img.shape startx = x//2-(cropx//2) starty = y//2-(cropy//2) print('crop_center: ', x, 'x', y, 'to', cropx, 'x', cropy) - return img[starty:starty+cropy,startx:startx+cropx] + return img[starty:starty+cropy, startx:startx+cropx] + def predict_url(imageUrl): - print('Predicting from url: ',imageUrl) + print('Predicting from url: ', imageUrl) with urlopen(imageUrl) as testImage: # image = scipy.misc.imread(testImage) image = Image.open(testImage) return predict_image(image) + def predict_image(image): print('Predicting image') tf.reset_default_graph() tf.import_graph_def(graph_def, name='') - + with tf.Session() as sess: prob_tensor = sess.graph.get_tensor_by_name(output_layer) - input_tensor_shape = sess.graph.get_tensor_by_name('Placeholder:0').shape.as_list() + input_tensor_shape = sess.graph.get_tensor_by_name( + 'Placeholder:0').shape.as_list() network_input_size = input_tensor_shape[1] # w = image.shape[0] # h = image.shape[1] w, h = image.size - print('Image size',w,'x',h) + print('Image size', w, 'x', h) # scaling if w > h: @@ -72,23 +77,26 @@ def predict_image(image): new_size = (size[0], int((float(size[0]) / w) * h), 3) # resize - if not (new_size[0] == w and new_size[0] == h): - print('Resizing to', new_size[0],'x',new_size[1]) + if not (new_size[0] == w and new_size[0] == h): + print('Resizing to', new_size[0], 'x', new_size[1]) #augmented_image = scipy.misc.imresize(image, new_size) - augmented_image = np.asarray(image.resize((new_size[0], new_size[1]))) + augmented_image = np.asarray( + image.resize((new_size[0], new_size[1]))) else: augmented_image = np.asarray(image) # crop center try: - augmented_image = crop_center(augmented_image, network_input_size, network_input_size) + augmented_image = crop_center( + augmented_image, network_input_size, network_input_size) except: return 'error: crop_center' augmented_image = augmented_image.astype(float) # RGB -> BGR - red, green, blue = tf.split(axis=2, num_or_size_splits=3, value=augmented_image) + red, green, blue = tf.split( + axis=2, num_or_size_splits=3, value=augmented_image) image_normalized = tf.concat(axis=2, values=[ blue - mean_values_b_g_r[0], @@ -104,9 +112,10 @@ def predict_image(image): result = [] idx = 0 for p in predictions: - truncated_probablity = np.float64(round(p,8)) + truncated_probablity = np.float64(round(p, 8)) if (truncated_probablity > 1e-8): - result.append({'Tag': labels[idx], 'Probability': truncated_probablity }) + result.append( + {'Tag': labels[idx], 'Probability': truncated_probablity}) idx += 1 print('Results: ', str(result)) return result diff --git a/modules/SenseHatDisplay/app/DisplayManager.py b/modules/SenseHatDisplay/app/DisplayManager.py index 93eafb6..c09d30a 100644 --- a/modules/SenseHatDisplay/app/DisplayManager.py +++ b/modules/SenseHatDisplay/app/DisplayManager.py @@ -3,15 +3,17 @@ import time from enum import Enum + class Colors(Enum): Green = (0, 255, 0) Yellow = (255, 255, 0) Blue = (0, 0, 255) Red = (255, 0, 0) - White = (255,255,255) - Nothing = (0,0,0) - Pink = (255,105, 180) - Orange = (255,165, 0) + White = (255, 255, 255) + Nothing = (0, 0, 0) + Pink = (255, 105, 180) + Orange = (255, 165, 0) + class DisplayManager(object): def __apple(self): @@ -19,14 +21,14 @@ def __apple(self): N = Colors.Nothing.value Y = Colors.Yellow.value logo = [ - N, N, N, N, Y, Y, N, N, - N, N, N, Y, Y, N, N, N, - N, N, G, G, G, G, N, N, - N, G, G, G, G, G, G, N, - N, G, G, G, G, G, G, N, - N, G, G, G, G, G, G, N, - N, G, G, G, G, G, G, N, - N, N, G, G, G, G, N, N, + N, N, N, N, Y, Y, N, N, + N, N, N, Y, Y, N, N, N, + N, N, G, G, G, G, N, N, + N, G, G, G, G, G, G, N, + N, G, G, G, G, G, G, N, + N, G, G, G, G, G, G, N, + N, G, G, G, G, G, G, N, + N, N, G, G, G, G, N, N, ] return logo @@ -35,14 +37,14 @@ def __raspberry(self): N = Colors.Nothing.value R = Colors.Red.value logo = [ - N, G, G, N, N, G, G, N, - N, N, G, G, G, G, N, N, - N, N, R, R, R, R, N, N, - N, R, R, R, R, R, R, N, - R, R, R, R, R, R, R, R, - R, R, R, R, R, R, R, R, - N, R, R, R, R, R, R, N, - N, N, R, R, R, R, N, N, + N, G, G, N, N, G, G, N, + N, N, G, G, G, G, N, N, + N, N, R, R, R, R, N, N, + N, R, R, R, R, R, R, N, + R, R, R, R, R, R, R, R, + R, R, R, R, R, R, R, R, + N, R, R, R, R, R, R, N, + N, N, R, R, R, R, N, N, ] return logo @@ -50,14 +52,14 @@ def __banana(self): N = Colors.Nothing.value Y = Colors.Yellow.value logo = [ - N, N, Y, Y, N, N, N, N, - N, Y, Y, Y, N, N, N, N, - Y, Y, Y, N, N, N, N, N, - Y, Y, Y, N, N, N, N, N, - N, Y, Y, Y, N, N, N, N, - N, N, Y, Y, Y, N, N, N, - N, N, N, Y, Y, Y, Y, N, - N, N, N, N, N, Y, Y, Y, + N, N, Y, Y, N, N, N, N, + N, Y, Y, Y, N, N, N, N, + Y, Y, Y, N, N, N, N, N, + Y, Y, Y, N, N, N, N, N, + N, Y, Y, Y, N, N, N, N, + N, N, Y, Y, Y, N, N, N, + N, N, N, Y, Y, Y, Y, N, + N, N, N, N, N, Y, Y, Y, ] return logo @@ -65,14 +67,14 @@ def __orange(self): N = Colors.Nothing.value O = Colors.Orange.value logo = [ - N, N, N, O, O, N, N, N, - N, O, O, O, O, O, O, N, - N, O, O, O, O, O, O, N, - O, O, O, O, O, O, O, O, - O, O, O, O, O, O, O, O, - N, O, O, O, O, O, O, N, - N, O, O, O, O, O, O, N, - N, N, N, O, O, N, N, N, + N, N, N, O, O, N, N, N, + N, O, O, O, O, O, O, N, + N, O, O, O, O, O, O, N, + O, O, O, O, O, O, O, O, + O, O, O, O, O, O, O, O, + N, O, O, O, O, O, O, N, + N, O, O, O, O, O, O, N, + N, N, N, O, O, N, N, N, ] return logo @@ -80,14 +82,14 @@ def __lemon(self): N = Colors.Nothing.value Y = Colors.Yellow.value logo = [ - N, N, N, N, N, N, N, N, - N, N, N, Y, Y, N, N, N, - N, N, Y, Y, Y, Y, N, N, - Y, Y, Y, Y, Y, Y, Y, Y, - Y, Y, Y, Y, Y, Y, Y, Y, - N, Y, Y, Y, Y, Y, Y, N, - N, N, Y, Y, Y, Y, N, N, - N, N, N, Y, Y, N, N, N, + N, N, N, N, N, N, N, N, + N, N, N, Y, Y, N, N, N, + N, N, Y, Y, Y, Y, N, N, + Y, Y, Y, Y, Y, Y, Y, Y, + Y, Y, Y, Y, Y, Y, Y, Y, + N, Y, Y, Y, Y, Y, Y, N, + N, N, Y, Y, Y, Y, N, N, + N, N, N, Y, Y, N, N, N, ] return logo @@ -95,21 +97,22 @@ def __unknown(self): N = Colors.Nothing.value R = Colors.Red.value logo = [ - N, N, N, R, R, N, N, N, - N, N, R, N, N, R, N, N, - N, R, N, N, N, N, R, N, - N, R, N, N, N, N, R, N, - N, N, R, N, N, R, N, N, - N, N, N, N, R, N, N, N, - N, N, N, N, N, N, N, N, - N, N, N, N, R, N, N, N, + N, N, N, R, R, N, N, N, + N, N, R, N, N, R, N, N, + N, R, N, N, N, N, R, N, + N, R, N, N, N, N, R, N, + N, N, R, N, N, R, N, N, + N, N, N, N, R, N, N, N, + N, N, N, N, N, N, N, N, + N, N, N, N, R, N, N, N, ] return logo def __init__(self): self.s = SenseHat() self.s.low_light = True - self.__displayImage(self.__raspberry())#Flash the raspberry pi logo at initialization + # Flash the raspberry pi logo at initialization + self.__displayImage(self.__raspberry()) time.sleep(1) self.s.clear() @@ -133,4 +136,3 @@ def displayImage(self, strImage): else: self.__displayImage(self.__unknown()) self.s.clear() - diff --git a/modules/SenseHatDisplay/app/MessageParser.py b/modules/SenseHatDisplay/app/MessageParser.py index dc8c484..eca2672 100644 --- a/modules/SenseHatDisplay/app/MessageParser.py +++ b/modules/SenseHatDisplay/app/MessageParser.py @@ -1,5 +1,5 @@ class MessageParser: - #Returns the highest probablity tag in the json object (takes the output as json.loads as input) + # Returns the highest probablity tag in the json object (takes the output as json.loads as input) def highestProbabilityTagMeetingThreshold(self, allTagsAndProbability, threshold): highestProbabilityTag = 'none' highestProbability = 0 @@ -7,4 +7,4 @@ def highestProbabilityTagMeetingThreshold(self, allTagsAndProbability, threshold if item['Probability'] > highestProbability and item['Probability'] > threshold: highestProbability = item['Probability'] highestProbabilityTag = item['Tag'] - return highestProbabilityTag \ No newline at end of file + return highestProbabilityTag diff --git a/modules/SenseHatDisplay/app/main.py b/modules/SenseHatDisplay/app/main.py index 0127186..d5563ad 100644 --- a/modules/SenseHatDisplay/app/main.py +++ b/modules/SenseHatDisplay/app/main.py @@ -8,7 +8,7 @@ import sys import iothub_client # pylint: disable=E0611 -# Disabling linting that is not supported by Pylint for C extensions such as iothub_client. See issue https://github.com/PyCQA/pylint/issues/1955 +# Disabling linting that is not supported by Pylint for C extensions such as iothub_client. See issue https://github.com/PyCQA/pylint/issues/1955 from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientRetryPolicy from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue import DisplayManager @@ -20,21 +20,25 @@ RECEIVE_CALLBACKS = 0 # receive_message_callback is invoked when an incoming message arrives on the specified input queue + + def receive_message_callback(message, HubManager): global RECEIVE_CALLBACKS RECEIVE_CALLBACKS += 1 - print("Received message #: "+ str(RECEIVE_CALLBACKS)) + print("Received message #: " + str(RECEIVE_CALLBACKS)) message_buffer = message.get_bytearray() - body=message_buffer[:len(message_buffer)].decode('utf-8') + body = message_buffer[:len(message_buffer)].decode('utf-8') allTagsAndProbability = json.loads(body) try: - DISPLAY_MANAGER.displayImage(MESSAGE_PARSER.highestProbabilityTagMeetingThreshold(allTagsAndProbability, THRESHOLD)) + DISPLAY_MANAGER.displayImage(MESSAGE_PARSER.highestProbabilityTagMeetingThreshold( + allTagsAndProbability, THRESHOLD)) except Exception as error: print("Message body: " + body) - print ( error ) + print(error) return IoTHubMessageDispositionResult.ACCEPTED + class HubManager(object): def __init__(self): @@ -43,21 +47,19 @@ def __init__(self): self.client_protocol = protocol self.client = IoTHubModuleClient() self.client.create_from_environment(protocol) - self.client.set_option("logtrace", 1)#enables MQTT logging + self.client.set_option("logtrace", 1) # enables MQTT logging self.client.set_option("messageTimeout", 10000) - # sets the callback when a message arrives on "input1" queue. Messages sent to + # sets the callback when a message arrives on "input1" queue. Messages sent to # other inputs or to the default will be silently discarded. - self.client.set_message_callback("input1", receive_message_callback, self) - print ( "Module is now waiting for messages in the input1 queue.") - - - + self.client.set_message_callback( + "input1", receive_message_callback, self) + print("Module is now waiting for messages in the input1 queue.") def main(): try: - print ( "Starting the SenseHat module...") + print("Starting the SenseHat module...") global DISPLAY_MANAGER global MESSAGE_PARSER @@ -69,10 +71,11 @@ def main(): time.sleep(1000) except IoTHubError as iothub_error: - print ( "Unexpected error %s from IoTHub" % iothub_error ) + print("Unexpected error %s from IoTHub" % iothub_error) return except KeyboardInterrupt: - print ( "IoTHubClient sample stopped" ) + print("IoTHubClient sample stopped") + if __name__ == '__main__': try: @@ -80,7 +83,7 @@ def main(): THRESHOLD = float(os.getenv('THRESHOLD', 0)) except Exception as error: - print ( error ) + print(error) sys.exit(1) - main() \ No newline at end of file + main() diff --git a/modules/SenseHatDisplay/test/IntegrationTests.py b/modules/SenseHatDisplay/test/IntegrationTests.py index 26965ad..28cf247 100644 --- a/modules/SenseHatDisplay/test/IntegrationTests.py +++ b/modules/SenseHatDisplay/test/IntegrationTests.py @@ -1,8 +1,8 @@ +from DisplayManager import DisplayManager +import DisplayManager import time import sys sys.path.insert(0, '../app/') -import DisplayManager -from DisplayManager import DisplayManager try: @@ -27,4 +27,4 @@ displayManager.displayImage('none') time.sleep(1) except Exception as exception: - print ( "Error while executing Display Manager tests: (%s)" % exception) \ No newline at end of file + print("Error while executing Display Manager tests: (%s)" % exception) diff --git a/modules/SenseHatDisplay/test/UnitTests.py b/modules/SenseHatDisplay/test/UnitTests.py index 811cbe0..2708391 100644 --- a/modules/SenseHatDisplay/test/UnitTests.py +++ b/modules/SenseHatDisplay/test/UnitTests.py @@ -1,18 +1,26 @@ +import app.MessageParser import unittest import json import sys sys.path.insert(0, '../') -import app.MessageParser + class UnitTests(unittest.TestCase): def test_HighestProbabilityTagMeetingThreshold(self): MessageParser = app.MessageParser.MessageParser() - message1=json.loads("[{\"Tag\": \"banana\",\"Probability\": 0.4}, {\"Tag\": \"apple\",\"Probability\": 0.3}]") - self.assertEqual(MessageParser.highestProbabilityTagMeetingThreshold(message1, 0.5), 'none') - message2=json.loads("[{\"Tag\": \"banana\",\"Probability\": 0.4}, {\"Tag\": \"apple\",\"Probability\": 0.5}]") - self.assertEqual(MessageParser.highestProbabilityTagMeetingThreshold(message2, 0.3), 'apple') - message3=json.loads("[{\"Probability\": 0.038001421838998795, \"Tag\": \"apple\"}, {\"Probability\": 0.38567957282066345, \"Tag\": \"banana\"}]") - self.assertEqual(MessageParser.highestProbabilityTagMeetingThreshold(message3, 0.3), 'banana') + message1 = json.loads( + "[{\"Tag\": \"banana\",\"Probability\": 0.4}, {\"Tag\": \"apple\",\"Probability\": 0.3}]") + self.assertEqual( + MessageParser.highestProbabilityTagMeetingThreshold(message1, 0.5), 'none') + message2 = json.loads( + "[{\"Tag\": \"banana\",\"Probability\": 0.4}, {\"Tag\": \"apple\",\"Probability\": 0.5}]") + self.assertEqual(MessageParser.highestProbabilityTagMeetingThreshold( + message2, 0.3), 'apple') + message3 = json.loads( + "[{\"Probability\": 0.038001421838998795, \"Tag\": \"apple\"}, {\"Probability\": 0.38567957282066345, \"Tag\": \"banana\"}]") + self.assertEqual(MessageParser.highestProbabilityTagMeetingThreshold( + message3, 0.3), 'banana') + if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main()