testing/tools/proxyserver/proxyserver.py

Wed, 31 Dec 2014 06:55:50 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:50 +0100
changeset 2
7e26c7da4463
permissions
-rw-r--r--

Added tag UPSTREAM_283F7C6 for changeset ca08bd8f51b2

michael@0 1 # This Source Code Form is subject to the terms of the Mozilla Public
michael@0 2 # License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
michael@0 4
michael@0 5 """
michael@0 6 Caching HTTP Proxy for use with the Talos pageload tests
michael@0 7 Author: Rob Arnold
michael@0 8
michael@0 9 This file implements a multithreaded caching http 1.1 proxy. HEAD and GET
michael@0 10 methods are supported; POST is not yet.
michael@0 11
michael@0 12 Each incoming request is put onto a new thread; python does not have a thread
michael@0 13 pool library, so a new thread is spawned for each request. I have tried to use
michael@0 14 the python 2.4 standard library wherever possible.
michael@0 15
michael@0 16 Caching:
michael@0 17 The cache is implemented in the Cache class. Items can only be added to the
michael@0 18 cache. The only way to remove items from the cache is to blow it all away,
michael@0 19 either by deleting the file (default: proxy_cache.db) or passing the -c or
michael@0 20 --clear-cache flags on the command line. It is technically possible to remove
michael@0 21 items individually from the cache, but there has been no need to do so so far.
michael@0 22
michael@0 23 The cache is implemented with the shelve module. The key is the combination of
michael@0 24 host, port and request (path + params + fragment) and the values stored are the
michael@0 25 http status code, headers and content that were received from the remote server.
michael@0 26
michael@0 27 Access to the cache is guarded by a semaphore which allows concurrent read
michael@0 28 access. The semaphore is guarded by a simple mutex which prevents a deadlock
michael@0 29 from occuring when two threads try to add an item to the cache at the same time.
michael@0 30
michael@0 31 Memory usage is kept to a minimum by the shelve module; only items in the cache
michael@0 32 that are currently being served stay in memory.
michael@0 33
michael@0 34 Proxy:
michael@0 35 The BaseHTTPServer.BaseHTTPRequestHandler takes care of parsing incoming
michael@0 36 requests and managing the socket connection. See the documentation of the
michael@0 37 BaseHTTPServer module for more information. When do_HEAD or do_GET is called,
michael@0 38 the url that we are supposed to fetch is in self.path.
michael@0 39
michael@0 40 TODO:
michael@0 41 * Implement POST requests. This requires implementing the do_POST method and
michael@0 42 passing the post data along.
michael@0 43 * Implement different cache policies
michael@0 44 * Added an interface to allow administrators to probe the cache and remove
michael@0 45 items from the database and such.
michael@0 46 """
michael@0 47
michael@0 48 __version__ = "0.1"
michael@0 49
michael@0 50 import os
michael@0 51 import sys
michael@0 52 import time
michael@0 53 import threading
michael@0 54 import shelve
michael@0 55 from optparse import OptionParser, OptionValueError
michael@0 56
michael@0 57 import SocketServer
michael@0 58 import BaseHTTPServer
michael@0 59 import socket
michael@0 60 import httplib
michael@0 61 from urlparse import urlsplit, urlunsplit
michael@0 62
michael@0 63 class HTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
michael@0 64 server_version = "TalosProxy/" + __version__
michael@0 65 protocol_version = "HTTP/1.1"
michael@0 66
michael@0 67 def do_GET(self):
michael@0 68 content = self.send_head()
michael@0 69 if content:
michael@0 70 try:
michael@0 71 self.wfile.write(content)
michael@0 72 except socket.error, e:
michael@0 73 if options.verbose:
michael@0 74 print "Got socket error %s" % e
michael@0 75 #self.close_connection = 1
michael@0 76 def do_HEAD(self):
michael@0 77 self.send_head()
michael@0 78
michael@0 79 def getHeaders(self):
michael@0 80 h = {}
michael@0 81 for name in self.headers.keys():
michael@0 82 h[name] = self.headers[name]
michael@0 83
michael@0 84 return h
michael@0 85
michael@0 86 def send_head(self, method="GET"):
michael@0 87 o = urlsplit(self.path)
michael@0 88
michael@0 89 #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
michael@0 90
michael@0 91 headers = self.getHeaders()
michael@0 92 for k in "Proxy-Connection", "Connection":
michael@0 93 if k in headers:
michael@0 94 headers[k] = "Close"
michael@0 95 if "Keep-Alive" in headers:
michael@0 96 del headers["Keep-Alive"]
michael@0 97
michael@0 98 reqstring = urlunsplit(('','',o.path, o.query, o.fragment))
michael@0 99
michael@0 100 if options.no_cache:
michael@0 101 cache_result = None
michael@0 102 else:
michael@0 103 cache_result = cache.get(o.hostname, o.port, reqstring)
michael@0 104
michael@0 105 if not cache_result:
michael@0 106 if options.localonly:
michael@0 107 self.send_error(404, "Object not in cache")
michael@0 108 return None
michael@0 109 else:
michael@0 110 if options.verbose:
michael@0 111 print "Object %s was not in the cache" % self.path
michael@0 112 conn = httplib.HTTPConnection(o.netloc)
michael@0 113 conn.request("GET", reqstring, headers=headers)
michael@0 114 res = conn.getresponse()
michael@0 115
michael@0 116 content = res.read()
michael@0 117 conn.close()
michael@0 118
michael@0 119 status, headers = res.status, res.getheaders()
michael@0 120
michael@0 121 if not options.no_cache:
michael@0 122 cache.add(o.hostname, o.port, reqstring, status, headers, content)
michael@0 123 else:
michael@0 124 status, headers, content = cache_result
michael@0 125
michael@0 126 try:
michael@0 127 self.send_response(status)
michael@0 128 for name, value in headers:
michael@0 129 # kill the transfer-encoding header because we don't support it when
michael@0 130 # we send data to the client
michael@0 131 if name not in ('transfer-encoding',):
michael@0 132 self.send_header(name, value)
michael@0 133 if "Content-Length" not in headers:
michael@0 134 self.send_header("Content-Length", str(len(content)))
michael@0 135 self.end_headers()
michael@0 136 except socket.error, e:
michael@0 137 if options.verbose:
michael@0 138 print "Got socket error %s" % e
michael@0 139 return None
michael@0 140 return content
michael@0 141 def log_message(self, format, *args):
michael@0 142 if options.verbose:
michael@0 143 BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args)
michael@0 144
michael@0 145 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
michael@0 146 def __init__(self, address, handler):
michael@0 147 BaseHTTPServer.HTTPServer.__init__(self, address, handler)
michael@0 148
michael@0 149 class Cache(object):
michael@0 150 """Multithreaded cache uses the shelve module to store pages"""
michael@0 151 # 20 concurrent threads ought to be enough for one browser
michael@0 152 max_concurrency = 20
michael@0 153 def __init__(self, name='', max_concurrency=20):
michael@0 154 name = name or options.cache or "proxy_cache.db"
michael@0 155 self.name = name
michael@0 156 self.max_concurrency = max_concurrency
michael@0 157 self.entries = {}
michael@0 158 self.sem = threading.Semaphore(self.max_concurrency)
michael@0 159 self.semlock = threading.Lock()
michael@0 160 if options.clear_cache:
michael@0 161 flag = 'n'
michael@0 162 else:
michael@0 163 flag = 'c'
michael@0 164 self.db = shelve.DbfilenameShelf(name, flag)
michael@0 165
michael@0 166 def __del__(self):
michael@0 167 if hasattr(self, 'db'):
michael@0 168 self.db.close()
michael@0 169
michael@0 170 def get_key(self, host, port, resource):
michael@0 171 return '%s:%s/%s' % (host, port, resource)
michael@0 172
michael@0 173 def get(self, host, port, resource):
michael@0 174 key = self.get_key(host, port, resource)
michael@0 175 self.semlock.acquire()
michael@0 176 self.sem.acquire()
michael@0 177 self.semlock.release()
michael@0 178 try:
michael@0 179 if not self.db.has_key(key):
michael@0 180 return None
michael@0 181 # returns status, headers, content
michael@0 182 return self.db[key]
michael@0 183 finally:
michael@0 184 self.sem.release()
michael@0 185 def add(self, host, port, resource, status, headers, content):
michael@0 186 key = self.get_key(host, port, resource)
michael@0 187 self.semlock.acquire()
michael@0 188 for i in range(self.max_concurrency):
michael@0 189 self.sem.acquire()
michael@0 190 self.semlock.release()
michael@0 191 try:
michael@0 192 self.db[key] = (status, headers, content)
michael@0 193 self.db.sync()
michael@0 194 finally:
michael@0 195 for i in range(self.max_concurrency):
michael@0 196 self.sem.release()
michael@0 197
michael@0 198 class Options(object):
michael@0 199 port = 8000
michael@0 200 localonly = False
michael@0 201 clear_cache = False
michael@0 202 no_cache = False
michael@0 203 cache = 'proxy_cache.db'
michael@0 204 verbose = False
michael@0 205
michael@0 206 def _parseOptions():
michael@0 207 def port_callback(option, opt, value, parser):
michael@0 208 if value > 0 and value < (2 ** 16 - 1):
michael@0 209 setattr(parser.values, option.dest, value)
michael@0 210 else:
michael@0 211 raise OptionValueError("Port number is out of range")
michael@0 212
michael@0 213 global options
michael@0 214 parser = OptionParser(version="Talos Proxy " + __version__)
michael@0 215 parser.add_option("-p", "--port", dest="port",
michael@0 216 help="The port to run the proxy server on", metavar="PORT", type="int",
michael@0 217 action="callback", callback=port_callback)
michael@0 218 parser.add_option("-v", "--verbose", action="store_true", dest="verbose",
michael@0 219 help="Include additional debugging information")
michael@0 220 parser.add_option("-l", "--localonly", action="store_true", dest="localonly",
michael@0 221 help="Only serve pages from the local database")
michael@0 222 parser.add_option("-c", "--clear", action="store_true", dest="clear_cache",
michael@0 223 help="Clear the cache on startup")
michael@0 224 parser.add_option("-n", "--no-cache", action="store_true", dest="no_cache",
michael@0 225 help="Do not use a cache")
michael@0 226 parser.add_option("-u", "--use-cache", dest="cache",
michael@0 227 help="The filename of the cache to use", metavar="NAME.db")
michael@0 228 parser.set_defaults(verbose=Options.verbose,
michael@0 229 port=Options.port,
michael@0 230 localonly=Options.localonly,
michael@0 231 clear_cache=Options.clear_cache,
michael@0 232 no_cache=Options.no_cache,
michael@0 233 cache=Options.cache)
michael@0 234 options, args = parser.parse_args()
michael@0 235
michael@0 236 """Configures the proxy server. This should be called before run_proxy. It can be
michael@0 237 called afterwards, but note that it is not threadsafe and some options (namely
michael@0 238 port) will not take effect"""
michael@0 239 def configure_proxy(**kwargs):
michael@0 240 global options
michael@0 241 options = Options()
michael@0 242 for key in kwargs:
michael@0 243 setattr(options, key, kwargs[key])
michael@0 244
michael@0 245 def _run():
michael@0 246 global cache
michael@0 247 cache = Cache()
michael@0 248 server_address = ('', options.port)
michael@0 249 httpd = HTTPServer(server_address, HTTPRequestHandler)
michael@0 250 httpd.serve_forever()
michael@0 251
michael@0 252 """Starts the proxy; it runs on a separate daemon thread"""
michael@0 253 def run_proxy():
michael@0 254 thr = threading.Thread(target=_run)
michael@0 255 # now when we die, the daemon thread will die too
michael@0 256 thr.setDaemon(1)
michael@0 257 thr.start()
michael@0 258
michael@0 259 if __name__ == '__main__':
michael@0 260 _parseOptions()
michael@0 261 try:
michael@0 262 run_proxy()
michael@0 263 # thr.join() doesn't terminate on keyboard interrupt
michael@0 264 while 1: time.sleep(1)
michael@0 265 except KeyboardInterrupt:
michael@0 266 if options.verbose:
michael@0 267 print "Quittin' time..."
michael@0 268
michael@0 269 __all__ = ['run_proxy', 'configure_proxy']

mercurial