michael@0: # This Source Code Form is subject to the terms of the Mozilla Public michael@0: # License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: # file, You can obtain one at http://mozilla.org/MPL/2.0/. michael@0: michael@0: """ deki.py - Access the wiki pages on a MindTouch Deki server via the API. michael@0: michael@0: Here's what this code can do: michael@0: michael@0: wiki = deki.Deki("http://developer.mozilla.org/@api/deki/", username, password) michael@0: page = wiki.get_page("Sheep") michael@0: print page.title michael@0: print page.doc.toxml() michael@0: michael@0: page.title = "Bananas" michael@0: page.save() michael@0: michael@0: There are also some additional methods: michael@0: wiki.create_page(path, content, title=, override=) michael@0: wiki.move_page(old, new) michael@0: wiki.get_subpages(page) michael@0: michael@0: This module does not try to mimic the MindTouch "Plug" API. It's meant to be michael@0: higher-level than that. michael@0: """ michael@0: michael@0: import sys michael@0: import urllib2, cookielib, httplib michael@0: import xml.dom.minidom as dom michael@0: from urllib import quote as _urllib_quote michael@0: from urllib import urlencode as _urlencode michael@0: import urlparse michael@0: from datetime import datetime michael@0: import re michael@0: michael@0: __all__ = ['Deki'] michael@0: michael@0: michael@0: # === Utils michael@0: michael@0: def _check(fact): michael@0: if not fact: michael@0: raise AssertionError('check failed') michael@0: michael@0: def _urlquote(s, *args): michael@0: return _urllib_quote(s.encode('utf-8'), *args) michael@0: michael@0: def _make_url(*dirs, **params): michael@0: """ dirs must already be url-encoded, params must not """ michael@0: url = '/'.join(dirs) michael@0: if params: michael@0: url += '?' + _urlencode(params) michael@0: return url michael@0: michael@0: class PutRequest(urllib2.Request): michael@0: def get_method(self): michael@0: return "PUT" michael@0: michael@0: # === Dream framework client code michael@0: michael@0: # This handler causes python to "always be logged in" when it's talking to the michael@0: # server. If you're just accessing public pages, it generates more requests michael@0: # than are strictly needed, but this is the behavior you want for a bot. michael@0: # michael@0: # The users/authenticate request is sent twice: once without any basic auth and michael@0: # once with. Dumb. Feel free to fix. michael@0: # michael@0: class _LoginHandler(urllib2.HTTPCookieProcessor): michael@0: def __init__(self, server): michael@0: policy = cookielib.DefaultCookiePolicy(rfc2965=True) michael@0: cookiejar = cookielib.CookieJar(policy) michael@0: urllib2.HTTPCookieProcessor.__init__(self, cookiejar) michael@0: self.server = server michael@0: michael@0: def http_request(self, req): michael@0: #print "DEBUG- Requesting " + req.get_full_url() michael@0: s = self.server michael@0: req = urllib2.HTTPCookieProcessor.http_request(self, req) michael@0: if ('Cookie' not in req.unredirected_hdrs michael@0: and req.get_full_url() != s.base + 'users/authenticate'): michael@0: s.login() michael@0: # Retry - should have a new cookie. michael@0: req = urllib2.HTTPCookieProcessor.http_request(self, req) michael@0: _check('Cookie' in req.unredirected_hdrs) michael@0: return req michael@0: michael@0: class DreamClient: michael@0: def __init__(self, base, user, password): michael@0: """ michael@0: base - The base URI of the Deki API, with trailing slash. michael@0: Typically, 'http://wiki.example.org/@api/deki/'. michael@0: user, password - Your Deki login information. michael@0: """ michael@0: self.base = base michael@0: pm = urllib2.HTTPPasswordMgrWithDefaultRealm() michael@0: pm.add_password(None, self.base, user, password) michael@0: ah = urllib2.HTTPBasicAuthHandler(pm) michael@0: lh = _LoginHandler(self) michael@0: self._opener = urllib2.build_opener(ah, lh) michael@0: michael@0: def login(self): michael@0: response = self._opener.open(self.base + 'users/authenticate') michael@0: response.close() michael@0: michael@0: def open(self, url): michael@0: return self._opener.open(self.base + url) michael@0: michael@0: def _handleResponse(self, req): michael@0: """Helper method shared between post() and put()""" michael@0: resp = self._opener.open(req) michael@0: try: michael@0: ct = resp.headers.get('Content-Type', '(none)') michael@0: if '/xml' in ct or '+xml' in ct: michael@0: return dom.parse(resp) michael@0: else: michael@0: #print "DEBUG- Content-Type:", ct michael@0: crud = resp.read() michael@0: #print 'DEBUG- crud:\n---\n%s\n---' % re.sub(r'(?m)^', ' ', crud) michael@0: return None michael@0: finally: michael@0: resp.close() michael@0: michael@0: michael@0: def post(self, url, data, type): michael@0: #print "DEBUG- posting to:", self.base + url michael@0: req = urllib2.Request(self.base + url, data, {'Content-Type': type}) michael@0: return self._handleResponse(req) michael@0: michael@0: def put(self, url, data, type): michael@0: #print "DEBUG- putting to:", self.base + url michael@0: req = PutRequest(self.base + url, data, {'Content-Type': type}) michael@0: return self._handleResponse(req) michael@0: michael@0: def get_xml(self, url): michael@0: resp = self.open(url) michael@0: try: michael@0: return dom.parse(resp) michael@0: finally: michael@0: resp.close() michael@0: michael@0: michael@0: # === DOM michael@0: michael@0: def _text_of(node): michael@0: if node.nodeType == node.ELEMENT_NODE: michael@0: return u''.join(_text_of(n) for n in node.childNodes) michael@0: elif node.nodeType == node.TEXT_NODE: michael@0: return node.nodeValue michael@0: else: michael@0: return u'' michael@0: michael@0: def _the_element_by_name(doc, tagName): michael@0: elts = doc.getElementsByTagName(tagName) michael@0: if len(elts) != 1: michael@0: raise ValueError("Expected exactly one <%s> tag, got %d." % (tagName, len(elts))) michael@0: return elts[0] michael@0: michael@0: def _first_element(node): michael@0: n = node.firstChild michael@0: while n is not None: michael@0: if n.nodeType == n.ELEMENT_NODE: michael@0: return n michael@0: n = node.nextSibling michael@0: return None michael@0: michael@0: def _find_elements(node, path): michael@0: if u'/' in path: michael@0: [first, rest] = path.split(u'/', 1) michael@0: for child in _find_elements(node, first): michael@0: for desc in _find_elements(child, rest): michael@0: yield desc michael@0: else: michael@0: for n in node.childNodes: michael@0: if n.nodeType == node.ELEMENT_NODE and n.nodeName == path: michael@0: yield n michael@0: michael@0: michael@0: # === Deki michael@0: michael@0: def _format_page_id(id): michael@0: if isinstance(id, int): michael@0: return str(id) michael@0: elif id is Deki.HOME: michael@0: return 'home' michael@0: elif isinstance(id, basestring): michael@0: # Double-encoded, per the Deki API reference. michael@0: return '=' + _urlquote(_urlquote(id, '')) michael@0: michael@0: class Deki(DreamClient): michael@0: HOME = object() michael@0: michael@0: def get_page(self, page_id): michael@0: """ Get the content of a page from the wiki. michael@0: michael@0: The page_id argument must be one of: michael@0: an int - The page id (an arbitrary number assigned by Deki) michael@0: a str - The page name (not the title, the full path that shows up in the URL) michael@0: Deki.HOME - Refers to the main page of the wiki. michael@0: michael@0: Returns a Page object. michael@0: """ michael@0: p = Page(self) michael@0: p._load(page_id) michael@0: return p michael@0: michael@0: def create_page(self, path, content, title=None, overwrite=False): michael@0: """ Create a new wiki page. michael@0: michael@0: Parameters: michael@0: path - str - The page id. michael@0: content - str - The XML content to put in the new page. michael@0: The document element must be a . michael@0: title - str - The page title. Keyword argument only. michael@0: Defaults to the last path-segment of path. michael@0: overwrite - bool - Whether to overwrite an existing page. If false, michael@0: and the page already exists, the method will throw an error. michael@0: """ michael@0: if title is None: michael@0: title = path.split('/')[-1] michael@0: doc = dom.parseString(content) michael@0: _check(doc.documentElement.tagName == 'body') michael@0: p = Page(self) michael@0: p._create(path, title, doc, overwrite) michael@0: michael@0: def attach_file(self, page, name, data, mimetype, description=None): michael@0: """Create or update a file attachment. michael@0: michael@0: Parameters: michael@0: page - str - the page ID this file is related to michael@0: name - str - the name of the file michael@0: data - str - the file data michael@0: mimetype - str - the MIME type of the file michael@0: description - str - a description of the file michael@0: """ michael@0: michael@0: p = {} michael@0: if description is not None: michael@0: p['description'] = description michael@0: michael@0: url = _make_url('pages', _format_page_id(page), michael@0: 'files', _format_page_id(name), **p) michael@0: michael@0: r = self.put(url, data, mimetype) michael@0: _check(r.documentElement.nodeName == u'file') michael@0: michael@0: def get_subpages(self, page_id): michael@0: """ Return the ids of all subpages of the given page. """ michael@0: doc = self.get_xml(_make_url("pages", _format_page_id(page_id), michael@0: "files,subpages")) michael@0: for elt in _find_elements(doc, u'page/subpages/page.subpage/path'): michael@0: yield _text_of(elt) michael@0: michael@0: def move_page(self, page_id, new_title, redirects=True): michael@0: """ Move an existing page to a new location. michael@0: michael@0: A page cannot be moved to a destination that already exists, is a michael@0: descendant, or has a protected title (ex. Special:xxx, User:, michael@0: Template:). michael@0: michael@0: When a page is moved, subpages under the specified page are also moved. michael@0: For each moved page, the system automatically creates an alias page michael@0: that redirects from the old to the new destination. michael@0: """ michael@0: self.post(_make_url("pages", _format_page_id(page_id), "move", michael@0: to=new_title, michael@0: redirects=redirects and "1" or "0"), michael@0: "", "text/plain") michael@0: michael@0: class Page: michael@0: """ A Deki wiki page. michael@0: michael@0: To obtain a page, call wiki.get_page(id). michael@0: Attributes: michael@0: title : unicode - The page title. michael@0: doc : Document - The content of the page as a DOM Document. michael@0: The root element of this document is a . michael@0: path : unicode - The path. Use this to detect redirects, as otherwise michael@0: page.save() will overwrite the redirect with a copy of the content! michael@0: deki : Deki - The Deki object from which the page was loaded. michael@0: page_id : str/id/Deki.HOME - The page id used to load the page. michael@0: load_time : datetime - The time the page was loaded, michael@0: according to the clock on the client machine. michael@0: Methods: michael@0: save() - Save the modified document back to the server. michael@0: Only the page.title and the contents of page.doc are saved. michael@0: """ michael@0: michael@0: def __init__(self, deki): michael@0: self.deki = deki michael@0: michael@0: def _create(self, path, title, doc, overwrite): michael@0: self.title = title michael@0: self.doc = doc michael@0: self.page_id = path michael@0: if overwrite: michael@0: self.load_time = datetime(2500, 1, 1) michael@0: else: michael@0: self.load_time = datetime(1900, 1, 1) michael@0: self.path = path michael@0: self.save() michael@0: michael@0: def _load(self, page_id): michael@0: """ page_id - See comment near the definition of `HOME`. """ michael@0: load_time = datetime.utcnow() michael@0: michael@0: # Getting the title is a whole separate query! michael@0: url = 'pages/%s/info' % _format_page_id(page_id) michael@0: doc = self.deki.get_xml(url) michael@0: title = _text_of(_the_element_by_name(doc, 'title')) michael@0: path = _text_of(_the_element_by_name(doc, 'path')) michael@0: michael@0: # If you prefer to sling regexes, you can request format=raw instead. michael@0: # The result is an XML document with one big fat text node in the body. michael@0: url = _make_url('pages', _format_page_id(page_id), 'contents', michael@0: format='xhtml', mode='edit') michael@0: doc = self.deki.get_xml(url) michael@0: michael@0: content = doc.documentElement michael@0: _check(content.tagName == u'content') michael@0: body = _first_element(content) michael@0: _check(body is not None) michael@0: _check(body.tagName == u'body') michael@0: michael@0: doc.removeChild(content) michael@0: doc.appendChild(body) michael@0: michael@0: self.page_id = page_id michael@0: self.load_time = load_time michael@0: self.title = title michael@0: self.path = path michael@0: self.doc = doc michael@0: michael@0: def save(self): michael@0: p = {'edittime': _urlquote(self.load_time.strftime('%Y%m%d%H%M%S')), michael@0: 'abort': 'modified'} michael@0: michael@0: if self.title is not None: michael@0: p['title'] = _urlquote(self.title) michael@0: michael@0: url = _make_url('pages', _format_page_id(self.page_id), 'contents', **p) michael@0: michael@0: body = self.doc.documentElement michael@0: bodyInnerXML = ''.join(n.toxml('utf-8') for n in body.childNodes) michael@0: michael@0: reply = self.deki.post(url, bodyInnerXML, 'text/plain; charset=utf-8') michael@0: _check(reply.documentElement.nodeName == u'edit') michael@0: _check(reply.documentElement.getAttribute(u'status') == u'success')