Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .messageReceiver.py.swp
Binary file not shown.
14 changes: 11 additions & 3 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
device_info:
device-info:
deviceId: 1
deviceType: 'RaspberryPi'
message_server:
mac-address: '00:0c:29:59:59:7b'
rabbit-mq:
username: 'speedtest'
password: '1nfield'
uri: 'http://192.168.56.222/speedtest/postResult.json'
send-queue: 'sendQueue'
syslog-queue: 'syslogQueue'
message-receiver:
uri: 'http://scottf.ddns.net/speedtest/postResult.json'
configuration-server:
uri: 'http://scottf.ddns.net/speedtest/postResult.json'
general:
ping-timeout: 180 #seconds
125 changes: 87 additions & 38 deletions messageReceiver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python

import pika
import subprocess
import sys
Expand All @@ -13,13 +14,28 @@
import json
import requests

#Load configuration file
with open('/opt/speedtest/config.yaml', 'r') as f:
config = yaml.load(f)

#Global variables
deviceId = config["device_info"]["deviceId"]
deviceType = config["device_info"]["deviceType"]
uri = config["message_server"]["uri"]
##Device configuration
deviceId = config["device-info"]["deviceId"]
deviceType = config["device-info"]["deviceType"]
macAddress = config["device-info"]["mac-address"]

##Message receiver (Remote REST API)
uri = config["message-receiver"]["uri"]

##Rabbit MQ related configuration
username = config["rabbit-mq"]["username"]
password = config["rabbit-mq"]["password"]
sendQueue = config["rabbit-mq"]["send-queue"]
syslogQueue = config["rabbit-mq"]["syslog-queue"]

##General configuration
pingTimeOut = config["general"]["ping-timeout"]

sessionId = ""

#Initialise syslogger
Expand All @@ -29,12 +45,13 @@
syslog.addHandler(handler)

#Initialise rabbitmq connection
credentials = pika.PlainCredentials('speedtest', '1nfield')
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='sendQueue')
channel.queue_declare(queue=sendQueue)

#Log to syslog
def syslogger(message, severity):
logtime = str(datetime.now())
if severity == "info":
Expand All @@ -43,86 +60,118 @@ def syslogger(message, severity):
syslog.critical(logtime + ': ' + message)
elif severity == "debug":
syslog.debug(logtime + ': ' + message)

def rabbitmqLog (message, queue, **kwargs):
if kwargs.has_key('exchange'):
exchange = kwargs.get('exchange', None)
else:
exchange = ''

channel.basic_publish(exchange=exchange,routing_key=queue,body=message)

#Method for sending and logging messages
def processMessage(message, **kwargs):
#def processMessage(message, severity='info', queue=None, exchange='', remotelog=True, syslog=True, rabbitlog=True):
severity = 'info'
exchange = ''
remotelog = True
syslog = True
rabbitlog = True

if kwargs.has_key('severity'):
severity = kwargs.get('severity', None)

if not kwargs.has_key('queue'):
queue = sendQueue #default queue
else:
queue = kwargs.get('queue', None)

if kwargs.has_key('exchange'):
exchange = kwargs.get('exchange', None)

if kwargs.has_key('syslog'):
syslog = kwargs.get('syslog', None)

if kwargs.has_key('rabbitlog'):
rabbitlog = kwargs.get('rabbitlog')

#Log to syslog?
if syslog:
syslogger(message, severity)

#Log to rabbitMQ queue?
if rabbitlog:
rabbitmqLog(message, queue=queue)

#Send log to remote log server
if remotelog:
remoteLogger(message)

#Make sure that the speed test device can reach Internet
def waitForPing( ip ):
waiting =True
counter =0
logMsg = 'Testing Internet Connectivity.'
syslogger(logMsg, 'info')
remoteLogger(logMsg)
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
processMessage(logMsg)
while waiting:
t = os.system('ping -c 1 -W 1 {}'.format(ip)+'> /dev/null 2>&1')
if not t:
waiting=False
logMsg = 'Ping reply from '+ip
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
syslogger(logMsg, 'info')
remoteLogger(logMsg)
processMessage(logMsg)
return True
else:
counter +=1
if (counter%30 == 0):
logMsg = 'Trying to connect to Internet for '+str(counter)+' seconds'
syslogger(logMsg, 'info')
remoteLogger(logMsg)
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
if counter == 300: # this will prevent an never ending loop, set to the number of tries you think it will require
processMessage(logMsg)
if counter == pingTimeOut: # this will prevent an never ending loop, set to the number of tries you think it will require
waiting = False
logMsg = 'Ping timeout after trying for '+str(counter)+' seconds!\nRestart the speed test by reconnecting the ethernet cable to the speed test device.'
syslogger(logMsg, 'info')
remoteLogger(logMsg)
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
processMessage(logMsg)
return False

def speedtest():
cmd = ['/usr/bin/python', '-u', '/opt/speedtest/speedtest_cli.py']

logMsg = 'Initiating Speed Test'
syslogger(logMsg, 'info')
remoteLogger(logMsg)
processMessage(logMsg)

p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in iter(p.stdout.readline,''):
sys.stdout.flush()
remoteLogger(line)
channel.basic_publish(exchange='',
routing_key="sendQueue",
body=line)
processMessage(line)
p.wait()
return not p.returncode

#Implement function to validate that dns can be resolved

def remoteLogger(msg):
#TODO: error handling
headers = {'Content-type': 'application/x-www-form-urlencoded ', 'Accept': 'text/plain'}
json_data = json.dumps({'deviceId':deviceId, 'sessionId':str(sessionId), 'timeCreated':str(datetime.now()), 'msg':msg})
payload = {'speedtest':json_data}
r = requests.post(uri, data=payload, headers=headers)
#If http error log this to syslog and rabbitmq retry?
print r

def callback(ch, method, properties, body):
global sessionId
sessionId = uuid.uuid1()
logMsg = 'Speed device has been plugged in. Initiating speed test process...'
syslogger(logMsg, 'info')
remoteLogger(logMsg)

channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
#print " [x] Received %r" % (body,)
logMsg = 'Speed device has been plugged in. Initiating speed test process...' #Start tag to identify session start
processMessage(logMsg)
#Retry if unsuccessful
if waitForPing("8.8.8.8") and speedtest():
logMsg = 'Speed Test Completed!'
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
syslogger(logMsg, 'info')
remoteLogger(logMsg)
processMessage(logMsg)
else:
logMsg = 'Speed Test Unsuccessful!'
channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg)
syslogger(logMsg, 'critical')
remoteLogger(logMsg)
processMessage(logMsg, severity='critical')

channel.queue_declare(queue='linkup')
channel.queue_declare(queue=syslogQueue)

channel.basic_consume(callback,
queue='linkup',
queue=syslogQueue,
no_ack=True)

syslogger('Started speed test message receiver', 'info')
Expand Down
16 changes: 13 additions & 3 deletions pushMessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@
from datetime import datetime
import logging
import logging.handlers
import yaml

with open('/opt/speedtest/config.yaml', 'r') as f:
config = yaml.load(f)

#Global variables
##Rabbit MQ configuration
username = config["rabbit-mq"]["username"]
password = config["rabbit-mq"]["password"]
syslogQueue = config["rabbit-mq"]["syslog-queue"]

syslog = logging.getLogger('Syslog')
syslog.setLevel(logging.DEBUG)
Expand All @@ -30,15 +40,15 @@ def syslogger(message, severity):
message = sys.stdin.readline().rstrip()

if len(message) > 1:
credentials = pika.PlainCredentials('speedtest', '1nfield')
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()

channel.queue_declare(queue='linkup')
channel.queue_declare(queue=syslogQueue)

channel.basic_publish(exchange='',
routing_key='linkup',
routing_key=syslogQueue,
body=message)

connection.close()
Expand Down
16 changes: 12 additions & 4 deletions sendQueue.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
#!/usr/bin/env python

import pika
import yaml

credentials = pika.PlainCredentials('speedtest', '1nfield')
with open('/opt/speedtest/config.yaml', 'r') as f:
config = yaml.load(f)

##Rabbit MQ configuration
username = config["rabbit-mq"]["username"]
password = config["rabbit-mq"]["password"]
sendQueue = config["rabbit-mq"]["send-queue"]

credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='sendQueue')
channel.queue_declare(queue=sendQueue)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)

channel.basic_consume(callback,
queue='sendQueue',
queue=sendQueue,
no_ack=True)

channel.start_consuming()