|
1 # This Source Code Form is subject to the terms of the Mozilla Public |
|
2 # License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
|
4 |
|
5 """ |
|
6 Caching HTTP Proxy for use with the Talos pageload tests |
|
7 Author: Rob Arnold |
|
8 |
|
9 This file implements a multithreaded caching http 1.1 proxy. HEAD and GET |
|
10 methods are supported; POST is not yet. |
|
11 |
|
12 Each incoming request is put onto a new thread; python does not have a thread |
|
13 pool library, so a new thread is spawned for each request. I have tried to use |
|
14 the python 2.4 standard library wherever possible. |
|
15 |
|
16 Caching: |
|
17 The cache is implemented in the Cache class. Items can only be added to the |
|
18 cache. The only way to remove items from the cache is to blow it all away, |
|
19 either by deleting the file (default: proxy_cache.db) or passing the -c or |
|
20 --clear-cache flags on the command line. It is technically possible to remove |
|
21 items individually from the cache, but there has been no need to do so so far. |
|
22 |
|
23 The cache is implemented with the shelve module. The key is the combination of |
|
24 host, port and request (path + params + fragment) and the values stored are the |
|
25 http status code, headers and content that were received from the remote server. |
|
26 |
|
27 Access to the cache is guarded by a semaphore which allows concurrent read |
|
28 access. The semaphore is guarded by a simple mutex which prevents a deadlock |
|
29 from occuring when two threads try to add an item to the cache at the same time. |
|
30 |
|
31 Memory usage is kept to a minimum by the shelve module; only items in the cache |
|
32 that are currently being served stay in memory. |
|
33 |
|
34 Proxy: |
|
35 The BaseHTTPServer.BaseHTTPRequestHandler takes care of parsing incoming |
|
36 requests and managing the socket connection. See the documentation of the |
|
37 BaseHTTPServer module for more information. When do_HEAD or do_GET is called, |
|
38 the url that we are supposed to fetch is in self.path. |
|
39 |
|
40 TODO: |
|
41 * Implement POST requests. This requires implementing the do_POST method and |
|
42 passing the post data along. |
|
43 * Implement different cache policies |
|
44 * Added an interface to allow administrators to probe the cache and remove |
|
45 items from the database and such. |
|
46 """ |
|
47 |
|
48 __version__ = "0.1" |
|
49 |
|
50 import os |
|
51 import sys |
|
52 import time |
|
53 import threading |
|
54 import shelve |
|
55 from optparse import OptionParser, OptionValueError |
|
56 |
|
57 import SocketServer |
|
58 import BaseHTTPServer |
|
59 import socket |
|
60 import httplib |
|
61 from urlparse import urlsplit, urlunsplit |
|
62 |
|
63 class HTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): |
|
64 server_version = "TalosProxy/" + __version__ |
|
65 protocol_version = "HTTP/1.1" |
|
66 |
|
67 def do_GET(self): |
|
68 content = self.send_head() |
|
69 if content: |
|
70 try: |
|
71 self.wfile.write(content) |
|
72 except socket.error, e: |
|
73 if options.verbose: |
|
74 print "Got socket error %s" % e |
|
75 #self.close_connection = 1 |
|
76 def do_HEAD(self): |
|
77 self.send_head() |
|
78 |
|
79 def getHeaders(self): |
|
80 h = {} |
|
81 for name in self.headers.keys(): |
|
82 h[name] = self.headers[name] |
|
83 |
|
84 return h |
|
85 |
|
86 def send_head(self, method="GET"): |
|
87 o = urlsplit(self.path) |
|
88 |
|
89 #sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
90 |
|
91 headers = self.getHeaders() |
|
92 for k in "Proxy-Connection", "Connection": |
|
93 if k in headers: |
|
94 headers[k] = "Close" |
|
95 if "Keep-Alive" in headers: |
|
96 del headers["Keep-Alive"] |
|
97 |
|
98 reqstring = urlunsplit(('','',o.path, o.query, o.fragment)) |
|
99 |
|
100 if options.no_cache: |
|
101 cache_result = None |
|
102 else: |
|
103 cache_result = cache.get(o.hostname, o.port, reqstring) |
|
104 |
|
105 if not cache_result: |
|
106 if options.localonly: |
|
107 self.send_error(404, "Object not in cache") |
|
108 return None |
|
109 else: |
|
110 if options.verbose: |
|
111 print "Object %s was not in the cache" % self.path |
|
112 conn = httplib.HTTPConnection(o.netloc) |
|
113 conn.request("GET", reqstring, headers=headers) |
|
114 res = conn.getresponse() |
|
115 |
|
116 content = res.read() |
|
117 conn.close() |
|
118 |
|
119 status, headers = res.status, res.getheaders() |
|
120 |
|
121 if not options.no_cache: |
|
122 cache.add(o.hostname, o.port, reqstring, status, headers, content) |
|
123 else: |
|
124 status, headers, content = cache_result |
|
125 |
|
126 try: |
|
127 self.send_response(status) |
|
128 for name, value in headers: |
|
129 # kill the transfer-encoding header because we don't support it when |
|
130 # we send data to the client |
|
131 if name not in ('transfer-encoding',): |
|
132 self.send_header(name, value) |
|
133 if "Content-Length" not in headers: |
|
134 self.send_header("Content-Length", str(len(content))) |
|
135 self.end_headers() |
|
136 except socket.error, e: |
|
137 if options.verbose: |
|
138 print "Got socket error %s" % e |
|
139 return None |
|
140 return content |
|
141 def log_message(self, format, *args): |
|
142 if options.verbose: |
|
143 BaseHTTPServer.BaseHTTPRequestHandler.log_message(self, format, *args) |
|
144 |
|
145 class HTTPServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): |
|
146 def __init__(self, address, handler): |
|
147 BaseHTTPServer.HTTPServer.__init__(self, address, handler) |
|
148 |
|
149 class Cache(object): |
|
150 """Multithreaded cache uses the shelve module to store pages""" |
|
151 # 20 concurrent threads ought to be enough for one browser |
|
152 max_concurrency = 20 |
|
153 def __init__(self, name='', max_concurrency=20): |
|
154 name = name or options.cache or "proxy_cache.db" |
|
155 self.name = name |
|
156 self.max_concurrency = max_concurrency |
|
157 self.entries = {} |
|
158 self.sem = threading.Semaphore(self.max_concurrency) |
|
159 self.semlock = threading.Lock() |
|
160 if options.clear_cache: |
|
161 flag = 'n' |
|
162 else: |
|
163 flag = 'c' |
|
164 self.db = shelve.DbfilenameShelf(name, flag) |
|
165 |
|
166 def __del__(self): |
|
167 if hasattr(self, 'db'): |
|
168 self.db.close() |
|
169 |
|
170 def get_key(self, host, port, resource): |
|
171 return '%s:%s/%s' % (host, port, resource) |
|
172 |
|
173 def get(self, host, port, resource): |
|
174 key = self.get_key(host, port, resource) |
|
175 self.semlock.acquire() |
|
176 self.sem.acquire() |
|
177 self.semlock.release() |
|
178 try: |
|
179 if not self.db.has_key(key): |
|
180 return None |
|
181 # returns status, headers, content |
|
182 return self.db[key] |
|
183 finally: |
|
184 self.sem.release() |
|
185 def add(self, host, port, resource, status, headers, content): |
|
186 key = self.get_key(host, port, resource) |
|
187 self.semlock.acquire() |
|
188 for i in range(self.max_concurrency): |
|
189 self.sem.acquire() |
|
190 self.semlock.release() |
|
191 try: |
|
192 self.db[key] = (status, headers, content) |
|
193 self.db.sync() |
|
194 finally: |
|
195 for i in range(self.max_concurrency): |
|
196 self.sem.release() |
|
197 |
|
198 class Options(object): |
|
199 port = 8000 |
|
200 localonly = False |
|
201 clear_cache = False |
|
202 no_cache = False |
|
203 cache = 'proxy_cache.db' |
|
204 verbose = False |
|
205 |
|
206 def _parseOptions(): |
|
207 def port_callback(option, opt, value, parser): |
|
208 if value > 0 and value < (2 ** 16 - 1): |
|
209 setattr(parser.values, option.dest, value) |
|
210 else: |
|
211 raise OptionValueError("Port number is out of range") |
|
212 |
|
213 global options |
|
214 parser = OptionParser(version="Talos Proxy " + __version__) |
|
215 parser.add_option("-p", "--port", dest="port", |
|
216 help="The port to run the proxy server on", metavar="PORT", type="int", |
|
217 action="callback", callback=port_callback) |
|
218 parser.add_option("-v", "--verbose", action="store_true", dest="verbose", |
|
219 help="Include additional debugging information") |
|
220 parser.add_option("-l", "--localonly", action="store_true", dest="localonly", |
|
221 help="Only serve pages from the local database") |
|
222 parser.add_option("-c", "--clear", action="store_true", dest="clear_cache", |
|
223 help="Clear the cache on startup") |
|
224 parser.add_option("-n", "--no-cache", action="store_true", dest="no_cache", |
|
225 help="Do not use a cache") |
|
226 parser.add_option("-u", "--use-cache", dest="cache", |
|
227 help="The filename of the cache to use", metavar="NAME.db") |
|
228 parser.set_defaults(verbose=Options.verbose, |
|
229 port=Options.port, |
|
230 localonly=Options.localonly, |
|
231 clear_cache=Options.clear_cache, |
|
232 no_cache=Options.no_cache, |
|
233 cache=Options.cache) |
|
234 options, args = parser.parse_args() |
|
235 |
|
236 """Configures the proxy server. This should be called before run_proxy. It can be |
|
237 called afterwards, but note that it is not threadsafe and some options (namely |
|
238 port) will not take effect""" |
|
239 def configure_proxy(**kwargs): |
|
240 global options |
|
241 options = Options() |
|
242 for key in kwargs: |
|
243 setattr(options, key, kwargs[key]) |
|
244 |
|
245 def _run(): |
|
246 global cache |
|
247 cache = Cache() |
|
248 server_address = ('', options.port) |
|
249 httpd = HTTPServer(server_address, HTTPRequestHandler) |
|
250 httpd.serve_forever() |
|
251 |
|
252 """Starts the proxy; it runs on a separate daemon thread""" |
|
253 def run_proxy(): |
|
254 thr = threading.Thread(target=_run) |
|
255 # now when we die, the daemon thread will die too |
|
256 thr.setDaemon(1) |
|
257 thr.start() |
|
258 |
|
259 if __name__ == '__main__': |
|
260 _parseOptions() |
|
261 try: |
|
262 run_proxy() |
|
263 # thr.join() doesn't terminate on keyboard interrupt |
|
264 while 1: time.sleep(1) |
|
265 except KeyboardInterrupt: |
|
266 if options.verbose: |
|
267 print "Quittin' time..." |
|
268 |
|
269 __all__ = ['run_proxy', 'configure_proxy'] |