Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
michael@0 | 1 | # This Source Code Form is subject to the terms of the Mozilla Public |
michael@0 | 2 | # License, v. 2.0. If a copy of the MPL was not distributed with this |
michael@0 | 3 | # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
michael@0 | 4 | |
michael@0 | 5 | """ |
michael@0 | 6 | Caching HTTP Proxy for use with the Talos pageload tests |
michael@0 | 7 | Author: Rob Arnold |
michael@0 | 8 | |
michael@0 | 9 | This file implements a multithreaded caching http 1.1 proxy. HEAD and GET |
michael@0 | 10 | methods are supported; POST is not yet. |
michael@0 | 11 | |
michael@0 | 12 | Each incoming request is put onto a new thread; python does not have a thread |
michael@0 | 13 | pool library, so a new thread is spawned for each request. I have tried to use |
michael@0 | 14 | the python 2.4 standard library wherever possible. |
michael@0 | 15 | |
michael@0 | 16 | Caching: |
michael@0 | 17 | The cache is implemented in the Cache class. Items can only be added to the |
michael@0 | 18 | cache. The only way to remove items from the cache is to blow it all away, |
michael@0 | 19 | either by deleting the file (default: proxy_cache.db) or passing the -c or |
michael@0 | 20 | --clear-cache flags on the command line. It is technically possible to remove |
michael@0 | 21 | items individually from the cache, but there has been no need to do so so far. |
michael@0 | 22 | |
michael@0 | 23 | The cache is implemented with the shelve module. The key is the combination of |
michael@0 | 24 | host, port and request (path + params + fragment) and the values stored are the |
michael@0 | 25 | http status code, headers and content that were received from the remote server. |
michael@0 | 26 | |
michael@0 | 27 | Access to the cache is guarded by a semaphore which allows concurrent read |
michael@0 | 28 | access. The semaphore is guarded by a simple mutex which prevents a deadlock |
michael@0 | 29 | from occuring when two threads try to add an item to the cache at the same time. |
michael@0 | 30 | |
michael@0 | 31 | Memory usage is kept to a minimum by the shelve module; only items in the cache |
michael@0 | 32 | that are currently being served stay in memory. |
michael@0 | 33 | |
michael@0 | 34 | Proxy: |
michael@0 | 35 | The BaseHTTPServer.BaseHTTPRequestHandler takes care of parsing incoming |
michael@0 | 36 | requests and managing the socket connection. See the documentation of the |
michael@0 | 37 | BaseHTTPServer module for more information. When do_HEAD or do_GET is called, |
michael@0 | 38 | the url that we are supposed to fetch is in self.path. |
michael@0 | 39 | |
michael@0 | 40 | TODO: |
michael@0 | 41 | * Implement POST requests. This requires implementing the do_POST method and |
michael@0 | 42 | passing the post data along. |
michael@0 | 43 | * Implement different cache policies |
michael@0 | 44 | * Added an interface to allow administrators to probe the cache and remove |
michael@0 | 45 | items from the database and such. |
michael@0 | 46 | """ |
michael@0 | 47 | |
michael@0 | 48 | __version__ = "0.1" |
michael@0 | 49 | |
michael@0 | 50 | import os |
michael@0 | 51 | import sys |
michael@0 | 52 | import time |
michael@0 | 53 | import threading |
michael@0 | 54 | import shelve |
michael@0 | 55 | from optparse import OptionParser, OptionValueError |
michael@0 | 56 | |
michael@0 | 57 | import SocketServer |
michael@0 | 58 | import BaseHTTPServer |
michael@0 | 59 | import socket |
michael@0 | 60 | import httplib |
michael@0 | 61 | from urlparse import urlsplit, urlunsplit |
michael@0 | 62 | |
michael@0 | 63 | class HTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
michael@0 | 64 | server_version = "TalosProxy/" + __version__ |
michael@0 | 65 | protocol_version = "HTTP/1.1" |
michael@0 | 66 | |
michael@0 | 67 | def do_GET(self): |
michael@0 | 68 | content = self.send_head() |
michael@0 | 69 | if content: |
michael@0 | 70 | try: |
michael@0 | 71 | self.wfile.write(content) |
michael@0 | 72 | except socket.error, e: |
michael@0 | 73 | if options.verbose: |
michael@0 | 74 | print "Got socket error %s" % e |
michael@0 | 75 | #self.close_connection = 1 |
michael@0 | 76 | def do_HEAD(self): |
michael@0 | 77 | self.send_head() |
michael@0 | 78 | |
michael@0 | 79 | def getHeaders(self): |
michael@0 | 80 | h = {} |
michael@0 | 81 | for name in self.headers.keys(): |
michael@0 | 82 | h[name] = self.headers[name] |
michael@0 | 83 | |
michael@0 | 84 | return h |
michael@0 | 85 | |
michael@0 | 86 | def send_head(self, method="GET"): |
michael@0 | 87 | o = urlsplit(self.path) |
michael@0 | 88 | |
michael@0 | 89 | #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
michael@0 | 90 | |
michael@0 | 91 | headers = self.getHeaders() |
michael@0 | 92 | for k in "Proxy-Connection", "Connection": |
michael@0 | 93 | if k in headers: |
michael@0 | 94 | headers[k] = "Close" |
michael@0 | 95 | if "Keep-Alive" in headers: |
michael@0 | 96 | del headers["Keep-Alive"] |
michael@0 | 97 | |
michael@0 | 98 | reqstring = urlunsplit(('','',o.path, o.query, o.fragment)) |
michael@0 | 99 | |
michael@0 | 100 | if options.no_cache: |
michael@0 | 101 | cache_result = None |
michael@0 | 102 | else: |
michael@0 | 103 | cache_result = cache.get(o.hostname, o.port, reqstring) |
michael@0 | 104 | |
michael@0 | 105 | if not cache_result: |
michael@0 | 106 | if options.localonly: |
michael@0 | 107 | self.send_error(404, "Object not in cache") |
michael@0 | 108 | return None |
michael@0 | 109 | else: |
michael@0 | 110 | if options.verbose: |
michael@0 | 111 | print "Object %s was not in the cache" % self.path |
michael@0 | 112 | conn = httplib.HTTPConnection(o.netloc) |
michael@0 | 113 | conn.request("GET", reqstring, headers=headers) |
michael@0 | 114 | res = conn.getresponse() |
michael@0 | 115 | |
michael@0 | 116 | content = res.read() |
michael@0 | 117 | conn.close() |
michael@0 | 118 | |
michael@0 | 119 | status, headers = res.status, res.getheaders() |
michael@0 | 120 | |
michael@0 | 121 | if not options.no_cache: |
michael@0 | 122 | cache.add(o.hostname, o.port, reqstring, status, headers, content) |
michael@0 | 123 | else: |
michael@0 | 124 | status, headers, content = cache_result |
michael@0 | 125 | |
michael@0 | 126 | try: |
michael@0 | 127 | self.send_response(status) |
michael@0 | 128 | for name, value in headers: |
michael@0 | 129 | # kill the transfer-encoding header because we don't support it when |
michael@0 | 130 | # we send data to the client |
michael@0 | 131 | if name not in ('transfer-encoding',): |
michael@0 | 132 | self.send_header(name, value) |
michael@0 | 133 | if "Content-Length" not in headers: |
michael@0 | 134 | self.send_header("Content-Length", str(len(content))) |
michael@0 | 135 | self.end_headers() |
michael@0 | 136 | except socket.error, e: |
michael@0 | 137 | if options.verbose: |
michael@0 | 138 | print "Got socket error %s" % e |
michael@0 | 139 | return None |
michael@0 | 140 | return content |
michael@0 | 141 | def log_message(self, format, *args): |
michael@0 | 142 | if options.verbose: |
michael@0 | 143 | BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args) |
michael@0 | 144 | |
michael@0 | 145 | class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): |
michael@0 | 146 | def __init__(self, address, handler): |
michael@0 | 147 | BaseHTTPServer.HTTPServer.__init__(self, address, handler) |
michael@0 | 148 | |
michael@0 | 149 | class Cache(object): |
michael@0 | 150 | """Multithreaded cache uses the shelve module to store pages""" |
michael@0 | 151 | # 20 concurrent threads ought to be enough for one browser |
michael@0 | 152 | max_concurrency = 20 |
michael@0 | 153 | def __init__(self, name='', max_concurrency=20): |
michael@0 | 154 | name = name or options.cache or "proxy_cache.db" |
michael@0 | 155 | self.name = name |
michael@0 | 156 | self.max_concurrency = max_concurrency |
michael@0 | 157 | self.entries = {} |
michael@0 | 158 | self.sem = threading.Semaphore(self.max_concurrency) |
michael@0 | 159 | self.semlock = threading.Lock() |
michael@0 | 160 | if options.clear_cache: |
michael@0 | 161 | flag = 'n' |
michael@0 | 162 | else: |
michael@0 | 163 | flag = 'c' |
michael@0 | 164 | self.db = shelve.DbfilenameShelf(name, flag) |
michael@0 | 165 | |
michael@0 | 166 | def __del__(self): |
michael@0 | 167 | if hasattr(self, 'db'): |
michael@0 | 168 | self.db.close() |
michael@0 | 169 | |
michael@0 | 170 | def get_key(self, host, port, resource): |
michael@0 | 171 | return '%s:%s/%s' % (host, port, resource) |
michael@0 | 172 | |
michael@0 | 173 | def get(self, host, port, resource): |
michael@0 | 174 | key = self.get_key(host, port, resource) |
michael@0 | 175 | self.semlock.acquire() |
michael@0 | 176 | self.sem.acquire() |
michael@0 | 177 | self.semlock.release() |
michael@0 | 178 | try: |
michael@0 | 179 | if not self.db.has_key(key): |
michael@0 | 180 | return None |
michael@0 | 181 | # returns status, headers, content |
michael@0 | 182 | return self.db[key] |
michael@0 | 183 | finally: |
michael@0 | 184 | self.sem.release() |
michael@0 | 185 | def add(self, host, port, resource, status, headers, content): |
michael@0 | 186 | key = self.get_key(host, port, resource) |
michael@0 | 187 | self.semlock.acquire() |
michael@0 | 188 | for i in range(self.max_concurrency): |
michael@0 | 189 | self.sem.acquire() |
michael@0 | 190 | self.semlock.release() |
michael@0 | 191 | try: |
michael@0 | 192 | self.db[key] = (status, headers, content) |
michael@0 | 193 | self.db.sync() |
michael@0 | 194 | finally: |
michael@0 | 195 | for i in range(self.max_concurrency): |
michael@0 | 196 | self.sem.release() |
michael@0 | 197 | |
michael@0 | 198 | class Options(object): |
michael@0 | 199 | port = 8000 |
michael@0 | 200 | localonly = False |
michael@0 | 201 | clear_cache = False |
michael@0 | 202 | no_cache = False |
michael@0 | 203 | cache = 'proxy_cache.db' |
michael@0 | 204 | verbose = False |
michael@0 | 205 | |
michael@0 | 206 | def _parseOptions(): |
michael@0 | 207 | def port_callback(option, opt, value, parser): |
michael@0 | 208 | if value > 0 and value < (2 ** 16 - 1): |
michael@0 | 209 | setattr(parser.values, option.dest, value) |
michael@0 | 210 | else: |
michael@0 | 211 | raise OptionValueError("Port number is out of range") |
michael@0 | 212 | |
michael@0 | 213 | global options |
michael@0 | 214 | parser = OptionParser(version="Talos Proxy " + __version__) |
michael@0 | 215 | parser.add_option("-p", "--port", dest="port", |
michael@0 | 216 | help="The port to run the proxy server on", metavar="PORT", type="int", |
michael@0 | 217 | action="callback", callback=port_callback) |
michael@0 | 218 | parser.add_option("-v", "--verbose", action="store_true", dest="verbose", |
michael@0 | 219 | help="Include additional debugging information") |
michael@0 | 220 | parser.add_option("-l", "--localonly", action="store_true", dest="localonly", |
michael@0 | 221 | help="Only serve pages from the local database") |
michael@0 | 222 | parser.add_option("-c", "--clear", action="store_true", dest="clear_cache", |
michael@0 | 223 | help="Clear the cache on startup") |
michael@0 | 224 | parser.add_option("-n", "--no-cache", action="store_true", dest="no_cache", |
michael@0 | 225 | help="Do not use a cache") |
michael@0 | 226 | parser.add_option("-u", "--use-cache", dest="cache", |
michael@0 | 227 | help="The filename of the cache to use", metavar="NAME.db") |
michael@0 | 228 | parser.set_defaults(verbose=Options.verbose, |
michael@0 | 229 | port=Options.port, |
michael@0 | 230 | localonly=Options.localonly, |
michael@0 | 231 | clear_cache=Options.clear_cache, |
michael@0 | 232 | no_cache=Options.no_cache, |
michael@0 | 233 | cache=Options.cache) |
michael@0 | 234 | options, args = parser.parse_args() |
michael@0 | 235 | |
michael@0 | 236 | """Configures the proxy server. This should be called before run_proxy. It can be |
michael@0 | 237 | called afterwards, but note that it is not threadsafe and some options (namely |
michael@0 | 238 | port) will not take effect""" |
michael@0 | 239 | def configure_proxy(**kwargs): |
michael@0 | 240 | global options |
michael@0 | 241 | options = Options() |
michael@0 | 242 | for key in kwargs: |
michael@0 | 243 | setattr(options, key, kwargs[key]) |
michael@0 | 244 | |
michael@0 | 245 | def _run(): |
michael@0 | 246 | global cache |
michael@0 | 247 | cache = Cache() |
michael@0 | 248 | server_address = ('', options.port) |
michael@0 | 249 | httpd = HTTPServer(server_address, HTTPRequestHandler) |
michael@0 | 250 | httpd.serve_forever() |
michael@0 | 251 | |
michael@0 | 252 | """Starts the proxy; it runs on a separate daemon thread""" |
michael@0 | 253 | def run_proxy(): |
michael@0 | 254 | thr = threading.Thread(target=_run) |
michael@0 | 255 | # now when we die, the daemon thread will die too |
michael@0 | 256 | thr.setDaemon(1) |
michael@0 | 257 | thr.start() |
michael@0 | 258 | |
michael@0 | 259 | if __name__ == '__main__': |
michael@0 | 260 | _parseOptions() |
michael@0 | 261 | try: |
michael@0 | 262 | run_proxy() |
michael@0 | 263 | # thr.join() doesn't terminate on keyboard interrupt |
michael@0 | 264 | while 1: time.sleep(1) |
michael@0 | 265 | except KeyboardInterrupt: |
michael@0 | 266 | if options.verbose: |
michael@0 | 267 | print "Quittin' time..." |
michael@0 | 268 | |
michael@0 | 269 | __all__ = ['run_proxy', 'configure_proxy'] |