This repository has been archived on 2023-04-13. You can view files and clone it, but cannot push or open issues or pull requests.
CloudBot/plugins/minecraft_ping.py
2014-03-01 13:12:48 +13:00

232 lines
6.5 KiB
Python

import socket
import struct
import json
import traceback
from util import hook
try:
import DNS
has_dns = True
except ImportError:
has_dns = False
mc_colors = [(u'\xa7f', u'\x0300'), (u'\xa70', u'\x0301'), (u'\xa71', u'\x0302'), (u'\xa72', u'\x0303'),
(u'\xa7c', u'\x0304'), (u'\xa74', u'\x0305'), (u'\xa75', u'\x0306'), (u'\xa76', u'\x0307'),
(u'\xa7e', u'\x0308'), (u'\xa7a', u'\x0309'), (u'\xa73', u'\x0310'), (u'\xa7b', u'\x0311'),
(u'\xa71', u'\x0312'), (u'\xa7d', u'\x0313'), (u'\xa78', u'\x0314'), (u'\xa77', u'\x0315'),
(u'\xa7l', u'\x02'), (u'\xa79', u'\x0310'), (u'\xa7o', u'\t'), (u'\xa7m', u'\x13'),
(u'\xa7r', u'\x0f'), (u'\xa7n', u'\x15')]
## EXCEPTIONS
class PingError(Exception):
def __init__(self, text):
self.text = text
def __str__(self):
return self.text
class ParseError(Exception):
def __init__(self, text):
self.text = text
def __str__(self):
return self.text
## MISC
def unpack_varint(s):
d = 0
i = 0
while True:
b = ord(s.recv(1))
d |= (b & 0x7F) << 7 * i
i += 1
if not b & 0x80:
return d
pack_data = lambda d: struct.pack('>b', len(d)) + d
pack_port = lambda i: struct.pack('>H', i)
## DATA FUNCTIONS
def mcping_modern(host, port):
""" pings a server using the modern (1.7+) protocol and returns data """
try:
# connect to the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
except socket.gaierror:
raise PingError("Invalid hostname")
except socket.timeout:
raise PingError("Request timed out")
# send handshake + status request
s.send(pack_data("\x00\x00" + pack_data(host.encode('utf8')) + pack_port(port) + "\x01"))
s.send(pack_data("\x00"))
# read response
unpack_varint(s) # Packet length
unpack_varint(s) # Packet ID
l = unpack_varint(s) # String length
if not l > 1:
raise PingError("Invalid response")
d = ""
while len(d) < l:
d += s.recv(1024)
# Close our socket
s.close()
except socket.error:
raise PingError("Socket Error")
# Load json and return
data = json.loads(d.decode('utf8'))
try:
version = data["version"]["name"]
try:
desc = u" ".join(data["description"]["text"].split())
except TypeError:
desc = u" ".join(data["description"].split())
max_players = data["players"]["max"]
online = data["players"]["online"]
except Exception as e:
# TODO: except Exception is bad
traceback.print_exc(e)
raise PingError("Unknown Error: {}".format(e))
output = {
"motd": format_colors(desc),
"motd_raw": desc,
"version": version,
"players": online,
"players_max": max_players
}
return output
def mcping_legacy(host, port):
""" pings a server using the legacy (1.6 and older) protocol and returns data """
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
sock.send('\xfe\x01')
response = sock.recv(1)
except socket.gaierror:
raise PingError("Invalid hostname")
except socket.timeout:
raise PingError("Request timed out")
if response[0] != '\xff':
raise PingError("Invalid response")
length = struct.unpack('!h', sock.recv(2))[0]
values = sock.recv(length * 2).decode('utf-16be')
data = values.split(u'\x00') # try to decode data using new format
if len(data) == 1:
# failed to decode data, server is using old format
data = values.split(u'\xa7')
output = {
"motd": format_colors(" ".join(data[0].split())),
"motd_raw": data[0],
"version": None,
"players": data[1],
"players_max": data[2]
}
else:
# decoded data, server is using new format
output = {
"motd": format_colors(" ".join(data[3].split())),
"motd_raw": data[3],
"version": data[2],
"players": data[4],
"players_max": data[5]
}
sock.close()
return output
## FORMATTING/PARSING FUNCTIONS
def check_srv(domain):
""" takes a domain and finds minecraft SRV records """
DNS.DiscoverNameServers()
srv_req = DNS.Request(qtype='srv')
srv_result = srv_req.req('_minecraft._tcp.{}'.format(domain))
for getsrv in srv_result.answers:
if getsrv['typename'] == 'SRV':
data = [getsrv['data'][2], getsrv['data'][3]]
return data
def parse_input(inp):
""" takes the input from the mcping command and returns the host and port """
inp = inp.strip().split(" ")[0]
if ":" in inp:
# the port is defined in the input string
host, port = inp.split(":", 1)
try:
port = int(port)
if port > 65535 or port < 0:
raise ParseError("The port '{}' is invalid.".format(port))
except ValueError:
raise ParseError("The port '{}' is invalid.".format(port))
return host, port
if has_dns:
# the port is not in the input string, but we have PyDNS so look for a SRV record
srv_data = check_srv(inp)
if srv_data:
return str(srv_data[1]), int(srv_data[0])
# return default port
return inp, 25565
def format_colors(motd):
for original, replacement in mc_colors:
motd = motd.replace(original, replacement)
motd = motd.replace(u"\xa7k", "")
return motd
def format_output(data):
if data["version"]:
return u"{motd}\x0f - {version}\x0f - {players}/{players_max}" \
u" players.".format(**data).replace("\n", u"\x0f - ")
else:
return u"{motd}\x0f - {players}/{players_max}" \
u" players.".format(**data).replace("\n", u"\x0f - ")
@hook.command
@hook.command("mcp")
def mcping(inp):
"""mcping <server>[:port] - Ping a Minecraft server to check status."""
try:
host, port = parse_input(inp)
except ParseError as e:
return "Could not parse input ({})".format(e)
try:
data = mcping_modern(host, port)
except PingError:
try:
data = mcping_legacy(host, port)
except PingError as e:
return "Could not ping server, is it offline? ({})".format(e)
return format_output(data)