-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_slurper.py
154 lines (127 loc) · 5.46 KB
/
twitter_slurper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/python
"""Twitter stream slurper."""
__author__ = 'Elmer de Looff <[email protected]>'
__version__ = '0.1'
# Built-in modules
import datetime
import logging
import Queue
import threading
import time
# Thirdparty modules
import requests
import simplejson
# Twitter keys
import twitter_credentials
API_DIRECTORY = 'http://openspace.slopjong.de/directory.json'
class TwitterError(Exception):
"""The Twitter API returned an error response."""
class TwitterFeedSlurper(threading.Thread):
"""Collects all new tweets by a user and puts them on a queue."""
def __init__(self, space, twitter_name, queue, token, interval=300):
super(TwitterFeedSlurper, self).__init__(
name='TwitterFeedSlurper %s' % space)
self.space = space
self.twitter_name = twitter_name
self.token = token
self.check_interval = interval
self.cutoff_date = datetime.datetime.now()
self.cutoff_date -= datetime.timedelta(minutes=30)
self.last_tweet_id = 1
self.queue = queue
self.daemon = True
self.start()
def run(self):
"""Periodically process tweets and then sleep again."""
while True:
try:
self.process_tweets()
except TwitterError, error:
logging.warning('Twitter error: %s', error)
time.sleep(self.check_interval)
def process_tweets(self):
"""Loops through all received tweets and puts relevant info on a queue
"""
for tweet in self.new_tweets():
self.last_tweet_id = tweet['id']
tweet['created_at'] = datetime.datetime.strptime(
tweet['created_at'], '%a %b %d %H:%M:%S +0000 %Y')
if tweet['created_at'] > self.cutoff_date:
link = 'https://twitter.com/%s/status/%d' % (
tweet['user']['screen_name'], tweet['id'])
self.queue.put({'space': self.space,
'link': link,
'time': tweet['created_at'],
'text': tweet['text']})
def new_tweets(self):
"""Returns a list of tweets in chronological order.
The `since_id` parameter is provided to the API so that tweets that
have already been collected once aren't collected again and again.
"""
response = requests.get(
'https://api.twitter.com/1.1/statuses/user_timeline.json',
params={'screen_name': self.twitter_name,
'since_id': self.last_tweet_id},
headers={'authorization': 'Bearer %s' % self.token}).json()
if 'errors' in response:
error = response['errors'][0] # Only raise the first error
raise TwitterError('%(code)d: %(message)s' % error)
return reversed(response)
def all_space_info():
"""Returns a combined dictionary of all space APIs."""
spaces = {}
for name, api_url in requests.get(API_DIRECTORY).json().iteritems():
status = get_space_api(name, api_url)
if status is not None:
spaces[name] = status
return spaces
def format_twitter_update(tweet):
"""Returns a human readable update notification."""
return """A new tweet by %(space)s!
At %(time)s - %(link)s
Tweet: %(text)s\n""" % tweet
def get_space_api(name, api_url, connect_timeout=1):
"""Returns the space API result if it is available."""
try:
return requests.get(api_url, timeout=connect_timeout).json()
except requests.exceptions.RequestException:
logging.warning('Could not request status for %s', name)
except simplejson.scanner.JSONDecodeError:
logging.warning('SpaceAPI request from %s is not JSON :(', name)
def get_twitter_bearer_token(consumer_key, consumer_secret):
"""Returns a Twitter OATH2 bearer token using the given key and secret.
This uses the application-only authentication as explained on
https://dev.twitter.com/docs/auth/application-only-auth
"""
response = requests.post('https://api.twitter.com/oauth2/token',
auth=(consumer_key, consumer_secret),
data={'grant_type': 'client_credentials'}).json()
if 'errors' in response:
error = response['errors'][0] # Only raise the first error
raise TwitterError('%(label)s (%(code)d): %(message)s' % error)
return response['access_token']
def twitter_report(token, interval=200, pause=.1):
"""Reports updates of all spaces that have a Twitter account listed."""
queue = Queue.Queue()
spaces = all_space_info()
for space_name in sorted(spaces, key=lambda name: name.lower()):
info = spaces[space_name]
if 'contact' in info and 'twitter' in info['contact']:
twitter_name = info['contact']['twitter']
print '[%s] is on Twitter: %s' % (space_name, twitter_name)
TwitterFeedSlurper(
space_name, twitter_name, queue, token, interval=interval)
while True:
try:
print format_twitter_update(queue.get(timeout=0.2))
time.sleep(pause)
except Queue.Empty:
# This exists so that people can still interrupt this program
pass
def main():
"""Simple commandline application that dumps tweets."""
bearer_token = get_twitter_bearer_token(
twitter_credentials.CONSUMER_KEY, twitter_credentials.CONSUMER_SECRET)
twitter_report(bearer_token)
if __name__ == '__main__':
main()