forked from fanzeyi/Treeholes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoauth.py
115 lines (103 loc) · 4.09 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
import uuid
import hmac
import time
import urllib
import hashlib
import binascii
import urlparse
from google.appengine.api import urlfetch
try:
from urlparse import parse_qs # Python 2.6+
except ImportError:
from cgi import parse_qs
if str is unicode:
def b(s):
return s.encode('latin1')
bytes_type = bytes
else:
def b(s):
return s
bytes_type = str
def _oauth_escape(val):
if isinstance(val, unicode):
val = val.encode("utf-8")
return urllib.quote(val, safe="~")
_UTF8_TYPES = (type(None))
def escape_utf8(value):
if isinstance(value, _UTF8_TYPES):
return value
# assert isinstance(value, unicode)
return value.encode("utf-8")
def _signature_request(consumer_token, method, url, args = {}, token = None):
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(args.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape_utf8(urllib.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape_utf8(urllib.quote(token["secret"], safe='~') if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape_utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
class FanfouOAuth(object):
_API_BASE_URL = "http://api.fanfou.com"
_REQUEST_TOKEN_URL = "http://fanfou.com/oauth/request_token"
_AUTHENTICATE_URL = "http://fanfou.com/oauth/authenticate"
_ACCESS_TOKEN_URL = "http://fanfou.com/oauth/access_token"
def __init__(self, key, secret, callback = None):
self.key = key
self.secret = secret
self.callback = callback
def _get_consumer_token(self):
return dict(key = self.key,
secret = self.secret)
def _get_access_token(self):
if self._ACCESS_TOKEN_KEY and self._ACCESS_TOKEN_SECRET:
return dict( key = self._ACCESS_TOKEN_KEY,
secret = self._ACCESS_TOKEN_SECRET)
return None
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
consumer_token = self._get_consumer_token()
base_args = dict(
oauth_consumer_key = consumer_token["key"],
oauth_token = access_token["key"],
oauth_signature_method = "HMAC-SHA1",
oauth_timestamp = str(int(time.time())),
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version = "1.0a",
)
args = {}
args.update(base_args)
args.update(parameters)
signature = _signature_request(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
def set_access_token(self, key, secret):
self._ACCESS_TOKEN_KEY = key
self._ACCESS_TOKEN_SECRET = secret
def fanfou_request(self, path, access_token = None, \
post_args = None, **args):
url = self._API_BASE_URL + path + ".json"
if not access_token:
access_token = self._get_access_token()
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = "POST" if post_args is not None else "GET"
oauth = self._oauth_request_parameters(url, access_token, all_args, method = method)
args.update(oauth)
if args: url += "?" + urllib.urlencode(args)
if method == "GET":
result = urlfetch.fetch(url, method = urlfetch.GET)
else:
result = urlfetch.fetch(url, method = urlfetch.POST, \
payload = urllib.urlencode(post_args))
return result.content