232 lines
6.5 KiB
Python
232 lines
6.5 KiB
Python
import socket
|
|
import struct
|
|
import json
|
|
import traceback
|
|
|
|
from util import hook
|
|
|
|
|
|
try:
|
|
import DNS
|
|
has_dns = True
|
|
except ImportError:
|
|
has_dns = False
|
|
|
|
|
|
mc_colors = [(u'\xa7f', u'\x0300'), (u'\xa70', u'\x0301'), (u'\xa71', u'\x0302'), (u'\xa72', u'\x0303'),
|
|
(u'\xa7c', u'\x0304'), (u'\xa74', u'\x0305'), (u'\xa75', u'\x0306'), (u'\xa76', u'\x0307'),
|
|
(u'\xa7e', u'\x0308'), (u'\xa7a', u'\x0309'), (u'\xa73', u'\x0310'), (u'\xa7b', u'\x0311'),
|
|
(u'\xa71', u'\x0312'), (u'\xa7d', u'\x0313'), (u'\xa78', u'\x0314'), (u'\xa77', u'\x0315'),
|
|
(u'\xa7l', u'\x02'), (u'\xa79', u'\x0310'), (u'\xa7o', u'\t'), (u'\xa7m', u'\x13'),
|
|
(u'\xa7r', u'\x0f'), (u'\xa7n', u'\x15')]
|
|
|
|
|
|
## EXCEPTIONS
|
|
|
|
|
|
class PingError(Exception):
|
|
def __init__(self, text):
|
|
self.text = text
|
|
|
|
def __str__(self):
|
|
return self.text
|
|
|
|
|
|
class ParseError(Exception):
|
|
def __init__(self, text):
|
|
self.text = text
|
|
|
|
def __str__(self):
|
|
return self.text
|
|
|
|
|
|
## MISC
|
|
|
|
|
|
def unpack_varint(s):
|
|
d = 0
|
|
i = 0
|
|
while True:
|
|
b = ord(s.recv(1))
|
|
d |= (b & 0x7F) << 7 * i
|
|
i += 1
|
|
if not b & 0x80:
|
|
return d
|
|
|
|
pack_data = lambda d: struct.pack('>b', len(d)) + d
|
|
pack_port = lambda i: struct.pack('>H', i)
|
|
|
|
## DATA FUNCTIONS
|
|
|
|
|
|
def mcping_modern(host, port):
|
|
""" pings a server using the modern (1.7+) protocol and returns data """
|
|
try:
|
|
# connect to the server
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
try:
|
|
s.connect((host, port))
|
|
except socket.gaierror:
|
|
raise PingError("Invalid hostname")
|
|
except socket.timeout:
|
|
raise PingError("Request timed out")
|
|
|
|
# send handshake + status request
|
|
s.send(pack_data("\x00\x00" + pack_data(host.encode('utf8')) + pack_port(port) + "\x01"))
|
|
s.send(pack_data("\x00"))
|
|
|
|
# read response
|
|
unpack_varint(s) # Packet length
|
|
unpack_varint(s) # Packet ID
|
|
l = unpack_varint(s) # String length
|
|
|
|
if not l > 1:
|
|
raise PingError("Invalid response")
|
|
|
|
d = ""
|
|
while len(d) < l:
|
|
d += s.recv(1024)
|
|
|
|
# Close our socket
|
|
s.close()
|
|
except socket.error:
|
|
raise PingError("Socket Error")
|
|
|
|
# Load json and return
|
|
data = json.loads(d.decode('utf8'))
|
|
try:
|
|
version = data["version"]["name"]
|
|
try:
|
|
desc = u" ".join(data["description"]["text"].split())
|
|
except TypeError:
|
|
desc = u" ".join(data["description"].split())
|
|
max_players = data["players"]["max"]
|
|
online = data["players"]["online"]
|
|
except Exception as e:
|
|
# TODO: except Exception is bad
|
|
traceback.print_exc(e)
|
|
raise PingError("Unknown Error: {}".format(e))
|
|
|
|
output = {
|
|
"motd": format_colors(desc),
|
|
"motd_raw": desc,
|
|
"version": version,
|
|
"players": online,
|
|
"players_max": max_players
|
|
}
|
|
return output
|
|
|
|
|
|
def mcping_legacy(host, port):
|
|
""" pings a server using the legacy (1.6 and older) protocol and returns data """
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
try:
|
|
sock.connect((host, port))
|
|
sock.send('\xfe\x01')
|
|
response = sock.recv(1)
|
|
except socket.gaierror:
|
|
raise PingError("Invalid hostname")
|
|
except socket.timeout:
|
|
raise PingError("Request timed out")
|
|
|
|
if response[0] != '\xff':
|
|
raise PingError("Invalid response")
|
|
|
|
length = struct.unpack('!h', sock.recv(2))[0]
|
|
values = sock.recv(length * 2).decode('utf-16be')
|
|
data = values.split(u'\x00') # try to decode data using new format
|
|
if len(data) == 1:
|
|
# failed to decode data, server is using old format
|
|
data = values.split(u'\xa7')
|
|
output = {
|
|
"motd": format_colors(" ".join(data[0].split())),
|
|
"motd_raw": data[0],
|
|
"version": None,
|
|
"players": data[1],
|
|
"players_max": data[2]
|
|
}
|
|
else:
|
|
# decoded data, server is using new format
|
|
output = {
|
|
"motd": format_colors(" ".join(data[3].split())),
|
|
"motd_raw": data[3],
|
|
"version": data[2],
|
|
"players": data[4],
|
|
"players_max": data[5]
|
|
}
|
|
sock.close()
|
|
return output
|
|
|
|
|
|
## FORMATTING/PARSING FUNCTIONS
|
|
|
|
def check_srv(domain):
|
|
""" takes a domain and finds minecraft SRV records """
|
|
DNS.DiscoverNameServers()
|
|
srv_req = DNS.Request(qtype='srv')
|
|
srv_result = srv_req.req('_minecraft._tcp.{}'.format(domain))
|
|
|
|
for getsrv in srv_result.answers:
|
|
if getsrv['typename'] == 'SRV':
|
|
data = [getsrv['data'][2], getsrv['data'][3]]
|
|
return data
|
|
|
|
|
|
def parse_input(inp):
|
|
""" takes the input from the mcping command and returns the host and port """
|
|
inp = inp.strip().split(" ")[0]
|
|
if ":" in inp:
|
|
# the port is defined in the input string
|
|
host, port = inp.split(":", 1)
|
|
try:
|
|
port = int(port)
|
|
if port > 65535 or port < 0:
|
|
raise ParseError("The port '{}' is invalid.".format(port))
|
|
except ValueError:
|
|
raise ParseError("The port '{}' is invalid.".format(port))
|
|
return host, port
|
|
if has_dns:
|
|
# the port is not in the input string, but we have PyDNS so look for a SRV record
|
|
srv_data = check_srv(inp)
|
|
if srv_data:
|
|
return str(srv_data[1]), int(srv_data[0])
|
|
# return default port
|
|
return inp, 25565
|
|
|
|
|
|
def format_colors(motd):
|
|
for original, replacement in mc_colors:
|
|
motd = motd.replace(original, replacement)
|
|
motd = motd.replace(u"\xa7k", "")
|
|
return motd
|
|
|
|
|
|
def format_output(data):
|
|
if data["version"]:
|
|
return u"{motd}\x0f - {version}\x0f - {players}/{players_max}" \
|
|
u" players.".format(**data).replace("\n", u"\x0f - ")
|
|
else:
|
|
return u"{motd}\x0f - {players}/{players_max}" \
|
|
u" players.".format(**data).replace("\n", u"\x0f - ")
|
|
|
|
|
|
@hook.command
|
|
@hook.command("mcp")
|
|
def mcping(inp):
|
|
"""mcping <server>[:port] - Ping a Minecraft server to check status."""
|
|
try:
|
|
host, port = parse_input(inp)
|
|
except ParseError as e:
|
|
return "Could not parse input ({})".format(e)
|
|
|
|
try:
|
|
data = mcping_modern(host, port)
|
|
except PingError:
|
|
try:
|
|
data = mcping_legacy(host, port)
|
|
except PingError as e:
|
|
return "Could not ping server, is it offline? ({})".format(e)
|
|
|
|
return format_output(data)
|