xpcom/analysis/deki.py

Thu, 22 Jan 2015 13:21:57 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Thu, 22 Jan 2015 13:21:57 +0100
branch
TOR_BUG_9701
changeset 15
b8a032363ba2
permissions
-rw-r--r--

Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6

     1 # This Source Code Form is subject to the terms of the Mozilla Public
     2 # License, v. 2.0. If a copy of the MPL was not distributed with this
     3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
     5 """ deki.py - Access the wiki pages on a MindTouch Deki server via the API.
     7 Here's what this code can do:
     9   wiki = deki.Deki("http://developer.mozilla.org/@api/deki/", username, password)
    10   page = wiki.get_page("Sheep")
    11   print page.title
    12   print page.doc.toxml()
    14   page.title = "Bananas"
    15   page.save()
    17 There are also some additional methods:
    18   wiki.create_page(path, content, title=, override=)
    19   wiki.move_page(old, new)
    20   wiki.get_subpages(page)
    22 This module does not try to mimic the MindTouch "Plug" API.  It's meant to be
    23 higher-level than that.
    24 """
    26 import sys
    27 import urllib2, cookielib, httplib
    28 import xml.dom.minidom as dom
    29 from urllib import quote as _urllib_quote
    30 from urllib import urlencode as _urlencode
    31 import urlparse
    32 from datetime import datetime
    33 import re
    35 __all__ = ['Deki']
    38 # === Utils
    40 def _check(fact):
    41     if not fact:
    42         raise AssertionError('check failed')
    44 def _urlquote(s, *args):
    45     return _urllib_quote(s.encode('utf-8'), *args)
    47 def _make_url(*dirs, **params):
    48     """ dirs must already be url-encoded, params must not """
    49     url = '/'.join(dirs)
    50     if params:
    51         url += '?' + _urlencode(params)
    52     return url
    54 class PutRequest(urllib2.Request):
    55     def get_method(self):
    56         return "PUT"
    58 # === Dream framework client code
    60 # This handler causes python to "always be logged in" when it's talking to the
    61 # server.  If you're just accessing public pages, it generates more requests
    62 # than are strictly needed, but this is the behavior you want for a bot.
    63 #
    64 # The users/authenticate request is sent twice: once without any basic auth and
    65 # once with.  Dumb.  Feel free to fix.
    66 #
    67 class _LoginHandler(urllib2.HTTPCookieProcessor):
    68     def __init__(self, server):
    69         policy = cookielib.DefaultCookiePolicy(rfc2965=True)
    70         cookiejar = cookielib.CookieJar(policy)
    71         urllib2.HTTPCookieProcessor.__init__(self, cookiejar)
    72         self.server = server
    74     def http_request(self, req):
    75         #print "DEBUG- Requesting " + req.get_full_url()
    76         s = self.server
    77         req = urllib2.HTTPCookieProcessor.http_request(self, req)
    78         if ('Cookie' not in req.unredirected_hdrs
    79               and req.get_full_url() != s.base + 'users/authenticate'):
    80             s.login()
    81             # Retry - should have a new cookie.
    82             req = urllib2.HTTPCookieProcessor.http_request(self, req)
    83             _check('Cookie' in req.unredirected_hdrs)
    84         return req
    86 class DreamClient:
    87     def __init__(self, base, user, password):
    88         """ 
    89         base - The base URI of the Deki API, with trailing slash.
    90                Typically, 'http://wiki.example.org/@api/deki/'.
    91         user, password - Your Deki login information.
    92         """
    93         self.base = base
    94         pm = urllib2.HTTPPasswordMgrWithDefaultRealm()
    95         pm.add_password(None, self.base, user, password)
    96         ah = urllib2.HTTPBasicAuthHandler(pm)
    97         lh = _LoginHandler(self)
    98         self._opener = urllib2.build_opener(ah, lh)
   100     def login(self):
   101         response = self._opener.open(self.base + 'users/authenticate')
   102         response.close()
   104     def open(self, url):
   105         return self._opener.open(self.base + url)
   107     def _handleResponse(self, req):
   108         """Helper method shared between post() and put()"""
   109         resp = self._opener.open(req)
   110         try:
   111             ct = resp.headers.get('Content-Type', '(none)')
   112             if '/xml' in ct or '+xml' in ct:
   113                 return dom.parse(resp)
   114             else:
   115                 #print "DEBUG- Content-Type:", ct
   116                 crud = resp.read()
   117                 #print 'DEBUG- crud:\n---\n%s\n---' % re.sub(r'(?m)^', '    ', crud)
   118                 return None
   119         finally:
   120             resp.close()
   123     def post(self, url, data, type):
   124         #print "DEBUG- posting to:", self.base + url
   125         req = urllib2.Request(self.base + url, data, {'Content-Type': type})
   126         return self._handleResponse(req)
   128     def put(self, url, data, type):
   129         #print "DEBUG- putting to:", self.base + url
   130         req = PutRequest(self.base + url, data, {'Content-Type': type})
   131         return self._handleResponse(req)
   133     def get_xml(self, url):
   134         resp = self.open(url)
   135         try:
   136             return dom.parse(resp)
   137         finally:
   138             resp.close()
   141 # === DOM
   143 def _text_of(node):
   144     if node.nodeType == node.ELEMENT_NODE:
   145         return u''.join(_text_of(n) for n in node.childNodes)
   146     elif node.nodeType == node.TEXT_NODE:
   147         return node.nodeValue
   148     else:
   149         return u''
   151 def _the_element_by_name(doc, tagName):
   152     elts = doc.getElementsByTagName(tagName)
   153     if len(elts) != 1:
   154         raise ValueError("Expected exactly one <%s> tag, got %d." % (tagName, len(elts)))
   155     return elts[0]
   157 def _first_element(node):
   158     n = node.firstChild
   159     while n is not None:
   160         if n.nodeType == n.ELEMENT_NODE:
   161             return n
   162         n = node.nextSibling
   163     return None
   165 def _find_elements(node, path):
   166     if u'/' in path:
   167         [first, rest] = path.split(u'/', 1)
   168         for child in _find_elements(node, first):
   169             for desc in _find_elements(child, rest):
   170                 yield desc
   171     else:
   172         for n in node.childNodes:
   173             if n.nodeType == node.ELEMENT_NODE and n.nodeName == path:
   174                 yield n
   177 # === Deki
   179 def _format_page_id(id):
   180     if isinstance(id, int):
   181         return str(id)
   182     elif id is Deki.HOME:
   183         return 'home'
   184     elif isinstance(id, basestring):
   185         # Double-encoded, per the Deki API reference.
   186         return '=' + _urlquote(_urlquote(id, ''))
   188 class Deki(DreamClient):
   189     HOME = object()
   191     def get_page(self, page_id):
   192         """ Get the content of a page from the wiki.
   194         The page_id argument must be one of:
   195           an int - The page id (an arbitrary number assigned by Deki)
   196           a str - The page name (not the title, the full path that shows up in the URL)
   197           Deki.HOME - Refers to the main page of the wiki.
   199         Returns a Page object.
   200         """
   201         p = Page(self)
   202         p._load(page_id)
   203         return p
   205     def create_page(self, path, content, title=None, overwrite=False):
   206         """ Create a new wiki page.
   208         Parameters:
   209           path - str - The page id.
   210           content - str - The XML content to put in the new page.
   211             The document element must be a <body>.
   212           title - str - The page title.  Keyword argument only.
   213             Defaults to the last path-segment of path.
   214           overwrite - bool - Whether to overwrite an existing page. If false,
   215             and the page already exists, the method will throw an error.
   216         """
   217         if title is None:
   218             title = path.split('/')[-1]
   219         doc = dom.parseString(content)
   220         _check(doc.documentElement.tagName == 'body')
   221         p = Page(self)
   222         p._create(path, title, doc, overwrite)
   224     def attach_file(self, page, name, data, mimetype, description=None):
   225         """Create or update a file attachment.
   227         Parameters:
   228           page - str - the page ID this file is related to
   229           name - str - the name of the file
   230           data - str - the file data
   231           mimetype - str - the MIME type of the file
   232           description - str - a description of the file
   233         """
   235         p = {}
   236         if description is not None:
   237             p['description'] = description
   239         url = _make_url('pages', _format_page_id(page),
   240                         'files', _format_page_id(name), **p)
   242         r = self.put(url, data, mimetype)
   243         _check(r.documentElement.nodeName == u'file')
   245     def get_subpages(self, page_id):
   246         """ Return the ids of all subpages of the given page. """
   247         doc = self.get_xml(_make_url("pages", _format_page_id(page_id),
   248                                      "files,subpages"))
   249         for elt in _find_elements(doc, u'page/subpages/page.subpage/path'):
   250             yield _text_of(elt)
   252     def move_page(self, page_id, new_title, redirects=True):
   253         """ Move an existing page to a new location.
   255         A page cannot be moved to a destination that already exists, is a
   256         descendant, or has a protected title (ex.  Special:xxx, User:,
   257         Template:).
   259         When a page is moved, subpages under the specified page are also moved.
   260         For each moved page, the system automatically creates an alias page
   261         that redirects from the old to the new destination.
   262         """
   263         self.post(_make_url("pages", _format_page_id(page_id), "move",
   264                             to=new_title,
   265                             redirects=redirects and "1" or "0"),
   266                   "", "text/plain")
   268 class Page:
   269     """ A Deki wiki page.
   271     To obtain a page, call wiki.get_page(id).
   272     Attributes:
   273         title : unicode - The page title.
   274         doc : Document - The content of the page as a DOM Document.
   275           The root element of this document is a <body>.
   276         path : unicode - The path.  Use this to detect redirects, as otherwise
   277           page.save() will overwrite the redirect with a copy of the content!
   278         deki : Deki - The Deki object from which the page was loaded.
   279         page_id : str/id/Deki.HOME - The page id used to load the page.
   280         load_time : datetime - The time the page was loaded,
   281           according to the clock on the client machine.
   282     Methods:
   283         save() - Save the modified document back to the server.
   284           Only the page.title and the contents of page.doc are saved.
   285     """
   287     def __init__(self, deki):
   288         self.deki = deki
   290     def _create(self, path, title, doc, overwrite):
   291         self.title = title
   292         self.doc = doc
   293         self.page_id = path
   294         if overwrite:
   295             self.load_time = datetime(2500, 1, 1)
   296         else:
   297             self.load_time = datetime(1900, 1, 1)
   298         self.path = path
   299         self.save()
   301     def _load(self, page_id):
   302         """ page_id - See comment near the definition of `HOME`. """
   303         load_time = datetime.utcnow()
   305         # Getting the title is a whole separate query!
   306         url = 'pages/%s/info' % _format_page_id(page_id)
   307         doc = self.deki.get_xml(url)
   308         title = _text_of(_the_element_by_name(doc, 'title'))
   309         path = _text_of(_the_element_by_name(doc, 'path'))
   311         # If you prefer to sling regexes, you can request format=raw instead.
   312         # The result is an XML document with one big fat text node in the body.
   313         url = _make_url('pages', _format_page_id(page_id), 'contents',
   314                         format='xhtml', mode='edit')
   315         doc = self.deki.get_xml(url)
   317         content = doc.documentElement
   318         _check(content.tagName == u'content')
   319         body = _first_element(content)
   320         _check(body is not None)
   321         _check(body.tagName == u'body')
   323         doc.removeChild(content)
   324         doc.appendChild(body)
   326         self.page_id = page_id
   327         self.load_time = load_time
   328         self.title = title
   329         self.path = path
   330         self.doc = doc
   332     def save(self):
   333         p = {'edittime': _urlquote(self.load_time.strftime('%Y%m%d%H%M%S')),
   334              'abort': 'modified'}
   336         if self.title is not None:
   337             p['title'] = _urlquote(self.title)
   339         url = _make_url('pages', _format_page_id(self.page_id), 'contents', **p)
   341         body = self.doc.documentElement
   342         bodyInnerXML = ''.join(n.toxml('utf-8') for n in body.childNodes)
   344         reply = self.deki.post(url, bodyInnerXML, 'text/plain; charset=utf-8')
   345         _check(reply.documentElement.nodeName == u'edit')
   346         _check(reply.documentElement.getAttribute(u'status') == u'success')

mercurial