Add tweepy module
This commit is contained in:
parent
696f2b7c1a
commit
6b76ba3e2a
12 changed files with 3261 additions and 0 deletions
248
lib/tweepy/streaming.py
Normal file
248
lib/tweepy/streaming.py
Normal file
|
@ -0,0 +1,248 @@
|
|||
# Tweepy
|
||||
# Copyright 2009-2010 Joshua Roesslein
|
||||
# See LICENSE for details.
|
||||
|
||||
import httplib
|
||||
from socket import timeout
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
|
||||
from tweepy.models import Status
|
||||
from tweepy.api import API
|
||||
from tweepy.error import TweepError
|
||||
|
||||
from tweepy.utils import import_simplejson, urlencode_noplus
|
||||
json = import_simplejson()
|
||||
|
||||
STREAM_VERSION = '1.1'
|
||||
|
||||
|
||||
class StreamListener(object):
|
||||
|
||||
def __init__(self, api=None):
|
||||
self.api = api or API()
|
||||
|
||||
def on_connect(self):
|
||||
"""Called once connected to streaming server.
|
||||
|
||||
This will be invoked once a successful response
|
||||
is received from the server. Allows the listener
|
||||
to perform some work prior to entering the read loop.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_data(self, data):
|
||||
"""Called when raw data is received from connection.
|
||||
|
||||
Override this method if you wish to manually handle
|
||||
the stream data. Return False to stop stream and close connection.
|
||||
"""
|
||||
|
||||
if 'in_reply_to_status_id' in data:
|
||||
status = Status.parse(self.api, json.loads(data))
|
||||
if self.on_status(status) is False:
|
||||
return False
|
||||
elif 'delete' in data:
|
||||
delete = json.loads(data)['delete']['status']
|
||||
if self.on_delete(delete['id'], delete['user_id']) is False:
|
||||
return False
|
||||
elif 'limit' in data:
|
||||
if self.on_limit(json.loads(data)['limit']['track']) is False:
|
||||
return False
|
||||
|
||||
def on_status(self, status):
|
||||
"""Called when a new status arrives"""
|
||||
return
|
||||
|
||||
def on_delete(self, status_id, user_id):
|
||||
"""Called when a delete notice arrives for a status"""
|
||||
return
|
||||
|
||||
def on_limit(self, track):
|
||||
"""Called when a limitation notice arrvies"""
|
||||
return
|
||||
|
||||
def on_error(self, status_code):
|
||||
"""Called when a non-200 status code is returned"""
|
||||
return False
|
||||
|
||||
def on_timeout(self):
|
||||
"""Called when stream connection times out"""
|
||||
return
|
||||
|
||||
|
||||
class Stream(object):
|
||||
|
||||
host = 'stream.twitter.com'
|
||||
|
||||
def __init__(self, auth, listener, **options):
|
||||
self.auth = auth
|
||||
self.listener = listener
|
||||
self.running = False
|
||||
self.timeout = options.get("timeout", 300.0)
|
||||
self.retry_count = options.get("retry_count")
|
||||
self.retry_time = options.get("retry_time", 10.0)
|
||||
self.snooze_time = options.get("snooze_time", 5.0)
|
||||
self.buffer_size = options.get("buffer_size", 1500)
|
||||
if options.get("secure", True):
|
||||
self.scheme = "https"
|
||||
else:
|
||||
self.scheme = "http"
|
||||
|
||||
self.api = API()
|
||||
self.headers = options.get("headers") or {}
|
||||
self.parameters = None
|
||||
self.body = None
|
||||
|
||||
def _run(self):
|
||||
# Authenticate
|
||||
url = "%s://%s%s" % (self.scheme, self.host, self.url)
|
||||
|
||||
# Connect and process the stream
|
||||
error_counter = 0
|
||||
conn = None
|
||||
exception = None
|
||||
while self.running:
|
||||
if self.retry_count is not None and error_counter > self.retry_count:
|
||||
# quit if error count greater than retry count
|
||||
break
|
||||
try:
|
||||
if self.scheme == "http":
|
||||
conn = httplib.HTTPConnection(self.host)
|
||||
else:
|
||||
conn = httplib.HTTPSConnection(self.host)
|
||||
self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
|
||||
conn.connect()
|
||||
conn.sock.settimeout(self.timeout)
|
||||
conn.request('POST', self.url, self.body, headers=self.headers)
|
||||
resp = conn.getresponse()
|
||||
if resp.status != 200:
|
||||
if self.listener.on_error(resp.status) is False:
|
||||
break
|
||||
error_counter += 1
|
||||
sleep(self.retry_time)
|
||||
else:
|
||||
error_counter = 0
|
||||
self.listener.on_connect()
|
||||
self._read_loop(resp)
|
||||
except timeout:
|
||||
if self.listener.on_timeout() == False:
|
||||
break
|
||||
if self.running is False:
|
||||
break
|
||||
conn.close()
|
||||
sleep(self.snooze_time)
|
||||
except Exception, exception:
|
||||
# any other exception is fatal, so kill loop
|
||||
break
|
||||
|
||||
# cleanup
|
||||
self.running = False
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
if exception:
|
||||
raise
|
||||
|
||||
def _data(self, data):
|
||||
if self.listener.on_data(data) is False:
|
||||
self.running = False
|
||||
|
||||
def _read_loop(self, resp):
|
||||
|
||||
while self.running and not resp.isclosed():
|
||||
|
||||
# Note: keep-alive newlines might be inserted before each length value.
|
||||
# read until we get a digit...
|
||||
c = '\n'
|
||||
while c == '\n' and self.running and not resp.isclosed():
|
||||
c = resp.read(1)
|
||||
delimited_string = c
|
||||
|
||||
# read rest of delimiter length..
|
||||
d = ''
|
||||
while d != '\n' and self.running and not resp.isclosed():
|
||||
d = resp.read(1)
|
||||
delimited_string += d
|
||||
|
||||
# read the next twitter status object
|
||||
if delimited_string.strip().isdigit():
|
||||
next_status_obj = resp.read( int(delimited_string) )
|
||||
self._data(next_status_obj)
|
||||
|
||||
if resp.isclosed():
|
||||
self.on_closed(resp)
|
||||
|
||||
def _start(self, async):
|
||||
self.running = True
|
||||
if async:
|
||||
Thread(target=self._run).start()
|
||||
else:
|
||||
self._run()
|
||||
|
||||
def on_closed(self, resp):
|
||||
""" Called when the response has been closed by Twitter """
|
||||
pass
|
||||
|
||||
def userstream(self, count=None, async=False, secure=True):
|
||||
self.parameters = {'delimited': 'length'}
|
||||
if self.running:
|
||||
raise TweepError('Stream object already connected!')
|
||||
self.url = '/2/user.json?delimited=length'
|
||||
self.host='userstream.twitter.com'
|
||||
self._start(async)
|
||||
|
||||
def firehose(self, count=None, async=False):
|
||||
self.parameters = {'delimited': 'length'}
|
||||
if self.running:
|
||||
raise TweepError('Stream object already connected!')
|
||||
self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
|
||||
if count:
|
||||
self.url += '&count=%s' % count
|
||||
self._start(async)
|
||||
|
||||
def retweet(self, async=False):
|
||||
self.parameters = {'delimited': 'length'}
|
||||
if self.running:
|
||||
raise TweepError('Stream object already connected!')
|
||||
self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
|
||||
self._start(async)
|
||||
|
||||
def sample(self, count=None, async=False):
|
||||
self.parameters = {'delimited': 'length'}
|
||||
if self.running:
|
||||
raise TweepError('Stream object already connected!')
|
||||
self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
|
||||
if count:
|
||||
self.url += '&count=%s' % count
|
||||
self._start(async)
|
||||
|
||||
def filter(self, follow=None, track=None, async=False, locations=None,
|
||||
count = None, stall_warnings=False, languages=None):
|
||||
self.parameters = {}
|
||||
self.headers['Content-type'] = "application/x-www-form-urlencoded"
|
||||
if self.running:
|
||||
raise TweepError('Stream object already connected!')
|
||||
self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
|
||||
if follow:
|
||||
self.parameters['follow'] = ','.join(map(str, follow))
|
||||
if track:
|
||||
self.parameters['track'] = ','.join(map(str, track))
|
||||
if locations and len(locations) > 0:
|
||||
assert len(locations) % 4 == 0
|
||||
self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
|
||||
if count:
|
||||
self.parameters['count'] = count
|
||||
if stall_warnings:
|
||||
self.parameters['stall_warnings'] = stall_warnings
|
||||
if languages:
|
||||
self.parameters['language'] = ','.join(map(str, languages))
|
||||
self.body = urlencode_noplus(self.parameters)
|
||||
self.parameters['delimited'] = 'length'
|
||||
self._start(async)
|
||||
|
||||
def disconnect(self):
|
||||
if self.running is False:
|
||||
return
|
||||
self.running = False
|
||||
|
Reference in a new issue