michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: """ michael@0: Caching HTTP Proxy for use with the Talos pageload tests michael@0: Author: Rob Arnold michael@0: michael@0: This file implements a multithreaded caching http 1.1 proxy. HEAD and GET michael@0: methods are supported; POST is not yet. michael@0: michael@0: Each incoming request is put onto a new thread; python does not have a thread michael@0: pool library, so a new thread is spawned for each request. I have tried to use michael@0: the python 2.4 standard library wherever possible. michael@0: michael@0: Caching: michael@0: The cache is implemented in the Cache class. Items can only be added to the michael@0: cache. The only way to remove items from the cache is to blow it all away, michael@0: either by deleting the file (default: proxy_cache.db) or passing the -c or michael@0: --clear-cache flags on the command line. It is technically possible to remove michael@0: items individually from the cache, but there has been no need to do so so far. michael@0: michael@0: The cache is implemented with the shelve module. The key is the combination of michael@0: host, port and request (path + params + fragment) and the values stored are the michael@0: http status code, headers and content that were received from the remote server. michael@0: michael@0: Access to the cache is guarded by a semaphore which allows concurrent read michael@0: access. The semaphore is guarded by a simple mutex which prevents a deadlock michael@0: from occuring when two threads try to add an item to the cache at the same time. michael@0: michael@0: Memory usage is kept to a minimum by the shelve module; only items in the cache michael@0: that are currently being served stay in memory. michael@0: michael@0: Proxy: michael@0: The BaseHTTPServer.BaseHTTPRequestHandler takes care of parsing incoming michael@0: requests and managing the socket connection. See the documentation of the michael@0: BaseHTTPServer module for more information. When do_HEAD or do_GET is called, michael@0: the url that we are supposed to fetch is in self.path. michael@0: michael@0: TODO: michael@0: * Implement POST requests. This requires implementing the do_POST method and michael@0: passing the post data along. michael@0: * Implement different cache policies michael@0: * Added an interface to allow administrators to probe the cache and remove michael@0: items from the database and such. michael@0: """ michael@0: michael@0: __version__ = "0.1" michael@0: michael@0: import os michael@0: import sys michael@0: import time michael@0: import threading michael@0: import shelve michael@0: from optparse import OptionParser, OptionValueError michael@0: michael@0: import SocketServer michael@0: import BaseHTTPServer michael@0: import socket michael@0: import httplib michael@0: from urlparse import urlsplit, urlunsplit michael@0: michael@0: class HTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): michael@0: server_version = "TalosProxy/" + __version__ michael@0: protocol_version = "HTTP/1.1" michael@0: michael@0: def do_GET(self): michael@0: content = self.send_head() michael@0: if content: michael@0: try: michael@0: self.wfile.write(content) michael@0: except socket.error, e: michael@0: if options.verbose: michael@0: print "Got socket error %s" % e michael@0: #self.close_connection = 1 michael@0: def do_HEAD(self): michael@0: self.send_head() michael@0: michael@0: def getHeaders(self): michael@0: h = {} michael@0: for name in self.headers.keys(): michael@0: h[name] = self.headers[name] michael@0: michael@0: return h michael@0: michael@0: def send_head(self, method="GET"): michael@0: o = urlsplit(self.path) michael@0: michael@0: #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) michael@0: michael@0: headers = self.getHeaders() michael@0: for k in "Proxy-Connection", "Connection": michael@0: if k in headers: michael@0: headers[k] = "Close" michael@0: if "Keep-Alive" in headers: michael@0: del headers["Keep-Alive"] michael@0: michael@0: reqstring = urlunsplit(('','',o.path, o.query, o.fragment)) michael@0: michael@0: if options.no_cache: michael@0: cache_result = None michael@0: else: michael@0: cache_result = cache.get(o.hostname, o.port, reqstring) michael@0: michael@0: if not cache_result: michael@0: if options.localonly: michael@0: self.send_error(404, "Object not in cache") michael@0: return None michael@0: else: michael@0: if options.verbose: michael@0: print "Object %s was not in the cache" % self.path michael@0: conn = httplib.HTTPConnection(o.netloc) michael@0: conn.request("GET", reqstring, headers=headers) michael@0: res = conn.getresponse() michael@0: michael@0: content = res.read() michael@0: conn.close() michael@0: michael@0: status, headers = res.status, res.getheaders() michael@0: michael@0: if not options.no_cache: michael@0: cache.add(o.hostname, o.port, reqstring, status, headers, content) michael@0: else: michael@0: status, headers, content = cache_result michael@0: michael@0: try: michael@0: self.send_response(status) michael@0: for name, value in headers: michael@0: # kill the transfer-encoding header because we don't support it when michael@0: # we send data to the client michael@0: if name not in ('transfer-encoding',): michael@0: self.send_header(name, value) michael@0: if "Content-Length" not in headers: michael@0: self.send_header("Content-Length", str(len(content))) michael@0: self.end_headers() michael@0: except socket.error, e: michael@0: if options.verbose: michael@0: print "Got socket error %s" % e michael@0: return None michael@0: return content michael@0: def log_message(self, format, *args): michael@0: if options.verbose: michael@0: BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args) michael@0: michael@0: class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): michael@0: def __init__(self, address, handler): michael@0: BaseHTTPServer.HTTPServer.__init__(self, address, handler) michael@0: michael@0: class Cache(object): michael@0: """Multithreaded cache uses the shelve module to store pages""" michael@0: # 20 concurrent threads ought to be enough for one browser michael@0: max_concurrency = 20 michael@0: def __init__(self, name='', max_concurrency=20): michael@0: name = name or options.cache or "proxy_cache.db" michael@0: self.name = name michael@0: self.max_concurrency = max_concurrency michael@0: self.entries = {} michael@0: self.sem = threading.Semaphore(self.max_concurrency) michael@0: self.semlock = threading.Lock() michael@0: if options.clear_cache: michael@0: flag = 'n' michael@0: else: michael@0: flag = 'c' michael@0: self.db = shelve.DbfilenameShelf(name, flag) michael@0: michael@0: def __del__(self): michael@0: if hasattr(self, 'db'): michael@0: self.db.close() michael@0: michael@0: def get_key(self, host, port, resource): michael@0: return '%s:%s/%s' % (host, port, resource) michael@0: michael@0: def get(self, host, port, resource): michael@0: key = self.get_key(host, port, resource) michael@0: self.semlock.acquire() michael@0: self.sem.acquire() michael@0: self.semlock.release() michael@0: try: michael@0: if not self.db.has_key(key): michael@0: return None michael@0: # returns status, headers, content michael@0: return self.db[key] michael@0: finally: michael@0: self.sem.release() michael@0: def add(self, host, port, resource, status, headers, content): michael@0: key = self.get_key(host, port, resource) michael@0: self.semlock.acquire() michael@0: for i in range(self.max_concurrency): michael@0: self.sem.acquire() michael@0: self.semlock.release() michael@0: try: michael@0: self.db[key] = (status, headers, content) michael@0: self.db.sync() michael@0: finally: michael@0: for i in range(self.max_concurrency): michael@0: self.sem.release() michael@0: michael@0: class Options(object): michael@0: port = 8000 michael@0: localonly = False michael@0: clear_cache = False michael@0: no_cache = False michael@0: cache = 'proxy_cache.db' michael@0: verbose = False michael@0: michael@0: def _parseOptions(): michael@0: def port_callback(option, opt, value, parser): michael@0: if value > 0 and value < (2 ** 16 - 1): michael@0: setattr(parser.values, option.dest, value) michael@0: else: michael@0: raise OptionValueError("Port number is out of range") michael@0: michael@0: global options michael@0: parser = OptionParser(version="Talos Proxy " + __version__) michael@0: parser.add_option("-p", "--port", dest="port", michael@0: help="The port to run the proxy server on", metavar="PORT", type="int", michael@0: action="callback", callback=port_callback) michael@0: parser.add_option("-v", "--verbose", action="store_true", dest="verbose", michael@0: help="Include additional debugging information") michael@0: parser.add_option("-l", "--localonly", action="store_true", dest="localonly", michael@0: help="Only serve pages from the local database") michael@0: parser.add_option("-c", "--clear", action="store_true", dest="clear_cache", michael@0: help="Clear the cache on startup") michael@0: parser.add_option("-n", "--no-cache", action="store_true", dest="no_cache", michael@0: help="Do not use a cache") michael@0: parser.add_option("-u", "--use-cache", dest="cache", michael@0: help="The filename of the cache to use", metavar="NAME.db") michael@0: parser.set_defaults(verbose=Options.verbose, michael@0: port=Options.port, michael@0: localonly=Options.localonly, michael@0: clear_cache=Options.clear_cache, michael@0: no_cache=Options.no_cache, michael@0: cache=Options.cache) michael@0: options, args = parser.parse_args() michael@0: michael@0: """Configures the proxy server. This should be called before run_proxy. It can be michael@0: called afterwards, but note that it is not threadsafe and some options (namely michael@0: port) will not take effect""" michael@0: def configure_proxy(**kwargs): michael@0: global options michael@0: options = Options() michael@0: for key in kwargs: michael@0: setattr(options, key, kwargs[key]) michael@0: michael@0: def _run(): michael@0: global cache michael@0: cache = Cache() michael@0: server_address = ('', options.port) michael@0: httpd = HTTPServer(server_address, HTTPRequestHandler) michael@0: httpd.serve_forever() michael@0: michael@0: """Starts the proxy; it runs on a separate daemon thread""" michael@0: def run_proxy(): michael@0: thr = threading.Thread(target=_run) michael@0: # now when we die, the daemon thread will die too michael@0: thr.setDaemon(1) michael@0: thr.start() michael@0: michael@0: if __name__ == '__main__': michael@0: _parseOptions() michael@0: try: michael@0: run_proxy() michael@0: # thr.join() doesn't terminate on keyboard interrupt michael@0: while 1: time.sleep(1) michael@0: except KeyboardInterrupt: michael@0: if options.verbose: michael@0: print "Quittin' time..." michael@0: michael@0: __all__ = ['run_proxy', 'configure_proxy']