xpcom/analysis/deki.py

branch
TOR_BUG_9701
changeset 15
b8a032363ba2
equal deleted inserted replaced
-1:000000000000 0:a7902f1967c6
1 # This Source Code Form is subject to the terms of the Mozilla Public
2 # License, v. 2.0. If a copy of the MPL was not distributed with this
3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
4
5 """ deki.py - Access the wiki pages on a MindTouch Deki server via the API.
6
7 Here's what this code can do:
8
9 wiki = deki.Deki("http://developer.mozilla.org/@api/deki/", username, password)
10 page = wiki.get_page("Sheep")
11 print page.title
12 print page.doc.toxml()
13
14 page.title = "Bananas"
15 page.save()
16
17 There are also some additional methods:
18 wiki.create_page(path, content, title=, override=)
19 wiki.move_page(old, new)
20 wiki.get_subpages(page)
21
22 This module does not try to mimic the MindTouch "Plug" API. It's meant to be
23 higher-level than that.
24 """
25
26 import sys
27 import urllib2, cookielib, httplib
28 import xml.dom.minidom as dom
29 from urllib import quote as _urllib_quote
30 from urllib import urlencode as _urlencode
31 import urlparse
32 from datetime import datetime
33 import re
34
35 __all__ = ['Deki']
36
37
38 # === Utils
39
40 def _check(fact):
41 if not fact:
42 raise AssertionError('check failed')
43
44 def _urlquote(s, *args):
45 return _urllib_quote(s.encode('utf-8'), *args)
46
47 def _make_url(*dirs, **params):
48 """ dirs must already be url-encoded, params must not """
49 url = '/'.join(dirs)
50 if params:
51 url += '?' + _urlencode(params)
52 return url
53
54 class PutRequest(urllib2.Request):
55 def get_method(self):
56 return "PUT"
57
58 # === Dream framework client code
59
60 # This handler causes python to "always be logged in" when it's talking to the
61 # server. If you're just accessing public pages, it generates more requests
62 # than are strictly needed, but this is the behavior you want for a bot.
63 #
64 # The users/authenticate request is sent twice: once without any basic auth and
65 # once with. Dumb. Feel free to fix.
66 #
67 class _LoginHandler(urllib2.HTTPCookieProcessor):
68 def __init__(self, server):
69 policy = cookielib.DefaultCookiePolicy(rfc2965=True)
70 cookiejar = cookielib.CookieJar(policy)
71 urllib2.HTTPCookieProcessor.__init__(self, cookiejar)
72 self.server = server
73
74 def http_request(self, req):
75 #print "DEBUG- Requesting " + req.get_full_url()
76 s = self.server
77 req = urllib2.HTTPCookieProcessor.http_request(self, req)
78 if ('Cookie' not in req.unredirected_hdrs
79 and req.get_full_url() != s.base + 'users/authenticate'):
80 s.login()
81 # Retry - should have a new cookie.
82 req = urllib2.HTTPCookieProcessor.http_request(self, req)
83 _check('Cookie' in req.unredirected_hdrs)
84 return req
85
86 class DreamClient:
87 def __init__(self, base, user, password):
88 """
89 base - The base URI of the Deki API, with trailing slash.
90 Typically, 'http://wiki.example.org/@api/deki/'.
91 user, password - Your Deki login information.
92 """
93 self.base = base
94 pm = urllib2.HTTPPasswordMgrWithDefaultRealm()
95 pm.add_password(None, self.base, user, password)
96 ah = urllib2.HTTPBasicAuthHandler(pm)
97 lh = _LoginHandler(self)
98 self._opener = urllib2.build_opener(ah, lh)
99
100 def login(self):
101 response = self._opener.open(self.base + 'users/authenticate')
102 response.close()
103
104 def open(self, url):
105 return self._opener.open(self.base + url)
106
107 def _handleResponse(self, req):
108 """Helper method shared between post() and put()"""
109 resp = self._opener.open(req)
110 try:
111 ct = resp.headers.get('Content-Type', '(none)')
112 if '/xml' in ct or '+xml' in ct:
113 return dom.parse(resp)
114 else:
115 #print "DEBUG- Content-Type:", ct
116 crud = resp.read()
117 #print 'DEBUG- crud:\n---\n%s\n---' % re.sub(r'(?m)^', ' ', crud)
118 return None
119 finally:
120 resp.close()
121
122
123 def post(self, url, data, type):
124 #print "DEBUG- posting to:", self.base + url
125 req = urllib2.Request(self.base + url, data, {'Content-Type': type})
126 return self._handleResponse(req)
127
128 def put(self, url, data, type):
129 #print "DEBUG- putting to:", self.base + url
130 req = PutRequest(self.base + url, data, {'Content-Type': type})
131 return self._handleResponse(req)
132
133 def get_xml(self, url):
134 resp = self.open(url)
135 try:
136 return dom.parse(resp)
137 finally:
138 resp.close()
139
140
141 # === DOM
142
143 def _text_of(node):
144 if node.nodeType == node.ELEMENT_NODE:
145 return u''.join(_text_of(n) for n in node.childNodes)
146 elif node.nodeType == node.TEXT_NODE:
147 return node.nodeValue
148 else:
149 return u''
150
151 def _the_element_by_name(doc, tagName):
152 elts = doc.getElementsByTagName(tagName)
153 if len(elts) != 1:
154 raise ValueError("Expected exactly one <%s> tag, got %d." % (tagName, len(elts)))
155 return elts[0]
156
157 def _first_element(node):
158 n = node.firstChild
159 while n is not None:
160 if n.nodeType == n.ELEMENT_NODE:
161 return n
162 n = node.nextSibling
163 return None
164
165 def _find_elements(node, path):
166 if u'/' in path:
167 [first, rest] = path.split(u'/', 1)
168 for child in _find_elements(node, first):
169 for desc in _find_elements(child, rest):
170 yield desc
171 else:
172 for n in node.childNodes:
173 if n.nodeType == node.ELEMENT_NODE and n.nodeName == path:
174 yield n
175
176
177 # === Deki
178
179 def _format_page_id(id):
180 if isinstance(id, int):
181 return str(id)
182 elif id is Deki.HOME:
183 return 'home'
184 elif isinstance(id, basestring):
185 # Double-encoded, per the Deki API reference.
186 return '=' + _urlquote(_urlquote(id, ''))
187
188 class Deki(DreamClient):
189 HOME = object()
190
191 def get_page(self, page_id):
192 """ Get the content of a page from the wiki.
193
194 The page_id argument must be one of:
195 an int - The page id (an arbitrary number assigned by Deki)
196 a str - The page name (not the title, the full path that shows up in the URL)
197 Deki.HOME - Refers to the main page of the wiki.
198
199 Returns a Page object.
200 """
201 p = Page(self)
202 p._load(page_id)
203 return p
204
205 def create_page(self, path, content, title=None, overwrite=False):
206 """ Create a new wiki page.
207
208 Parameters:
209 path - str - The page id.
210 content - str - The XML content to put in the new page.
211 The document element must be a <body>.
212 title - str - The page title. Keyword argument only.
213 Defaults to the last path-segment of path.
214 overwrite - bool - Whether to overwrite an existing page. If false,
215 and the page already exists, the method will throw an error.
216 """
217 if title is None:
218 title = path.split('/')[-1]
219 doc = dom.parseString(content)
220 _check(doc.documentElement.tagName == 'body')
221 p = Page(self)
222 p._create(path, title, doc, overwrite)
223
224 def attach_file(self, page, name, data, mimetype, description=None):
225 """Create or update a file attachment.
226
227 Parameters:
228 page - str - the page ID this file is related to
229 name - str - the name of the file
230 data - str - the file data
231 mimetype - str - the MIME type of the file
232 description - str - a description of the file
233 """
234
235 p = {}
236 if description is not None:
237 p['description'] = description
238
239 url = _make_url('pages', _format_page_id(page),
240 'files', _format_page_id(name), **p)
241
242 r = self.put(url, data, mimetype)
243 _check(r.documentElement.nodeName == u'file')
244
245 def get_subpages(self, page_id):
246 """ Return the ids of all subpages of the given page. """
247 doc = self.get_xml(_make_url("pages", _format_page_id(page_id),
248 "files,subpages"))
249 for elt in _find_elements(doc, u'page/subpages/page.subpage/path'):
250 yield _text_of(elt)
251
252 def move_page(self, page_id, new_title, redirects=True):
253 """ Move an existing page to a new location.
254
255 A page cannot be moved to a destination that already exists, is a
256 descendant, or has a protected title (ex. Special:xxx, User:,
257 Template:).
258
259 When a page is moved, subpages under the specified page are also moved.
260 For each moved page, the system automatically creates an alias page
261 that redirects from the old to the new destination.
262 """
263 self.post(_make_url("pages", _format_page_id(page_id), "move",
264 to=new_title,
265 redirects=redirects and "1" or "0"),
266 "", "text/plain")
267
268 class Page:
269 """ A Deki wiki page.
270
271 To obtain a page, call wiki.get_page(id).
272 Attributes:
273 title : unicode - The page title.
274 doc : Document - The content of the page as a DOM Document.
275 The root element of this document is a <body>.
276 path : unicode - The path. Use this to detect redirects, as otherwise
277 page.save() will overwrite the redirect with a copy of the content!
278 deki : Deki - The Deki object from which the page was loaded.
279 page_id : str/id/Deki.HOME - The page id used to load the page.
280 load_time : datetime - The time the page was loaded,
281 according to the clock on the client machine.
282 Methods:
283 save() - Save the modified document back to the server.
284 Only the page.title and the contents of page.doc are saved.
285 """
286
287 def __init__(self, deki):
288 self.deki = deki
289
290 def _create(self, path, title, doc, overwrite):
291 self.title = title
292 self.doc = doc
293 self.page_id = path
294 if overwrite:
295 self.load_time = datetime(2500, 1, 1)
296 else:
297 self.load_time = datetime(1900, 1, 1)
298 self.path = path
299 self.save()
300
301 def _load(self, page_id):
302 """ page_id - See comment near the definition of `HOME`. """
303 load_time = datetime.utcnow()
304
305 # Getting the title is a whole separate query!
306 url = 'pages/%s/info' % _format_page_id(page_id)
307 doc = self.deki.get_xml(url)
308 title = _text_of(_the_element_by_name(doc, 'title'))
309 path = _text_of(_the_element_by_name(doc, 'path'))
310
311 # If you prefer to sling regexes, you can request format=raw instead.
312 # The result is an XML document with one big fat text node in the body.
313 url = _make_url('pages', _format_page_id(page_id), 'contents',
314 format='xhtml', mode='edit')
315 doc = self.deki.get_xml(url)
316
317 content = doc.documentElement
318 _check(content.tagName == u'content')
319 body = _first_element(content)
320 _check(body is not None)
321 _check(body.tagName == u'body')
322
323 doc.removeChild(content)
324 doc.appendChild(body)
325
326 self.page_id = page_id
327 self.load_time = load_time
328 self.title = title
329 self.path = path
330 self.doc = doc
331
332 def save(self):
333 p = {'edittime': _urlquote(self.load_time.strftime('%Y%m%d%H%M%S')),
334 'abort': 'modified'}
335
336 if self.title is not None:
337 p['title'] = _urlquote(self.title)
338
339 url = _make_url('pages', _format_page_id(self.page_id), 'contents', **p)
340
341 body = self.doc.documentElement
342 bodyInnerXML = ''.join(n.toxml('utf-8') for n in body.childNodes)
343
344 reply = self.deki.post(url, bodyInnerXML, 'text/plain; charset=utf-8')
345 _check(reply.documentElement.nodeName == u'edit')
346 _check(reply.documentElement.getAttribute(u'status') == u'success')

mercurial