diff --git a/.messageReceiver.py.swp b/.messageReceiver.py.swp new file mode 100644 index 0000000..286b25a Binary files /dev/null and b/.messageReceiver.py.swp differ diff --git a/config.yaml b/config.yaml index 07ea648..b7d526f 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,15 @@ -device_info: +device-info: deviceId: 1 deviceType: 'RaspberryPi' -message_server: + mac-address: '00:0c:29:59:59:7b' +rabbit-mq: username: 'speedtest' password: '1nfield' - uri: 'http://192.168.56.222/speedtest/postResult.json' + send-queue: 'sendQueue' + syslog-queue: 'syslogQueue' +message-receiver: + uri: 'http://scottf.ddns.net/speedtest/postResult.json' +configuration-server: + uri: 'http://scottf.ddns.net/speedtest/postResult.json' +general: + ping-timeout: 180 #seconds diff --git a/messageReceiver.py b/messageReceiver.py index 94a07f9..430643d 100755 --- a/messageReceiver.py +++ b/messageReceiver.py @@ -1,4 +1,5 @@ #!/usr/bin/env python + import pika import subprocess import sys @@ -13,13 +14,28 @@ import json import requests +#Load configuration file with open('/opt/speedtest/config.yaml', 'r') as f: config = yaml.load(f) #Global variables -deviceId = config["device_info"]["deviceId"] -deviceType = config["device_info"]["deviceType"] -uri = config["message_server"]["uri"] +##Device configuration +deviceId = config["device-info"]["deviceId"] +deviceType = config["device-info"]["deviceType"] +macAddress = config["device-info"]["mac-address"] + +##Message receiver (Remote REST API) +uri = config["message-receiver"]["uri"] + +##Rabbit MQ related configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +sendQueue = config["rabbit-mq"]["send-queue"] +syslogQueue = config["rabbit-mq"]["syslog-queue"] + +##General configuration +pingTimeOut = config["general"]["ping-timeout"] + sessionId = "" #Initialise syslogger @@ -29,12 +45,13 @@ syslog.addHandler(handler) #Initialise rabbitmq connection -credentials = pika.PlainCredentials('speedtest', '1nfield') +credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() -channel.queue_declare(queue='sendQueue') +channel.queue_declare(queue=sendQueue) +#Log to syslog def syslogger(message, severity): logtime = str(datetime.now()) if severity == "info": @@ -43,86 +60,118 @@ def syslogger(message, severity): syslog.critical(logtime + ': ' + message) elif severity == "debug": syslog.debug(logtime + ': ' + message) + +def rabbitmqLog (message, queue, **kwargs): + if kwargs.has_key('exchange'): + exchange = kwargs.get('exchange', None) + else: + exchange = '' + + channel.basic_publish(exchange=exchange,routing_key=queue,body=message) + +#Method for sending and logging messages +def processMessage(message, **kwargs): +#def processMessage(message, severity='info', queue=None, exchange='', remotelog=True, syslog=True, rabbitlog=True): + severity = 'info' + exchange = '' + remotelog = True + syslog = True + rabbitlog = True + + if kwargs.has_key('severity'): + severity = kwargs.get('severity', None) + + if not kwargs.has_key('queue'): + queue = sendQueue #default queue + else: + queue = kwargs.get('queue', None) + + if kwargs.has_key('exchange'): + exchange = kwargs.get('exchange', None) + + if kwargs.has_key('syslog'): + syslog = kwargs.get('syslog', None) + + if kwargs.has_key('rabbitlog'): + rabbitlog = kwargs.get('rabbitlog') + + #Log to syslog? + if syslog: + syslogger(message, severity) + + #Log to rabbitMQ queue? + if rabbitlog: + rabbitmqLog(message, queue=queue) + + #Send log to remote log server + if remotelog: + remoteLogger(message) +#Make sure that the speed test device can reach Internet def waitForPing( ip ): waiting =True counter =0 logMsg = 'Testing Internet Connectivity.' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) + processMessage(logMsg) while waiting: t = os.system('ping -c 1 -W 1 {}'.format(ip)+'> /dev/null 2>&1') if not t: waiting=False logMsg = 'Ping reply from '+ip - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) return True else: counter +=1 if (counter%30 == 0): logMsg = 'Trying to connect to Internet for '+str(counter)+' seconds' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - if counter == 300: # this will prevent an never ending loop, set to the number of tries you think it will require + processMessage(logMsg) + if counter == pingTimeOut: # this will prevent an never ending loop, set to the number of tries you think it will require waiting = False logMsg = 'Ping timeout after trying for '+str(counter)+' seconds!\nRestart the speed test by reconnecting the ethernet cable to the speed test device.' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) + processMessage(logMsg) return False def speedtest(): cmd = ['/usr/bin/python', '-u', '/opt/speedtest/speedtest_cli.py'] logMsg = 'Initiating Speed Test' - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) p = subprocess.Popen(cmd, stdout=subprocess.PIPE) for line in iter(p.stdout.readline,''): sys.stdout.flush() - remoteLogger(line) - channel.basic_publish(exchange='', - routing_key="sendQueue", - body=line) + processMessage(line) p.wait() return not p.returncode +#Implement function to validate that dns can be resolved + def remoteLogger(msg): + #TODO: error handling headers = {'Content-type': 'application/x-www-form-urlencoded ', 'Accept': 'text/plain'} json_data = json.dumps({'deviceId':deviceId, 'sessionId':str(sessionId), 'timeCreated':str(datetime.now()), 'msg':msg}) payload = {'speedtest':json_data} r = requests.post(uri, data=payload, headers=headers) + #If http error log this to syslog and rabbitmq retry? + print r def callback(ch, method, properties, body): global sessionId sessionId = uuid.uuid1() - logMsg = 'Speed device has been plugged in. Initiating speed test process...' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - #print " [x] Received %r" % (body,) + logMsg = 'Speed device has been plugged in. Initiating speed test process...' #Start tag to identify session start + processMessage(logMsg) #Retry if unsuccessful if waitForPing("8.8.8.8") and speedtest(): logMsg = 'Speed Test Completed!' - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) else: logMsg = 'Speed Test Unsuccessful!' - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'critical') - remoteLogger(logMsg) + processMessage(logMsg, severity='critical') -channel.queue_declare(queue='linkup') +channel.queue_declare(queue=syslogQueue) channel.basic_consume(callback, - queue='linkup', + queue=syslogQueue, no_ack=True) syslogger('Started speed test message receiver', 'info') diff --git a/pushMessage.py b/pushMessage.py index d5028fa..241b3ee 100644 --- a/pushMessage.py +++ b/pushMessage.py @@ -6,6 +6,16 @@ from datetime import datetime import logging import logging.handlers +import yaml + +with open('/opt/speedtest/config.yaml', 'r') as f: + config = yaml.load(f) + +#Global variables +##Rabbit MQ configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +syslogQueue = config["rabbit-mq"]["syslog-queue"] syslog = logging.getLogger('Syslog') syslog.setLevel(logging.DEBUG) @@ -30,15 +40,15 @@ def syslogger(message, severity): message = sys.stdin.readline().rstrip() if len(message) > 1: - credentials = pika.PlainCredentials('speedtest', '1nfield') + credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() - channel.queue_declare(queue='linkup') + channel.queue_declare(queue=syslogQueue) channel.basic_publish(exchange='', - routing_key='linkup', + routing_key=syslogQueue, body=message) connection.close() diff --git a/sendQueue.py b/sendQueue.py index 590de0a..e9d9224 100644 --- a/sendQueue.py +++ b/sendQueue.py @@ -1,12 +1,21 @@ #!/usr/bin/env python import pika +import yaml -credentials = pika.PlainCredentials('speedtest', '1nfield') +with open('/opt/speedtest/config.yaml', 'r') as f: + config = yaml.load(f) + +##Rabbit MQ configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +sendQueue = config["rabbit-mq"]["send-queue"] + +credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() -channel.queue_declare(queue='sendQueue') +channel.queue_declare(queue=sendQueue) print ' [*] Waiting for messages. To exit press CTRL+C' @@ -14,8 +23,7 @@ def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, - queue='sendQueue', + queue=sendQueue, no_ack=True) channel.start_consuming() -