restructured IRC engine (this does not work yet)

This commit is contained in:
Luke Rogers 2013-10-02 16:08:17 +13:00
parent 1cc38cb139
commit 7c399db0ea
6 changed files with 189 additions and 111 deletions

View file

@ -4,16 +4,25 @@ from core import bot as _bot
import os import os
import sys import sys
import signal
# set up enviroment # set up enviroment
os.chdir(sys.path[0] or '.') # do stuff relative to the install directory os.chdir(sys.path[0] or '.') # do stuff relative to the install directory
print 'CloudBot REFRESH <http://git.io/cloudbotirc>' print 'CloudBot REFRESH <http://git.io/cloudbotirc>'
def exit_gracefully(signum, frame):
bot.stop()
# store the original SIGINT handler
original_sigint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, exit_gracefully)
# create new bot object # create new bot object
bot = _bot.Bot("cloudbot") bot = _bot.Bot("cloudbot")
bot.logger.debug("Bot initalized.") bot.logger.debug("Bot initalized, starting main loop.")
bot.logger.debug("Starting main loop.") while bot.running:
while True:
bot.loop() bot.loop()
bot.logger.debug("Stopped main loop.")

View file

@ -1,5 +1,6 @@
import time import time
import logging import logging
import sys
import re import re
import os import os
import Queue import Queue
@ -17,6 +18,7 @@ class Bot(object):
# basic variables # basic variables
self.name = name self.name = name
self.start_time = time.time() self.start_time = time.time()
self.running = True
# set up config and logging # set up config and logging
self.setup() self.setup()
@ -32,19 +34,44 @@ class Bot(object):
self.logger.debug("Plugin reloader started.") self.logger.debug("Plugin reloader started.")
def stop(self, reason=None):
"""quits all networks and shuts the bot down"""
self.logger.info("Stopping bot.")
self.running = False
# wait for the bot loop to stop
time.sleep(1)
self.config.observer.stop()
self.logger.debug("Config reloader stopped.")
for name, connection in self.connections.iteritems():
# TODO: end connections properly
self.logger.debug("({}) Closing connection.".format(name))
if reason:
connection.cmd("QUIT", [reason])
else:
connection.cmd("QUIT")
connection.stop()
self.logger.debug("Logging engine stopped")
logging.shutdown()
sys.exit()
def loop(self): def loop(self):
"""reloads plugins, then recives input from the IRC engine and processes it""" """reloads plugins, then recives input from the IRC engine and processes it"""
loader.reload(self) # TODO: new plugin loader loader.reload(self) # TODO: new plugin loader
for connection in self.connections.itervalues(): for conn in self.connections.itervalues():
try: try:
incoming = connection.out.get_nowait() incoming = conn.parsed_queue.get_nowait()
main.main(self, connection, incoming) main.main(self, conn, incoming)
except Queue.Empty: except Queue.Empty:
pass pass
# if no messages are in the incoming queue, sleep # if no messages are in the incoming queue, sleep
while all(connection.out.empty() for connection in self.connections.itervalues()): while all(connection.parsed_queue.empty() for connection in self.connections.itervalues()):
time.sleep(.1) time.sleep(.1)

View file

@ -48,6 +48,7 @@ class Config(dict):
recursive=False recursive=False
) )
self.observer.start() self.observer.start()
self.logger.debug("Config reloader started.")
class ConfigReloader(Trick): class ConfigReloader(Trick):

View file

@ -2,6 +2,7 @@ import re
import socket import socket
import time import time
import thread import thread
import threading
import Queue import Queue
from ssl import wrap_socket, CERT_NONE, CERT_REQUIRED, SSLError from ssl import wrap_socket, CERT_NONE, CERT_REQUIRED, SSLError
@ -17,61 +18,43 @@ def decode(txt):
def censor(text): def censor(text):
#text = text.replace('\n', '').replace('\r', '')
#replacement = '[censored]'
#if 'censored_strings' in bot.config:
# if bot.config['censored_strings']:
# words = map(re.escape, bot.config['censored_strings'])
# regex = re.compile('({})'.format("|".join(words)))
# text = regex.sub(replacement, text)
return text return text
class crlf_tcp(object): class RecieveThread(threading.Thread):
"""Handles tcp connections that consist of utf-8 lines ending with crlf""" """recieves messages from IRC and puts them in the input_queue"""
def __init__(self, socket, input_queue, timeout):
def __init__(self, host, port, timeout=300): self.input_buffer = ""
self.ibuffer = "" self.input_queue = input_queue
self.obuffer = "" self.socket = socket
self.oqueue = Queue.Queue() # lines to be sent out
self.iqueue = Queue.Queue() # lines that were received
self.socket = self.create_socket()
self.host = host
self.port = port
self.timeout = timeout self.timeout = timeout
threading.Thread.__init__(self)
def create_socket(self):
return socket.socket(socket.AF_INET, socket.TCP_NODELAY)
def run(self):
self.socket.connect((self.host, self.port))
thread.start_new_thread(self.recv_loop, ())
thread.start_new_thread(self.send_loop, ())
def recv_from_socket(self, nbytes): def recv_from_socket(self, nbytes):
return self.socket.recv(nbytes) return self.socket.recv(nbytes)
def get_timeout_exception_type(self):
return socket.timeout
def handle_receive_exception(self, error, last_timestamp): def handle_receive_exception(self, error, last_timestamp):
print error
if time.time() - last_timestamp > self.timeout: if time.time() - last_timestamp > self.timeout:
self.iqueue.put(StopIteration) self.input_queue.put(StopIteration)
self.socket.close() self.socket.close()
return True return True
return False return False
def recv_loop(self): def get_timeout_exception_type(self):
return socket.timeout
def run(self):
last_timestamp = time.time() last_timestamp = time.time()
while True: while True:
try: try:
data = self.recv_from_socket(4096) data = self.recv_from_socket(4096)
self.ibuffer += data self.input_buffer += data
if data: if data:
last_timestamp = time.time() last_timestamp = time.time()
else: else:
if time.time() - last_timestamp > self.timeout: if time.time() - last_timestamp > self.timeout:
self.iqueue.put(StopIteration) self.input_queue.put(StopIteration)
self.socket.close() self.socket.close()
return return
time.sleep(1) time.sleep(1)
@ -80,44 +63,123 @@ class crlf_tcp(object):
return return
continue continue
while '\r\n' in self.ibuffer: while '\r\n' in self.input_buffer:
line, self.ibuffer = self.ibuffer.split('\r\n', 1) line, self.input_buffer = self.input_buffer.split('\r\n', 1)
self.iqueue.put(decode(line)) self.input_queue.put(decode(line))
def send_loop(self):
class SendThread(threading.Thread):
"""sends messages from output_queue to IRC"""
def __init__(self, socket, conn_name, output_queue):
self.output_buffer = ""
self.output_queue = output_queue
self.conn_name = conn_name
self.socket = socket
threading.Thread.__init__(self)
def run(self):
while True: while True:
line = self.oqueue.get().splitlines()[0][:500] line = self.output_queue.get().splitlines()[0][:500]
print ">>> %r" % line print u"sending {}> {}".format(self.conn_name, line)
self.obuffer += line.encode('utf-8', 'replace') + '\r\n' self.output_buffer += line.encode('utf-8', 'replace') + '\r\n'
while self.obuffer: while self.output_buffer:
sent = self.socket.send(self.obuffer) sent = self.socket.send(self.output_buffer)
self.obuffer = self.obuffer[sent:] self.output_buffer = self.output_buffer[sent:]
class crlf_ssl_tcp(crlf_tcp): class ParseThread(threading.Thread):
"""Handles ssl tcp connetions that consist of utf-8 lines ending with crlf""" """parses messages from input_queue and puts them in parsed_queue"""
def __init__(self, input_queue, output_queue, parsed_queue):
self.input_queue = input_queue # lines that were received
self.output_queue = output_queue # lines to be sent out
self.parsed_queue = parsed_queue # lines that have been parsed
threading.Thread.__init__(self)
def __init__(self, host, port, ignore_cert_errors, timeout=300): def run(self):
self.ignore_cert_errors = ignore_cert_errors while True:
crlf_tcp.__init__(self, host, port, timeout) # get a message from the input queue
msg = self.input_queue.get()
#if msg == StopIteration:
# self.irc.connect()
# continue
# parse the message
if msg.startswith(":"): # has a prefix
prefix, command, params = irc_prefix_rem(msg).groups()
else:
prefix, command, params = irc_noprefix_rem(msg).groups()
nick, user, host = irc_netmask_rem(prefix).groups()
mask = nick + "!" + user + "@" + host
paramlist = irc_param_ref(params)
lastparam = ""
if paramlist:
if paramlist[-1].startswith(':'):
paramlist[-1] = paramlist[-1][1:]
lastparam = paramlist[-1]
# put the parsed message in the response queue
self.parsed_queue.put([msg, prefix, command, params, nick, user, host,
mask, paramlist, lastparam])
# if the server pings us, pong them back
if command == "PING":
print paramlist
str = "PONG " " ".join(paramlist)
self.output_queue.put(str)
class Connection(object):
def __init__(self, name, host, port, input_queue, output_queue):
self.output_queue = output_queue # lines to be sent out
self.input_queue = input_queue # lines that were received
self.socket = self.create_socket()
self.conn_name = name
self.host = host
self.port = port
self.timeout = 300
def create_socket(self): def create_socket(self):
return wrap_socket(crlf_tcp.create_socket(self), server_side=False, return socket.socket(socket.AF_INET, socket.TCP_NODELAY)
cert_reqs=CERT_NONE if self.ignore_cert_errors else
CERT_REQUIRED)
def recv_from_socket(self, nbytes): def connect(self):
return self.socket.read(nbytes) self.socket.connect((self.host, self.port))
def get_timeout_exception_type(self): self.recieve_thread = RecieveThread(self.socket, self.input_queue, self.timeout)
return SSLError self.recieve_thread.start()
def handle_receive_exception(self, error, last_timestamp): self.send_thread = SendThread(self.socket, self.conn_name, self.output_queue)
# this is terrible self.send_thread.start()
if not "timed out" in error.args[0]:
raise
return crlf_tcp.handle_receive_exception(self, error, last_timestamp)
def stop(self):
self.recv_thread.stop()
self.send_thread.stop()
self.socket.disconnect()
##class crlf_ssl_tcp(crlf_tcp):
# """Handles ssl tcp connetions that consist of utf-8 lines ending with crlf"""
# def __init__(self, host, port, ignore_cert_errors, timeout=300):
# self.ignore_cert_errors = ignore_cert_errors
# crlf_tcp.__init__(self, host, port, timeout)
# def create_socket(self):
# return wrap_socket(crlf_tcp.create_socket(self), server_side=False,
# cert_reqs=CERT_NONE if self.ignore_cert_errors else
# CERT_REQUIRED)
# def recv_from_socket(self, nbytes):
# return self.socket.read(nbytes)
# def get_timeout_exception_type(self):
# return SSLError
# def handle_receive_exception(self, error, last_timestamp):
# # this is terrible
# if not "timed out" in error.args[0]:
# raise
# return crlf_tcp.handle_receive_exception(self, error, last_timestamp)
#
irc_prefix_rem = re.compile(r'(.*?) (.*?) (.*)').match irc_prefix_rem = re.compile(r'(.*?) (.*?) (.*)').match
irc_noprefix_rem = re.compile(r'()(.*?) (.*)').match irc_noprefix_rem = re.compile(r'()(.*?) (.*)').match
@ -137,53 +199,36 @@ class IRC(object):
self.nick = nick self.nick = nick
self.vars = {} self.vars = {}
self.out = Queue.Queue() # responses from the server are placed here self.parsed_queue = Queue.Queue() # responses from the server are placed here
# format: [rawline, prefix, command, params, # format: [rawline, prefix, command, params,
# nick, user, host, paramlist, msg] # nick, user, host, paramlist, msg]
self.connect()
thread.start_new_thread(self.parse_loop, ()) self.parsed_queue = Queue.Queue()
self.input_queue = Queue.Queue()
self.output_queue = Queue.Queue()
def create_connection(self): self.connection = Connection(self.name, self.server, self.port,
return crlf_tcp(self.server, self.port) self.input_queue, self.output_queue)
self.connection.connect()
self.parse_thread = ParseThread(self.input_queue, self.output_queue,
self.parsed_queue)
self.parse_thread.start()
def connect(self):
self.conn = self.create_connection()
thread.start_new_thread(self.conn.run, ())
self.set_pass(self.conf.get('server_password')) self.set_pass(self.conf.get('server_password'))
self.set_nick(self.nick) self.set_nick(self.nick)
self.cmd("USER", self.cmd("USER",
[self.conf.get('user', 'cloudbot'), "3", "*", self.conf.get('realname', [self.conf.get('user', 'cloudbot'), "3", "*", self.conf.get('realname',
'CloudBot - http://git.io/cloudbot')]) 'CloudBot - http://git.io/cloudbot')])
def parse_loop(self): def stop(self):
while True: self.parse_thread.stop()
# get a message from the input queue self.parse_thread.stop()
msg = self.conn.iqueue.get()
if msg == StopIteration: def connect(self):
self.connect() self.conn = self.create_connection()
continue self.conn_thread = thread.start_new_thread(self.conn.run, ())
# parse the message
if msg.startswith(":"): # has a prefix
prefix, command, params = irc_prefix_rem(msg).groups()
else:
prefix, command, params = irc_noprefix_rem(msg).groups()
nick, user, host = irc_netmask_rem(prefix).groups()
mask = nick + "!" + user + "@" + host
paramlist = irc_param_ref(params)
lastparam = ""
if paramlist:
if paramlist[-1].startswith(':'):
paramlist[-1] = paramlist[-1][1:]
lastparam = paramlist[-1]
# put the parsed message in the response queue
self.out.put([msg, prefix, command, params, nick, user, host,
mask, paramlist, lastparam])
# if the server pings us, pong them back
if command == "PING":
self.cmd("PONG", paramlist)
def set_pass(self, password): def set_pass(self, password):
if password: if password:
@ -222,7 +267,7 @@ class IRC(object):
self.send(command) self.send(command)
def send(self, str): def send(self, str):
self.conn.oqueue.put(str) self.output_queue.put(str)
class SSLIRC(IRC): class SSLIRC(IRC):
@ -232,4 +277,4 @@ class SSLIRC(IRC):
IRC.__init__(self, name, server, nick, port, channels, conf) IRC.__init__(self, name, server, nick, port, channels, conf)
def create_connection(self): def create_connection(self):
return crlf_ssl_tcp(self.server, self.port, self.ignore_cert_errors) return crlf_ssl_tcp(self.name, self.server, self.port, self.ignore_cert_errors)

View file

@ -2,8 +2,6 @@ import collections
import glob import glob
import os import os
import re import re
import sys
import Queue
import traceback import traceback
from core import main from core import main

View file

@ -103,14 +103,12 @@ def adduser(inp, bot=None, notice=None):
@hook.command("quit", autohelp=False, permissions=["botcontrol"]) @hook.command("quit", autohelp=False, permissions=["botcontrol"])
@hook.command(autohelp=False, permissions=["botcontrol"]) @hook.command(autohelp=False, permissions=["botcontrol"])
def stop(inp, nick=None, conn=None): def stop(inp, bot=None):
"""stop [reason] -- Kills the bot with [reason] as its quit message.""" """stop [reason] -- Kills the bot with [reason] as its quit message."""
if inp: if inp:
conn.cmd("QUIT", ["Killed by {} ({})".format(nick, inp)]) bot.stop(reason=inp)
else: else:
conn.cmd("QUIT", ["Killed by {}.".format(nick)]) bot.stop()
time.sleep(5)
os.execl("./cloudbot", "cloudbot", "stop")
@hook.command(autohelp=False, permissions=["botcontrol"]) @hook.command(autohelp=False, permissions=["botcontrol"])