# Tweepy # Copyright 2009-2010 Joshua Roesslein # See LICENSE for details. from urllib2 import Request, urlopen import base64 from tweepy import oauth from tweepy.error import TweepError from tweepy.api import API class AuthHandler(object): def apply_auth(self, url, method, headers, parameters): """Apply authentication headers to request""" raise NotImplementedError def get_username(self): """Return the username of the authenticated user""" raise NotImplementedError class BasicAuthHandler(AuthHandler): def __init__(self, username, password): self.username = username self._b64up = base64.b64encode('%s:%s' % (username, password)) def apply_auth(self, url, method, headers, parameters): headers['Authorization'] = 'Basic %s' % self._b64up def get_username(self): return self.username class OAuthHandler(AuthHandler): """OAuth authentication handler""" OAUTH_HOST = 'api.twitter.com' OAUTH_ROOT = '/oauth/' def __init__(self, consumer_key, consumer_secret, callback=None, secure=False): self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret) self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1() self.request_token = None self.access_token = None self.callback = callback self.username = None self.secure = secure def _get_oauth_url(self, endpoint, secure=False): if self.secure or secure: prefix = 'https://' else: prefix = 'http://' return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint def apply_auth(self, url, method, headers, parameters): request = oauth.OAuthRequest.from_consumer_and_token( self._consumer, http_url=url, http_method=method, token=self.access_token, parameters=parameters ) request.sign_request(self._sigmethod, self._consumer, self.access_token) headers.update(request.to_header()) def _get_request_token(self): try: url = self._get_oauth_url('request_token') request = oauth.OAuthRequest.from_consumer_and_token( self._consumer, http_url=url, callback=self.callback ) request.sign_request(self._sigmethod, self._consumer, None) resp = urlopen(Request(url, headers=request.to_header())) return oauth.OAuthToken.from_string(resp.read()) except Exception, e: raise TweepError(e) def set_request_token(self, key, secret): self.request_token = oauth.OAuthToken(key, secret) def set_access_token(self, key, secret): self.access_token = oauth.OAuthToken(key, secret) def get_authorization_url(self, signin_with_twitter=False): """Get the authorization URL to redirect the user""" try: # get the request token self.request_token = self._get_request_token() # build auth request and return as url if signin_with_twitter: url = self._get_oauth_url('authenticate') else: url = self._get_oauth_url('authorize') request = oauth.OAuthRequest.from_token_and_callback( token=self.request_token, http_url=url ) return request.to_url() except Exception, e: raise TweepError(e) def get_access_token(self, verifier=None): """ After user has authorized the request token, get access token with user supplied verifier. """ try: url = self._get_oauth_url('access_token') # build request request = oauth.OAuthRequest.from_consumer_and_token( self._consumer, token=self.request_token, http_url=url, verifier=str(verifier) ) request.sign_request(self._sigmethod, self._consumer, self.request_token) # send request resp = urlopen(Request(url, headers=request.to_header())) self.access_token = oauth.OAuthToken.from_string(resp.read()) return self.access_token except Exception, e: raise TweepError(e) def get_xauth_access_token(self, username, password): """ Get an access token from an username and password combination. In order to get this working you need to create an app at http://twitter.com/apps, after that send a mail to api@twitter.com and request activation of xAuth for it. """ try: url = self._get_oauth_url('access_token', secure=True) # must use HTTPS request = oauth.OAuthRequest.from_consumer_and_token( oauth_consumer=self._consumer, http_method='POST', http_url=url, parameters = { 'x_auth_mode': 'client_auth', 'x_auth_username': username, 'x_auth_password': password } ) request.sign_request(self._sigmethod, self._consumer, None) resp = urlopen(Request(url, data=request.to_postdata())) self.access_token = oauth.OAuthToken.from_string(resp.read()) return self.access_token except Exception, e: raise TweepError(e) def get_username(self): if self.username is None: api = API(self) user = api.verify_credentials() if user: self.username = user.screen_name else: raise TweepError("Unable to get username, invalid oauth token!") return self.username