From e37f7008900d9ac4579598047e7b7834b7b101e2 Mon Sep 17 00:00:00 2001 From: William Cohen Date: Wed, 2 Dec 2015 16:34:28 -0500 Subject: [PATCH] docs --- emr-extras/emr-bootstrap-sample.sh~ | 16 --- emr-extras/emr-create-cluster-sample.sh~ | 15 --- gpextras.py | 2 + guineapig1_1.py | 2 + guineapig1_2.py | 2 + guineapig1_3.py | 2 + mrs_gp1_0.py | 2 + mrs_test/Makefile | 2 + mrs_test/mrs-phirl-naive.py | 2 +- mrs_test/mrs-wordcount.py | 2 + mrs_test/streamNaiveBayesLearner.py | 2 + mrs_test/sum-events.py | 2 + testgp1_2.py | 2 + testgp1_3.py | 2 + testspyk.py | 2 + try.py | 165 ----------------------- tutorial/README.txt | 3 - 17 files changed, 25 insertions(+), 200 deletions(-) delete mode 100644 emr-extras/emr-bootstrap-sample.sh~ delete mode 100644 emr-extras/emr-create-cluster-sample.sh~ delete mode 100644 try.py diff --git a/emr-extras/emr-bootstrap-sample.sh~ b/emr-extras/emr-bootstrap-sample.sh~ deleted file mode 100644 index c14e638..0000000 --- a/emr-extras/emr-bootstrap-sample.sh~ +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/bash -MY_EMAIL=wcohen@gmail.com -stat=`grep isMaster /mnt/var/lib/info/instance.json | cut -d: -f2` -if [ "$stat" != "" ]; then - #get the code and unpack it - wget http://www.cs.cmu.edu/~wcohen/10-605/gpigtut.tgz - tar -xzf gpigtut.tgz - #this is needed to initialize the HDFS - hadoop jar ~/hadoop-examples.jar pi 10 10000000 >& pi-example.log - #create the default HDFS directory for Guinea Pig on HDFS - hadoop fs -mkdir /user/hadoop/gp_views - ######################################## - #if you want, uncomment this section to get an email - #notification - after defining your own email address above - #echo the cluster is ready now - ssh in and cd to tutorial | mail -s 'cluster is now up' $MY_EMAIL -fi diff --git a/emr-extras/emr-create-cluster-sample.sh~ b/emr-extras/emr-create-cluster-sample.sh~ deleted file mode 100644 index 1222a5b..0000000 --- a/emr-extras/emr-create-cluster-sample.sh~ +++ /dev/null @@ -1,15 +0,0 @@ -VERSION=3.8.0 -MASTER=m3.xlarge -WORKER=m3.xlarge -NWORKERS=2 -BOOTSTRAP_SCRIPT=s3n://wcohen-gpig-input/emr-bootstrap.sh -aws emr create-cluster \ - --ami-version $VERSION \ - --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=$MASTER \ - InstanceGroupType=CORE,InstanceCount=$NWORKERS,InstanceType=$WORKER \ - --ec2-attributes KeyName=MyKeyPair \ - --log-uri s3n://wcohen-gpig-log \ - --bootstrap-action Path=$BOOTSTRAP_SCRIPT \ - | tee emr-cluster-id.txt -echo saved in emr-cluster-id.txt - diff --git a/gpextras.py b/gpextras.py index a60639c..4c0de26 100644 --- a/gpextras.py +++ b/gpextras.py @@ -1,3 +1,5 @@ +# extensions to guineapig + ############################################################################## # (C) Copyright 2014, 2015 William W. Cohen. All rights reserved. ############################################################################## diff --git a/guineapig1_1.py b/guineapig1_1.py index 93c5fbf..afd56fa 100644 --- a/guineapig1_1.py +++ b/guineapig1_1.py @@ -1,3 +1,5 @@ +# earlier version of guineapig.py + ############################################################################## # (C) Copyright 2014 William W. Cohen. All rights reserved. ############################################################################## diff --git a/guineapig1_2.py b/guineapig1_2.py index e4fb776..0a3345c 100644 --- a/guineapig1_2.py +++ b/guineapig1_2.py @@ -1,3 +1,5 @@ +# earlier version of guineapig.py + ############################################################################## # (C) Copyright 2014 William W. Cohen. All rights reserved. ############################################################################## diff --git a/guineapig1_3.py b/guineapig1_3.py index 9b1fc5a..d660ec5 100644 --- a/guineapig1_3.py +++ b/guineapig1_3.py @@ -1,3 +1,5 @@ +# earlier version of guineapig.py + ############################################################################## # (C) Copyright 2014 William W. Cohen. All rights reserved. ############################################################################## diff --git a/mrs_gp1_0.py b/mrs_gp1_0.py index 3f13876..9067c70 100644 --- a/mrs_gp1_0.py +++ b/mrs_gp1_0.py @@ -1,3 +1,5 @@ +# early version of mrs_gp.py + import getopt import sys import os diff --git a/mrs_test/Makefile b/mrs_test/Makefile index 071cb05..5e46087 100644 --- a/mrs_test/Makefile +++ b/mrs_test/Makefile @@ -1,3 +1,5 @@ +# the targets here are basically interactive tests for mrs_gp.py + MRS=pypy ../mrs_gp.py K=5 ASYNC = 0 diff --git a/mrs_test/mrs-phirl-naive.py b/mrs_test/mrs-phirl-naive.py index bc3526d..f1f00e8 100644 --- a/mrs_test/mrs-phirl-naive.py +++ b/mrs_test/mrs-phirl-naive.py @@ -1,7 +1,7 @@ from guineapig import * import gpextras -# a non-trivial GineaPig program +# a non-trivial GineaPig program - modified to work with mrs import sys import math diff --git a/mrs_test/mrs-wordcount.py b/mrs_test/mrs-wordcount.py index 94fd31c..0eb1e60 100644 --- a/mrs_test/mrs-wordcount.py +++ b/mrs_test/mrs-wordcount.py @@ -2,6 +2,8 @@ import sys import gpextras +# wordcount modified for mrs + # supporting routines can go here def tokens(line): for tok in line.split(): diff --git a/mrs_test/streamNaiveBayesLearner.py b/mrs_test/streamNaiveBayesLearner.py index cbd26dc..b10399b 100644 --- a/mrs_test/streamNaiveBayesLearner.py +++ b/mrs_test/streamNaiveBayesLearner.py @@ -1,3 +1,5 @@ +#streaming naive bayes + import sys import math import logging diff --git a/mrs_test/sum-events.py b/mrs_test/sum-events.py index 1e17368..cfd9c29 100644 --- a/mrs_test/sum-events.py +++ b/mrs_test/sum-events.py @@ -1,3 +1,5 @@ +# used with streamNaiveBayesLearner to get event counts + import sys import logging diff --git a/testgp1_2.py b/testgp1_2.py index 13116ee..2c3fb1c 100644 --- a/testgp1_2.py +++ b/testgp1_2.py @@ -1,3 +1,5 @@ +# tests for guineapig1_2 + import unittest import sys diff --git a/testgp1_3.py b/testgp1_3.py index 1d363e5..6c4b610 100644 --- a/testgp1_3.py +++ b/testgp1_3.py @@ -1,3 +1,5 @@ +# tests for guineapig1_3 + import unittest import sys diff --git a/testspyk.py b/testspyk.py index 1c08760..ab2c337 100644 --- a/testspyk.py +++ b/testspyk.py @@ -1,3 +1,5 @@ +# tests for spyk + import spyk if __name__ == "__main__" : diff --git a/try.py b/try.py deleted file mode 100644 index 7728a82..0000000 --- a/try.py +++ /dev/null @@ -1,165 +0,0 @@ -import subprocess -import threading -import Queue -import sys -import time -from fcntl import fcntl, F_GETFL, F_SETFL -from os import O_NONBLOCK, read - - -# testing threads....ok with 2 mappers and one reducer on powerbook, not on sijo -# even 1 1 fails, at read from mpipe, even if the close operation has been performed. -# I guess the issue is that we never know if the pipe is finished...it could -# still spit stuff out after having its stdin closed....map is not a 1-1 mapping -# -# I guess we could use select() to see what's readable/writeable....? -# -# maybe I should have an extended thread for communications, which includes -# stream for inputs/outputs -# -# while True: -# select to see what's readable/writeable -# if you can read, read -# else if you can, write -# else poll and if the process is terminated, stop -# else sleep - -def key(line): - return 1 if (line.find("hello")>=0 or line.find("bye")>=0) else 0 - -def makePipe(proc): - p = subprocess.Popen(proc,shell=True, bufsize=-1, - stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) - # set the O_NONBLOCK flag of p.stdout file descriptor: - if False: - flags = fcntl(p.stdout, F_GETFL) # get current p.stdout flags - fcntl(p.stdout, F_SETFL, flags | O_NONBLOCK) - return p - -def acceptQInput(q,numMappers,pipe): - numPoison = 0 - while True: - try: - task = q.get_nowait() - except Queue.Empty: - pass - else: - if task: - key,line = task - print 'p dequed',key,line, - pipe.stdin.write(line) - q.task_done() - else: - numPoison += 1 - print 'poison read: numPoison',numPoison - if numPoison>=numMappers: - pipe.stdin.close() - print 'returning from acceptQInput' - return - -def shuffleOutput(pipe,qs): - for line in pipe.stdout: - print "p from mpipe:",line, - k = key(line) - qs[k].put((k,line)) - for q in qs: - q.put(None) - -def sendToFile(pipe,filename): - fp = open(filename,'w') - for line in pipe.stdout: - print "p to "+filename,line, - fp.write(line) - fp.close() - print 'file sent' - -def getFromFile(pipe,filename): - for line in open(filename): - print "p to mpipe:",line, - pipe.stdin.write(line) - pipe.stdin.close() - #pipe.wait() - print "p closed mpipe input for ",pipe - - -if __name__ == "__main__": - - try: - p = int(sys.argv[1]) #how far in pipeline to use threads - 0-3 - z = int(sys.argv[2]) #start threads at once - except Exception: - p = 0 - z = 0 - print 'p =',p - km = 2 - kr = 2 - - mpipes = [] - for i in range(km): - mpipe = makePipe('cat') - mpipes.append(mpipe) - mpipe = None - - if p>=1: - readers = map(lambda i: threading.Thread(target=getFromFile,args=(mpipes[i],("test%d.txt" %i ))), range(km)) - for r in readers: r.start() - if z<1: - print 'joining readers' - for r in readers: r.join() - print 'joined readers' - else: - for i in range(km): - for line in open(("test%d.txt" % i )): - mpipes[i].stdin.write(line) - mpipes[i].stdin.close() - - rpipes = [] - for j in range(kr): - rpipe = makePipe('sort | cat -n') - rpipes.append(rpipe) - - if p>=2: - qs = map(lambda i:Queue.Queue(), range(kr)) - shufflers = map(lambda i: threading.Thread(target=shuffleOutput,args=(mpipes[i],qs)), range(km)) - acceptors = map(lambda j:threading.Thread(target=acceptQInput,args=(qs[j],km,rpipes[j])), range(kr)) - for s in shufflers: s.start() - for a in acceptors: a.start() - if z<2: - print 'joining acceptor, shufflers' - for s in shufflers: s.join() - for a in acceptors: a.join() - print 'joined acceptor, shufflers' - else: - for mpipe in mpipes: - print 'starting read from',mpipe - for line in mpipe.stdout: - k = key(line) - print "s from mpipe:",k,line, - rpipes[k].stdin.write(line) - for j in range(kr): - rpipes[j].stdin.close() - - if p>=3: - writers = map(lambda j:threading.Thread(target=sendToFile,args=(rpipes[j],"tmp%d.txt" % j)), range(kr)) - for w in writers: w.start() - if not z: - print 'joining writers' - for w in writers: w.join() - print 'joined writers' - else: - for j in range(kr): - for line in rpipes[j].stdout: - print 's from rpipe',j,line, - - if z: - print 'join readers' - for r in readers: r.join() - print 'join shufflers' - for s in shufflers: s.join() - print 'join acceptors' - for a in acceptors: a.join() - print 'join writers' - for w in writers: w.start() - print 'joined everything' - - diff --git a/tutorial/README.txt b/tutorial/README.txt index 95e94f3..0d7af18 100644 --- a/tutorial/README.txt +++ b/tutorial/README.txt @@ -12,10 +12,7 @@ Recent changes: re-generating views that have been already stored, a new option --alreadyStored v1,...,vk is passed in to the various view-bulding ("doXXX") invocations of the script. - 8/13: Bug fix related to Augment and hadoop in guineapig1_2.py. Added guineapig1_2.py to the tutorial. - 10/9: added SafeEvaluator to 1.3. - 11/11: added LC_COLLATE=C to sort command