P
This commit is contained in:
parent
fbdeacdc76
commit
48aeb9426f
26 changed files with 5550 additions and 0 deletions
6
lib/yql/AUTHORS
Normal file
6
lib/yql/AUTHORS
Normal file
|
@ -0,0 +1,6 @@
|
|||
* Stuart Colville <pypi@muffinresearch.co.uk>
|
||||
|
||||
* Cyril Doussin
|
||||
* Diogo Baeder
|
||||
* Jannis Leidel
|
||||
* *YOUR NAME HERE*
|
27
lib/yql/LICENSE
Normal file
27
lib/yql/LICENSE
Normal file
|
@ -0,0 +1,27 @@
|
|||
::
|
||||
|
||||
Copyright (c) 2009, Stuart Colville, Muffin Research Labs
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
* Neither the name of "Muffin Research Labs" nor the
|
||||
names of its contributors may be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY "Muffin Research Labs" ''AS IS'' AND ANY
|
||||
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL "Muffin Research Labs" BE LIABLE FOR ANY
|
||||
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
640
lib/yql/__init__.py
Normal file
640
lib/yql/__init__.py
Normal file
|
@ -0,0 +1,640 @@
|
|||
"""
|
||||
Python YQL
|
||||
==========
|
||||
|
||||
YQL client for Python
|
||||
|
||||
Author: Stuart Colville http://muffinresearch.co.uk/
|
||||
Docs at: http://python-yql.org/
|
||||
|
||||
TODO: More granular error handling
|
||||
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
import pprint
|
||||
from urlparse import urlparse
|
||||
from urllib import urlencode
|
||||
from httplib2 import Http
|
||||
|
||||
from yql.utils import get_http_method, clean_url, clean_query
|
||||
from yql.logger import get_logger
|
||||
import oauth2 as oauth
|
||||
|
||||
try:
|
||||
from urlparse import parse_qs, parse_qsl
|
||||
except ImportError: # pragma: no cover
|
||||
from cgi import parse_qs, parse_qsl
|
||||
|
||||
|
||||
__author__ = 'Stuart Colville'
|
||||
__version__ = '0.7.5'
|
||||
__all__ = ['Public', 'TwoLegged', 'ThreeLegged']
|
||||
|
||||
|
||||
QUERY_PLACEHOLDER = re.compile(r"[ =]@(?P<param>[a-z].*?\b)", re.IGNORECASE)
|
||||
|
||||
|
||||
REQUEST_TOKEN_URL = 'https://api.login.yahoo.com/oauth/v2/get_request_token'
|
||||
ACCESS_TOKEN_URL = 'https://api.login.yahoo.com/oauth/v2/get_token'
|
||||
AUTHORIZATION_URL = 'https://api.login.yahoo.com/oauth/v2/request_auth'
|
||||
|
||||
|
||||
PUBLIC_ENDPOINT = "query.yahooapis.com/v1/public/yql"
|
||||
PRIVATE_ENDPOINT = "query.yahooapis.com/v1/yql"
|
||||
HTTP_SCHEME = "http:"
|
||||
HTTPS_SCHEME = "https:"
|
||||
|
||||
|
||||
yql_logger = get_logger()
|
||||
|
||||
|
||||
class YQLObj(object):
|
||||
"""A YQLObject is the object created as the result of a YQL query"""
|
||||
|
||||
def __init__(self, result_dict):
|
||||
"""Init query object"""
|
||||
self._raw = result_dict and result_dict.get('query') or {}
|
||||
|
||||
@property
|
||||
def raw(self):
|
||||
"""The raw data response"""
|
||||
return self._raw
|
||||
|
||||
@property
|
||||
def uri(self):
|
||||
"""The uri used to query the YQL API"""
|
||||
return self._raw.get('uri')
|
||||
|
||||
@property
|
||||
def query_params(self):
|
||||
"""The query parameters of the uri used to call the YQL API"""
|
||||
if self.uri:
|
||||
q_string = urlparse(self.uri)[4]
|
||||
return dict(parse_qsl(q_string))
|
||||
else:
|
||||
return {}
|
||||
|
||||
@property
|
||||
def results(self):
|
||||
"""The query results dict."""
|
||||
return self._raw.get('results')
|
||||
|
||||
def one(self):
|
||||
"""Return just one result directly."""
|
||||
rows = self.rows
|
||||
if len(rows) > 1:
|
||||
raise NotOneError, "More than one result"
|
||||
else:
|
||||
return rows[0]
|
||||
|
||||
@property
|
||||
def rows(self):
|
||||
"""Get a list of rows returned by the query.
|
||||
|
||||
Results is a dict with one key but that key changes depending on the results
|
||||
This provides a way of getting at the rows list in an arbitrary way.
|
||||
|
||||
Added in version: 0.6 fixes results with 1 item so that they are still
|
||||
returned within a list.
|
||||
|
||||
"""
|
||||
result = []
|
||||
if self.results:
|
||||
vals = self.results.values()
|
||||
if len(vals) == 1:
|
||||
result = self.results.values()[0]
|
||||
|
||||
if self.count == 1 and result:
|
||||
result = [result]
|
||||
|
||||
return result
|
||||
|
||||
@property
|
||||
def query(self):
|
||||
"""The YQL query"""
|
||||
return self.query_params.get('q')
|
||||
|
||||
@property
|
||||
def lang(self):
|
||||
"""The language"""
|
||||
return self._raw.get('lang')
|
||||
|
||||
@property
|
||||
def count(self):
|
||||
"""The results count"""
|
||||
count = self._raw.get('count')
|
||||
if count:
|
||||
return int(count)
|
||||
|
||||
@property
|
||||
def diagnostics(self):
|
||||
"""The query diagnostics"""
|
||||
return self._raw.get('diagnostics')
|
||||
|
||||
def pprint_raw(self, indent=4): # pragma: no cover
|
||||
"""Pretty print the raw data"""
|
||||
pprint.pprint(self._raw, indent=indent)
|
||||
|
||||
def pformat_raw(self, indent=4): # pragma: no cover
|
||||
"""Pretty format the raw data"""
|
||||
return pprint.pformat(self._raw, indent=indent)
|
||||
|
||||
|
||||
class YQLError(Exception):
|
||||
"""Default Error"""
|
||||
|
||||
def __init__(self, resp, content, url=None, query=None):
|
||||
yql_logger.error("%s", content)
|
||||
yql_logger.error("Error Response: %s", resp)
|
||||
yql_logger.error("Error url: %s", url)
|
||||
self.response = resp
|
||||
self.content = content
|
||||
self.url = url
|
||||
self.query = query
|
||||
|
||||
def __str__(self):
|
||||
"""Return the error message.
|
||||
|
||||
Attempt to parse the json if it fails
|
||||
simply return the content attribute instead.
|
||||
|
||||
"""
|
||||
try:
|
||||
content = json.loads(self.content)
|
||||
except:
|
||||
content = {}
|
||||
|
||||
if content and content.get("error") and content["error"].get(
|
||||
"description"):
|
||||
return content['error']['description']
|
||||
else:
|
||||
if isinstance(self.content, basestring):
|
||||
return self.content
|
||||
else:
|
||||
return repr(self.content)
|
||||
|
||||
|
||||
class NotOneError(Exception):
|
||||
"""Not One Error."""
|
||||
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
"""Return the error message"""
|
||||
return self.message
|
||||
|
||||
|
||||
class Public(object):
|
||||
"""Class for making public YQL queries"""
|
||||
|
||||
def __init__(self, api_key=None, shared_secret=None, httplib2_inst=None):
|
||||
"""Init the base class.
|
||||
|
||||
Optionally you can pass in an httplib2 instance which allows you
|
||||
to set-up the instance in a different way for your own uses.
|
||||
|
||||
Also it's very helpful in a testing scenario.
|
||||
|
||||
"""
|
||||
self.api_key = api_key
|
||||
self.secret = shared_secret
|
||||
self.http = httplib2_inst or Http()
|
||||
self.scheme = HTTPS_SCHEME
|
||||
self.__endpoint = PUBLIC_ENDPOINT
|
||||
self.uri = self.get_endpoint_uri()
|
||||
|
||||
def get_endpoint_uri(self):
|
||||
"""Get endpoint"""
|
||||
return "http://%s" % self.endpoint
|
||||
|
||||
def get_endpoint(self):
|
||||
"""Gets the endpoint for requests"""
|
||||
return self.__endpoint
|
||||
|
||||
def set_endpoint(self, value):
|
||||
"""Sets the endpoint and updates the uri"""
|
||||
if value in (PRIVATE_ENDPOINT, PUBLIC_ENDPOINT):
|
||||
self.__endpoint = value
|
||||
self.uri = self.get_endpoint_uri()
|
||||
else:
|
||||
raise ValueError, "Invalid endpoint: %s" % value
|
||||
|
||||
|
||||
def get_query_params(self, query, params, **kwargs):
|
||||
"""Get the query params and validate placeholders"""
|
||||
query_params = {}
|
||||
keys_from_query = self.get_placeholder_keys(query)
|
||||
|
||||
if keys_from_query and not params or (
|
||||
params and not hasattr(params, 'get')):
|
||||
|
||||
raise ValueError, "If you are using placeholders a dictionary "\
|
||||
"of substitutions is required"
|
||||
|
||||
elif not keys_from_query and params and hasattr(params, 'get'):
|
||||
raise ValueError, "You supplied a dictionary of substitutions "\
|
||||
"but the query doesn't have any placeholders"
|
||||
|
||||
elif keys_from_query and params:
|
||||
keys_from_params = params.keys()
|
||||
|
||||
if set(keys_from_query) != set(keys_from_params):
|
||||
raise ValueError, "Parameter keys don't match the query "\
|
||||
"placeholders"
|
||||
else:
|
||||
query_params.update(params)
|
||||
|
||||
query_params['q'] = query
|
||||
query_params['format'] = 'json'
|
||||
|
||||
env = kwargs.get('env')
|
||||
if env:
|
||||
query_params['env'] = env
|
||||
|
||||
return query_params
|
||||
|
||||
@staticmethod
|
||||
def get_placeholder_keys(query):
|
||||
"""Gets the @var placeholders
|
||||
|
||||
http://developer.yahoo.com/yql/guide/var_substitution.html
|
||||
|
||||
"""
|
||||
result = []
|
||||
for match in QUERY_PLACEHOLDER.finditer(query):
|
||||
result.append(match.group('param'))
|
||||
|
||||
if result:
|
||||
yql_logger.debug("placeholder_keys: %s", result)
|
||||
|
||||
return result
|
||||
|
||||
def get_uri(self, query, params=None, **kwargs):
|
||||
"""Get the the request url"""
|
||||
params = self.get_query_params(query, params, **kwargs)
|
||||
query_string = urlencode(params)
|
||||
uri = '%s?%s' % (self.uri, query_string)
|
||||
uri = clean_url(uri)
|
||||
return uri
|
||||
|
||||
def execute(self, query, params=None, **kwargs):
|
||||
"""Execute YQL query"""
|
||||
query = clean_query(query)
|
||||
url = self.get_uri(query, params, **kwargs)
|
||||
# Just in time change to https avoids
|
||||
# invalid oauth sigs
|
||||
if self.scheme == HTTPS_SCHEME:
|
||||
url = url.replace(HTTP_SCHEME, HTTPS_SCHEME)
|
||||
yql_logger.debug("executed url: %s", url)
|
||||
http_method = get_http_method(query)
|
||||
if http_method in ["DELETE", "PUT", "POST"]:
|
||||
data = {"q": query}
|
||||
|
||||
# Encode as json and set Content-Type header
|
||||
# to reflect we are sending JSON
|
||||
# Fixes LP: 629064
|
||||
data = json.dumps(data)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
resp, content = self.http.request(
|
||||
url, http_method, headers=headers, body=data)
|
||||
yql_logger.debug("body: %s", data)
|
||||
else:
|
||||
resp, content = self.http.request(url, http_method)
|
||||
yql_logger.debug("http_method: %s", http_method)
|
||||
if resp.get('status') == '200':
|
||||
return YQLObj(json.loads(content))
|
||||
else:
|
||||
raise YQLError, (resp, content)
|
||||
|
||||
endpoint = property(get_endpoint, set_endpoint)
|
||||
|
||||
|
||||
class TwoLegged(Public):
|
||||
"""Two legged Auth is simple request which is signed prior to sending"""
|
||||
|
||||
def __init__(self, api_key, shared_secret, httplib2_inst=None):
|
||||
"""Override init to ensure required args"""
|
||||
super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst)
|
||||
self.endpoint = PRIVATE_ENDPOINT
|
||||
self.scheme = HTTPS_SCHEME
|
||||
self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1()
|
||||
self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
|
||||
|
||||
@staticmethod
|
||||
def get_base_params():
|
||||
"""Set-up the basic parameters needed for a request"""
|
||||
|
||||
params = {}
|
||||
params['oauth_version'] = "1.0"
|
||||
params['oauth_nonce'] = oauth.generate_nonce()
|
||||
params['oauth_timestamp'] = int(time.time())
|
||||
return params
|
||||
|
||||
|
||||
def __two_legged_request(self, resource_url, parameters=None, method=None):
|
||||
"""Sign a request for two-legged authentication"""
|
||||
|
||||
params = self.get_base_params()
|
||||
if parameters:
|
||||
params.update(parameters)
|
||||
|
||||
yql_logger.debug("params: %s", params)
|
||||
yql_logger.debug("resource_url: %s", resource_url)
|
||||
if not method:
|
||||
method = "GET"
|
||||
|
||||
consumer = oauth.Consumer(self.api_key, self.secret)
|
||||
request = oauth.Request(method=method, url=resource_url,
|
||||
parameters=params)
|
||||
request.sign_request(self.hmac_sha1_signature, consumer, None)
|
||||
return request
|
||||
|
||||
|
||||
def get_uri(self, query, params=None, **kwargs):
|
||||
"""Get the the request url"""
|
||||
query_params = self.get_query_params(query, params, **kwargs)
|
||||
|
||||
http_method = get_http_method(query)
|
||||
request = self.__two_legged_request(self.uri,
|
||||
parameters=query_params, method=http_method)
|
||||
uri = "%s?%s" % (self.uri, request.to_postdata())
|
||||
uri = clean_url(uri)
|
||||
return uri
|
||||
|
||||
|
||||
class ThreeLegged(TwoLegged):
|
||||
|
||||
"""
|
||||
Three-legged Auth is used when it involves private data such as a
|
||||
user's contacts.
|
||||
|
||||
Three-legged auth is most likely to be used in a web-site or
|
||||
web-accessible application. Three-legged auth requires the user
|
||||
to authenticate the request through the Yahoo login.
|
||||
|
||||
Three-legged auth requires the implementation to:
|
||||
|
||||
* Request a token
|
||||
* Get a authentication url
|
||||
* User uses the auth url to login which will redirect to a callback
|
||||
or shows a verfier string on screen
|
||||
* Verifier is read at the callback url or manually provided to get
|
||||
the access token
|
||||
* resources is access
|
||||
|
||||
For an implementation this will require calling the following methods
|
||||
in order the first time the user needs to authenticate
|
||||
|
||||
* :meth:`get_token_and_auth_url` (returns a token and the auth url)
|
||||
* get verifier through callback or from screen
|
||||
* :meth:`get_access_token` (returns the access token)
|
||||
* :meth:`execute` - makes the request to the protected resource.
|
||||
|
||||
Once the access token has been provided subsequent requests can re-use it.
|
||||
|
||||
Access tokens expire after 1 hour, however they can be refreshed with
|
||||
the :meth:`refresh_token` method
|
||||
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, api_key, shared_secret, httplib2_inst=None):
|
||||
"""Override init to add consumer"""
|
||||
super(ThreeLegged, self).__init__(
|
||||
api_key, shared_secret, httplib2_inst)
|
||||
|
||||
self.scheme = HTTP_SCHEME
|
||||
self.endpoint = PRIVATE_ENDPOINT
|
||||
self.consumer = oauth.Consumer(self.api_key, self.secret)
|
||||
|
||||
def get_token_and_auth_url(self, callback_url=None):
|
||||
"""First step is to get the token and then send the request that
|
||||
provides the auth URL
|
||||
|
||||
Returns a tuple of token and the authorisation URL.
|
||||
|
||||
"""
|
||||
|
||||
client = oauth.Client(self.consumer)
|
||||
|
||||
params = {}
|
||||
params['oauth_callback'] = callback_url or 'oob'
|
||||
|
||||
request = oauth.Request(parameters=params)
|
||||
url = REQUEST_TOKEN_URL
|
||||
resp, content = client.request(url, "POST", request.to_postdata())
|
||||
|
||||
if resp.get('status') == '200':
|
||||
token = oauth.Token.from_string(content)
|
||||
yql_logger.debug("token: %s", token)
|
||||
data = dict(parse_qsl(content))
|
||||
yql_logger.debug("data: %s", data)
|
||||
return token, data['xoauth_request_auth_url']
|
||||
else:
|
||||
raise YQLError, (resp, content, url)
|
||||
|
||||
|
||||
def get_access_token(self, token, verifier):
|
||||
|
||||
"""Get the access token
|
||||
|
||||
The verifier (required) should have been provided to the
|
||||
user following login to at the url returned
|
||||
by the :meth:`get_token_and_auth_url` method.
|
||||
|
||||
If not you will need need to extract the auth_verifier
|
||||
parameter from your callback url on the site where you
|
||||
are implementing 3-legged auth in order to pass it to this
|
||||
function.
|
||||
|
||||
The access token can be stored and re-used for subsequent
|
||||
calls.
|
||||
|
||||
The stored token will also need to be refreshed periodically
|
||||
with :meth:`refresh_token`
|
||||
|
||||
"""
|
||||
|
||||
params = {}
|
||||
params['oauth_verifier'] = verifier
|
||||
|
||||
oauth_request = oauth.Request.from_consumer_and_token(
|
||||
self.consumer, token=token,
|
||||
http_url=ACCESS_TOKEN_URL,
|
||||
http_method="POST",
|
||||
parameters=params)
|
||||
|
||||
yql_logger.debug("oauth_request: %s", oauth_request)
|
||||
oauth_request.sign_request(
|
||||
self.hmac_sha1_signature, self.consumer, token)
|
||||
|
||||
url = oauth_request.to_url()
|
||||
|
||||
yql_logger.debug("oauth_url: %s", url)
|
||||
postdata = oauth_request.to_postdata()
|
||||
yql_logger.debug("oauth_postdata: %s", postdata)
|
||||
resp, content = self.http.request(url, "POST", postdata)
|
||||
|
||||
if resp.get('status') == '200':
|
||||
access_token = YahooToken.from_string(content)
|
||||
access_token.timestamp = oauth_request['oauth_timestamp']
|
||||
return access_token
|
||||
else:
|
||||
raise YQLError, (resp, content, url)
|
||||
|
||||
|
||||
def check_token(self, token):
|
||||
"""Check to see if a token has expired"""
|
||||
|
||||
if not hasattr(token, 'timestamp'):
|
||||
raise AttributeError, 'token doesn\'t have a timestamp attrbute'
|
||||
|
||||
if (int(token.timestamp) + 3600) < time.time():
|
||||
token = self.refresh_token(token)
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def refresh_token(self, token):
|
||||
"""Access Tokens only last for one hour from the point of being issued.
|
||||
|
||||
When a token has expired it needs to be refreshed this method takes an
|
||||
expired token and refreshes it.
|
||||
|
||||
token parameter can be either a token object or a token string.
|
||||
|
||||
"""
|
||||
if not hasattr(token, "key"):
|
||||
token = YahooToken.from_string(token)
|
||||
|
||||
params = self.get_base_params()
|
||||
params['oauth_token'] = token.key
|
||||
params['oauth_token_secret'] = token.secret
|
||||
params['oauth_session_handle'] = token.session_handle
|
||||
|
||||
oauth_request = oauth.Request.from_consumer_and_token(
|
||||
self.consumer, token=token,
|
||||
http_url=ACCESS_TOKEN_URL,
|
||||
http_method="POST",
|
||||
parameters=params)
|
||||
|
||||
yql_logger.debug("oauth_request: %s", oauth_request)
|
||||
oauth_request.sign_request(
|
||||
self.hmac_sha1_signature, self.consumer, token)
|
||||
|
||||
url = oauth_request.to_url()
|
||||
yql_logger.debug("oauth_url: %s", url)
|
||||
postdata = oauth_request.to_postdata()
|
||||
yql_logger.debug("oauth_postdata: %s", postdata)
|
||||
resp, content = self.http.request(url, "POST", postdata)
|
||||
|
||||
if resp.get('status') == '200':
|
||||
access_token = YahooToken.from_string(content)
|
||||
yql_logger.debug("oauth_access_token: %s", access_token)
|
||||
access_token.timestamp = oauth_request['oauth_timestamp']
|
||||
return access_token
|
||||
else:
|
||||
raise YQLError, (resp, content, url)
|
||||
|
||||
def get_uri(self, query, params=None, **kwargs):
|
||||
"""Get the the request url"""
|
||||
query_params = self.get_query_params(query, params, **kwargs)
|
||||
|
||||
token = kwargs.get("token")
|
||||
|
||||
if hasattr(token, "yahoo_guid"):
|
||||
query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid")
|
||||
|
||||
if not token:
|
||||
raise ValueError, "Without a token three-legged-auth cannot be"\
|
||||
" carried out"
|
||||
|
||||
yql_logger.debug("query_params: %s", query_params)
|
||||
http_method = get_http_method(query)
|
||||
oauth_request = oauth.Request.from_consumer_and_token(
|
||||
self.consumer, http_url=self.uri,
|
||||
token=token, parameters=query_params,
|
||||
http_method=http_method)
|
||||
yql_logger.debug("oauth_request: %s", oauth_request)
|
||||
# Sign request
|
||||
oauth_request.sign_request(
|
||||
self.hmac_sha1_signature, self.consumer, token)
|
||||
|
||||
yql_logger.debug("oauth_signed_request: %s", oauth_request)
|
||||
uri = "%s?%s" % (self.uri, oauth_request.to_postdata())
|
||||
return uri.replace('+', '%20').replace('%7E', '~')
|
||||
|
||||
|
||||
class YahooToken(oauth.Token):
|
||||
"""A subclass of oauth.Token with the addition of a place to
|
||||
stash the session_handler which is required for token refreshing
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_string(data_string):
|
||||
"""Deserializes a token from a string like one returned by
|
||||
|
||||
`to_string()`."""
|
||||
|
||||
if not len(data_string):
|
||||
raise ValueError("Invalid parameter string.")
|
||||
|
||||
params = parse_qs(data_string, keep_blank_values=False)
|
||||
if not len(params):
|
||||
raise ValueError("Invalid parameter string.")
|
||||
|
||||
try:
|
||||
key = params['oauth_token'][0]
|
||||
except Exception:
|
||||
raise ValueError("'oauth_token' not found in OAuth request.")
|
||||
|
||||
try:
|
||||
secret = params['oauth_token_secret'][0]
|
||||
except Exception:
|
||||
raise ValueError("'oauth_token_secret' not found in "
|
||||
"OAuth request.")
|
||||
|
||||
token = YahooToken(key, secret)
|
||||
|
||||
session_handle = params.get('oauth_session_handle')
|
||||
if session_handle:
|
||||
setattr(token, 'session_handle', session_handle[0])
|
||||
|
||||
timestamp = params.get('token_creation_timestamp')
|
||||
if timestamp:
|
||||
setattr(token, 'timestamp', timestamp[0])
|
||||
|
||||
try:
|
||||
token.callback_confirmed = params['oauth_callback_confirmed'][0]
|
||||
except KeyError:
|
||||
pass # 1.0, no callback confirmed.
|
||||
|
||||
return token
|
||||
|
||||
|
||||
def to_string(self):
|
||||
"""Returns this token as a plain string, suitable for storage.
|
||||
The resulting string includes the token's secret, so you should never
|
||||
send or store this string where a third party can read it.
|
||||
|
||||
"""
|
||||
|
||||
data = {
|
||||
'oauth_token': self.key,
|
||||
'oauth_token_secret': self.secret,
|
||||
}
|
||||
|
||||
if hasattr(self, 'session_handle'):
|
||||
data['oauth_session_handle'] = self.session_handle
|
||||
|
||||
if hasattr(self, 'timestamp'):
|
||||
data['token_creation_timestamp'] = self.timestamp
|
||||
|
||||
if self.callback_confirmed is not None:
|
||||
data['oauth_callback_confirmed'] = self.callback_confirmed
|
||||
|
||||
return urlencode(data)
|
44
lib/yql/logger.py
Normal file
44
lib/yql/logger.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
"""Logging for Python YQL."""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import logging.handlers
|
||||
|
||||
|
||||
LOG_DIRECTORY_DEFAULT = os.path.join(os.path.dirname(__file__), "../logs")
|
||||
LOG_DIRECTORY = os.environ.get("YQL_LOG_DIR", LOG_DIRECTORY_DEFAULT)
|
||||
LOG_LEVELS = {'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL}
|
||||
|
||||
LOG_LEVEL = os.environ.get("YQL_LOGGING_LEVEL", 'debug')
|
||||
LOG_FILENAME = os.path.join(LOG_DIRECTORY, "python-yql.log")
|
||||
MAX_BYTES = 1024 * 1024
|
||||
|
||||
log_level = LOG_LEVELS.get(LOG_LEVEL)
|
||||
yql_logger = logging.getLogger("python-yql")
|
||||
yql_logger.setLevel(LOG_LEVELS.get(LOG_LEVEL))
|
||||
|
||||
|
||||
class NullHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
pass
|
||||
|
||||
|
||||
def get_logger():
|
||||
"""Set-upt the logger if enabled or fallback to NullHandler."""
|
||||
if os.environ.get("YQL_LOGGING", False):
|
||||
if not os.path.exists(LOG_DIRECTORY):
|
||||
os.mkdir(LOG_DIRECTORY)
|
||||
log_handler = logging.handlers.RotatingFileHandler(
|
||||
LOG_FILENAME, maxBytes=MAX_BYTES,
|
||||
backupCount=5)
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||
log_handler.setFormatter(formatter)
|
||||
else:
|
||||
log_handler = NullHandler()
|
||||
yql_logger.addHandler(log_handler)
|
||||
return yql_logger
|
72
lib/yql/storage.py
Normal file
72
lib/yql/storage.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
import os
|
||||
from hashlib import md5
|
||||
|
||||
from yql import YahooToken
|
||||
|
||||
SECRET = "FDHSJLUREIRPpieruweruwoeirhfsdjf"
|
||||
|
||||
|
||||
class TokenStoreError(Exception):
|
||||
"""Generic token storage"""
|
||||
pass
|
||||
|
||||
|
||||
class BaseTokenStore(object):
|
||||
"""Base class for storage"""
|
||||
|
||||
def set(self, name, token):
|
||||
raise NotImplementedError
|
||||
|
||||
def get(self, name):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class FileTokenStore(BaseTokenStore):
|
||||
"""A simple filesystem based token store
|
||||
|
||||
Note: this is more intended as an example rather than
|
||||
something for heavy duty production usage.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, dir_path, secret=None):
|
||||
"""Initialize token storage"""
|
||||
|
||||
if not os.path.exists(dir_path):
|
||||
raise TokenStoreError("Path is not valid")
|
||||
|
||||
self.base_dir = dir_path
|
||||
self.secret = secret or SECRET
|
||||
|
||||
def get_filepath(self, name):
|
||||
"""Build filepath"""
|
||||
|
||||
filename = md5("%s%s" % (name, self.secret)).hexdigest()
|
||||
filepath = os.path.join(self.base_dir, filename)
|
||||
|
||||
return filepath
|
||||
|
||||
def set(self, name, token):
|
||||
"""Write a token to file"""
|
||||
|
||||
if hasattr(token, 'key'):
|
||||
token = YahooToken.to_string(token)
|
||||
|
||||
if token:
|
||||
filepath = self.get_filepath(name)
|
||||
f_handle = open(filepath, 'w')
|
||||
f_handle.write(token)
|
||||
f_handle.close()
|
||||
|
||||
def get(self, name):
|
||||
"""Get a token from the filesystem"""
|
||||
|
||||
filepath = self.get_filepath(name)
|
||||
|
||||
if os.path.exists(filepath):
|
||||
f_handle = open(filepath, 'r')
|
||||
token = f_handle.read()
|
||||
f_handle.close()
|
||||
|
||||
token = YahooToken.from_string(token)
|
||||
return token
|
0
lib/yql/tests/__init__.py
Normal file
0
lib/yql/tests/__init__.py
Normal file
29
lib/yql/tests/test_errors.py
Normal file
29
lib/yql/tests/test_errors.py
Normal file
|
@ -0,0 +1,29 @@
|
|||
import json
|
||||
from unittest import TestCase
|
||||
|
||||
from yql import NotOneError, YQLError
|
||||
|
||||
|
||||
class YQLErrorTest(TestCase):
|
||||
def test_error_passed_error_string(self):
|
||||
error = YQLError(resp='some response', content='some content')
|
||||
self.assertEqual("some content", str(error))
|
||||
|
||||
def test_error_passed_object(self):
|
||||
error = YQLError(resp='some response', content={"foo": 1})
|
||||
self.assertEqual(repr({"foo": 1}), str(error))
|
||||
|
||||
def test_error_passed_json(self):
|
||||
content = {
|
||||
'error': {
|
||||
'description': 'some description',
|
||||
}
|
||||
}
|
||||
error = YQLError(resp='some response', content=json.dumps(content))
|
||||
self.assertEqual("some description", str(error))
|
||||
|
||||
|
||||
class NotOneErrorTest(TestCase):
|
||||
def test_is_represented_by_message_as_json(self):
|
||||
error = NotOneError('some message')
|
||||
self.assertEqual("some message", str(error))
|
155
lib/yql/tests/test_live_services.py
Normal file
155
lib/yql/tests/test_live_services.py
Normal file
|
@ -0,0 +1,155 @@
|
|||
"""Tests against live services.
|
||||
|
||||
*** SKIPPED BY DEFAULT ***
|
||||
|
||||
These tests won't normally be run, as part of the main test suite but are run by
|
||||
our hudson instance to tell us should Yahoo's API change in some way that will
|
||||
break python-yql.
|
||||
|
||||
Note to end-users: These tests are dependent on defining a secrets file with API
|
||||
keys and other secrets which are required to carry out these tests.
|
||||
|
||||
If the secrets file isn't present the tests are skipped
|
||||
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
from time import time
|
||||
from unittest import TestCase
|
||||
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
import yql
|
||||
from yql.storage import FileTokenStore
|
||||
|
||||
|
||||
SECRETS_DIR = os.path.join(os.path.dirname(__file__), "../../../secrets")
|
||||
CACHE_DIR = os.path.abspath(os.path.join(SECRETS_DIR, "cache"))
|
||||
|
||||
try:
|
||||
if SECRETS_DIR not in sys.path:
|
||||
sys.path.append(SECRETS_DIR)
|
||||
|
||||
from secrets import *
|
||||
except ImportError:
|
||||
raise SkipTest("Unable to find secrets directory")
|
||||
|
||||
|
||||
class LiveTestCase(TestCase):
|
||||
"""A test case containing live tests"""
|
||||
|
||||
def test_write_bitly_url(self):
|
||||
"""Test writing bit.ly url"""
|
||||
|
||||
query = """USE 'http://www.datatables.org/bitly/bit.ly.shorten.xml';
|
||||
SELECT * from bit.ly.shorten where login='%s' and apiKey='%s' and
|
||||
longUrl='http://yahoo.com'""" % (BITLY_USER, BITLY_API_KEY)
|
||||
|
||||
y = yql.TwoLegged(YQL_API_KEY, YQL_SHARED_SECRET)
|
||||
res = y.execute(query)
|
||||
assert res.one()["data"]["url"] == "http://yhoo.it/9PPTOr"
|
||||
|
||||
def test_public_request(self):
|
||||
"""Test public two-legged request to flickr"""
|
||||
query = """select * from flickr.photos.search where
|
||||
text="panda" and api_key='%s' LIMIT 3""" % FLICKR_API_KEY
|
||||
y = yql.TwoLegged(YQL_API_KEY, YQL_SHARED_SECRET)
|
||||
res = y.execute(query)
|
||||
assert len(res.rows) == 3
|
||||
|
||||
def test_two_legged_weather_select(self):
|
||||
"""Tests the weather tables using two-legged"""
|
||||
query = """select * from weather.forecast where location in
|
||||
(select id from xml where
|
||||
url='http://xoap.weather.com/search/search?where=london'
|
||||
and itemPath='search.loc')"""
|
||||
y = yql.TwoLegged(YQL_API_KEY, YQL_SHARED_SECRET)
|
||||
res = y.execute(query)
|
||||
assert len(res.rows) > 1
|
||||
|
||||
def test_update_social_status(self):
|
||||
"""Updates status"""
|
||||
y = yql.ThreeLegged(YQL_API_KEY, YQL_SHARED_SECRET)
|
||||
|
||||
timestamp = time()
|
||||
query = """UPDATE social.profile.status
|
||||
SET status='Using YQL. %s Update'
|
||||
WHERE guid=me""" % timestamp
|
||||
|
||||
token_store = FileTokenStore(CACHE_DIR, secret='gsfdsfdsfdsfs')
|
||||
stored_token = token_store.get('foo')
|
||||
|
||||
if not stored_token:
|
||||
# Do the dance
|
||||
request_token, auth_url = y.get_token_and_auth_url()
|
||||
print "Visit url %s and get a verifier string" % auth_url
|
||||
verifier = raw_input("Enter the code: ")
|
||||
token = y.get_access_token(request_token, verifier)
|
||||
token_store.set('foo', token)
|
||||
else:
|
||||
# Check access_token is within 1hour-old and if not refresh it
|
||||
# and stash it
|
||||
token = y.check_token(stored_token)
|
||||
if token != stored_token:
|
||||
token_store.set('foo', token)
|
||||
|
||||
res = y.execute(query, token=token)
|
||||
assert res.rows[0] == "ok"
|
||||
new_query = """select message from social.profile.status where guid=me"""
|
||||
res = y.execute(new_query, token=token)
|
||||
assert res.rows[0].get("message") == "Using YQL. %s Update" % timestamp
|
||||
|
||||
def test_update_meme_status(self):
|
||||
"""Updates status"""
|
||||
y = yql.ThreeLegged(YQL_API_KEY, YQL_SHARED_SECRET)
|
||||
query = 'INSERT INTO meme.user.posts (type, content) VALUES("text", "test with pythonyql")'
|
||||
token_store = FileTokenStore(CACHE_DIR, secret='fjdsfjllds')
|
||||
|
||||
store_name = "meme"
|
||||
stored_token = token_store.get(store_name)
|
||||
if not stored_token:
|
||||
# Do the dance
|
||||
request_token, auth_url = y.get_token_and_auth_url()
|
||||
print "Visit url %s and get a verifier string" % auth_url
|
||||
verifier = raw_input("Enter the code: ")
|
||||
token = y.get_access_token(request_token, verifier)
|
||||
token_store.set(store_name, token)
|
||||
else:
|
||||
# Check access_token is within 1hour-old and if not refresh it
|
||||
# and stash it
|
||||
token = y.check_token(stored_token)
|
||||
if token != stored_token:
|
||||
token_store.set(store_name, token)
|
||||
|
||||
# post a meme
|
||||
res = y.execute(query, token=token)
|
||||
assert y.uri == "http://query.yahooapis.com/v1/yql"
|
||||
assert res.rows[0].get("message") == "ok"
|
||||
|
||||
pubid = None
|
||||
if res.rows[0].get("post") and res.rows[0]["post"].get("pubid"):
|
||||
pubid = res.rows[0]["post"]["pubid"]
|
||||
|
||||
# Delete the post we've just created
|
||||
query = 'DELETE FROM meme.user.posts WHERE pubid=@pubid'
|
||||
res2 = y.execute(query, token=token, params={"pubid": pubid})
|
||||
assert res2.rows[0].get("message") == "ok"
|
||||
|
||||
def test_check_env_var(self):
|
||||
"""Testing env variable"""
|
||||
y = yql.Public()
|
||||
env = "http://datatables.org/alltables.env"
|
||||
query = "SHOW tables;"
|
||||
res = y.execute(query, env=env)
|
||||
assert res.count >= 800
|
||||
|
||||
def test_xpath_works(self):
|
||||
y = yql.Public()
|
||||
query = """SELECT * FROM html
|
||||
WHERE url='http://google.co.uk'
|
||||
AND xpath="//input[contains(@name, 'q')]"
|
||||
LIMIT 10"""
|
||||
res = y.execute(query)
|
||||
assert res.rows[0].get("title") == "Search"
|
||||
|
||||
|
23
lib/yql/tests/test_logger.py
Normal file
23
lib/yql/tests/test_logger.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
import os
|
||||
import shutil
|
||||
from unittest import TestCase
|
||||
|
||||
import yql.logger
|
||||
|
||||
|
||||
class LoggerTest(TestCase):
|
||||
def setUp(self):
|
||||
self._logging = os.environ.get('YQL_LOGGING', '')
|
||||
|
||||
def tearDown(self):
|
||||
os.environ['YQL_LOGGING'] = self._logging
|
||||
|
||||
def test_is_instantiated_even_if_log_dir_doesnt_exist(self):
|
||||
os.environ['YQL_LOGGING'] = '1'
|
||||
if os.path.exists(yql.logger.LOG_DIRECTORY):
|
||||
shutil.rmtree(yql.logger.LOG_DIRECTORY)
|
||||
yql.logger.get_logger()
|
||||
|
||||
def test_logs_message_to_file(self):
|
||||
os.environ['YQL_LOGGING'] = '1'
|
||||
yql.logger.get_logger()
|
77
lib/yql/tests/test_query_placeholders.py
Normal file
77
lib/yql/tests/test_query_placeholders.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
"""Set of tests for the placeholder checking"""
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from nose.tools import raises
|
||||
|
||||
import yql
|
||||
|
||||
|
||||
class PublicTest(TestCase):
|
||||
@raises(ValueError)
|
||||
def test_empty_args_raises_valueerror(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where dog=@dog"
|
||||
params = {}
|
||||
y.execute(query, params)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_incorrect_args_raises_valueerror(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where dog=@dog"
|
||||
params = {'test': 'fail'}
|
||||
y.execute(query, params)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_params_raises_when_not_dict(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where dog=@dog"
|
||||
params = ['test']
|
||||
y.execute(query, params)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_unecessary_args_raises_valueerror(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where dog='test'"
|
||||
params = {'test': 'fail'}
|
||||
y.execute(query, params)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_incorrect_type_raises_valueerror(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where dog=@test"
|
||||
params = ('fail')
|
||||
y.execute(query, params)
|
||||
|
||||
def test_placeholder_regex_one(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where email='foo@foo.com'"
|
||||
placeholders = y.get_placeholder_keys(query)
|
||||
self.assertEqual(placeholders, [])
|
||||
|
||||
def test_placeholder_regex_two(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where email=@foo'"
|
||||
placeholders = y.get_placeholder_keys(query)
|
||||
self.assertEqual(placeholders, ['foo'])
|
||||
|
||||
def test_placeholder_regex_three(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where email=@foo and test=@bar'"
|
||||
placeholders = y.get_placeholder_keys(query)
|
||||
self.assertEqual(placeholders, ['foo', 'bar'])
|
||||
|
||||
def test_placeholder_regex_four(self):
|
||||
y = yql.Public()
|
||||
query = "SELECT * from foo where foo='bar' LIMIT @foo"
|
||||
placeholders = y.get_placeholder_keys(query)
|
||||
self.assertEqual(placeholders, ['foo'])
|
||||
|
||||
def test_placeholder_regex_five(self):
|
||||
y = yql.Public()
|
||||
query = """SELECT * from foo
|
||||
where foo='bar' LIMIT
|
||||
@foo"""
|
||||
placeholders = y.get_placeholder_keys(query)
|
||||
self.assertEqual(placeholders, ['foo'])
|
||||
|
248
lib/yql/tests/test_requests_responses.py
Normal file
248
lib/yql/tests/test_requests_responses.py
Normal file
|
@ -0,0 +1,248 @@
|
|||
from email import message_from_file
|
||||
import os
|
||||
from unittest import TestCase
|
||||
import urlparse
|
||||
from urllib import urlencode
|
||||
try:
|
||||
from urlparse import parse_qsl
|
||||
except ImportError:
|
||||
from cgi import parse_qsl
|
||||
|
||||
from nose.tools import raises
|
||||
from nose import with_setup
|
||||
import oauth2 as oauth
|
||||
import httplib2
|
||||
|
||||
import yql
|
||||
|
||||
|
||||
HTTP_SRC_DIR = os.path.join(os.path.dirname(__file__), "http_src/")
|
||||
|
||||
|
||||
class FileDataHttpReplacement(object):
|
||||
"""Build a stand-in for httplib2.Http that takes its
|
||||
response headers and bodies from files on disk
|
||||
|
||||
http://bitworking.org/news/172/Test-stubbing-httplib2
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, cache=None, timeout=None):
|
||||
self.hit_counter = {}
|
||||
|
||||
def request(self, uri, method="GET", body=None, headers=None, redirections=5):
|
||||
path = urlparse.urlparse(uri)[2]
|
||||
fname = os.path.join(HTTP_SRC_DIR, path[1:])
|
||||
|
||||
if not os.path.exists(fname):
|
||||
index = self.hit_counter.get(fname, 1)
|
||||
|
||||
if os.path.exists(fname + "." + str(index)):
|
||||
self.hit_counter[fname] = index + 1
|
||||
fname = fname + "." + str(index)
|
||||
|
||||
if os.path.exists(fname):
|
||||
f = file(fname, "r")
|
||||
response = message_from_file(f)
|
||||
f.close()
|
||||
body = response.get_payload()
|
||||
response_headers = httplib2.Response(response)
|
||||
return (response_headers, body)
|
||||
else:
|
||||
return (httplib2.Response({"status": "404"}), "")
|
||||
|
||||
def add_credentials(self, name, password):
|
||||
pass
|
||||
|
||||
|
||||
class RequestDataHttpReplacement:
|
||||
"""Create an httplib stub that returns request data"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def request(self, uri, *args, **kwargs):
|
||||
"""return the request data"""
|
||||
return uri, args, kwargs
|
||||
|
||||
|
||||
class TestPublic(yql.Public):
|
||||
"""Subclass of YQL to allow returning of the request data"""
|
||||
|
||||
execute = yql.Public.get_uri
|
||||
|
||||
|
||||
class TestTwoLegged(yql.TwoLegged):
|
||||
"""Subclass of YQLTwoLegged to allow returning of the request data"""
|
||||
|
||||
execute = yql.TwoLegged.get_uri
|
||||
|
||||
|
||||
class TestThreeLegged(yql.ThreeLegged):
|
||||
"""Subclass of YQLTwoLegged to allow returning of the request data"""
|
||||
|
||||
execute = yql.ThreeLegged.get_uri
|
||||
|
||||
|
||||
class StubbedHttpTestCase(TestCase):
|
||||
stub = None
|
||||
|
||||
def setUp(self):
|
||||
self._http = httplib2.Http
|
||||
httplib2.Http = self.stub
|
||||
|
||||
def tearDown(self):
|
||||
httplib2.Http = self._http
|
||||
|
||||
|
||||
class PublicStubbedRequestTest(StubbedHttpTestCase):
|
||||
stub = RequestDataHttpReplacement
|
||||
|
||||
def test_urlencoding_for_public_yql(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = TestPublic(httplib2_inst=httplib2.Http())
|
||||
uri = y.execute(query)
|
||||
self.assertEqual(uri, "http://query.yahooapis.com/v1/public/yql?q=SELECT+%2A+from+foo&format=json")
|
||||
|
||||
def test_env_for_public_yql(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = TestPublic(httplib2_inst=httplib2.Http())
|
||||
uri = y.execute(query, env="http://foo.com")
|
||||
self.assertTrue(uri.find(urlencode({"env":"http://foo.com"})) > -1)
|
||||
|
||||
def test_name_param_inserted_for_public_yql(self):
|
||||
query = 'SELECT * from foo WHERE dog=@dog'
|
||||
y = TestPublic(httplib2_inst=httplib2.Http())
|
||||
uri = y.execute(query, {"dog": "fifi"})
|
||||
self.assertTrue(uri.find('dog=fifi') >-1)
|
||||
|
||||
|
||||
class PublicStubbedFromFileTest(StubbedHttpTestCase):
|
||||
stub = FileDataHttpReplacement
|
||||
|
||||
def test_json_response_from_file(self):
|
||||
query = 'SELECT * from foo WHERE dog=@dog'
|
||||
y = yql.Public(httplib2_inst=httplib2.Http())
|
||||
content = y.execute(query, {"dog": "fifi"})
|
||||
self.assertEqual(content.count, 3)
|
||||
|
||||
|
||||
class TwoLeggedTest(TestCase):
|
||||
@raises(TypeError)
|
||||
def test_yql_with_2leg_auth_raises_typerror(self):
|
||||
TestTwoLegged()
|
||||
|
||||
def test_api_key_and_secret_attrs(self):
|
||||
y = yql.TwoLegged('test-api-key', 'test-secret')
|
||||
self.assertEqual(y.api_key, 'test-api-key')
|
||||
self.assertEqual(y.secret, 'test-secret')
|
||||
|
||||
def test_get_two_legged_request_keys(self):
|
||||
y = yql.TwoLegged('test-api-key', 'test-secret')
|
||||
# Accessed this was because it's private
|
||||
request = y._TwoLegged__two_legged_request('http://google.com')
|
||||
self.assertEqual(set(['oauth_nonce', 'oauth_version', 'oauth_timestamp',
|
||||
'oauth_consumer_key', 'oauth_signature_method', 'oauth_body_hash',
|
||||
'oauth_version', 'oauth_signature']), set(request.keys()))
|
||||
|
||||
def test_get_two_legged_request_values(self):
|
||||
y = yql.TwoLegged('test-api-key', 'test-secret')
|
||||
# Accessed this was because it's private
|
||||
request = y._TwoLegged__two_legged_request('http://google.com')
|
||||
self.assertEqual(request['oauth_consumer_key'], 'test-api-key')
|
||||
self.assertEqual(request['oauth_signature_method'], 'HMAC-SHA1')
|
||||
self.assertEqual(request['oauth_version'], '1.0')
|
||||
|
||||
def test_get_two_legged_request_param(self):
|
||||
y = yql.TwoLegged('test-api-key', 'test-secret')
|
||||
# Accessed this way because it's private
|
||||
request = y._TwoLegged__two_legged_request('http://google.com',
|
||||
{"test-param": "test"})
|
||||
self.assertEqual(request.get('test-param'), 'test')
|
||||
|
||||
|
||||
class TwoLeggedStubbedRequestTest(StubbedHttpTestCase):
|
||||
stub = RequestDataHttpReplacement
|
||||
|
||||
def test_request_for_two_legged(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
|
||||
signed_url = y.execute(query)
|
||||
qs = dict(parse_qsl(signed_url.split('?')[1]))
|
||||
self.assertEqual(qs['q'], query)
|
||||
self.assertEqual(qs['format'], 'json')
|
||||
|
||||
|
||||
class TwoLeggedStubbedFromFileTest(StubbedHttpTestCase):
|
||||
stub = FileDataHttpReplacement
|
||||
|
||||
def test_get_two_legged_from_file(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = yql.TwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
|
||||
# Accessed this was because it's private
|
||||
self.assertTrue(y.execute(query) is not None)
|
||||
|
||||
|
||||
class ThreeLeggedTest(TestCase):
|
||||
@raises(TypeError)
|
||||
def test_yql_with_3leg_auth_raises_typerror(self):
|
||||
TestThreeLegged()
|
||||
|
||||
def test_api_key_and_secret_attrs2(self):
|
||||
y = yql.ThreeLegged('test-api-key', 'test-secret')
|
||||
self.assertEqual(y.api_key, 'test-api-key')
|
||||
self.assertEqual(y.secret, 'test-secret')
|
||||
|
||||
def test_get_base_params(self):
|
||||
y = yql.ThreeLegged('test-api-key', 'test-secret')
|
||||
result = y.get_base_params()
|
||||
self.assertEqual(set(['oauth_nonce', 'oauth_version', 'oauth_timestamp']),
|
||||
set(result.keys()))
|
||||
|
||||
@raises(ValueError)
|
||||
def test_raises_for_three_legged_with_no_token(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = TestThreeLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http())
|
||||
y.execute(query)
|
||||
|
||||
|
||||
class ThreeLeggedStubbedRequestTest(StubbedHttpTestCase):
|
||||
stub = RequestDataHttpReplacement
|
||||
|
||||
def test_request_for_three_legged(self):
|
||||
query = 'SELECT * from foo'
|
||||
y = TestThreeLegged('test-api-key', 'test-secret',
|
||||
httplib2_inst=httplib2.Http())
|
||||
token = oauth.Token.from_string(
|
||||
'oauth_token=foo&oauth_token_secret=bar')
|
||||
signed_url = y.execute(query, token=token)
|
||||
qs = dict(parse_qsl(signed_url.split('?')[1]))
|
||||
self.assertEqual(qs['q'], query)
|
||||
self.assertEqual(qs['format'], 'json')
|
||||
|
||||
|
||||
class ThreeLeggedStubbedFromFileTest(StubbedHttpTestCase):
|
||||
stub = FileDataHttpReplacement
|
||||
|
||||
def test_three_legged_execution(self):
|
||||
query = 'SELECT * from foo WHERE dog=@dog'
|
||||
y = yql.ThreeLegged('test','test2', httplib2_inst=httplib2.Http())
|
||||
token = yql.YahooToken('test', 'test2')
|
||||
content = y.execute(query, {"dog": "fifi"}, token=token)
|
||||
self.assertEqual(content.count, 3)
|
||||
|
||||
@raises(ValueError)
|
||||
def test_three_legged_execution_raises_value_error_with_invalid_uri(self):
|
||||
y = yql.ThreeLegged('test','test2', httplib2_inst=httplib2.Http())
|
||||
y.uri = "fail"
|
||||
token = yql.YahooToken('tes1t', 'test2')
|
||||
y.execute("SELECT foo meh meh ", token=token)
|
||||
|
||||
def test_get_access_token_request3(self):
|
||||
y = yql.ThreeLegged('test', 'test-does-not-exist',
|
||||
httplib2_inst=httplib2.Http())
|
||||
new_token = yql.YahooToken('test', 'test2')
|
||||
new_token.session_handle = 'sess_handle_test'
|
||||
token = y.refresh_token(token=new_token)
|
||||
self.assertTrue(hasattr(token, 'key'))
|
||||
self.assertTrue(hasattr(token, 'secret'))
|
12
lib/yql/tests/test_services.py
Normal file
12
lib/yql/tests/test_services.py
Normal file
|
@ -0,0 +1,12 @@
|
|||
from unittest import TestCase
|
||||
|
||||
from nose.tools import raises
|
||||
|
||||
import yql
|
||||
|
||||
|
||||
class PublicTest(TestCase):
|
||||
@raises(ValueError)
|
||||
def test_cannot_use_unrecognizable_endpoint(self):
|
||||
y = yql.Public()
|
||||
y.endpoint = 'some-strange-endpoint'
|
61
lib/yql/tests/test_storage.py
Normal file
61
lib/yql/tests/test_storage.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
import os
|
||||
import tempfile
|
||||
from unittest import TestCase
|
||||
|
||||
from nose.tools import raises
|
||||
|
||||
from yql import YahooToken
|
||||
from yql.storage import BaseTokenStore, FileTokenStore, TokenStoreError
|
||||
|
||||
|
||||
class BaseTokenStoreTest(TestCase):
|
||||
@raises(NotImplementedError)
|
||||
def test_must_implement_set(self):
|
||||
class FooStore(BaseTokenStore):
|
||||
pass
|
||||
store = FooStore()
|
||||
store.set('some name', 'some token')
|
||||
|
||||
@raises(NotImplementedError)
|
||||
def test_must_implement_get(self):
|
||||
class FooStore(BaseTokenStore):
|
||||
pass
|
||||
store = FooStore()
|
||||
store.get('some name')
|
||||
|
||||
|
||||
class FileTokenStoreTest(TestCase):
|
||||
@raises(TokenStoreError)
|
||||
def test_must_be_instanced_with_an_existant_path(self):
|
||||
FileTokenStore('/some/inexistant/path')
|
||||
|
||||
def test_saves_token_string_to_filesystem(self):
|
||||
directory = tempfile.mkdtemp()
|
||||
store = FileTokenStore(directory)
|
||||
store.set('foo', '?key=some-token')
|
||||
with open(store.get_filepath('foo')) as stored_file:
|
||||
self.assertTrue('some-token' in stored_file.read())
|
||||
|
||||
def test_retrieves_token_from_filesystem(self):
|
||||
directory = tempfile.mkdtemp()
|
||||
store = FileTokenStore(directory)
|
||||
store.set('foo', '?key=%s&oauth_token=some-oauth-token&'\
|
||||
'oauth_token_secret=some-token-secret' % 'some-token')
|
||||
token = store.get('foo')
|
||||
self.assertTrue('some-token' in token.to_string())
|
||||
|
||||
def test_cannot_retrieve_token_if_path_doesnt_exist(self):
|
||||
directory = tempfile.mkdtemp()
|
||||
store = FileTokenStore(directory)
|
||||
store.set('foo', '?key=%s&oauth_token=some-oauth-token&'\
|
||||
'oauth_token_secret=some-token-secret' % 'some-token')
|
||||
os.remove(store.get_filepath('foo'))
|
||||
self.assertTrue(store.get('foo') is None)
|
||||
|
||||
def test_saves_token_to_filesystem(self):
|
||||
directory = tempfile.mkdtemp()
|
||||
store = FileTokenStore(directory)
|
||||
token = YahooToken('some-token', 'some-secret')
|
||||
store.set('foo', token)
|
||||
with open(store.get_filepath('foo')) as stored_file:
|
||||
self.assertTrue('some-token' in stored_file.read())
|
40
lib/yql/tests/test_utilities.py
Normal file
40
lib/yql/tests/test_utilities.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from unittest import TestCase
|
||||
|
||||
from yql.utils import get_http_method
|
||||
|
||||
|
||||
class UtilitiesTest(TestCase):
|
||||
def test_finds_get_method_for_select_query(self):
|
||||
self.assertEqual(get_http_method("SELECT foo"), "GET")
|
||||
|
||||
def test_finds_get_method_for_select_query_with_leading_space(self):
|
||||
self.assertEqual(get_http_method(" SELECT foo"), "GET")
|
||||
|
||||
def test_finds_get_method_for_lowercase_select_query(self):
|
||||
self.assertEqual(get_http_method("select foo"), "GET")
|
||||
|
||||
def test_finds_post_method_for_insert_query(self):
|
||||
self.assertEqual(get_http_method("INSERT into"), "POST")
|
||||
|
||||
def test_finds_post_method_for_multiline_insert_query(self):
|
||||
query = """
|
||||
INSERT INTO yql.queries.query (name, query)
|
||||
VALUES ("weather", "SELECT * FROM weather.forecast
|
||||
WHERE location=90210")
|
||||
"""
|
||||
self.assertEqual(get_http_method(query), "POST")
|
||||
|
||||
def test_finds_put_method_for_update_query(self):
|
||||
self.assertEqual(get_http_method("update foo"), "PUT")
|
||||
|
||||
def test_finds_post_method_for_delete_query(self):
|
||||
self.assertEqual(get_http_method("DELETE from"), "POST")
|
||||
|
||||
def test_finds_post_method_for_lowercase_delete_query(self):
|
||||
self.assertEqual(get_http_method("delete from"), "POST")
|
||||
|
||||
def test_finds_get_method_for_show_query(self):
|
||||
self.assertEqual(get_http_method("SHOW tables"), "GET")
|
||||
|
||||
def test_finds_get_method_for_describe_query(self):
|
||||
self.assertEqual(get_http_method("DESC tablename"), "GET")
|
81
lib/yql/tests/test_yahoo_token.py
Normal file
81
lib/yql/tests/test_yahoo_token.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
from unittest import TestCase
|
||||
|
||||
from nose.tools import raises
|
||||
try:
|
||||
from urlparse import parse_qs, parse_qsl
|
||||
except ImportError:
|
||||
from cgi import parse_qs, parse_qsl
|
||||
|
||||
import yql
|
||||
|
||||
|
||||
class YahooTokenTest(TestCase):
|
||||
def test_create_yahoo_token(self):
|
||||
token = yql.YahooToken('test-key', 'test-secret')
|
||||
self.assertEqual(token.key, 'test-key')
|
||||
self.assertEqual(token.secret, 'test-secret')
|
||||
|
||||
def test_y_token_to_string(self):
|
||||
token = yql.YahooToken('test-key', 'test-secret')
|
||||
token_to_string = token.to_string()
|
||||
string_data = dict(parse_qsl(token_to_string))
|
||||
self.assertEqual(string_data.get('oauth_token'), 'test-key')
|
||||
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
|
||||
|
||||
def test_y_token_to_string2(self):
|
||||
token = yql.YahooToken('test-key', 'test-secret')
|
||||
|
||||
token.timestamp = '1111'
|
||||
token.session_handle = 'poop'
|
||||
token.callback_confirmed = 'basilfawlty'
|
||||
|
||||
token_to_string = token.to_string()
|
||||
string_data = dict(parse_qsl(token_to_string))
|
||||
self.assertEqual(string_data.get('oauth_token'), 'test-key')
|
||||
self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
|
||||
self.assertEqual(string_data.get('token_creation_timestamp'), '1111')
|
||||
self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty')
|
||||
self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
|
||||
|
||||
def test_y_token_from_string(self):
|
||||
token_string = "oauth_token=foo&oauth_token_secret=bar&"\
|
||||
"oauth_session_handle=baz&token_creation_timestamp=1111"
|
||||
token_from_string = yql.YahooToken.from_string(token_string)
|
||||
self.assertEqual(token_from_string.key, 'foo')
|
||||
self.assertEqual(token_from_string.secret, 'bar')
|
||||
self.assertEqual(token_from_string.session_handle, 'baz')
|
||||
self.assertEqual(token_from_string.timestamp, '1111')
|
||||
|
||||
@raises(ValueError)
|
||||
def test_y_token_raises_value_error(self):
|
||||
yql.YahooToken.from_string('')
|
||||
|
||||
@raises(ValueError)
|
||||
def test_y_token_raises_value_error2(self):
|
||||
yql.YahooToken.from_string('foo')
|
||||
|
||||
@raises(ValueError)
|
||||
def test_y_token_raises_value_error3(self):
|
||||
yql.YahooToken.from_string('oauth_token=bar')
|
||||
|
||||
@raises(ValueError)
|
||||
def test_y_token_raises_value_error4(self):
|
||||
yql.YahooToken.from_string('oauth_token_secret=bar')
|
||||
|
||||
@raises(AttributeError)
|
||||
def test_y_token_without_timestamp_raises(self):
|
||||
token = yql.YahooToken('test', 'test2')
|
||||
y = yql.ThreeLegged('test', 'test2')
|
||||
y.check_token(token)
|
||||
|
||||
def test_y_token_without_timestamp_raises2(self):
|
||||
|
||||
def refresh_token_replacement(token):
|
||||
return 'replaced'
|
||||
|
||||
y = yql.ThreeLegged('test', 'test2')
|
||||
y.refresh_token = refresh_token_replacement
|
||||
|
||||
token = yql.YahooToken('test', 'test2')
|
||||
token.timestamp = 11111
|
||||
self.assertEqual(y.check_token(token), 'replaced')
|
105
lib/yql/tests/test_yql_object.py
Normal file
105
lib/yql/tests/test_yql_object.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
"""Tests for the YQL object"""
|
||||
|
||||
import json
|
||||
from unittest import TestCase
|
||||
|
||||
from nose.tools import raises
|
||||
|
||||
from yql import YQLObj, NotOneError
|
||||
|
||||
|
||||
data_dict = json.loads("""{"query":{"count":"3","created":"2009-11-20T12:11:56Z","lang":"en-US","updated":"2009-11-20T12:11:56Z","uri":"http://query.yahooapis.com/v1/yql?q=select+*+from+flickr.photos.search+where+text%3D%22panda%22+limit+3","diagnostics":{"publiclyCallable":"true","url":{"execution-time":"742","content":"http://api.flickr.com/services/rest/?method=flickr.photos.search&text=panda&page=1&per_page=10"},"user-time":"745","service-time":"742","build-version":"3805"},"results":{"photo":[{"farm":"3","id":"4117944207","isfamily":"0","isfriend":"0","ispublic":"1","owner":"12346075@N00","secret":"ce1f6092de","server":"2510","title":"Pandas"},{"farm":"3","id":"4118710292","isfamily":"0","isfriend":"0","ispublic":"1","owner":"12346075@N00","secret":"649632a3e2","server":"2754","title":"Pandas"},{"farm":"3","id":"4118698318","isfamily":"0","isfriend":"0","ispublic":"1","owner":"28451051@N02","secret":"ec0b508684","server":"2586","title":"fuzzy flowers (Kalanchoe tomentosa)"}]}}}""")
|
||||
data_dict2 = json.loads("""{"query":{"count":"1","created":"2009-11-20T12:11:56Z","lang":"en-US","updated":"2009-11-20T12:11:56Z","uri":"http://query.yahooapis.com/v1/yql?q=select+*+from+flickr.photos.search+where+text%3D%22panda%22+limit+3","diagnostics":{"publiclyCallable":"true","url":{"execution-time":"742","content":"http://api.flickr.com/services/rest/?method=flickr.photos.search&text=panda&page=1&per_page=10"},"user-time":"745","service-time":"742","build-version":"3805"},"results":{"photo":{"farm":"3","id":"4117944207","isfamily":"0","isfriend":"0","ispublic":"1","owner":"12346075@N00","secret":"ce1f6092de","server":"2510","title":"Pandas"}}}}""")
|
||||
|
||||
|
||||
yqlobj = YQLObj(data_dict)
|
||||
yqlobj2 = YQLObj({})
|
||||
yqlobj3 = YQLObj(data_dict2)
|
||||
|
||||
|
||||
class YQLObjTest(TestCase):
|
||||
@raises(AttributeError)
|
||||
def test_yql_object_one(self):
|
||||
"""Test that invalid query raises AttributeError"""
|
||||
yqlobj.query = 1
|
||||
|
||||
def test_yqlobj_uri(self):
|
||||
"""Test that the query uri is as expected."""
|
||||
self.assertEqual(yqlobj.uri, u"http://query.yahooapis.com/v1/yql?q=select+*+"\
|
||||
"from+flickr.photos.search+where+text%3D%22panda%22+limit+3")
|
||||
|
||||
def test_yqlobj_query(self):
|
||||
"""Test retrieval of the actual query"""
|
||||
self.assertEqual(yqlobj.query, u'select * from flickr.photos.search '\
|
||||
'where text="panda" limit 3')
|
||||
|
||||
def test_yqlobj_count(self):
|
||||
"""Check we have 3 records"""
|
||||
self.assertEqual(yqlobj.count, 3)
|
||||
|
||||
def test_yqlobj_lang(self):
|
||||
"""Check the lang attr."""
|
||||
self.assertEqual(yqlobj.lang, u"en-US")
|
||||
|
||||
def test_yqlobj_results(self):
|
||||
"""Check the results."""
|
||||
expected_results = {u'photo': [
|
||||
{u'isfamily': u'0',
|
||||
u'title': u'Pandas',
|
||||
u'farm': u'3',
|
||||
u'ispublic': u'1',
|
||||
u'server': u'2510',
|
||||
u'isfriend': u'0',
|
||||
u'secret': u'ce1f6092de',
|
||||
u'owner': u'12346075@N00',
|
||||
u'id': u'4117944207'},
|
||||
{u'isfamily': u'0',
|
||||
u'title': u'Pandas',
|
||||
u'farm': u'3',
|
||||
u'ispublic': u'1',
|
||||
u'server': u'2754',
|
||||
u'isfriend': u'0',
|
||||
u'secret': u'649632a3e2',
|
||||
u'owner': u'12346075@N00',
|
||||
u'id': u'4118710292'},
|
||||
{u'isfamily': u'0',
|
||||
u'title': u'fuzzy flowers (Kalanchoe tomentosa)',
|
||||
u'farm': u'3',
|
||||
u'ispublic': u'1',
|
||||
u'server': u'2586',
|
||||
u'isfriend': u'0',
|
||||
u'secret': u'ec0b508684',
|
||||
u'owner': u'28451051@N02',
|
||||
u'id': u'4118698318'}
|
||||
]}
|
||||
self.assertEqual(yqlobj.results, expected_results)
|
||||
|
||||
def test_yqlobj_raw(self):
|
||||
"""Check the raw attr."""
|
||||
self.assertEqual(yqlobj.raw, data_dict.get('query'))
|
||||
|
||||
def test_yqlobj_diagnostics(self):
|
||||
"""Check the diagnostics"""
|
||||
self.assertEqual(yqlobj.diagnostics, data_dict.get('query').get('diagnostics'))
|
||||
|
||||
def test_query_is_none(self):
|
||||
"""Check query is None with no data."""
|
||||
self.assertTrue(yqlobj2.query is None)
|
||||
|
||||
def test_rows(self):
|
||||
"""Test we can iterate over the rows."""
|
||||
stuff = []
|
||||
for row in yqlobj.rows:
|
||||
stuff.append(row.get('server'))
|
||||
|
||||
self.assertEqual(stuff, [u'2510', u'2754', u'2586'])
|
||||
|
||||
@raises(NotOneError)
|
||||
def test_one(self):
|
||||
"""Test that accessing one result raises exception"""
|
||||
yqlobj.one()
|
||||
|
||||
def test_one_with_one_result(self):
|
||||
"""Test accessing data with one result."""
|
||||
res = yqlobj3.one()
|
||||
self.assertEqual(res.get("title"), "Pandas")
|
38
lib/yql/utils.py
Normal file
38
lib/yql/utils.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
""""Utility functions"""
|
||||
import re
|
||||
|
||||
|
||||
METHOD_MAP = (
|
||||
("insert", "POST"),
|
||||
("update", "PUT"),
|
||||
("delete", "POST"),
|
||||
)
|
||||
MULTI_PLUS = re.compile(r"\+{2,}")
|
||||
MULTI_SPACE = re.compile(r" {2,}")
|
||||
|
||||
|
||||
def get_http_method(query):
|
||||
"""Work out if this should be GET, POST, PUT or DELETE"""
|
||||
lower_query = query.strip().lower()
|
||||
|
||||
http_method = "GET"
|
||||
for method in METHOD_MAP:
|
||||
if method[0] in lower_query:
|
||||
http_method = method[1]
|
||||
break
|
||||
|
||||
return http_method
|
||||
|
||||
|
||||
def clean_url(url):
|
||||
"""Cleans up a uri/url"""
|
||||
url = url.replace("\n", "")
|
||||
url = MULTI_PLUS.sub("+", url)
|
||||
return url
|
||||
|
||||
|
||||
def clean_query(query):
|
||||
"""Cleans up a query"""
|
||||
query = query.replace("\n", "")
|
||||
query = MULTI_SPACE.sub(" ", query)
|
||||
return query
|
Reference in a new issue