Early version of new Twitter plugin

This commit is contained in:
Luke Rogers 2013-07-02 03:28:46 +12:00
parent e091a48744
commit 696f2b7c1a

View file

@ -1,166 +1,94 @@
# written by Scaevolus, modified by Lukeroge from util import hook, timesince
import tweepy
from util import hook, http
import random
import re import re
from time import strftime, strptime
from datetime import datetime
from util.timesince import timesince
def unescape_xml(string):
"""Unescapes XML"""
return string.replace('&gt;', '>').replace('&lt;', '<').replace('&apos;',
"'").replace('&quote;', '"').replace('&amp;', '&')
history = []
history_max_size = 250
def parseDateTime(s):
"""Parses the date from a string"""
if s is None:
return None
m = re.match(r'(.*?)(?:\.(\d+))?(([-+]\d{1,2}):(\d{2}))?$',
str(s))
datestr, fractional, tzname, tzhour, tzmin = m.groups()
if tzname is None:
tz = None
else:
tzhour, tzmin = int(tzhour), int(tzmin)
if tzhour == tzmin == 0:
tzname = 'UTC'
tz = FixedOffset(timedelta(hours=tzhour,
minutes=tzmin), tzname)
x = datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S")
if fractional is None:
fractional = '0'
fracpower = 6 - len(fractional)
fractional = float(fractional) * (10 ** fracpower)
return x.replace(microsecond=int(fractional), tzinfo=tz)
@hook.command @hook.command
def twitter(inp): def twitter(inp, bot=None):
"twitter <user>/<user> <n>/<id>/#<hashtag>/@<user> -- Gets last/<n>th " \ "twitter <user> [n] -- Gets last/[n]th tweet from <user>"
"tweet from <user>/gets tweet <id>/Gets random tweet with #<hashtag>/" \
"gets replied tweet from @<user>."
def add_reply(reply_name, reply_id): consumer_key = bot.config.get("api_keys", {}).get("twitter_consumer_key")
if len(history) == history_max_size: consumer_secret = bot.config.get("api_keys", {}).get("twitter_consumer_secret")
history.pop()
history.insert(0, (reply_name, reply_id))
def find_reply(reply_name): oauth_token = bot.config.get("api_keys", {}).get("twitter_access_token")
for name, id in history: oauth_secret = bot.config.get("api_keys", {}).get("twitter_access_secret")
if name == reply_name:
return id if id != -1 else name
if inp[0] == '@': if not consumer_key:
reply_inp = find_reply(inp[1:]) return "Error: No Twitter API details."
if reply_inp == None:
return 'No replies to %s found.' % inp
inp = reply_inp
url = 'http://api.twitter.com' auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
getting_nth = False auth.set_access_token(oauth_token, oauth_secret)
getting_id = False
searching_hashtag = False
time = 'status/created_at' api = tweepy.API(auth)
text = 'status/text'
retweeted_text = 'status/retweeted_status/text'
retweeted_screen_name = 'status/retweeted_status/user/screen_name'
reply_name = 'status/in_reply_to_screen_name'
reply_id = 'status/in_reply_to_status_id'
reply_user = 'status/in_reply_to_user_id'
if re.match(r'^\d+$', inp): if re.match(r'^\w{1,15}$', inp) or re.match(r'^\w{1,15}\s+\d+$', inp):
getting_id = True
url += '/statuses/show/%s.xml' % inp
screen_name = 'user/screen_name'
time = 'created_at'
text = 'text'
reply_name = 'in_reply_to_screen_name'
reply_id = 'in_reply_to_status_id'
reply_user = 'in_reply_to_user_id'
elif re.match(r'^\w{1,15}$', inp) or re.match(r'^\w{1,15}\s+\d+$', inp):
getting_nth = True
if inp.find(' ') == -1: if inp.find(' ') == -1:
name = inp username = inp
num = 1 tweet_number = 0
else: else:
name, num = inp.split() username, tweet_number = inp.split()
if int(num) > 3200: tweet_number = int(tweet_number) - 1
return 'error: only supports up to the 3200th tweet'
url += ('/1/statuses/user_timeline.xml?include_rts=true&' if tweet_number > 300:
'screen_name=%s&count=1&page=%s' % (name, num)) return "This command can only find the last \x02300\x02 tweets."
screen_name = 'status/user/screen_name'
elif re.match(r'^#\w+$', inp):
url = 'http://search.twitter.com/search.atom?q=%23' + inp[1:]
searching_hashtag = True
else: else:
return 'Error: Invalid request.' username = inp
tweet_number = 0
try: try:
tweet = http.get_xml(url) # try to get user by username
except http.HTTPError as e: user = api.get_user(username)
errors = {400: 'Bad request (ratelimited?)', except tweepy.error.TweepError as e:
401: 'Tweet is private', if e[0][0]['code'] == 34:
403: 'Tweet is private', return "Could not find user."
404: 'Invalid user/id',
500: 'Twitter is broken',
502: 'Twitter is down ("getting upgraded")',
503: 'Twitter is overloaded'}
if e.code == 404:
return 'error: invalid ' + ['username', 'tweet id'][getting_id]
if e.code in errors:
return 'Error: %s.' % errors[e.code]
return 'Unknown Error: %s' % e.code
except http.URLError as e:
return 'Error: Request timed out.'
if searching_hashtag:
ns = '{http://www.w3.org/2005/Atom}'
tweets = tweet.findall(ns + 'entry/' + ns + 'id')
if not tweets:
return 'Hashtag not found!'
id = random.choice(tweets).text
id = id[id.rfind(':') + 1:]
return twitter(id)
if getting_nth:
if tweet.find('status') is None:
return "User doesn't have that many tweets!"
time = tweet.find(time)
if time is None:
return "User has no tweets!"
reply_name = tweet.find(reply_name).text
reply_id = tweet.find(reply_id).text
reply_user = tweet.find(reply_user).text
if reply_name is not None and (reply_id is not None or
reply_user is not None):
add_reply(reply_name, reply_id or -1)
time_raw = strftime('%Y-%m-%d %H:%M:%S',
strptime(time.text,
'%a %b %d %H:%M:%S +0000 %Y'))
time_nice = timesince(parseDateTime(time_raw), datetime.utcnow())
if tweet.find(retweeted_text) is not None:
text = 'RT @%s:' % tweet.find(retweeted_screen_name).text
text += unescape_xml(tweet.find(retweeted_text).text.replace('\n', ''))
else: else:
text = unescape_xml(tweet.find(text).text.replace('\n', '')) return "Error {}: {}".format(e[0][0]['code'], e[0][0]['message'])
screen_name = tweet.find(screen_name).text # get the users tweets
user_timeline = api.user_timeline(id=user.id, count=tweet_number + 1)
return "\x02@%s\x02: %s (%s ago)" % (screen_name, text, time_nice) # if the timeline is empty, return an error
if not user_timeline:
return "The user \x02{}\x02 has no tweets.".format(user.screen_name)
# grab the newest tweet from the users timeline
try:
tweet = user_timeline[tweet_number]
except IndexError:
tweet_count = len(user_timeline)
return "The user \x02{}\x02 only has \x02{}\x02 tweets.".format(user.screen_name, tweet_count)
time = timesince.timesince(tweet.created_at)
return u"@\x02{}\x02 ({}): {} ({} ago)".format(user.screen_name, user.name, tweet.text, time)
@hook.command
def twuser(inp, bot=None):
"twuser <user> -- Get info on the Twitter user <user>"
consumer_key = bot.config.get("api_keys", {}).get("twitter_consumer_key")
consumer_secret = bot.config.get("api_keys", {}).get("twitter_consumer_secret")
oauth_token = bot.config.get("api_keys", {}).get("twitter_access_token")
oauth_secret = bot.config.get("api_keys", {}).get("twitter_access_secret")
if not consumer_key:
return "Error: No Twitter API details."
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(oauth_token, oauth_secret)
api = tweepy.API(auth)
try:
# try to get user by username
user = api.get_user(inp)
except tweepy.error.TweepError as e:
if e[0][0]['code'] == 34:
return "Could not find user."
else:
return "Unknown error"
return u"@\x02{}\x02 ({}) is located in \x02{}\x02 and has \x02{}\x02 tweet(s). The users description is \"{}\" " \
"".format(user.screen_name, user.name, user.location, user.statuses_count, user.description)