| """ |
| The MIT License |
| |
| Copyright (c) 2007 Leah Culver |
| |
| Permission is hereby granted, free of charge, to any person obtaining a copy |
| of this software and associated documentation files (the "Software"), to deal |
| in the Software without restriction, including without limitation the rights |
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| copies of the Software, and to permit persons to whom the Software is |
| furnished to do so, subject to the following conditions: |
| |
| The above copyright notice and this permission notice shall be included in |
| all copies or substantial portions of the Software. |
| |
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| THE SOFTWARE. |
| """ |
| |
| from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
| import urllib |
| |
| import oauth.oauth as oauth |
| |
| # fake urls for the test server |
| REQUEST_TOKEN_URL = 'https://photos.example.net/request_token' |
| ACCESS_TOKEN_URL = 'https://photos.example.net/access_token' |
| AUTHORIZATION_URL = 'https://photos.example.net/authorize' |
| CALLBACK_URL = 'http://printer.example.com/request_token_ready' |
| RESOURCE_URL = 'http://photos.example.net/photos' |
| REALM = 'http://photos.example.net/' |
| VERIFIER = 'verifier' |
| |
| # example store for one of each thing |
| class MockOAuthDataStore(oauth.OAuthDataStore): |
| |
| def __init__(self): |
| self.consumer = oauth.OAuthConsumer('key', 'secret') |
| self.request_token = oauth.OAuthToken('requestkey', 'requestsecret') |
| self.access_token = oauth.OAuthToken('accesskey', 'accesssecret') |
| self.nonce = 'nonce' |
| self.verifier = VERIFIER |
| |
| def lookup_consumer(self, key): |
| if key == self.consumer.key: |
| return self.consumer |
| return None |
| |
| def lookup_token(self, token_type, token): |
| token_attrib = getattr(self, '%s_token' % token_type) |
| if token == token_attrib.key: |
| ## HACK |
| token_attrib.set_callback(CALLBACK_URL) |
| return token_attrib |
| return None |
| |
| def lookup_nonce(self, oauth_consumer, oauth_token, nonce): |
| if oauth_token and oauth_consumer.key == self.consumer.key and (oauth_token.key == self.request_token.key or oauth_token.key == self.access_token.key) and nonce == self.nonce: |
| return self.nonce |
| return None |
| |
| def fetch_request_token(self, oauth_consumer, oauth_callback): |
| if oauth_consumer.key == self.consumer.key: |
| if oauth_callback: |
| # want to check here if callback is sensible |
| # for mock store, we assume it is |
| self.request_token.set_callback(oauth_callback) |
| return self.request_token |
| return None |
| |
| def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier): |
| if oauth_consumer.key == self.consumer.key and oauth_token.key == self.request_token.key and oauth_verifier == self.verifier: |
| # want to check here if token is authorized |
| # for mock store, we assume it is |
| return self.access_token |
| return None |
| |
| def authorize_request_token(self, oauth_token, user): |
| if oauth_token.key == self.request_token.key: |
| # authorize the request token in the store |
| # for mock store, do nothing |
| return self.request_token |
| return None |
| |
| class RequestHandler(BaseHTTPRequestHandler): |
| |
| def __init__(self, *args, **kwargs): |
| self.oauth_server = oauth.OAuthServer(MockOAuthDataStore()) |
| self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_PLAINTEXT()) |
| self.oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1()) |
| BaseHTTPRequestHandler.__init__(self, *args, **kwargs) |
| |
| # example way to send an oauth error |
| def send_oauth_error(self, err=None): |
| # send a 401 error |
| self.send_error(401, str(err.message)) |
| # return the authenticate header |
| header = oauth.build_authenticate_header(realm=REALM) |
| for k, v in header.iteritems(): |
| self.send_header(k, v) |
| |
| def do_GET(self): |
| |
| # debug info |
| #print self.command, self.path, self.headers |
| |
| # get the post data (if any) |
| postdata = None |
| if self.command == 'POST': |
| try: |
| length = int(self.headers.getheader('content-length')) |
| postdata = self.rfile.read(length) |
| except: |
| pass |
| |
| # construct the oauth request from the request parameters |
| oauth_request = oauth.OAuthRequest.from_request(self.command, self.path, headers=self.headers, query_string=postdata) |
| |
| # request token |
| if self.path.startswith(REQUEST_TOKEN_URL): |
| try: |
| # create a request token |
| token = self.oauth_server.fetch_request_token(oauth_request) |
| # send okay response |
| self.send_response(200, 'OK') |
| self.end_headers() |
| # return the token |
| self.wfile.write(token.to_string()) |
| except oauth.OAuthError, err: |
| self.send_oauth_error(err) |
| return |
| |
| # user authorization |
| if self.path.startswith(AUTHORIZATION_URL): |
| try: |
| # get the request token |
| token = self.oauth_server.fetch_request_token(oauth_request) |
| # authorize the token (kind of does nothing for now) |
| token = self.oauth_server.authorize_token(token, None) |
| token.set_verifier(VERIFIER) |
| # send okay response |
| self.send_response(200, 'OK') |
| self.end_headers() |
| # return the callback url (to show server has it) |
| self.wfile.write(token.get_callback_url()) |
| except oauth.OAuthError, err: |
| self.send_oauth_error(err) |
| return |
| |
| # access token |
| if self.path.startswith(ACCESS_TOKEN_URL): |
| try: |
| # create an access token |
| token = self.oauth_server.fetch_access_token(oauth_request) |
| # send okay response |
| self.send_response(200, 'OK') |
| self.end_headers() |
| # return the token |
| self.wfile.write(token.to_string()) |
| except oauth.OAuthError, err: |
| self.send_oauth_error(err) |
| return |
| |
| # protected resources |
| if self.path.startswith(RESOURCE_URL): |
| try: |
| # verify the request has been oauth authorized |
| consumer, token, params = self.oauth_server.verify_request(oauth_request) |
| # send okay response |
| self.send_response(200, 'OK') |
| self.end_headers() |
| # return the extra parameters - just for something to return |
| self.wfile.write(str(params)) |
| except oauth.OAuthError, err: |
| self.send_oauth_error(err) |
| return |
| |
| def do_POST(self): |
| return self.do_GET() |
| |
| def main(): |
| try: |
| server = HTTPServer(('', 8080), RequestHandler) |
| print 'Test server running...' |
| server.serve_forever() |
| except KeyboardInterrupt: |
| server.socket.close() |
| |
| if __name__ == '__main__': |
| main() |