#!/usr/bin/env python3 import os import pprint import re import sys import zlib import time import struct import curses import pprint import tarfile import logging import datetime import requests import threading import subprocess import yaml import click import colorama from pathlib import Path from PyQt6 import QtWidgets from rcon.source import Client import hlnaui home_dir = Path.home() logging.basicConfig(stream=sys.stderr, level=logging.CRITICAL) def find_file(path): """Находим все конфиги в зависимости от пути""" arr = next(os.walk(path), (None, None, []))[2] # [] if no file if '.directory' in arr: arr.remove('.directory') return arr dir_config = f"{home_dir}/.config/hlna/" dir_maps_ark = f"{dir_config}ARK/" dir_deactivated = f"{dir_maps_ark}deactivated/" list_config = find_file(dir_maps_ark) delist_config = find_file(dir_deactivated) list_allconfigs = list_config + delist_config def create_dir(directory): """Проверка и создание директории""" if not os.path.exists(directory): os.makedirs(directory) create_dir(dir_config) def path_server(): """Получение пути для хранения файлов серверов, записываем путь в yaml файл""" dir_server = input(f"""Укажите путь до каталога, где будут храниться файлы сервера. По-умолчанию {home_dir}/Servers/ :""") if not dir_server: dir_server = f"{home_dir}/Servers/" yaml_create("path_server", dir_server) return dir_server @click.group() def hlna(): pass def unpack(src, dst): """Добавить документацию""" with open(src, 'rb') as f: sigver = struct.unpack('q', f.read(8))[0] unpacked_chunk = f.read(8) packed = f.read(8) unpacked = f.read(8) size_unpacked_chunk = struct.unpack('q', unpacked_chunk)[0] size_packed = struct.unpack('q', packed)[0] size_unpacked = struct.unpack('q', unpacked)[0] # Verify the integrity of the Archive Header if sigver == 2653586369: if isinstance(size_unpacked_chunk, int) and isinstance(size_packed, int) and isinstance(size_unpacked, int): logging.info("Archive is valid.") logging.debug( f"Archive header size information. Unpacked Chunk: {size_unpacked_chunk}({unpacked_chunk}) Full Packed: {size_packed}({packed}) Full Unpacked: {size_unpacked}({unpacked})") # Obtain the Archive Compression Index compression_index = [] size_indexed = 0 while size_indexed < size_unpacked: raw_compressed = f.read(8) raw_uncompressed = f.read(8) compressed = struct.unpack('q', raw_compressed)[0] uncompressed = struct.unpack('q', raw_uncompressed)[0] compression_index.append((compressed, uncompressed)) size_indexed += uncompressed logging.debug( f"{len(compression_index)}: {size_indexed}/{size_unpacked} ({compressed}/{uncompressed}) - {raw_compressed} - {raw_uncompressed}") if size_unpacked != size_indexed: msg = f"Header-Index mismatch. Header indicates it should only have {size_unpacked} bytes when uncompressed but the index indicates {size_indexed} bytes." logging.critical(msg) return print_line(msg, flag="RED") # Read the actual archive data data = b'' read_data = 0 for compressed, uncompressed in compression_index: compressed_data = f.read(compressed) uncompressed_data = zlib.decompress(compressed_data) # Verify the size of the data is consistent with the archives index if len(uncompressed_data) == uncompressed: data += uncompressed_data read_data += 1 # Verify there is only one partial chunk if len(uncompressed_data) != size_unpacked_chunk and read_data != len(compression_index): msg = f"Index contains more than one partial chunk: was {len(uncompressed_data)} when the full chunk size is {size_unpacked_chunk}, chunk {read_data}/{len(compression_index)}" logging.critical(msg) return print_line(msg, flag="RED") else: msg = f"Uncompressed chunk size is not the same as in the index: was {len(uncompressed_data)} but should be {uncompressed}." logging.critical(msg) return print_line(msg, flag="RED") else: msg = f"Data types in the headers should be int's. Size Types: unpacked_chunk({type(size_unpacked_chunk)}), packed({type(size_packed)}), unpacked({type(size_unpacked)})" logging.critical(msg) return print_line(msg, flag="RED") else: msg = "The signature and format version is incorrect. Signature was {} should be 2653586369.".format(sigver) logging.critical(msg) return print_line(msg, flag="RED") # Write the extracted data to disk with open(dst, 'wb') as f: f.write(data) logging.info("Archive has been extracted.") def get_external_ip(): response = requests.get('https://api.ipify.org') return response.text def print_line(*text, flag="", sep=" ", end="\n"): """Добавление обводки вокруг текста, покраска""" if flag == "RED": color = colorama.Fore.RED elif flag == "YELLOW": color = colorama.Fore.YELLOW elif flag == "GREEN": color = colorama.Fore.GREEN elif flag == "CYAN": color = colorama.Fore.CYAN else: color = colorama.Fore.WHITE len_text = str(*text) len_text = len_text.split("\n") max_length = max(len(str(string)) for string in len_text) + 2 print(color + "." * max_length) print(color, *text, sep=sep, end=end) print(color + "." * max_length + colorama.Style.RESET_ALL) def zmeyuka(stdscr, func): print_line(func) curses.curs_set(0) stdscr.nodelay(1) def nachalo_stroki(): while func.do_run: zmejka = '⠇⠋⠙⠸⠴⠦' for i in range(len(zmejka)): print('\r' + zmejka[i], end="", flush=True) time.sleep(0.25) th = threading.Thread(target=func) th.start() # Запуск потока со змеюкой zmeyuka_thread = threading.Thread(target=nachalo_stroki) zmeyuka_thread.start() # Ожидаем завершение функции th.join() # Убираем змеюку zmeyuka_thread.do_run = False zmeyuka_thread.join() curses.curs_set(1) stdscr.nodelay(0) def check_int(number=""): """Проверка на ввод числа""" while True: try: x = input(number) if x == "": return 0 x = int(x) return x except ValueError: print_line("Введите число", flag="CYAN") @hlna.command(help='Восстановление бэкапов серверов в ') @click.argument('g', nargs=1) @click.option('-m', default='all', help="Название карты для запуска или all для запуска всех карт") @click.option('-d', required=True, help="Путь до zip архива") def restore(g, m, d): """Получение пути к файлам внутри архива""" with tarfile.open(d, 'r') as tar_file: files = tar_file.getnames() """Извлечение файлов""" for i in files: with tar_file.extract(d, 'r:gz') as tar_file: path_extarct = "./" if g == 'test' else "/" tar_file.extract(i, path_extarct) print_line(f"i - перемещен", flag="GREEN") print_line(f"Бэкап {d} восстановлен", flag="GREEN") @hlna.command(help='Бэкап серверов выбранной игры (.+)', modname) modname = modname and modname.group(1) if os.path.isfile(f"{dir_mod_ark}.mod"): os.remove(f"{dir_mod_ark}.mod") with open(f"{dir_extract}/mod.info", "rb") as modinfo: data = modinfo.read() mapnamelen = struct.unpack_from("> {dir_logs}{date} 2>&1") with open(f"{dir_logs}{date}.log", "a") as f: f.write(f"[{t}] Сервер {i} перемещён из {state_msg}\n") # переписать эту залупу else: x = os.system(f"mv {dir_maps_ark}{i} {dir_deactivated} >> {dir_logs}{date} 2>&1") with open(f"{dir_logs}{date}.log", "a") as f: f.write(f"[{t}] Сервер {i} перемещён из {state_msg}\n") if x == 0: # start = "start" if e else "stop" enable = "enable" if e else "disable" os.system(f"systemctl --user {enable} ark_{i.lower()}") print_line(f"Выполнено для сервера- {i}", flag="GREEN") else: print_line(f"Ошибка перемещения {i}", flag="RED") except: print_line("ошибка операции", flag="RED") @hlna.command(help='Выводит статус настроеных серверов') @click.argument('g', nargs=1) @click.option("-m", default='all', help="Название cервера") def status(g, m="all", list_config=list_config): """print_status делает вывод, flag - отвечает за показ активных/неактивных, под 7days надо будет дописать""" def print_status(g, status_map, flag=True): data = status_map print_line(data["status"], flag=("YELLOW", "RED")[flag] if data['status'] == "Не запущен" else "GREEN") print_line(f""" Имя сервера: {data['SessionName']} Карта: {data['map']} Моды: {data['ModsId']} Пароль: {data['ServerPassword']} Кластер: {data['Cluster']} Кластер id: {data['clusterid']} Query порт: {data['QueryPort']} Порт сервера: {data['Port']} Rcon включен: {data['RCONEnabled']} Rcon порт : {data['RCONPort']} Максимальное кол-во игроков: {data['MaxPlayers']} steam://connect/{get_external_ip()}:{data['QueryPort']}""", flag=("YELLOW","CYAN")[flag]) def get_param(g, list_con, flag=True): for i in list_con: data = read_yaml(g=g, m=i, flag=flag) status_map["ark"][flag][data["SessionName"]] = data status_map["ark"][flag][data["SessionName"]]['status'] = "Не запущен" if os.system(f"lsof -w -i :{data['Port']}") else "Запущен" if list_config == [] and delist_config == []: print_line("Сервера не сконфигурированы", flag="RED") exit() "Это скорее всего можно как то покрасивее записать, но пока так)" status_map = {} status_map["ark"] = {} status_map["ark"][True] = {} status_map["ark"][False] = {} get_param("ark", list_config) get_param("ark", delist_config, False) if g == "ark": name_servers = choose_map(g, m) for i in name_servers: print_status(g, status_map[g][True][i]) if delist_config != []: x = input("Есть неактивные сервера, показать Y/n: ") if x != "n": for i in delist_config: print_status(g, status_map[g][False][i], False) return status_map @hlna.command(help='Запуск, сконфигурированного сервера или кластера ') @click.argument('g', nargs=1) @click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт") def start(g, m): """Запускает сервер выбранной игры""" # добавить проверку на ввод аргумента ark/7days если else: давать подсказку # если нет конфигов, то выводим что серверов нет g = g.lower() if g == "ark": modupdateall(g, m) start_stop("start", g, m) @hlna.command(help='Остановка, сконфигурированного сервера или кластера ') @click.argument('g', nargs=1) @click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт") def stop(g, m): g = g.lower() if g == "ark": modupdateall(g, m) start_stop("stop", g, m) @hlna.command(help='Перезапуск, сконфигурированного сервера или кластера ') @click.argument('g', nargs=1) @click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт") def restart(g, m): g = g.lower() if g == "ark": modupdateall(g, m) start_stop("restart", g, m) def check_exist_servers(g): """Проверяет наличие конфигов для активных карт""" g = g.lower() if g == "ark" and not list_config: print_line("Нет сконфигурированных серверов", flag="RED") # добавить отсюда вилку на вопрос с конфигурацией elif g == "7days": print_line("7Days", flag="CYAN") else: return 1 def start_stop(action, g, m): """Функция изменения статусов сервера""" g = g.lower() if g == "ark": x = check_exist_servers(g) if x: name_servers = choose_map(g, m) if m != 'all' else list_config for i in name_servers: data = read_yaml(g="ark", m=i, flag=True) if action == "stop" or action == "restart": rcon_local(i, "SaveWorld") x = os.system(f"systemctl --user {action} ark_{data['SessionName'].lower()}.service") if x == 0: print_line(f"Готово {action} для {g} {i}", flag="GREEN") elif g == "7days": x = os.system(f"systemctl --user {action} 7days.service") if x == 0: print_line(f"Готово {action} для {m}", flag="GREEN") else: print_line("доступные игры: ark и 7days") def read_yaml(g="", m="", flag=True): """Читает конфиги активных или неактивных карт в зависимости от флага и отдаёт данные туда где их запросили""" g = g.lower() if g == "ark": if m == "all": print_line("Не правильный вызов yaml, должен вызываться из цикла") path_yaml = f"{dir_maps_ark}{m}" if flag else f"{dir_deactivated}{m}" elif g == "path_server": path_yaml = dir_config + "config" with open(path_yaml, "r") as yamlfile: data = yaml.load(yamlfile, Loader=yaml.FullLoader) return data[0] # возвращаем словарь со всеми значениями def choose_map(g, m, list_config=list_config): """Функция выбора карт""" g = g.lower() new_arr = [] if g == "ark": dict_mapname = {} dict_allmapname = [] print_line(list_config) for i in list_config: data = read_yaml(g="ark", m=i) dict_mapname[data['SessionName']] = data['map'] dict_allmapname.append(data['SessionName']) name_servers = [] for ns, v in dict_mapname.items(): if v in m: name_servers.append(ns) if list_config != []: # Перенести выше для проверки наличия конфигов if m == "all": new_arr = dict_allmapname print_line(f"Выполняется для карт(-ы) {new_arr}", flag="CYAN") else: if name_servers: name_servers = sorted(name_servers) print_line('Найдены сервера с этой картой', flag="CYAN") for i, map in enumerate(name_servers): print_line(f"{i + 1}) {map}", flag="CYAN") print_line(f"{i + 2} Все", flag="CYAN") x = None if i != 0: while True: try: x = input("Выберите сервер из списка, либо несколько через запятую: ").split(',') x = [int(j) for j in x] if i + 2 in x: return name_servers break except: print_line("Неправильный ввод", flag="RED") else: x = [1] for i in x: new_arr.append(name_servers[i - 1]) print_line(f"Выбранные сервера: {name_servers}", flag="CYAN") else: print_line("Не найдено серверов с картой") return new_arr @hlna.command(help='Отправка команд на игровой сервер через rcon ') @click.argument('c', nargs=1) @click.option('-m', required=True, help="Название карты для применения rcon команды") def rcon(m, c): rcon_local(m, c) def rcon_local(m, c): try: dict_mapname = {} dict_adminpwd = {} if list_config: rcon_ports = [] for i in list_config: data = read_yaml(g="ark", m=i) dict_mapname[data['RCONPort']] = data['map'] dict_adminpwd[data['RCONPort']] = data['ServerAdminPassword'] if m == "all": for rcon_p in dict_mapname: rcon_ports.append(rcon_p) else: for rcon_p, name_map in dict_mapname.items(): if name_map in m: rcon_ports.append(rcon_p) for port in rcon_ports: passwd = dict_adminpwd[port] with Client('127.0.0.1', port, passwd=str(passwd)) as client: response = client.run(c) print_line(f"Rcon выполнен {response} {dict_mapname[port]}", flag="GREEN") else: pass except: print_line(f"Ошибка отправки команды {c} в {m}", flag="RED") def zero(x=""): """Потом пригодится (нет)""" return "" if not os.path.exists(dir_config + "config"): dir_server = path_server() else: data = read_yaml(g="path_server") if data['path_server'] == "": path_server() else: dir_server = data['path_server'] dir_unit = f"{home_dir}/.config/systemd/user/" dir_logs = f"{dir_config}logs/" dir_server_ark = f"{dir_server}ARK/" dir_ark_save = f"{dir_server_ark}ShooterGame/Saved/SavedArks/" dir_workshop_ark = f"{home_dir}/.local/share/Steam/steamapps/workshop" dir_shooter = "ShooterGame" dir_mods_ark = f"{dir_server_ark}ShooterGame/Content/Mods" dir_server_7days = f"{dir_server}/7Days/" now = datetime.datetime.now() date = now.strftime("%Y-%m-%d") t = now.strftime("%H:%M:%S") create_dir(dir_server) create_dir(dir_unit) create_dir(dir_logs) class HlnaApp(QtWidgets.QMainWindow, hlnaui.Ui_mainWindow): def __init__(self): super().__init__() self.setupUi(self) def hlnag(): if len(sys.argv) > 1: hlna() else: app = QtWidgets.QApplication(sys.argv) window = HlnaApp() window.show() sys.exit(app.exec()) if __name__ == '__main__': hlnag()