# Tweepy # Copyright 2009-2010 Joshua Roesslein # See LICENSE for details. import httplib from socket import timeout from threading import Thread from time import sleep from tweepy.models import Status from tweepy.api import API from tweepy.error import TweepError from tweepy.utils import import_simplejson, urlencode_noplus json = import_simplejson() STREAM_VERSION = '1.1' class StreamListener(object): def __init__(self, api=None): self.api = api or API() def on_connect(self): """Called once connected to streaming server. This will be invoked once a successful response is received from the server. Allows the listener to perform some work prior to entering the read loop. """ pass def on_data(self, data): """Called when raw data is received from connection. Override this method if you wish to manually handle the stream data. Return False to stop stream and close connection. """ if 'in_reply_to_status_id' in data: status = Status.parse(self.api, json.loads(data)) if self.on_status(status) is False: return False elif 'delete' in data: delete = json.loads(data)['delete']['status'] if self.on_delete(delete['id'], delete['user_id']) is False: return False elif 'limit' in data: if self.on_limit(json.loads(data)['limit']['track']) is False: return False def on_status(self, status): """Called when a new status arrives""" return def on_delete(self, status_id, user_id): """Called when a delete notice arrives for a status""" return def on_limit(self, track): """Called when a limitation notice arrvies""" return def on_error(self, status_code): """Called when a non-200 status code is returned""" return False def on_timeout(self): """Called when stream connection times out""" return class Stream(object): host = 'stream.twitter.com' def __init__(self, auth, listener, **options): self.auth = auth self.listener = listener self.running = False self.timeout = options.get("timeout", 300.0) self.retry_count = options.get("retry_count") self.retry_time = options.get("retry_time", 10.0) self.snooze_time = options.get("snooze_time", 5.0) self.buffer_size = options.get("buffer_size", 1500) if options.get("secure", True): self.scheme = "https" else: self.scheme = "http" self.api = API() self.headers = options.get("headers") or {} self.parameters = None self.body = None def _run(self): # Authenticate url = "%s://%s%s" % (self.scheme, self.host, self.url) # Connect and process the stream error_counter = 0 conn = None exception = None while self.running: if self.retry_count is not None and error_counter > self.retry_count: # quit if error count greater than retry count break try: if self.scheme == "http": conn = httplib.HTTPConnection(self.host) else: conn = httplib.HTTPSConnection(self.host) self.auth.apply_auth(url, 'POST', self.headers, self.parameters) conn.connect() conn.sock.settimeout(self.timeout) conn.request('POST', self.url, self.body, headers=self.headers) resp = conn.getresponse() if resp.status != 200: if self.listener.on_error(resp.status) is False: break error_counter += 1 sleep(self.retry_time) else: error_counter = 0 self.listener.on_connect() self._read_loop(resp) except timeout: if self.listener.on_timeout() == False: break if self.running is False: break conn.close() sleep(self.snooze_time) except Exception, exception: # any other exception is fatal, so kill loop break # cleanup self.running = False if conn: conn.close() if exception: raise def _data(self, data): if self.listener.on_data(data) is False: self.running = False def _read_loop(self, resp): while self.running and not resp.isclosed(): # Note: keep-alive newlines might be inserted before each length value. # read until we get a digit... c = '\n' while c == '\n' and self.running and not resp.isclosed(): c = resp.read(1) delimited_string = c # read rest of delimiter length.. d = '' while d != '\n' and self.running and not resp.isclosed(): d = resp.read(1) delimited_string += d # read the next twitter status object if delimited_string.strip().isdigit(): next_status_obj = resp.read( int(delimited_string) ) self._data(next_status_obj) if resp.isclosed(): self.on_closed(resp) def _start(self, async): self.running = True if async: Thread(target=self._run).start() else: self._run() def on_closed(self, resp): """ Called when the response has been closed by Twitter """ pass def userstream(self, count=None, async=False, secure=True): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/2/user.json?delimited=length' self.host='userstream.twitter.com' self._start(async) def firehose(self, count=None, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self._start(async) def retweet(self, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION self._start(async) def sample(self, count=None, async=False): self.parameters = {'delimited': 'length'} if self.running: raise TweepError('Stream object already connected!') self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION if count: self.url += '&count=%s' % count self._start(async) def filter(self, follow=None, track=None, async=False, locations=None, count = None, stall_warnings=False, languages=None): self.parameters = {} self.headers['Content-type'] = "application/x-www-form-urlencoded" if self.running: raise TweepError('Stream object already connected!') self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION if follow: self.parameters['follow'] = ','.join(map(str, follow)) if track: self.parameters['track'] = ','.join(map(str, track)) if locations and len(locations) > 0: assert len(locations) % 4 == 0 self.parameters['locations'] = ','.join(['%.2f' % l for l in locations]) if count: self.parameters['count'] = count if stall_warnings: self.parameters['stall_warnings'] = stall_warnings if languages: self.parameters['language'] = ','.join(map(str, languages)) self.body = urlencode_noplus(self.parameters) self.parameters['delimited'] = 'length' self._start(async) def disconnect(self): if self.running is False: return self.running = False