From f40c19f5f25a46917b698c1c119fb0ababc239ed Mon Sep 17 00:00:00 2001 From: mellanon Date: Fri, 7 Aug 2015 18:14:57 -1200 Subject: [PATCH] Cleaned up logging code and setting --- .messageReceiver.py.swp | Bin 0 -> 16384 bytes config.yaml | 14 ++++- messageReceiver.py | 125 ++++++++++++++++++++++++++++------------ pushMessage.py | 16 ++++- sendQueue.py | 16 +++-- 5 files changed, 123 insertions(+), 48 deletions(-) create mode 100644 .messageReceiver.py.swp diff --git a/.messageReceiver.py.swp b/.messageReceiver.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..286b25acbc73a098efc45f00186c151cf1ea2898 GIT binary patch literal 16384 zcmeHOYm6jS6~3YZgRmGiKKR4k>BZ{a>8{xUabdI}Fbhj3JM8T2ENfiXvAXMacRAHn zwe^_U&borcm>A^s2NNGLK7tY;Q3FK(xQ4_SA;B1bh(;1)gb2nTM*N`>^*i@b{n+iE zh4@30s>#>gb?>?7o_p@Ox6V1Yrn%>i#xmb)e}Lh7J7eGf>Q9cmt8oGQ*0qeqVVF|! zc9-u3*~#sQP(L3=>3k9i;ie)<=X)YaoQ_x(E#Ym7*pB+=!J{B4G*D>Z4QL=s+qLU1 zW7k~0u!mZgW_R*;zT?OnU{o|IG*D=u&_JPqLIZ^c3Jnw*C^S%L;Qv(vX>jlW4DclI2=FD~ZXgEMfZKo%1A71tTnStV z{Bj3lKLmaNJPh0qG=T-+-;c$CT_6;Q31#`xS5oxF5I=xD)s^ z&;^!&UBHWPW9%8=XTT%C319*E=OvIIcnWwDco_IB&;<4aR{(#XVeF^CBfx3kQ@{#P z175qBvA+T@0KWwu0PY9cz>UB&7cuq(@D1Qz;1tjV9N-3E2XGni@`a537!}-&;kwsi@+T4 z8s_x(z|Vnufm473)B&3Be<11e9pK}@KHw@~7jV`!!*P|8e&UCnj)=?nT24AuZh1jF zv?`OUC)db^jQE~JBJqLvYj`P#I&1`~hy#)Gr4aMcO1(`l?c0`Q)Cx1I<@G)CRu72R z*WyetSF~lfw}vsPqRhRBR~A`m+1U_0$zs9Nu7iPgaYt_fae57iV+dYL@Rk!mcTTID z_b+>p9~@UKdSNOKCbx=YYQ?9Pmd2$yydKA5v{@955a%m7HxN&@~j9^HeliV9ijK4Hzi_ z(J$;`W6OyKgLj&jc^lKLuzC+o2E^InLk7`$V)IsC-YNPB?Zj;Rt1F+mNfw z6mK`(Ih|IB(vsX(_}IQuGO83>t$G6NsRHkkQ3 zpOr0V2g^V+Y+Cuv-Eom9|B?I7GL zS1Qtol;GrCoMp3QKO_4iJRR*S$d$Y9ik9!hqAc&~`3_JqWSG{Lew2lQt=pE>E(g_` z%ko30tQ#gNCj8nD?!6ig&OCm$V&RG)_3+S!umetD8+jX!eKXo^zz)shG7QSHd~mn*%P4y6SEFSQrxwODu@fa3g@k2d?MsHpiG+Z>Fon?$Xo;8@p!HirX=(_E7 zj?H#NAY#X#J$`JK2x??JSo^cbS*f&N)oI!+c2e*&+E7n|0VAcjTJHq0QexDk)qquQ zH$xqd(qrSwsm;VpA~yA4Y-V#3si~L4S(PpYhffg;AM3;&A6OQ9LyYudsm1ywY8n?P zB&%iRt%8wy!J3y>r4PkdZ#lfiZ(3Q)|EO|D+^GzrAoTe|#gF+HPj=t0LZ9jC}a=XH| zytK>1NCahT?2U8PzfIIw*H&y*xRda<++UnAP4hOqXvVx=6vioZ2s262#N2-}fsyID zEc0CRn^-gH{F_aVYbtYZD97)yOh#6wq0{B&mQ3IL4s&r}%G{JDSyOufri6AQyz#qp zK1<^HrWeeMV3S9EJdp!N@&7@@pGOg=Qv7eu_y36a{WrkVz_)=zzyk0nV)uK1zaTz; z26!BJ4EQpz1{?%-1D69k0E*!s10DoU0RgZI90J}8ya!;wONiG$4_peogt+`MAO_li z0Immi0T%%eAU4l{j{91bhuR4IBhYz(v5H5o13Gd=B^oumrq{xcWKZo4{v)yMW>< zG%#5M#+OM_Cr;O~MI~7e4>|ZJr6~4-lp8n3o=9QwW4n*@qmFleq~}K^8OnqJdF}c^ zlh}MMM8M#s2sOx4SXx`1%5G|<93BaOepK5_v<;sr)l+6+N-I4>x&G7FL+b_GQ*T2t zrFzPMlvxdghPBDa?A&uarH|p^pC=hv=~Sbd#%Eex6i=_Em2|#CCbQm-+w}VBwXau} z)dO=&mUhFHTTOcfe8ZtMnx4Ge+am`@CI%{lIM_rw>N^P6oz{kmF87?=mcm}-BXV;!E5x{z^E4UG+3RfiHw#A+ z>5Tk;8_S4UNy5^qNaH?7t87RtSv5fnYBes2a;G^`d%P(`z#~6Hbmej{u#qgKs8Ft` zA!E?YZQHhZ=`a}!PaT=E^kgX8?jwyLBQFHxsFe~8$71Bu+VruY(uc~?^P){+Osx=t}$U*9LQsh$W}Kbqp|~=RuIf;W_$^FCiTp5 z!4BZ?xCry@wx8i3NX;VoBKUt}0psT$GAiZBP(jr$)l@A@uf7P197IF*ii`ZaOqNnX4}_Q#Shaqf%`)^2p&o`#~;= literal 0 HcmV?d00001 diff --git a/config.yaml b/config.yaml index 07ea648..b7d526f 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,15 @@ -device_info: +device-info: deviceId: 1 deviceType: 'RaspberryPi' -message_server: + mac-address: '00:0c:29:59:59:7b' +rabbit-mq: username: 'speedtest' password: '1nfield' - uri: 'http://192.168.56.222/speedtest/postResult.json' + send-queue: 'sendQueue' + syslog-queue: 'syslogQueue' +message-receiver: + uri: 'http://scottf.ddns.net/speedtest/postResult.json' +configuration-server: + uri: 'http://scottf.ddns.net/speedtest/postResult.json' +general: + ping-timeout: 180 #seconds diff --git a/messageReceiver.py b/messageReceiver.py index 94a07f9..430643d 100755 --- a/messageReceiver.py +++ b/messageReceiver.py @@ -1,4 +1,5 @@ #!/usr/bin/env python + import pika import subprocess import sys @@ -13,13 +14,28 @@ import json import requests +#Load configuration file with open('/opt/speedtest/config.yaml', 'r') as f: config = yaml.load(f) #Global variables -deviceId = config["device_info"]["deviceId"] -deviceType = config["device_info"]["deviceType"] -uri = config["message_server"]["uri"] +##Device configuration +deviceId = config["device-info"]["deviceId"] +deviceType = config["device-info"]["deviceType"] +macAddress = config["device-info"]["mac-address"] + +##Message receiver (Remote REST API) +uri = config["message-receiver"]["uri"] + +##Rabbit MQ related configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +sendQueue = config["rabbit-mq"]["send-queue"] +syslogQueue = config["rabbit-mq"]["syslog-queue"] + +##General configuration +pingTimeOut = config["general"]["ping-timeout"] + sessionId = "" #Initialise syslogger @@ -29,12 +45,13 @@ syslog.addHandler(handler) #Initialise rabbitmq connection -credentials = pika.PlainCredentials('speedtest', '1nfield') +credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() -channel.queue_declare(queue='sendQueue') +channel.queue_declare(queue=sendQueue) +#Log to syslog def syslogger(message, severity): logtime = str(datetime.now()) if severity == "info": @@ -43,86 +60,118 @@ def syslogger(message, severity): syslog.critical(logtime + ': ' + message) elif severity == "debug": syslog.debug(logtime + ': ' + message) + +def rabbitmqLog (message, queue, **kwargs): + if kwargs.has_key('exchange'): + exchange = kwargs.get('exchange', None) + else: + exchange = '' + + channel.basic_publish(exchange=exchange,routing_key=queue,body=message) + +#Method for sending and logging messages +def processMessage(message, **kwargs): +#def processMessage(message, severity='info', queue=None, exchange='', remotelog=True, syslog=True, rabbitlog=True): + severity = 'info' + exchange = '' + remotelog = True + syslog = True + rabbitlog = True + + if kwargs.has_key('severity'): + severity = kwargs.get('severity', None) + + if not kwargs.has_key('queue'): + queue = sendQueue #default queue + else: + queue = kwargs.get('queue', None) + + if kwargs.has_key('exchange'): + exchange = kwargs.get('exchange', None) + + if kwargs.has_key('syslog'): + syslog = kwargs.get('syslog', None) + + if kwargs.has_key('rabbitlog'): + rabbitlog = kwargs.get('rabbitlog') + + #Log to syslog? + if syslog: + syslogger(message, severity) + + #Log to rabbitMQ queue? + if rabbitlog: + rabbitmqLog(message, queue=queue) + + #Send log to remote log server + if remotelog: + remoteLogger(message) +#Make sure that the speed test device can reach Internet def waitForPing( ip ): waiting =True counter =0 logMsg = 'Testing Internet Connectivity.' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) + processMessage(logMsg) while waiting: t = os.system('ping -c 1 -W 1 {}'.format(ip)+'> /dev/null 2>&1') if not t: waiting=False logMsg = 'Ping reply from '+ip - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) return True else: counter +=1 if (counter%30 == 0): logMsg = 'Trying to connect to Internet for '+str(counter)+' seconds' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - if counter == 300: # this will prevent an never ending loop, set to the number of tries you think it will require + processMessage(logMsg) + if counter == pingTimeOut: # this will prevent an never ending loop, set to the number of tries you think it will require waiting = False logMsg = 'Ping timeout after trying for '+str(counter)+' seconds!\nRestart the speed test by reconnecting the ethernet cable to the speed test device.' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) + processMessage(logMsg) return False def speedtest(): cmd = ['/usr/bin/python', '-u', '/opt/speedtest/speedtest_cli.py'] logMsg = 'Initiating Speed Test' - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) p = subprocess.Popen(cmd, stdout=subprocess.PIPE) for line in iter(p.stdout.readline,''): sys.stdout.flush() - remoteLogger(line) - channel.basic_publish(exchange='', - routing_key="sendQueue", - body=line) + processMessage(line) p.wait() return not p.returncode +#Implement function to validate that dns can be resolved + def remoteLogger(msg): + #TODO: error handling headers = {'Content-type': 'application/x-www-form-urlencoded ', 'Accept': 'text/plain'} json_data = json.dumps({'deviceId':deviceId, 'sessionId':str(sessionId), 'timeCreated':str(datetime.now()), 'msg':msg}) payload = {'speedtest':json_data} r = requests.post(uri, data=payload, headers=headers) + #If http error log this to syslog and rabbitmq retry? + print r def callback(ch, method, properties, body): global sessionId sessionId = uuid.uuid1() - logMsg = 'Speed device has been plugged in. Initiating speed test process...' - syslogger(logMsg, 'info') - remoteLogger(logMsg) - - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - #print " [x] Received %r" % (body,) + logMsg = 'Speed device has been plugged in. Initiating speed test process...' #Start tag to identify session start + processMessage(logMsg) #Retry if unsuccessful if waitForPing("8.8.8.8") and speedtest(): logMsg = 'Speed Test Completed!' - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'info') - remoteLogger(logMsg) + processMessage(logMsg) else: logMsg = 'Speed Test Unsuccessful!' - channel.basic_publish(exchange='',routing_key="sendQueue",body=logMsg) - syslogger(logMsg, 'critical') - remoteLogger(logMsg) + processMessage(logMsg, severity='critical') -channel.queue_declare(queue='linkup') +channel.queue_declare(queue=syslogQueue) channel.basic_consume(callback, - queue='linkup', + queue=syslogQueue, no_ack=True) syslogger('Started speed test message receiver', 'info') diff --git a/pushMessage.py b/pushMessage.py index d5028fa..241b3ee 100644 --- a/pushMessage.py +++ b/pushMessage.py @@ -6,6 +6,16 @@ from datetime import datetime import logging import logging.handlers +import yaml + +with open('/opt/speedtest/config.yaml', 'r') as f: + config = yaml.load(f) + +#Global variables +##Rabbit MQ configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +syslogQueue = config["rabbit-mq"]["syslog-queue"] syslog = logging.getLogger('Syslog') syslog.setLevel(logging.DEBUG) @@ -30,15 +40,15 @@ def syslogger(message, severity): message = sys.stdin.readline().rstrip() if len(message) > 1: - credentials = pika.PlainCredentials('speedtest', '1nfield') + credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() - channel.queue_declare(queue='linkup') + channel.queue_declare(queue=syslogQueue) channel.basic_publish(exchange='', - routing_key='linkup', + routing_key=syslogQueue, body=message) connection.close() diff --git a/sendQueue.py b/sendQueue.py index 590de0a..e9d9224 100644 --- a/sendQueue.py +++ b/sendQueue.py @@ -1,12 +1,21 @@ #!/usr/bin/env python import pika +import yaml -credentials = pika.PlainCredentials('speedtest', '1nfield') +with open('/opt/speedtest/config.yaml', 'r') as f: + config = yaml.load(f) + +##Rabbit MQ configuration +username = config["rabbit-mq"]["username"] +password = config["rabbit-mq"]["password"] +sendQueue = config["rabbit-mq"]["send-queue"] + +credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost', 5672, '/', credentials)) channel = connection.channel() -channel.queue_declare(queue='sendQueue') +channel.queue_declare(queue=sendQueue) print ' [*] Waiting for messages. To exit press CTRL+C' @@ -14,8 +23,7 @@ def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, - queue='sendQueue', + queue=sendQueue, no_ack=True) channel.start_consuming() -