Fandom

Zammy Wiki

TyBot/GEMWbot/wikibot.py

< TyBot | GEMWbot

2,244pages on
this wiki
Add New Page
Talk0 Share
#!/usr/bin/python
# -*- coding: utf-8 -*-
 
"""wikibot.py: A mid-level wrapper for using MediaWiki API."""
 
from simplemediawiki import MediaWiki, DEFAULT_UA, build_user_agent
from urllib import urlencode
from langcodes import iso639_1 as isolangs
from time import strftime, gmtime
#from hashlib import md5
from BeautifulSoup import BeautifulSoup
import sys
import urllib2
import re
import cookielib
 
__all__ = ["WikiBot", "Page", "User"]
__version__ = "0.0.3"
__status__ = "Prototype"
 
class WikiBot(MediaWiki):
    def __init__(self, api_url, cookie_file=None, user_agent=DEFAULT_UA):
        MediaWiki.__init__(self, api_url, cookie_file, user_agent)
        # setting attributes to save API queries
        self.setedittoken()
        self.setmovetoken()
        self.setindexurl()
        self.siteinfo = self.call({"action":"query", "meta":"siteinfo"})["query"]["general"]
 
    def __repr__(self):
        q = self.siteinfo
        text = "{lang} {sitename} ({mw})"
        return text.format(lang=isolangs[q["lang"]]["name"], sitename=q["sitename"], mw=q["generator"])
 
    @staticmethod
    def safeprint(text, errors="replace"):
        print text.encode(sys.stdout.encoding or "utf-8", errors)
 
    def query(self, **kwargs):
        return self.call(kwargs)
 
    def setedittoken(self):
        query = {"action":"query", "prop":"info", "intoken":"edit", "titles":"Main Page"}
        q = self.call(query)["query"]["pages"]
        for c in q:
            try:
                self.edittoken = q[c]["edittoken"]
            except: # no edit privilege for us
                self.edittoken = None; return False
            else:
                return True
 
    def setmovetoken(self):
        query = {"action":"query", "prop":"info", "intoken":"move", "titles":"Main Page"}
        q = self.call(query)
        for x in q["query"]["pages"]:
            try:
                self.movetoken = q["query"]["pages"][x]["movetoken"]
            except: # no move privilege for us
                self.movetoken = None; return False
            else:
                return True
 
    def setindexurl(self):
        apiurl = self.normalize_api_url()
        if apiurl is None:
            self.index_url = None
        else:
            self.index_url = apiurl.split("api.php")[0]+"index.php"
 
    def page(self, title):
        return Page(self, title)
 
    def user(self, name):
        return User(self, name)
 
    def diff(self, diffid, oldid):
        #diff(484144434, 484144169), diff(487104284, 486959056) on enwp
        # action=render makes the page much more machine-readable
        params = {"diff":diffid, "oldid":oldid, "action":"render", "diffonly":1}
        ua = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:11.0) Gecko/20100101 Firefox/11.0"
        req = urllib2.Request(self.index_url+"?"+urlencode(params), headers={"User-Agent":ua})
        diff_ = urllib2.urlopen(req)
        html = diff_.read(); diff_.close();
        soup = BeautifulSoup(html)
        deleted2 = []; added2 = [];
        added = soup.findAll("td", attrs="diff-addedline")
        deleted = soup.findAll("td", attrs="diff-deletedline")
        # remove <td>, make it like <div>bla<span...>h</span></div>
        # this will get the span contents too
        for tags in added:
            if len(tags.contents) == 0:
                added2.extend((BeautifulSoup("\n")))
                continue
            elif len(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline")) == 0:
                added2.extend(tags.contents[0])
                continue
            added2.extend(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline"))
        for tags in deleted:
            if len(tags.contents) == 0:
                deleted2.extend((BeautifulSoup("\n")))
                continue
            elif len(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline")) == 0:
                deleted2.extend(tags.contents[0])
                continue
            deleted2.extend(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline"))
        added2 = [str(x.string) for x in added2]
        deleted2 = [str(x.string) for x in deleted2]
        return added2, deleted2
 
    def getpage(self, title, revid=None):
        query = {"action":"query", "titles":title, "prop":"revisions", "rvprop":"content"}
        if revid is not None:
            query["rvstartid"] = revid
        content = self.call(query)
        content = content["query"]["pages"]
        return content[content.keys()[0]]["revisions"][0]["*"]
 
    def editcount(self, user):
        query = self.call({"action":"query", "list":"allusers", "auprop":"editcount", "aulimit":1, "aufrom":user})
        return query["query"]["allusers"][0]["editcount"]
 
    def gettransclusions(self, page, ns=[], eicontinue=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"embeddedin", "eititle":page, "einamespace":"|".join(ns), "eilimit":100}
            if eicontinue is not None:
                query["eicontinue"] = eicontinue
            result = self.call(query)
            for x in result["query"]["embeddedin"]:
                yield x["title"]
                #pages.append(x["title"])
            if "query-continue" not in result:
                #return pages
                break
            else:
                eicontinue = result["query-continue"]["embeddedin"]["eicontinue"]
        #return pages
 
    def prefixindex(self, page, ns=[], apfrom=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"allpages", "apprefix":page, "apnamespace":"|".join(ns), "aplimit":100}
            if apfrom is not None:
                query["apfrom"] = apfrom
            result = self.call(query)
            for x in result["query"]["allpages"]:
                #pages.append(x["title"])
                yield x["title"]
            if "query-continue" not in result:
                #return pages
                break
            else:
                apfrom = result["query-continue"]["allpages"]["apfrom"]
        #return pages
 
    def allpages(self, start, limit=500, apfrom=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"allpages", "apfrom":start, "aplimit":limit}
            if start is not None:
                query["apfrom"] = start
            result = self.call(query)
            for x in result["query"]["allpages"]:
                #pages.append(x["title"])
                yield x["title"]
            if "query-continue" not in result:
                #return pages
                break
            else:
                start = result["query-continue"]["allpages"]["apfrom"]
        #return pages
 
    def randompages(self, ns=None, redir=False, limit=1):
        #pages = []
        query = {"action":"query", "list":"random", "rnlimit":limit}
        if ns is not None:
            query["rnnamespace"] = ""
        if redir is not False:
            query["rnredirect"] = ""
        for page in self.call(query)["query"]["random"]:
            #pages.append(page["title"])
            yield page["title"]
        #return pages
 
    def exturlusage(self, url=None, limit=500, eucontinue=None):
        #links = []
        for counter in xrange(limit):#while True:
            query = {"action":"query", "list":"exturlusage", "eulimit":limit}
            if eucontinue is not None:
                query["eucontinue"] = eucontinue
            if url is not None:
                query["euquery"] = url
            result = self.call(query)
            for x in result["query"]["exturlusage"]:
                #links.append(x["url"])
                yield x["url"]
            if "query-continue" not in result:
                #return links
                break
            else:
                eucontinue = result["query-continue"]["exturlusage"]["euoffset"]
                #print "Continuing (%d)" % counter
        #return links
 
    def movepage(self, old, target, reason="", *args):
        token_query = {"action":"query", "prop":"info", "intoken":"move", "titles":old}
        for x in self.call(token_query)["query"]["pages"]:
            token = self.call(token_query)["query"]["pages"][x]["movetoken"]
        move_params = {"action":"move", "from":old, "to":target, "reason":reason, "token":token}
        for arg in args:
            move_params[arg] = ""
        return self.call(move_params)
 
    def rollback(page, user, reason=None, bot=False):
        token_query = {"action":"query", "prop":"revisions", "rvtoken":"rollback", "titles":page}
        for pageid in token_query:
            token = token_query[pageid]["revisions"][0]["rollbacktoken"]
        rollback_query = {"action":"rollback", "title":page, "user":user, "token":token}
        if reason is not None:
            rollback_query["summary"] = reason
        return self.call(rollback_query)
 
    def createaccount(self, username, password, real_name="", email=""):
        try:
            self.index_url
        except (AttributeError, ValueError):
            return False
        post_params = {"wpName":username,
                       "wpPassword":password,
                       "wpRetype":password,
                       "wpEmail":email,
                       "wpRemember":"",
                       "wpIgnoreAntiSpoof":"",
                       "wpRealName":real_name,
                       "wpCreateaccount":""}
        jar = cookielib.LWPCookieJar("cookies.tmp.cookies")
        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
        form = opener.open(self.index_url, data=urlencode({"title":"Special:UserLogin", "type":"signup", "action":"submitlogin"}))
        jar.save()
        text = form.read(); form.close(); del form
        create_token = re.search("<input type=\"hidden\" name=\"wpCreateaccountToken\" value=\"([0-9a-zA-Z]+)\" />", text)#form.text)
        if create_token is not None:
            post_params["wpCreateaccountToken"] = create_token.group(1)
        for cookies in jar:
            opener.addheaders.append(("Cookie", "{name}={val}".format(name=cookies.name, val=cookies.value)))
        return opener.open(self.index_url+"?"+urlencode({"title":"Special:UserLogin", "type":"signup", "action":"submitlogin"}), data=urlencode(post_params))
 
 
class User(object):
    """
    This was not designed for standalone use. Use WikiBot_instance.user instead.
    @param site - an instance of WikiBot
    @param name - string
    """
    def __init__(self, site, name):
        self.name = str(name)
        self.site = site
        pass
 
    def __repr__(self):
        return "User:"+self.name
 
    def editcount(self, force=False):
        query = self.site.call({"action":"query", "list":"allusers", "auprop":"editcount", "aulimit":1, "aufrom":self.name})
        self.edits = query["query"]["allusers"][0]["editcount"]
        return self.edits
 
    def email(self, target, subject, text, cc=False):
        params = {"target":target, "subject":subject, "text":text}
        if cc is not False: params["ccme"] = True
        return self.site.call(params)
 
 
class Page(object):
    """
    This was not designed for standalone use. Use WikiBot_instance.page instead.
    @param site - an instance of WikiBot
    @param title - string
    """
    def __init__(self, site, title):
        self.title = title
        self.namespace = title.split(":", 1)[0]
        self.site = site
 
    def exists(self, force=True):
        if hasattr(self, "_exists") and not force:
            return self._exists
        else:
            query = {"action":"query", "titles":self.title, "prop":"info"}
            q = self.site.call(query)["query"]["pages"]
            if "missing" in q[q.keys()[0]]:
                self._exists = False
            else:
                self._exists = True
            return self._exists
 
    def getcontent(self, follow_redir=True, force=False, revid=None):
        """
        Creates a self.content attribute and returns it
        @param follow_redir - If the page is a redirect, get the content of the target
        @param revid - Get the content of the page, at revision ID revid
        """
        query = {"action":"query", "titles":self.title, "prop":"revisions", "rvprop":"content"}
        if revid is not None:
            query["rvstartid"] = revid
        content = self.site.call(query)
        content = content["query"]["pages"]
        self.content = content
        return content[content.keys()[0]]["revisions"][0]["*"]
 
    def edit(self, content, summary="", minor=False, bot=False, force=False):
        page = self.title
        token = self.site.edittoken
        edit_params = {"action":"edit", "title":page, "text":content, "token":token, "summary":summary, "notminor":1, "notbot":1}
        if minor:
            edit_params["minor"] = 1; del edit_params["notminor"]
        if bot:
            edit_params["bot"] = 1; del edit_params["notbot"]
        if force is False:
            detect_ec_query = {"action":"query", "prop":"revisions", "rvprop":"timestamp", "titles":page}
            ec_timestamp_ = self.site.call(detect_ec_query)["query"]["pages"]
            for x in ec_timestamp_:
                if "missing" in ec_timestamp_[x]:
                    return False, "{} does not exist".format(page)
                elif ec_timestamp_[x]["ns"] == -1:
                    return False, "{} is in the Special: namespace".format(page)
                ec_timestamp = ec_timestamp_[x]["revisions"][0]["timestamp"]
            edit_params["basetimestamp"] = ec_timestamp
            edit_params["starttimestamp"] = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
        return self.site.call(edit_params)

Ad blocker interference detected!


Wikia is a free-to-use site that makes money from advertising. We have a modified experience for viewers using ad blockers

Wikia is not accessible if you’ve made further modifications. Remove the custom ad blocker rule(s) and the page will load as expected.

Also on Fandom

Random Wiki