From 7c399db0eaf62f396a1992d476661890c81184e4 Mon Sep 17 00:00:00 2001 From: Luke Rogers Date: Wed, 2 Oct 2013 16:08:17 +1300 Subject: [PATCH] restructured IRC engine (this does not work yet) --- cloudbot.py | 15 ++- core/bot.py | 35 ++++++- core/config.py | 1 + core/irc.py | 239 ++++++++++++++++++++++++++++------------------- core/loader.py | 2 - plugins/admin.py | 8 +- 6 files changed, 189 insertions(+), 111 deletions(-) diff --git a/cloudbot.py b/cloudbot.py index 5d627c3..65489e8 100644 --- a/cloudbot.py +++ b/cloudbot.py @@ -4,16 +4,25 @@ from core import bot as _bot import os import sys +import signal # set up enviroment os.chdir(sys.path[0] or '.') # do stuff relative to the install directory print 'CloudBot REFRESH ' +def exit_gracefully(signum, frame): + bot.stop() + +# store the original SIGINT handler +original_sigint = signal.getsignal(signal.SIGINT) +signal.signal(signal.SIGINT, exit_gracefully) + # create new bot object bot = _bot.Bot("cloudbot") -bot.logger.debug("Bot initalized.") +bot.logger.debug("Bot initalized, starting main loop.") -bot.logger.debug("Starting main loop.") -while True: +while bot.running: bot.loop() + +bot.logger.debug("Stopped main loop.") diff --git a/core/bot.py b/core/bot.py index 8f3bbef..b077f28 100644 --- a/core/bot.py +++ b/core/bot.py @@ -1,5 +1,6 @@ import time import logging +import sys import re import os import Queue @@ -17,6 +18,7 @@ class Bot(object): # basic variables self.name = name self.start_time = time.time() + self.running = True # set up config and logging self.setup() @@ -32,19 +34,44 @@ class Bot(object): self.logger.debug("Plugin reloader started.") + def stop(self, reason=None): + """quits all networks and shuts the bot down""" + + self.logger.info("Stopping bot.") + self.running = False + # wait for the bot loop to stop + time.sleep(1) + self.config.observer.stop() + self.logger.debug("Config reloader stopped.") + + for name, connection in self.connections.iteritems(): + # TODO: end connections properly + self.logger.debug("({}) Closing connection.".format(name)) + + if reason: + connection.cmd("QUIT", [reason]) + else: + connection.cmd("QUIT") + + connection.stop() + + self.logger.debug("Logging engine stopped") + logging.shutdown() + sys.exit() + def loop(self): """reloads plugins, then recives input from the IRC engine and processes it""" loader.reload(self) # TODO: new plugin loader - for connection in self.connections.itervalues(): + for conn in self.connections.itervalues(): try: - incoming = connection.out.get_nowait() - main.main(self, connection, incoming) + incoming = conn.parsed_queue.get_nowait() + main.main(self, conn, incoming) except Queue.Empty: pass # if no messages are in the incoming queue, sleep - while all(connection.out.empty() for connection in self.connections.itervalues()): + while all(connection.parsed_queue.empty() for connection in self.connections.itervalues()): time.sleep(.1) diff --git a/core/config.py b/core/config.py index a0372b6..86b16be 100755 --- a/core/config.py +++ b/core/config.py @@ -48,6 +48,7 @@ class Config(dict): recursive=False ) self.observer.start() + self.logger.debug("Config reloader started.") class ConfigReloader(Trick): diff --git a/core/irc.py b/core/irc.py index 57f435e..f61f092 100755 --- a/core/irc.py +++ b/core/irc.py @@ -2,6 +2,7 @@ import re import socket import time import thread +import threading import Queue from ssl import wrap_socket, CERT_NONE, CERT_REQUIRED, SSLError @@ -17,61 +18,43 @@ def decode(txt): def censor(text): - #text = text.replace('\n', '').replace('\r', '') - #replacement = '[censored]' - #if 'censored_strings' in bot.config: - # if bot.config['censored_strings']: - # words = map(re.escape, bot.config['censored_strings']) - # regex = re.compile('({})'.format("|".join(words))) - # text = regex.sub(replacement, text) return text -class crlf_tcp(object): - """Handles tcp connections that consist of utf-8 lines ending with crlf""" - - def __init__(self, host, port, timeout=300): - self.ibuffer = "" - self.obuffer = "" - self.oqueue = Queue.Queue() # lines to be sent out - self.iqueue = Queue.Queue() # lines that were received - self.socket = self.create_socket() - self.host = host - self.port = port +class RecieveThread(threading.Thread): + """recieves messages from IRC and puts them in the input_queue""" + def __init__(self, socket, input_queue, timeout): + self.input_buffer = "" + self.input_queue = input_queue + self.socket = socket self.timeout = timeout - - def create_socket(self): - return socket.socket(socket.AF_INET, socket.TCP_NODELAY) - - def run(self): - self.socket.connect((self.host, self.port)) - thread.start_new_thread(self.recv_loop, ()) - thread.start_new_thread(self.send_loop, ()) + threading.Thread.__init__(self) def recv_from_socket(self, nbytes): return self.socket.recv(nbytes) - def get_timeout_exception_type(self): - return socket.timeout - def handle_receive_exception(self, error, last_timestamp): + print error if time.time() - last_timestamp > self.timeout: - self.iqueue.put(StopIteration) + self.input_queue.put(StopIteration) self.socket.close() return True return False - def recv_loop(self): + def get_timeout_exception_type(self): + return socket.timeout + + def run(self): last_timestamp = time.time() while True: try: data = self.recv_from_socket(4096) - self.ibuffer += data + self.input_buffer += data if data: last_timestamp = time.time() else: if time.time() - last_timestamp > self.timeout: - self.iqueue.put(StopIteration) + self.input_queue.put(StopIteration) self.socket.close() return time.sleep(1) @@ -80,44 +63,123 @@ class crlf_tcp(object): return continue - while '\r\n' in self.ibuffer: - line, self.ibuffer = self.ibuffer.split('\r\n', 1) - self.iqueue.put(decode(line)) + while '\r\n' in self.input_buffer: + line, self.input_buffer = self.input_buffer.split('\r\n', 1) + self.input_queue.put(decode(line)) - def send_loop(self): + +class SendThread(threading.Thread): + """sends messages from output_queue to IRC""" + def __init__(self, socket, conn_name, output_queue): + self.output_buffer = "" + self.output_queue = output_queue + self.conn_name = conn_name + self.socket = socket + threading.Thread.__init__(self) + + def run(self): while True: - line = self.oqueue.get().splitlines()[0][:500] - print ">>> %r" % line - self.obuffer += line.encode('utf-8', 'replace') + '\r\n' - while self.obuffer: - sent = self.socket.send(self.obuffer) - self.obuffer = self.obuffer[sent:] + line = self.output_queue.get().splitlines()[0][:500] + print u"sending {}> {}".format(self.conn_name, line) + self.output_buffer += line.encode('utf-8', 'replace') + '\r\n' + while self.output_buffer: + sent = self.socket.send(self.output_buffer) + self.output_buffer = self.output_buffer[sent:] -class crlf_ssl_tcp(crlf_tcp): - """Handles ssl tcp connetions that consist of utf-8 lines ending with crlf""" +class ParseThread(threading.Thread): + """parses messages from input_queue and puts them in parsed_queue""" + def __init__(self, input_queue, output_queue, parsed_queue): + self.input_queue = input_queue # lines that were received + self.output_queue = output_queue # lines to be sent out + self.parsed_queue = parsed_queue # lines that have been parsed + threading.Thread.__init__(self) - def __init__(self, host, port, ignore_cert_errors, timeout=300): - self.ignore_cert_errors = ignore_cert_errors - crlf_tcp.__init__(self, host, port, timeout) + def run(self): + while True: + # get a message from the input queue + msg = self.input_queue.get() + + #if msg == StopIteration: + # self.irc.connect() + # continue + + # parse the message + if msg.startswith(":"): # has a prefix + prefix, command, params = irc_prefix_rem(msg).groups() + else: + prefix, command, params = irc_noprefix_rem(msg).groups() + nick, user, host = irc_netmask_rem(prefix).groups() + mask = nick + "!" + user + "@" + host + paramlist = irc_param_ref(params) + lastparam = "" + if paramlist: + if paramlist[-1].startswith(':'): + paramlist[-1] = paramlist[-1][1:] + lastparam = paramlist[-1] + # put the parsed message in the response queue + self.parsed_queue.put([msg, prefix, command, params, nick, user, host, + mask, paramlist, lastparam]) + # if the server pings us, pong them back + if command == "PING": + print paramlist + str = "PONG " " ".join(paramlist) + self.output_queue.put(str) + + +class Connection(object): + + def __init__(self, name, host, port, input_queue, output_queue): + self.output_queue = output_queue # lines to be sent out + self.input_queue = input_queue # lines that were received + self.socket = self.create_socket() + self.conn_name = name + self.host = host + self.port = port + self.timeout = 300 def create_socket(self): - return wrap_socket(crlf_tcp.create_socket(self), server_side=False, - cert_reqs=CERT_NONE if self.ignore_cert_errors else - CERT_REQUIRED) + return socket.socket(socket.AF_INET, socket.TCP_NODELAY) - def recv_from_socket(self, nbytes): - return self.socket.read(nbytes) + def connect(self): + self.socket.connect((self.host, self.port)) - def get_timeout_exception_type(self): - return SSLError + self.recieve_thread = RecieveThread(self.socket, self.input_queue, self.timeout) + self.recieve_thread.start() - def handle_receive_exception(self, error, last_timestamp): - # this is terrible - if not "timed out" in error.args[0]: - raise - return crlf_tcp.handle_receive_exception(self, error, last_timestamp) + self.send_thread = SendThread(self.socket, self.conn_name, self.output_queue) + self.send_thread.start() + def stop(self): + self.recv_thread.stop() + self.send_thread.stop() + self.socket.disconnect() + + +##class crlf_ssl_tcp(crlf_tcp): + # """Handles ssl tcp connetions that consist of utf-8 lines ending with crlf""" + +# def __init__(self, host, port, ignore_cert_errors, timeout=300): + # self.ignore_cert_errors = ignore_cert_errors + # crlf_tcp.__init__(self, host, port, timeout) + + # def create_socket(self): + # return wrap_socket(crlf_tcp.create_socket(self), server_side=False, + # cert_reqs=CERT_NONE if self.ignore_cert_errors else + # CERT_REQUIRED) + + # def recv_from_socket(self, nbytes): + # return self.socket.read(nbytes) + + # def get_timeout_exception_type(self): + # return SSLError + + # def handle_receive_exception(self, error, last_timestamp): + # # this is terrible + # if not "timed out" in error.args[0]: + # raise + # return crlf_tcp.handle_receive_exception(self, error, last_timestamp) +# irc_prefix_rem = re.compile(r'(.*?) (.*?) (.*)').match irc_noprefix_rem = re.compile(r'()(.*?) (.*)').match @@ -137,53 +199,36 @@ class IRC(object): self.nick = nick self.vars = {} - self.out = Queue.Queue() # responses from the server are placed here + self.parsed_queue = Queue.Queue() # responses from the server are placed here # format: [rawline, prefix, command, params, # nick, user, host, paramlist, msg] - self.connect() - thread.start_new_thread(self.parse_loop, ()) + self.parsed_queue = Queue.Queue() + self.input_queue = Queue.Queue() + self.output_queue = Queue.Queue() - def create_connection(self): - return crlf_tcp(self.server, self.port) + self.connection = Connection(self.name, self.server, self.port, + self.input_queue, self.output_queue) + self.connection.connect() + + self.parse_thread = ParseThread(self.input_queue, self.output_queue, + self.parsed_queue) + self.parse_thread.start() - def connect(self): - self.conn = self.create_connection() - thread.start_new_thread(self.conn.run, ()) self.set_pass(self.conf.get('server_password')) self.set_nick(self.nick) self.cmd("USER", [self.conf.get('user', 'cloudbot'), "3", "*", self.conf.get('realname', 'CloudBot - http://git.io/cloudbot')]) - def parse_loop(self): - while True: - # get a message from the input queue - msg = self.conn.iqueue.get() + def stop(self): + self.parse_thread.stop() + self.parse_thread.stop() - if msg == StopIteration: - self.connect() - continue - - # parse the message - if msg.startswith(":"): # has a prefix - prefix, command, params = irc_prefix_rem(msg).groups() - else: - prefix, command, params = irc_noprefix_rem(msg).groups() - nick, user, host = irc_netmask_rem(prefix).groups() - mask = nick + "!" + user + "@" + host - paramlist = irc_param_ref(params) - lastparam = "" - if paramlist: - if paramlist[-1].startswith(':'): - paramlist[-1] = paramlist[-1][1:] - lastparam = paramlist[-1] - # put the parsed message in the response queue - self.out.put([msg, prefix, command, params, nick, user, host, - mask, paramlist, lastparam]) - # if the server pings us, pong them back - if command == "PING": - self.cmd("PONG", paramlist) + def connect(self): + self.conn = self.create_connection() + self.conn_thread = thread.start_new_thread(self.conn.run, ()) + def set_pass(self, password): if password: @@ -222,7 +267,7 @@ class IRC(object): self.send(command) def send(self, str): - self.conn.oqueue.put(str) + self.output_queue.put(str) class SSLIRC(IRC): @@ -232,4 +277,4 @@ class SSLIRC(IRC): IRC.__init__(self, name, server, nick, port, channels, conf) def create_connection(self): - return crlf_ssl_tcp(self.server, self.port, self.ignore_cert_errors) + return crlf_ssl_tcp(self.name, self.server, self.port, self.ignore_cert_errors) diff --git a/core/loader.py b/core/loader.py index 7987c17..25f823e 100644 --- a/core/loader.py +++ b/core/loader.py @@ -2,8 +2,6 @@ import collections import glob import os import re -import sys -import Queue import traceback from core import main diff --git a/plugins/admin.py b/plugins/admin.py index 31d2342..95eaffc 100755 --- a/plugins/admin.py +++ b/plugins/admin.py @@ -103,14 +103,12 @@ def adduser(inp, bot=None, notice=None): @hook.command("quit", autohelp=False, permissions=["botcontrol"]) @hook.command(autohelp=False, permissions=["botcontrol"]) -def stop(inp, nick=None, conn=None): +def stop(inp, bot=None): """stop [reason] -- Kills the bot with [reason] as its quit message.""" if inp: - conn.cmd("QUIT", ["Killed by {} ({})".format(nick, inp)]) + bot.stop(reason=inp) else: - conn.cmd("QUIT", ["Killed by {}.".format(nick)]) - time.sleep(5) - os.execl("./cloudbot", "cloudbot", "stop") + bot.stop() @hook.command(autohelp=False, permissions=["botcontrol"])