Browse Source

Add Minecraft backup cron script

master
Noah Petherbridge 6 years ago
parent
commit
832001e1ef
  1. 2
      .gitignore
  2. 176
      misc/cron/mc-backup.py
  3. 175
      misc/cron/mcclient.py

2
.gitignore

@ -0,0 +1,2 @@
__pycache__
*.pyc

176
misc/cron/mc-backup.py

@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""mc-backup.py: automate backups of my Minecraft servers, using the
minecraft-control wrapper.
Usage: mc-backup.py -c minecraft_control.ini -s /path/to/server
The `minecraft-control` password is obtained from the settings.ini for
minecraft-control. Backups are placed in the `backups/` directory under the
Minecraft server root, named with datetime stamps.
See `mc-backup.py --help` for command usage."""
import argparse
from configparser import RawConfigParser
import datetime
import logging
import os
import subprocess
import sys
import time
# Import the minecraft-control client library.
from mcclient import MinecraftClient
logging.basicConfig()
log = logging.getLogger("mc-backup")
log.setLevel(logging.INFO)
class Application:
def __init__(self, args):
"""Initialize the application."""
self.args = args
# Verify settings.
self.verify_args()
self.config = dict() # minecraft-control configuration
self.client = None # MinecraftClient instance
self.today = datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
self.world = self.get_world_name()
log.info("Today's date/time: {}".format(self.today))
# Load the minecraft-control configuration.
self.load_mc_config()
def verify_args(self):
"""Do some sanity checking on input arguments."""
# The minecraft-control config file.
if not os.path.isfile(self.args.config):
log.error("{}: not a file".format(self.args.config))
sys.exit(1)
# The server directory.
if not os.path.isfile("{}/server.properties".format(self.args.server)):
log.error("{}: not a Minecraft server directory (no "
"server.properties file present)".format(self.args.server))
sys.exit(1)
def load_mc_config(self):
"""Load the configuration from minecraft-control."""
log.debug("Loading minecraft-control settings from {}".format(
args.config
))
parser = RawConfigParser()
with open(self.args.config, "r") as fh:
parser.readfp(fh)
self.config["host"] = parser.get("tcp-server", "address")
self.config["port"] = parser.get("tcp-server", "port")
self.config["password"] = parser.get("auth", "password")
self.config["method"] = parser.get("auth", "method")
def get_world_name(self):
"""Load the server.properties to find the world name."""
with open("{}/server.properties".format(self.args.server), "r") as fh:
for line in fh:
if not "=" in line: continue
parts = line.split("=", 1)
key = parts[0].strip()
value = parts[1].strip()
if key == "level-name":
return value
raise ValueError("No level-name found in server.properties!")
def run(self):
"""Run the main program's logic."""
# Change into the server's directory.
os.chdir(self.args.server)
# Backups directory.
backups = os.path.join(self.args.server, "backups")
if not os.path.isdir(backups):
log.info("Creating backups directory: {}".format(backups))
os.mkdir(backups)
# Connect to the Minecraft-Control server.
log.info("Connecting to Minecraft control server...")
self.client = MinecraftClient(
host=self.config["host"],
port=self.config["port"],
password=self.config["password"],
methods=[self.config["method"]],
)
self.client.add_handler("auth_ok", self.on_auth_ok)
self.client.add_handler("auth_error", self.on_auth_error)
self.client.add_handler("server_message", self.on_message)
self.client.connect()
self.client.start()
def on_auth_ok(self, mc):
"""Handle successful authentication."""
log.info("Connection to server established and authenticated!")
# Target file name.
fname = self.today + ".tar.gz"
target = os.path.join("backups", fname)
# Turn off saving and save the world now.
log.info("Turning off auto-saving and saving the world now!")
self.client.sendline("save-off")
self.client.sendline("save-all")
time.sleep(5)
# Archive the world.
log.info("Backing up the world as: {}".format(target))
subprocess.call(["tar", "czvf", target, self.world])
# Turn saving back on.
time.sleep(5)
log.info("Turning auto-saving back on!")
self.client.sendline("save-on")
# Fin
sys.exit(0)
def on_auth_error(self, mc, error):
"""Handle unsuccessful authentication."""
log.error(error)
sys.exit(1)
def on_message(self, mc, message):
"""Handle a Minecraft server message."""
log.info("Minecraft server says: {}".format(message))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Minecraft Backup Taker")
parser.add_argument("--debug", "-d",
help="Turn on debug mode.",
action="store_true",
)
parser.add_argument("--config", "-c",
help="Path to minecraft-control configuration file (settings.ini)",
type=str,
required=True,
)
parser.add_argument("--server", "-s",
help="Path to the Minecraft server on disk (should contain "
"./server.properties file)",
type=str,
required=True,
)
args = parser.parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
Application(args).run()

175
misc/cron/mcclient.py

@ -0,0 +1,175 @@
#!/usr/bin/env python
from __future__ import print_function
import socket
import hashlib
import time
"""mcclient.py: A client library for minecraft-control."""
# Quick methods for making MD5 or SHA1 hashes.
def md5(message):
h = hashlib.md5()
h.update(message)
return h.hexdigest()
def sha1(message):
h = hashlib.sha1()
h.update(message)
return h.hexdigest()
class MinecraftClient(object):
"""A client library for the minecraft-control server."""
def __init__(self, host, port, password, methods=None, debug=False):
"""Create a connection to a minecraft-control server.
* `host` and `port` provide the server information for connecting.
* `password` is the authentication password.
* `methods` is the list of acceptable auth methods that the server
must support one of (to prevent accidentally leaking the password
if the server uses plain authentication and you aren't expecting it).
* `debug` turns on some debug messages.
"""
self.host = host
self.port = int(port)
self.password = password
self.debug = debug
self.methods = methods
# Handlers.
self.handlers = dict()
# Current state.
self.auth_method = None
self.authed = False
self.challenge = None
def say(self, message):
"""Emit a debug message."""
if self.debug:
print("[DEBUG] {}".format(message))
def add_handler(self, event, handler):
"""Add an event handler.
Supported handlers and their arguments are:
* auth_ok(self):
Called when the auth handshake is successfully completed. You can now
get/receive messages from the Minecraft server.
* auth_error(self, error_message):
Called when an authentication error was received, i.e. that the
password is invalid OR the server uses an auth method not in the
`methods` whitelist. The `error_message` will tell you which it is.
* server_message(self, message):
A line of text from the Minecraft server was received.
"""
self.handlers[event] = handler
def _event(self, event, *args):
"""Call an event handler."""
if event in self.handlers:
self.handlers[event](self, *args)
def connect(self):
"""Establishes the connection to the minecraft-control server."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def start(self):
"""Start an infinite loop of `do_one_loop()`"""
while self.do_one_loop():
time.sleep(0.01)
def do_one_loop(self):
"""Perform a single event loop with the server."""
data = self.sock.recv(4192)
if data == "":
self.say("The server has gone away!")
self.sock.close()
# Reset the state.
self.auth_method = None
self.authed = False
self.challenge = None
return False
data = data.decode("utf-8").strip()
self.say("<<< {}".format(data))
# Are we authed yet?
if not self.authed:
# Look for the auth method packet.
if data.startswith("AUTH_METHOD"):
fields = data.split(" ")
if len(fields) > 1: # sanity test
auth_method = fields[1]
# Is the auth method acceptable?
if type(self.methods) == list:
if not auth_method in self.methods:
# Not ok!
error = "Server is using an auth method that we aren't allowing: {}".format(auth_method)
self.say(error)
self._event("auth_error", error)
return False
# Store the auth method used.
self.auth_method = auth_method
# Is there a challenge?
if len(fields) >= 3:
self.challenge = fields[2]
# Send the password.
self._send_auth()
elif data.startswith("AUTH_OK"):
# Password accepted!
self.authed = True
self._event("auth_ok")
elif data.startswith("AUTH_ERROR"):
# Password error!
self._event("auth_error", "Password not accepted by server.")
else:
# We're in!
self._event("server_message", data)
return True
def sendline(self, line):
"""Send a line of text to the server, with an implied line feed ending."""
self.say(">>> {}".format(line))
self.sock.send("{}\n".format(line).encode("utf-8"))
def send(self, text):
"""Send a raw message to the server. No line ending is added."""
self.say(">>> {}".format(text).encode("utf-8"))
self.sock.send(text)
def _send_auth(self):
"""Send the AUTH packet to log in."""
response = None
if self.auth_method == "plain":
response = self.password
elif self.auth_method == "md5":
response = md5(self.challenge + md5(self.password))
elif self.auth_method == "sha1":
response = sha1(self.challenge + sha1(self.password))
else:
raise Exception("Server is using unsupported auth method: {}".format(self.auth_method))
self.sendline("AUTH {}".format(response))
Loading…
Cancel
Save