This repository has been archived on 2023-04-13. You can view files and clone it, but cannot push or open issues or pull requests.
CloudBot/lib/tweepy/streaming.py

249 lines
8.2 KiB
Python

# Tweepy
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
import httplib
from socket import timeout
from threading import Thread
from time import sleep
from tweepy.models import Status
from tweepy.api import API
from tweepy.error import TweepError
from tweepy.utils import import_simplejson, urlencode_noplus
json = import_simplejson()
STREAM_VERSION = '1.1'
class StreamListener(object):
def __init__(self, api=None):
self.api = api or API()
def on_connect(self):
"""Called once connected to streaming server.
This will be invoked once a successful response
is received from the server. Allows the listener
to perform some work prior to entering the read loop.
"""
pass
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
def on_status(self, status):
"""Called when a new status arrives"""
return
def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
return
def on_limit(self, track):
"""Called when a limitation notice arrvies"""
return
def on_error(self, status_code):
"""Called when a non-200 status code is returned"""
return False
def on_timeout(self):
"""Called when stream connection times out"""
return
class Stream(object):
host = 'stream.twitter.com'
def __init__(self, auth, listener, **options):
self.auth = auth
self.listener = listener
self.running = False
self.timeout = options.get("timeout", 300.0)
self.retry_count = options.get("retry_count")
self.retry_time = options.get("retry_time", 10.0)
self.snooze_time = options.get("snooze_time", 5.0)
self.buffer_size = options.get("buffer_size", 1500)
if options.get("secure", True):
self.scheme = "https"
else:
self.scheme = "http"
self.api = API()
self.headers = options.get("headers") or {}
self.parameters = None
self.body = None
def _run(self):
# Authenticate
url = "%s://%s%s" % (self.scheme, self.host, self.url)
# Connect and process the stream
error_counter = 0
conn = None
exception = None
while self.running:
if self.retry_count is not None and error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
if self.scheme == "http":
conn = httplib.HTTPConnection(self.host)
else:
conn = httplib.HTTPSConnection(self.host)
self.auth.apply_auth(url, 'POST', self.headers, self.parameters)
conn.connect()
conn.sock.settimeout(self.timeout)
conn.request('POST', self.url, self.body, headers=self.headers)
resp = conn.getresponse()
if resp.status != 200:
if self.listener.on_error(resp.status) is False:
break
error_counter += 1
sleep(self.retry_time)
else:
error_counter = 0
self.listener.on_connect()
self._read_loop(resp)
except timeout:
if self.listener.on_timeout() == False:
break
if self.running is False:
break
conn.close()
sleep(self.snooze_time)
except Exception, exception:
# any other exception is fatal, so kill loop
break
# cleanup
self.running = False
if conn:
conn.close()
if exception:
raise
def _data(self, data):
if self.listener.on_data(data) is False:
self.running = False
def _read_loop(self, resp):
while self.running and not resp.isclosed():
# Note: keep-alive newlines might be inserted before each length value.
# read until we get a digit...
c = '\n'
while c == '\n' and self.running and not resp.isclosed():
c = resp.read(1)
delimited_string = c
# read rest of delimiter length..
d = ''
while d != '\n' and self.running and not resp.isclosed():
d = resp.read(1)
delimited_string += d
# read the next twitter status object
if delimited_string.strip().isdigit():
next_status_obj = resp.read( int(delimited_string) )
self._data(next_status_obj)
if resp.isclosed():
self.on_closed(resp)
def _start(self, async):
self.running = True
if async:
Thread(target=self._run).start()
else:
self._run()
def on_closed(self, resp):
""" Called when the response has been closed by Twitter """
pass
def userstream(self, count=None, async=False, secure=True):
self.parameters = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/2/user.json?delimited=length'
self.host='userstream.twitter.com'
self._start(async)
def firehose(self, count=None, async=False):
self.parameters = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/firehose.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self._start(async)
def retweet(self, async=False):
self.parameters = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/retweet.json?delimited=length' % STREAM_VERSION
self._start(async)
def sample(self, count=None, async=False):
self.parameters = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/sample.json?delimited=length' % STREAM_VERSION
if count:
self.url += '&count=%s' % count
self._start(async)
def filter(self, follow=None, track=None, async=False, locations=None,
count = None, stall_warnings=False, languages=None):
self.parameters = {}
self.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/statuses/filter.json?delimited=length' % STREAM_VERSION
if follow:
self.parameters['follow'] = ','.join(map(str, follow))
if track:
self.parameters['track'] = ','.join(map(str, track))
if locations and len(locations) > 0:
assert len(locations) % 4 == 0
self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
if count:
self.parameters['count'] = count
if stall_warnings:
self.parameters['stall_warnings'] = stall_warnings
if languages:
self.parameters['language'] = ','.join(map(str, languages))
self.body = urlencode_noplus(self.parameters)
self.parameters['delimited'] = 'length'
self._start(async)
def disconnect(self):
if self.running is False:
return
self.running = False