-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsend_to_es
executable file
·102 lines (83 loc) · 3.31 KB
/
send_to_es
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
import sys, json, os, urllib2, datetime, re, Queue, threading, time, optparse
class ExitToken:
pass
exit_token = ExitToken()
q = Queue.Queue(1000)
hostname = os.popen("hostname").read().strip()
parser = optparse.OptionParser(usage="Usage: %prog [options] <ES url>")
parser.add_option('-m', '--message-offset OFFSET', action="store", dest="offset", help="Offset of first byte of message (if this byte is a space or tab, the line will be added to the previous event)", default=0, type="int")
parser.add_option('-n', '--host NAME', action="store", dest="hostname", help="Override 'host' field (defaults to %s)" % hostname, default=hostname)
parser.add_option('-c', '--cut', action="store_true", dest="cut", help="Discard bytes from start of line until OFFSET", default=False)
options, args = parser.parse_args()
es_url = args[0]
def to_event(event):
try:
match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line'])
if match:
timestamp = match.group(1) + match.group(2)[0:3]
timestamp += match.group(3) if match.group(3) else "+0000"
if timestamp[10] == ' ':
timestamp = timestamp[0:10] + 'T' + timestamp[11:]
else:
timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1)
except ValueError, e:
timestamp = event['time']
data = {
"@timestamp": timestamp,
"host": options.hostname,
"message": event_line(event['line'].strip()),
}
return json.dumps(data)
def starts_with_space(line):
return len(line) > options.offset and line[options.offset] in [' ', '\t']
def event_line(line):
return line[options.offset:] if options.cut else line
def sending():
running = True
lastEvent = None
event = None
while event != exit_token:
count = 0;
payload = ''
while count < 1000 and event != exit_token and (not q.empty() or count == 0):
try:
event = q.get(True, 1)
except Queue.Empty, e:
event = None
if event and event != exit_token and starts_with_space(event['line']):
lastEvent['line'] += "\n" + event_line(event['line'])
else:
if lastEvent:
payload += "{\"index\": {}}\n" + to_event(lastEvent) + "\n"
count += 1
lastEvent = None
if not event:
break
lastEvent = event
if count > 0:
# print "----------------------"
# print payload[0:-1]
while True:
try:
urllib2.urlopen("%s/_bulk" % es_url, payload)
break
except URLError, e:
print >> sys.stderr, "Failed sending bulk to ES: %s" % str(e)
sleep(3)
t = threading.Thread(target=sending)
t.daemon = True
t.start()
try:
while 1:
line = sys.stdin.readline()
if not line:
q.put(exit_token)
break
line = line[0:-1]
if len(line) > 0:
q.put({'time': datetime.datetime.now().isoformat(), 'line': line})
# q.put(to_event(line))
t.join()
except KeyboardInterrupt, e:
pass