From 696f2b7c1a2d52ca99567ee288148ec7aa95a2ea Mon Sep 17 00:00:00 2001 From: Luke Rogers Date: Tue, 2 Jul 2013 03:28:46 +1200 Subject: [PATCH] Early version of new Twitter plugin --- plugins/twitter.py | 214 +++++++++++++++------------------------------ 1 file changed, 71 insertions(+), 143 deletions(-) diff --git a/plugins/twitter.py b/plugins/twitter.py index d798e12..866cec9 100755 --- a/plugins/twitter.py +++ b/plugins/twitter.py @@ -1,166 +1,94 @@ -# written by Scaevolus, modified by Lukeroge - -from util import hook, http - -import random +from util import hook, timesince +import tweepy import re -from time import strftime, strptime -from datetime import datetime - -from util.timesince import timesince - - -def unescape_xml(string): - """Unescapes XML""" - return string.replace('>', '>').replace('<', '<').replace(''', - "'").replace('"e;', '"').replace('&', '&') - -history = [] -history_max_size = 250 - - -def parseDateTime(s): - """Parses the date from a string""" - if s is None: - return None - m = re.match(r'(.*?)(?:\.(\d+))?(([-+]\d{1,2}):(\d{2}))?$', - str(s)) - datestr, fractional, tzname, tzhour, tzmin = m.groups() - - if tzname is None: - tz = None - else: - tzhour, tzmin = int(tzhour), int(tzmin) - if tzhour == tzmin == 0: - tzname = 'UTC' - tz = FixedOffset(timedelta(hours=tzhour, - minutes=tzmin), tzname) - - x = datetime.strptime(datestr, "%Y-%m-%d %H:%M:%S") - if fractional is None: - fractional = '0' - fracpower = 6 - len(fractional) - fractional = float(fractional) * (10 ** fracpower) - - return x.replace(microsecond=int(fractional), tzinfo=tz) @hook.command -def twitter(inp): - "twitter / //#/@ -- Gets last/th " \ - "tweet from /gets tweet /Gets random tweet with #/" \ - "gets replied tweet from @." +def twitter(inp, bot=None): + "twitter [n] -- Gets last/[n]th tweet from " - def add_reply(reply_name, reply_id): - if len(history) == history_max_size: - history.pop() - history.insert(0, (reply_name, reply_id)) + consumer_key = bot.config.get("api_keys", {}).get("twitter_consumer_key") + consumer_secret = bot.config.get("api_keys", {}).get("twitter_consumer_secret") - def find_reply(reply_name): - for name, id in history: - if name == reply_name: - return id if id != -1 else name + oauth_token = bot.config.get("api_keys", {}).get("twitter_access_token") + oauth_secret = bot.config.get("api_keys", {}).get("twitter_access_secret") - if inp[0] == '@': - reply_inp = find_reply(inp[1:]) - if reply_inp == None: - return 'No replies to %s found.' % inp - inp = reply_inp + if not consumer_key: + return "Error: No Twitter API details." - url = 'http://api.twitter.com' - getting_nth = False - getting_id = False - searching_hashtag = False + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(oauth_token, oauth_secret) - time = 'status/created_at' - text = 'status/text' - retweeted_text = 'status/retweeted_status/text' - retweeted_screen_name = 'status/retweeted_status/user/screen_name' - reply_name = 'status/in_reply_to_screen_name' - reply_id = 'status/in_reply_to_status_id' - reply_user = 'status/in_reply_to_user_id' + api = tweepy.API(auth) - if re.match(r'^\d+$', inp): - getting_id = True - url += '/statuses/show/%s.xml' % inp - screen_name = 'user/screen_name' - time = 'created_at' - text = 'text' - reply_name = 'in_reply_to_screen_name' - reply_id = 'in_reply_to_status_id' - reply_user = 'in_reply_to_user_id' - elif re.match(r'^\w{1,15}$', inp) or re.match(r'^\w{1,15}\s+\d+$', inp): - getting_nth = True + if re.match(r'^\w{1,15}$', inp) or re.match(r'^\w{1,15}\s+\d+$', inp): if inp.find(' ') == -1: - name = inp - num = 1 + username = inp + tweet_number = 0 else: - name, num = inp.split() - if int(num) > 3200: - return 'error: only supports up to the 3200th tweet' - url += ('/1/statuses/user_timeline.xml?include_rts=true&' - 'screen_name=%s&count=1&page=%s' % (name, num)) - screen_name = 'status/user/screen_name' - elif re.match(r'^#\w+$', inp): - url = 'http://search.twitter.com/search.atom?q=%23' + inp[1:] - searching_hashtag = True + username, tweet_number = inp.split() + tweet_number = int(tweet_number) - 1 + + if tweet_number > 300: + return "This command can only find the last \x02300\x02 tweets." else: - return 'Error: Invalid request.' + username = inp + tweet_number = 0 try: - tweet = http.get_xml(url) - except http.HTTPError as e: - errors = {400: 'Bad request (ratelimited?)', - 401: 'Tweet is private', - 403: 'Tweet is private', - 404: 'Invalid user/id', - 500: 'Twitter is broken', - 502: 'Twitter is down ("getting upgraded")', - 503: 'Twitter is overloaded'} - if e.code == 404: - return 'error: invalid ' + ['username', 'tweet id'][getting_id] - if e.code in errors: - return 'Error: %s.' % errors[e.code] - return 'Unknown Error: %s' % e.code - except http.URLError as e: - return 'Error: Request timed out.' + # try to get user by username + user = api.get_user(username) + except tweepy.error.TweepError as e: + if e[0][0]['code'] == 34: + return "Could not find user." + else: + return "Error {}: {}".format(e[0][0]['code'], e[0][0]['message']) - if searching_hashtag: - ns = '{http://www.w3.org/2005/Atom}' - tweets = tweet.findall(ns + 'entry/' + ns + 'id') - if not tweets: - return 'Hashtag not found!' - id = random.choice(tweets).text - id = id[id.rfind(':') + 1:] - return twitter(id) + # get the users tweets + user_timeline = api.user_timeline(id=user.id, count=tweet_number + 1) - if getting_nth: - if tweet.find('status') is None: - return "User doesn't have that many tweets!" + # if the timeline is empty, return an error + if not user_timeline: + return "The user \x02{}\x02 has no tweets.".format(user.screen_name) - time = tweet.find(time) - if time is None: - return "User has no tweets!" + # grab the newest tweet from the users timeline + try: + tweet = user_timeline[tweet_number] + except IndexError: + tweet_count = len(user_timeline) + return "The user \x02{}\x02 only has \x02{}\x02 tweets.".format(user.screen_name, tweet_count) - reply_name = tweet.find(reply_name).text - reply_id = tweet.find(reply_id).text - reply_user = tweet.find(reply_user).text - if reply_name is not None and (reply_id is not None or - reply_user is not None): - add_reply(reply_name, reply_id or -1) + time = timesince.timesince(tweet.created_at) - time_raw = strftime('%Y-%m-%d %H:%M:%S', - strptime(time.text, - '%a %b %d %H:%M:%S +0000 %Y')) + return u"@\x02{}\x02 ({}): {} ({} ago)".format(user.screen_name, user.name, tweet.text, time) - time_nice = timesince(parseDateTime(time_raw), datetime.utcnow()) - if tweet.find(retweeted_text) is not None: - text = 'RT @%s:' % tweet.find(retweeted_screen_name).text - text += unescape_xml(tweet.find(retweeted_text).text.replace('\n', '')) - else: - text = unescape_xml(tweet.find(text).text.replace('\n', '')) - - screen_name = tweet.find(screen_name).text +@hook.command +def twuser(inp, bot=None): + "twuser -- Get info on the Twitter user " - return "\x02@%s\x02: %s (%s ago)" % (screen_name, text, time_nice) + consumer_key = bot.config.get("api_keys", {}).get("twitter_consumer_key") + consumer_secret = bot.config.get("api_keys", {}).get("twitter_consumer_secret") + + oauth_token = bot.config.get("api_keys", {}).get("twitter_access_token") + oauth_secret = bot.config.get("api_keys", {}).get("twitter_access_secret") + + if not consumer_key: + return "Error: No Twitter API details." + + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(oauth_token, oauth_secret) + + api = tweepy.API(auth) + + try: + # try to get user by username + user = api.get_user(inp) + except tweepy.error.TweepError as e: + if e[0][0]['code'] == 34: + return "Could not find user." + else: + return "Unknown error" + + return u"@\x02{}\x02 ({}) is located in \x02{}\x02 and has \x02{}\x02 tweet(s). The users description is \"{}\" " \ + "".format(user.screen_name, user.name, user.location, user.statuses_count, user.description)