# A2K: an IRC bot
# Copyright (C) 2000, 2001 Mark Cornick
# <mcornick@users.sourceforge.net> and contributors.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
#
# $Id: a2kbot.py,v 1.13 2001/01/23 17:05:51 mcornick Exp $
# WARNING: The Surgeon General has determined that taking IRC too
# seriously is dangerous to your health.
# ############################################################
# Configuration #
# ############################################################
# Version string.
VERSION="A2K/2.0 (Festivus)"
# Pull in necessary python bits
import formatter,htmllib,ircbot,irclib,os,re,string,sys,urllib,time
# Read the configuration and op list.
try:
import a2kconf
except ImportError:
print "Couldn't read configuration from a2kconf.py. Abort!"
sys.exit(1)
# An RE that matches URLs. Pretty simplistic, perhaps it could be
# improved upon.
urlmatcher = re.compile('(ftp://\S*|gopher://\S*|https?://\S*)')
############################################################
# Miscellaneous utility subroutines #
############################################################
def rightnow():
"Return a string representing the current time and date."
return time.strftime('%X on %x',time.localtime(time.time()))
class a2kurlopener(urllib.FancyURLopener):
"Replacement for FancyURLopener that doesn't attempt authentication."
def __init__(self, *args):
apply(urllib.FancyURLopener.__init__, (self,) + args)
self.version = VERSION
def http_error_401(self, url, fp, errcode, errmsg, headers):
raise IOError,'HTTP authentication required'
# Make urllib use this instead of FancyURLopener.
urllib._urlopener = a2kurlopener()
def validateprefs():
"Check to make sure all of the preferences have been defined."
try:
a2kconf.server
a2kconf.nick
a2kconf.name
a2kconf.channel
a2kconf.chanmodes
a2kconf.botsnacks
a2kconf.password
a2kconf.urllist
a2kconf.opdelay
if ('you.need.to.change.this',6667) in a2kconf.server:
raise AttributeError,'server'
if a2kconf.password == 'you.need.to.change.this':
raise AttributeError,'password'
if 'you.need.to.change.this' in a2kconf.op.keys():
raise AttributeError,'the op list'
except AttributeError,pref:
print '%s is not properly set in a2kconf.py.' % pref
sys.exit(1)
############################################################
# The bot itself, and its event handlers #
############################################################
class a2kbot(ircbot.SingleServerIRCBot):
"The A2K bot object."
def __init__(self):
"Constructor."
# Check to make sure all our preferences are there.
validateprefs()
# Create an empty self.seenurls file.
self.seenurls=[]
# Try to open and parse the urllist file. If that fails, create
# a new one.
try:
urllistparser=htmllib.HTMLParser(formatter.NullFormatter())
urllistparser.feed(open(a2kconf.urllist).read())
urllistparser.close()
self.seenurls=urllistparser.anchorlist
except IOError:
self.newurllist()
# Create a new object.
ircbot.SingleServerIRCBot.__init__(self, a2kconf.server, a2kconf.nick, a2kconf.name,5)
# Set the channel and go.
self.channel = a2kconf.channel
self.start()
def newurllist(self):
"Start a new urllist file."
try:
urllist = open (a2kconf.urllist,"w")
urllist.write('<html>\n<head>\n<meta http-equiv="pragma" content="no-cache">\n<title>%s URLs</title>\n</head>\n<body>\n<h1>URLs seen by %s on %s</h1>\n<h2>Starting at %s</h2>\n<ul>\n' % (a2kconf.channel,a2kconf.nick,a2kconf.channel,rightnow()))
urllist.close()
self.seenurls=[]
except IOError:
print "Couldn't open urllist file %s. Check to make sure it's writeable." % a2kconf.urllist
sys.exit(1)
def catchurls(self,whonick,linein):
"Catch URLs in a string of text."
matches = urlmatcher.findall(linein)
if matches != []:
for url in matches:
# If we haven't seen it yet, write it out.
if not url in self.seenurls:
try:
# Keep the Free State litter free... don't add
# URLs we can't load.
urlverify=urllib.urlopen(url)
urlverify.close()
# If that worked, write it out.
urllist = open (a2kconf.urllist,"a")
urllist.write('<li><a href="%s">%s</a> (posted by %s at %s)\n' % (url,url,whonick,rightnow()))
urllist.close()
self.seenurls.append(url)
except:
# Either the URL verify failed or the write failed.
# Just ignore it.
pass
def get_version(self):
"Returns the bot version."
return VERSION+' (http://accutron2000.sourceforge.net/)'
def is_chanop(self,whonick):
"Return 1 if the user has ops, 0 otherwise."
# maybe this could be made simpler, dunno
for channelname,channelobject in self.channels.items():
if channelobject.has_user(whonick):
if channelobject.is_oper(whonick):
return 1
else:
return 0
def make_chanop(self, connection, whonick):
"Give someone ops."
if not self.is_chanop(whonick):
connection.mode(a2kconf.channel,'+o '+whonick)
def on_welcome(self, connection, event):
"Run after connecting to the IRC server."
connection.join(a2kconf.channel)
def on_nicknameinuse(self, connection, event):
"Handles nickname-in-use errors."
a2kconf.nick=a2kconf.nick+'_'
self._nickname=a2kconf.nick
connection.nick(a2kconf.nick)
def on_join(self, connection, event):
"Handles new clients joining the channel."
whonick=irclib.nm_to_n(event.source())
userhost=irclib.nm_to_uh(event.source())
# If the bot joined, set channel modes if possible.
if whonick == a2kconf.nick:
connection.mode(a2kconf.channel,a2kconf.chanmodes)
# Otherwise, someone else joined, try to op them if they're in
# the op dictionary. Search by key first, then try brute force.
else:
try:
if a2kconf.op[whonick].search(userhost):
self.ircobj.execute_delayed(a2kconf.opdelay,self.make_chanop,(connection,whonick))
else:
raise KeyError,"No key or bad userhost for that nick"
except KeyError:
for ii in a2kconf.op.keys():
if a2kconf.op[ii].search(userhost):
self.ircobj.execute_delayed(a2kconf.opdelay,self.make_chanop,(connection,whonick))
def on_ctcp(self,connection,event):
"Handles CTCP requests."
whonick=irclib.nm_to_n(event.source())
userhost=irclib.nm_to_uh(event.source())
message=string.lower(event.arguments()[0])
if message == 'version':
connection.notice(whonick,self.get_version())
if message == 'ping':
connection.pong(whonick)
if message == 'action':
self.catchurls(whonick,event.arguments()[1])
def on_kick(self,connection,event):
"Handles clients being kicked from the channel."
whonick=irclib.nm_to_n(event.source())
userhost=irclib.nm_to_uh(event.source())
message=event.arguments()[0]
channel=event.target()
victim=event.arguments()[0]
reason=event.arguments()[1]
# Did some twit deside to kick us?
if channel == a2kconf.channel and victim == a2kconf.nick:
connection.join(a2kconf.channel)
def on_pubmsg(self, connection, event):
"Handles public messages on the channel."
whonick=irclib.nm_to_n(event.source())
userhost=irclib.nm_to_uh(event.source())
message=event.arguments()[0]
# Things the bot should respond to only if it hears its nick.
if re.search(a2kconf.nick,message):
# Botsnacks. Of course.
if re.search('botsnack',message):
if a2kconf.botsnacks:
connection.privmsg(a2kconf.channel,':)')
# Check for catchable URLs.
self.catchurls(whonick,message)
def on_privmsg(self, connection, event):
"Handles private messages to the bot."
whonick=irclib.nm_to_n(event.source())
userhost=irclib.nm_to_uh(event.source())
message=event.arguments()[0]
# Invite requests. This doesn't require a password, as
# we may not want to give out the password to all the
# ops.
if message == "inviteme":
try:
if a2kconf.op[whonick].search(userhost):
connection.invite(whonick,a2kconf.channel)
else:
raise KeyError,"No key or bad userhost for that nick"
except KeyError:
connection.notice(a2kconf.channel,whonick + ' (' + userhost + ') requests an invitation to ' +a2kconf.channel + '.')
# 1) frustrate people who think this is eggdrop
# 2) provide an easy way to keep connections alive for people
# with anal firewalls
elif message == "hello":
connection.notice(whonick,'Hello, '+whonick+'.')
else:
# Check for password.
# authmessage is what remains after stripping the p/w.
try:
(password,authmessage) = string.split(message,' ',1)
authmessage=authmessage
except ValueError:
password = None
authmessage = message
# Check if the sender's on the op list. If not, ignore
# the command (by blowing away the password given...)
try:
if a2kconf.op[whonick].search(userhost):
pass
else:
raise KeyError,"No key or bad userhost for that nick"
except KeyError:
password = None
# If password is OK:
if password == a2kconf.password:
# The first word of authmessage will be a command -
# split it off from any args.
try:
(authcmd, authargs) = string.split(authmessage,' ',1)
except ValueError:
authcmd = authmessage
authargs = None
# /msg bot die: kill the bot
if authcmd == "die":
self.die(VERSION+" dying, ordered by "+whonick)
# Changing servers
if authcmd == "server":
connection.notice(whonick,'This command is obsolete - use the "jump" command instead.')
if authcmd == "jump":
self.jump_server()
# Add an op.
if authcmd == "addop":
try:
(nickname, nickre) = string.split(authargs,' ',1)
a2kconf.op[nickname] = re.compile(nickre)
connection.notice(whonick,nickname+' added to op list as '+nickre+'.')
except re.error:
connection.notice(whonick,'Bad regular expression!')
except ValueError:
connection.notice(whonick,'Not enough arguments given to addop!')
# Delete an op.
if authcmd == "delop":
if authargs != None:
try:
del a2kconf.op[authargs]
connection.notice(whonick,authargs+' removed from op list.')
except KeyError:
connection.notice(whonick,'No such op '+authargs+'!')
else:
connection.notice(whonick,'Not enough arguments given to delop!')
# Show an op list entry.
if authcmd == "showop":
if authargs != None:
try:
connection.notice(whonick,authargs+' is '+a2kconf.op[authargs].pattern)
except KeyError:
connection.notice(whonick,'No such op '+authargs+'!')
else:
connection.notice(whonick,'Not enough arguments given to showop!')
# Change the bot's nick.
if authcmd == "nick":
if authargs != None:
a2kconf.nick = authargs
self._nickname=a2kconf.nick
connection.nick(a2kconf.nick)
else:
connection.notice(whonick,'Not enough arguments given to nick!')
# Change the password.
if authcmd == "passwd":
if authargs != None:
a2kconf.password = authargs
connection.notice(whonick,'Password changed. Do a writeconf to make permanent.')
else:
connection.notice(whonick,'Not enough arguments given to passwd!')
# Botsnacks on/off.
if authcmd == "botsnacks":
a2kconf.botsnacks = 1
connection.notice(whonick,'Botsnacks on.')
if authcmd == "nobotsnacks":
a2kconf.botsnacks = 0
connection.notice(whonick,'Botsnacks off.')
# Write the current conf file.
if authcmd == "writeconf" or authcmd == "writeops":
try:
writeconf = open("a2kconf.py","w")
writeconf.write("""# A2K 2.x configuration module
# This file created by %s using writeconf at %s
# WARNING: The Surgeon General has determined that taking IRC too
# seriously is dangerous to your health.
# The IRC server(s) (host and port) to connect to. This is a list,
# and can contain one or more sets of hosts or ports, e.g.
# server = [('irc.foo.com',6667),('irc.bar.com',6667)]
server = %s
# The nickname to use:
nick = '%s'
# The name to use:
name = '%s'
# The channel to join:
channel = '%s'
# What modes to set there:
chanmodes = '%s'
# Want to eat botsnacks? 1 = yes, 0 = no:
botsnacks = %s
# A password for remote commands via /msg:
password = '%s'
# The name of a file to save posted URLs in:
urllist = '%s'
# Number of seconds to wait after someone joins before giving them ops:
opdelay = %s
# Op list. Do not remove the following two lines:
import re
op = {}
# Add new entries below this line.
""" % (whonick,rightnow(),a2kconf.server,a2kconf.nick,a2kconf.name,a2kconf.channel,a2kconf.chanmodes,a2kconf.botsnacks,a2kconf.password,a2kconf.urllist,a2kconf.opdelay))
for ii in a2kconf.op.keys():
writeconf.write("op['"+ii+"'] = re.compile('"+a2kconf.op[ii].pattern+"')\n")
writeconf.close()
connection.notice(whonick,'writeconf successful.')
except IOError:
connection.notice(whonick,'writeconf failed!')
# Start a new urllist file. Argument given is the
# name to be given the old file.
if authcmd == 'cycleurls':
if authargs != None:
os.rename(a2kconf.urllist,authargs)
newurllist()
connection.notice(whonick,'URLs cycled. Old URLs saved as '+authargs+'.')
else:
connection.notice(whonick,'Not enough arguments given to cycleurls!')
# If password is not OK, or no command given:
else:
if authmessage == a2kconf.password:
connection.notice(whonick,'No command given!')
else:
pass
|