Fandom

Zammy Wiki

TyBot/GEMWbot/wikibot.py

< TyBot | GEMWbot

2,249pages on
this wiki
Add New Page
Talk0 Share

Ad blocker interference detected!


Wikia is a free-to-use site that makes money from advertising. We have a modified experience for viewers using ad blockers

Wikia is not accessible if you’ve made further modifications. Remove the custom ad blocker rule(s) and the page will load as expected.

#!/usr/bin/python
# -*- coding: utf-8 -*-
 
"""wikibot.py: A mid-level wrapper for using MediaWiki API."""
 
from simplemediawiki import MediaWiki, DEFAULT_UA, build_user_agent
from urllib import urlencode
from langcodes import iso639_1 as isolangs
from time import strftime, gmtime
#from hashlib import md5
from BeautifulSoup import BeautifulSoup
import sys
import urllib2
import re
import cookielib
 
__all__ = ["WikiBot", "Page", "User"]
__version__ = "0.0.3"
__status__ = "Prototype"
 
class WikiBot(MediaWiki):
    def __init__(self, api_url, cookie_file=None, user_agent=DEFAULT_UA):
        MediaWiki.__init__(self, api_url, cookie_file, user_agent)
        # setting attributes to save API queries
        self.setedittoken()
        self.setmovetoken()
        self.setindexurl()
        self.siteinfo = self.call({"action":"query", "meta":"siteinfo"})["query"]["general"]
 
    def __repr__(self):
        q = self.siteinfo
        text = "{lang} {sitename} ({mw})"
        return text.format(lang=isolangs[q["lang"]]["name"], sitename=q["sitename"], mw=q["generator"])
 
    @staticmethod
    def safeprint(text, errors="replace"):
        print text.encode(sys.stdout.encoding or "utf-8", errors)
 
    def query(self, **kwargs):
        return self.call(kwargs)
 
    def setedittoken(self):
        query = {"action":"query", "prop":"info", "intoken":"edit", "titles":"Main Page"}
        q = self.call(query)["query"]["pages"]
        for c in q:
            try:
                self.edittoken = q[c]["edittoken"]
            except: # no edit privilege for us
                self.edittoken = None; return False
            else:
                return True
 
    def setmovetoken(self):
        query = {"action":"query", "prop":"info", "intoken":"move", "titles":"Main Page"}
        q = self.call(query)
        for x in q["query"]["pages"]:
            try:
                self.movetoken = q["query"]["pages"][x]["movetoken"]
            except: # no move privilege for us
                self.movetoken = None; return False
            else:
                return True
 
    def setindexurl(self):
        apiurl = self.normalize_api_url()
        if apiurl is None:
            self.index_url = None
        else:
            self.index_url = apiurl.split("api.php")[0]+"index.php"
 
    def page(self, title):
        return Page(self, title)
 
    def user(self, name):
        return User(self, name)
 
    def diff(self, diffid, oldid):
        #diff(484144434, 484144169), diff(487104284, 486959056) on enwp
        # action=render makes the page much more machine-readable
        params = {"diff":diffid, "oldid":oldid, "action":"render", "diffonly":1}
        ua = "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:11.0) Gecko/20100101 Firefox/11.0"
        req = urllib2.Request(self.index_url+"?"+urlencode(params), headers={"User-Agent":ua})
        diff_ = urllib2.urlopen(req)
        html = diff_.read(); diff_.close();
        soup = BeautifulSoup(html)
        deleted2 = []; added2 = [];
        added = soup.findAll("td", attrs="diff-addedline")
        deleted = soup.findAll("td", attrs="diff-deletedline")
        # remove <td>, make it like <div>bla<span...>h</span></div>
        # this will get the span contents too
        for tags in added:
            if len(tags.contents) == 0:
                added2.extend((BeautifulSoup("\n")))
                continue
            elif len(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline")) == 0:
                added2.extend(tags.contents[0])
                continue
            added2.extend(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline"))
        for tags in deleted:
            if len(tags.contents) == 0:
                deleted2.extend((BeautifulSoup("\n")))
                continue
            elif len(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline")) == 0:
                deleted2.extend(tags.contents[0])
                continue
            deleted2.extend(tags.contents[0].findAll("span", attrs="diffchange diffchange-inline"))
        added2 = [str(x.string) for x in added2]
        deleted2 = [str(x.string) for x in deleted2]
        return added2, deleted2
 
    def getpage(self, title, revid=None):
        query = {"action":"query", "titles":title, "prop":"revisions", "rvprop":"content"}
        if revid is not None:
            query["rvstartid"] = revid
        content = self.call(query)
        content = content["query"]["pages"]
        return content[content.keys()[0]]["revisions"][0]["*"]
 
    def editcount(self, user):
        query = self.call({"action":"query", "list":"allusers", "auprop":"editcount", "aulimit":1, "aufrom":user})
        return query["query"]["allusers"][0]["editcount"]
 
    def gettransclusions(self, page, ns=[], eicontinue=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"embeddedin", "eititle":page, "einamespace":"|".join(ns), "eilimit":100}
            if eicontinue is not None:
                query["eicontinue"] = eicontinue
            result = self.call(query)
            for x in result["query"]["embeddedin"]:
                yield x["title"]
                #pages.append(x["title"])
            if "query-continue" not in result:
                #return pages
                break
            else:
                eicontinue = result["query-continue"]["embeddedin"]["eicontinue"]
        #return pages
 
    def prefixindex(self, page, ns=[], apfrom=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"allpages", "apprefix":page, "apnamespace":"|".join(ns), "aplimit":100}
            if apfrom is not None:
                query["apfrom"] = apfrom
            result = self.call(query)
            for x in result["query"]["allpages"]:
                #pages.append(x["title"])
                yield x["title"]
            if "query-continue" not in result:
                #return pages
                break
            else:
                apfrom = result["query-continue"]["allpages"]["apfrom"]
        #return pages
 
    def allpages(self, start, limit=500, apfrom=None):
        #pages = []
        while True:
            query = {"action":"query", "list":"allpages", "apfrom":start, "aplimit":limit}
            if start is not None:
                query["apfrom"] = start
            result = self.call(query)
            for x in result["query"]["allpages"]:
                #pages.append(x["title"])
                yield x["title"]
            if "query-continue" not in result:
                #return pages
                break
            else:
                start = result["query-continue"]["allpages"]["apfrom"]
        #return pages
 
    def randompages(self, ns=None, redir=False, limit=1):
        #pages = []
        query = {"action":"query", "list":"random", "rnlimit":limit}
        if ns is not None:
            query["rnnamespace"] = ""
        if redir is not False:
            query["rnredirect"] = ""
        for page in self.call(query)["query"]["random"]:
            #pages.append(page["title"])
            yield page["title"]
        #return pages
 
    def exturlusage(self, url=None, limit=500, eucontinue=None):
        #links = []
        for counter in xrange(limit):#while True:
            query = {"action":"query", "list":"exturlusage", "eulimit":limit}
            if eucontinue is not None:
                query["eucontinue"] = eucontinue
            if url is not None:
                query["euquery"] = url
            result = self.call(query)
            for x in result["query"]["exturlusage"]:
                #links.append(x["url"])
                yield x["url"]
            if "query-continue" not in result:
                #return links
                break
            else:
                eucontinue = result["query-continue"]["exturlusage"]["euoffset"]
                #print "Continuing (%d)" % counter
        #return links
 
    def movepage(self, old, target, reason="", *args):
        token_query = {"action":"query", "prop":"info", "intoken":"move", "titles":old}
        for x in self.call(token_query)["query"]["pages"]:
            token = self.call(token_query)["query"]["pages"][x]["movetoken"]
        move_params = {"action":"move", "from":old, "to":target, "reason":reason, "token":token}
        for arg in args:
            move_params[arg] = ""
        return self.call(move_params)
 
    def rollback(page, user, reason=None, bot=False):
        token_query = {"action":"query", "prop":"revisions", "rvtoken":"rollback", "titles":page}
        for pageid in token_query:
            token = token_query[pageid]["revisions"][0]["rollbacktoken"]
        rollback_query = {"action":"rollback", "title":page, "user":user, "token":token}
        if reason is not None:
            rollback_query["summary"] = reason
        return self.call(rollback_query)
 
    def createaccount(self, username, password, real_name="", email=""):
        try:
            self.index_url
        except (AttributeError, ValueError):
            return False
        post_params = {"wpName":username,
                       "wpPassword":password,
                       "wpRetype":password,
                       "wpEmail":email,
                       "wpRemember":"",
                       "wpIgnoreAntiSpoof":"",
                       "wpRealName":real_name,
                       "wpCreateaccount":""}
        jar = cookielib.LWPCookieJar("cookies.tmp.cookies")
        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(jar))
        form = opener.open(self.index_url, data=urlencode({"title":"Special:UserLogin", "type":"signup", "action":"submitlogin"}))
        jar.save()
        text = form.read(); form.close(); del form
        create_token = re.search("<input type=\"hidden\" name=\"wpCreateaccountToken\" value=\"([0-9a-zA-Z]+)\" />", text)#form.text)
        if create_token is not None:
            post_params["wpCreateaccountToken"] = create_token.group(1)
        for cookies in jar:
            opener.addheaders.append(("Cookie", "{name}={val}".format(name=cookies.name, val=cookies.value)))
        return opener.open(self.index_url+"?"+urlencode({"title":"Special:UserLogin", "type":"signup", "action":"submitlogin"}), data=urlencode(post_params))
 
 
class User(object):
    """
    This was not designed for standalone use. Use WikiBot_instance.user instead.
    @param site - an instance of WikiBot
    @param name - string
    """
    def __init__(self, site, name):
        self.name = str(name)
        self.site = site
        pass
 
    def __repr__(self):
        return "User:"+self.name
 
    def editcount(self, force=False):
        query = self.site.call({"action":"query", "list":"allusers", "auprop":"editcount", "aulimit":1, "aufrom":self.name})
        self.edits = query["query"]["allusers"][0]["editcount"]
        return self.edits
 
    def email(self, target, subject, text, cc=False):
        params = {"target":target, "subject":subject, "text":text}
        if cc is not False: params["ccme"] = True
        return self.site.call(params)
 
 
class Page(object):
    """
    This was not designed for standalone use. Use WikiBot_instance.page instead.
    @param site - an instance of WikiBot
    @param title - string
    """
    def __init__(self, site, title):
        self.title = title
        self.namespace = title.split(":", 1)[0]
        self.site = site
 
    def exists(self, force=True):
        if hasattr(self, "_exists") and not force:
            return self._exists
        else:
            query = {"action":"query", "titles":self.title, "prop":"info"}
            q = self.site.call(query)["query"]["pages"]
            if "missing" in q[q.keys()[0]]:
                self._exists = False
            else:
                self._exists = True
            return self._exists
 
    def getcontent(self, follow_redir=True, force=False, revid=None):
        """
        Creates a self.content attribute and returns it
        @param follow_redir - If the page is a redirect, get the content of the target
        @param revid - Get the content of the page, at revision ID revid
        """
        query = {"action":"query", "titles":self.title, "prop":"revisions", "rvprop":"content"}
        if revid is not None:
            query["rvstartid"] = revid
        content = self.site.call(query)
        content = content["query"]["pages"]
        self.content = content
        return content[content.keys()[0]]["revisions"][0]["*"]
 
    def edit(self, content, summary="", minor=False, bot=False, force=False):
        page = self.title
        token = self.site.edittoken
        edit_params = {"action":"edit", "title":page, "text":content, "token":token, "summary":summary, "notminor":1, "notbot":1}
        if minor:
            edit_params["minor"] = 1; del edit_params["notminor"]
        if bot:
            edit_params["bot"] = 1; del edit_params["notbot"]
        if force is False:
            detect_ec_query = {"action":"query", "prop":"revisions", "rvprop":"timestamp", "titles":page}
            ec_timestamp_ = self.site.call(detect_ec_query)["query"]["pages"]
            for x in ec_timestamp_:
                if "missing" in ec_timestamp_[x]:
                    return False, "{} does not exist".format(page)
                elif ec_timestamp_[x]["ns"] == -1:
                    return False, "{} is in the Special: namespace".format(page)
                ec_timestamp = ec_timestamp_[x]["revisions"][0]["timestamp"]
            edit_params["basetimestamp"] = ec_timestamp
            edit_params["starttimestamp"] = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime())
        return self.site.call(edit_params)

Also on Fandom

Random Wiki