#!/usr/bin/env python3 import os import re import sys import zlib import struct import shutil import logging import datetime import subprocess import yaml import click import colorama from pathlib import Path from rcon.source import Client home_dir = Path.home() logging.basicConfig(stream=sys.stderr, level=logging.CRITICAL) class UnpackException(Exception): pass class SignatureUnpackException(UnpackException): pass class CorruptUnpackException(UnpackException): pass def find_file(path): """Находим все конфиги в зависимости от пути""" arr = next(os.walk(path), (None, None, []))[2] # [] if no file x = arr.count('.directory') y = arr.count('logs') if x > 0: arr.remove('.directory') if y > 0: arr.remove('logs') return arr dir_config = f"{home_dir}/.config/hlna/" dir_maps_ark = f"{dir_config}/ARK/" dir_deactivated = f"{dir_maps_ark}deactivated/" list_config = find_file(dir_maps_ark) delist_config = find_file(dir_deactivated) def path_server(): dir_server = input(f"""Укажите путь до каталога, где будут храниться файлы сервера. По-умолчанию {home_dir}/Servers/ :""") if dir_server == "": dir_server = f"{home_dir}/Servers/" yaml_create(game := "path_server", dir_server) return dir_server @click.group() def hlna(): pass def unpack(src, dst): with open(src, 'rb') as f: sigver = struct.unpack('q', f.read(8))[0] unpacked_chunk = f.read(8) packed = f.read(8) unpacked = f.read(8) size_unpacked_chunk = struct.unpack('q', unpacked_chunk)[0] size_packed = struct.unpack('q', packed)[0] size_unpacked = struct.unpack('q', unpacked)[0] #Verify the integrity of the Archive Header if sigver == 2653586369: if isinstance(size_unpacked_chunk, int) and isinstance(size_packed , int) and isinstance(size_unpacked , int): logging.info("Archive is valid.") logging.debug(f"Archive header size information. Unpacked Chunk: {size_unpacked_chunk}({unpacked_chunk}) Full Packed: {size_packed}({packed}) Full Unpacked: {size_unpacked}({unpacked})") #Obtain the Archive Compression Index compression_index = [] size_indexed = 0 while size_indexed < size_unpacked: raw_compressed = f.read(8) raw_uncompressed = f.read(8) compressed = struct.unpack('q', raw_compressed)[0] uncompressed = struct.unpack('q', raw_uncompressed)[0] compression_index.append((compressed, uncompressed)) size_indexed += uncompressed logging.debug(f"{len(compression_index)}: {size_indexed}/{size_unpacked} ({compressed}/{uncompressed}) - {raw_compressed} - {raw_uncompressed}") if size_unpacked != size_indexed: msg = f"Header-Index mismatch. Header indicates it should only have {size_unpacked} bytes when uncompressed but the index indicates {size_indexed} bytes." logging.critical(msg) raise CorruptUnpackException(msg) #Read the actual archive data data = b'' read_data = 0 for compressed, uncompressed in compression_index: compressed_data = f.read(compressed) uncompressed_data = zlib.decompress(compressed_data) #Verify the size of the data is consistent with the archives index if len(uncompressed_data) == uncompressed: data += uncompressed_data read_data += 1 #Verify there is only one partial chunk if len(uncompressed_data) != size_unpacked_chunk and read_data != len(compression_index): msg = f"Index contains more than one partial chunk: was {len(uncompressed_data)} when the full chunk size is {size_unpacked_chunk}, chunk {read_data}/{len(compression_index)}" logging.critical(msg) raise CorruptUnpackException(msg) else: msg = f"Uncompressed chunk size is not the same as in the index: was {len(uncompressed_data)} but should be {uncompressed}." logging.critical(msg) raise CorruptUnpackException(msg) else: msg = f"Data types in the headers should be int's. Size Types: unpacked_chunk({type(size_unpacked_chunk)}), packed({type(size_packed)}), unpacked({type(size_unpacked)})" logging.critical(msg) raise CorruptUnpackException(msg) else: msg = "The signature and format version is incorrect. Signature was {} should be 2653586369.".format(sigver) logging.critical(msg) raise SignatureUnpackException(msg) #Write the extracted data to disk with open(dst, 'wb') as f: f.write(data) logging.info("Archive has been extracted.") def create_dir(directory): """Проверка и создание директории""" if not os.path.exists(directory): os.makedirs(directory) def print_line(text): """Добавление тире вокруг текста, покраска""" print(colorama.Fore.YELLOW + "-" * 30) print(f"{colorama.Fore.GREEN} + {text}") print(colorama.Fore.YELLOW + "-" * 30 + colorama.Style.RESET_ALL) def check_int(number=""): """Проверка на ввод числа""" while True: try: x = input(number) if x == "": return 0 x = int(x) return x except ValueError: print_line("Введите число") @hlna.command(help='Сбор настроек для сервера или кластера') def config(): while True: count_game = check_int("""Выберите игру для конфигурирования 1. ARK Survival Evolved 2. 7 Days to Die : """) if count_game == 1: config_ark() break elif count_game == 2: config_7daystodie() break else: print_line("Пока есть только ARK и 7Days xD") def config_ark(list_config=list_config): create_dir(dir_server_ark) create_dir(dir_maps_ark) """Сбор данных для конфига""" data = {} port_s = [] rcon_p = [] query_p = [] cluster_id = "" cluster_dir_override = "" count_cluster = check_int("""Укажите требуется ли кластер? default: Нет 1. Да 2. Нет : """) if count_cluster == 1: cluster_server = True cluster_id = input("Укажите id для кластера, любое сочетание символов: \n") create_dir(dir_server_ark + cluster_id) cluster_dir_override = (dir_server_ark + cluster_id) else: cluster_server = False if list_config: print("Уже установленные карты: ") for i in list_config: data = read_yaml(i, game="ARK") print(f"{i} : {data['map']}") count_maps = check_int("Укажите количество карт: \n") if count_maps == 0: # 0 возвращает check_int когда, ничего не было введено count_maps = 1 for i in range(count_maps): while True: "Проверка на выбор карты из списка" amount_map = check_int("""Выберите карту из списка указав номер 1. The Island 2. The Center 3. Scorched Earth 4. Ragnarok 5. Aberration 6. Extinction 7. Valguero 8. Genesis: Part 1 9. Crystal Isles 10. Genesis: Part 2 11. Lost Island 12. Fjordur : """) if amount_map == 0: # 0 возвращает check_int когда, ничего не было введено amount_map = i + 1 if 0 < amount_map <= 12: break if list_config: for i in list_config: data = read_yaml(i, game="ARK") port_s.append(data['Port']) rcon_p.append(data['RCONPort']) query_p.append(data['QueryPort']) if amount_map == 1: map_s = "TheIsland" elif amount_map == 2: map_s = "TheCenter" elif amount_map == 3: map_s = "ScorchedEarth_P" elif amount_map == 4: map_s = "Ragnarok" elif amount_map == 5: map_s = "Aberration_P" elif amount_map == 6: map_s = "Extinction" elif amount_map == 7: map_s = "Valguero_P" elif amount_map == 8: map_s = "Genesis" elif amount_map == 9: map_s = "CrystalIsles" elif amount_map == 10: map_s = "Gen2" elif amount_map == 11: map_s = "LostIsland" elif amount_map == 12: map_s = "Fjordur" else: # Если вдруг каким-то боком проверка не отработает и не будет нужной цифры map_s = 'TheIsland' def ports(ports_arr): while True: port = check_int("") if not port: if not ports_arr: return 0 else: port = max(ports_arr) + 2 if port in ports_arr: print("Порт уже занят") else: return port if list_config: data = read_yaml(list_config[-1], game="ARK") while True: name_server = input("Укажите название Сервера: \n") if name_server == "": if map_s in list_config: count = 1 new_name = map_s while new_name in list_config: new_name = f"{map_s}{str(count)}" count += 1 list_config.append(new_name) print(list_config) break else: list_config.append(map_s) break else: if name_server in list_config: print_line("Имя занято") else: list_config.append(name_server) # если enter, то ставим последним элементом карту break print("Укажите порт сервера: ") port_server = ports(port_s) if port_server == 0: port_server = 7777 print("Порт Сервера=", port_server) print("Укажите query порт сервера: ") query_port = ports(query_p) if query_port == 0: query_port = 27015 print("Query Port=", query_port) rcon_enabled = True if rcon_p == []: rcon_port = 27020 else: rcon_port = max(rcon_p) + 1 password_server = input("Укажите пароль Сервера: \n") adminpassword_server = 123 max_players = check_int("Укажите максимальное количество игроков: \n") if max_players == 0: max_players = 70 print("Передавать сервер в глобальный список серверов steam?") listen_server_amount = check_int("""\n 1. Да 2. Нет :""") if listen_server_amount == 1: listen_server = True elif listen_server_amount == 2: listen_server = False else: listen_server = True yaml_create(game := "ARK", dir_server := "", cluster_server, map_s, list_config[-1], port_server, query_port, rcon_enabled, rcon_port, adminpassword_server, password_server, max_players, cluster_id, cluster_dir_override, listen_server) def config_7daystodie(): list_simvols = "$@-.%{}+/".split() create_dir(dir_server_7days) print("Введите имя конфига (serverconfig):\n") config_7days = input() if config_7days == "": config_7days = "serverconfig" elif config_7days == "serverconfig": config_7days = "serverconfig" elif config_7days in list_simvols: print_line("Запрещённые символы") else: xml_parser() systemd_unit_create(game := "7Days", config_7days) def xml_parser(): print("Я не умею парсить))") def yaml_create(game, dir_server="", cluster_server="", map_s="", name_server="", port_server="", query_port="", rcon_enabled="", rcon_port="", adminpassword_server="", password_server="", max_players="", id_mods_ark="", cluster_id="", cluster_dir_override="", listen_server=""): if game == "ARK": print_line(dir_maps_ark) print_line(name_server) path_yaml = dir_maps_ark + name_server settings = [ { 'map': map_s, 'Cluster': cluster_server, 'SessionName': name_server, 'Port': port_server, 'QueryPort': query_port, 'RCONEnabled': rcon_enabled, 'RCONPort': rcon_port, 'ServerAdminPassword': adminpassword_server, 'ServerPassword': password_server, 'MaxPlayers': max_players, 'ModsId': id_mods_ark, 'Listen': listen_server, 'ServerPath': dir_server_ark, 'clusterid': cluster_id, 'clusterdir': cluster_dir_override } ] elif game == "path_server": path_yaml = dir_config + "config" settings = [ { 'path_server': dir_server } ] elif game != "": systemd_unit_create(game) with open(path_yaml, 'w') as yamlfile: yaml.dump(settings, yamlfile) print(colorama.Fore.GREEN + "Конфиг создан" + colorama.Style.RESET_ALL) def systemd_unit_create(game, config_7days="", name_server=list_config): if game == "ARK": id_game = "376030" for i in name_server: data = read_yaml(i, game="ARK") ntff = "" if not data['Cluster'] else "-NoTransferFromFiltering" unit_dir_server = dir_server_ark systemd_unit_exec = f"{dir_server_exec}ShooterGameServer {data['map']}?listen={data['Listen']}?GameModIds=2967069515?SessionName={data['SessionName']}?Port={data['Port']}?QueryPort={data['QueryPort']}?RCONEnabled={data['RCONEnabled']}?RCONPort={data['RCONPort']}?ServerAdminPassword={data['ServerAdminPassword']}??MaxPlayers={data['MaxPlayers']} -clusterid={data['clusterid']} -ClusterDirOverride={data['clusterdir']} {ntff}" unit_file = f"{dir_unit}ark_{data['SessionName']}.service".lower() elif game == "7Days": id_game = "294420" # сюда дописать обращение к xml для получения уникального имени сервера unit_dir_server = dir_server_7days systemd_unit_exec = f"{dir_server_7days}startserver.sh -configfile={config_7days}.xml" unit_file = f"{dir_unit}7days.service".lower() unit_text = f'''[Unit] Description={game}: Server Wants=network-online.target After=syslog.target network.target nss-lookup.target network-online.target [Service] ExecStartPre=/usr/bin/steamcmd +force_install_dir {unit_dir_server} +login anonymous +app_update {id_game} +quit TimeoutStartSec=99999 ExecStart={systemd_unit_exec} LimitNOFILE=100000 ExecReload=/bin/kill -s HUP $MAINPID ExecStop=/bin/kill -s INT $MAINPID [Install] WantedBy=default.target ''' with open(unit_file, 'w') as systemd_unit: systemd_unit.write(unit_text) unit_name = unit_file.split("/")[-1] os.system(f"systemctl --user unmask {unit_name}") os.system('systemctl --user daemon-reload') os.system(f"systemctl --user enable {unit_name}") @hlna.command(help='Для скачивания и установки модов') @click.option('-g', help="Название игры для запуска. (ark, 7days") @click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт") def modinstall(g, m): if g == "ark": id_game_workshop = "346110" if not os.path.isdir(dir_workshop_ark): create_dir(dir_workshop_ark) id_mods_ark = input("""Укажите id модов через запятую :""").split(",") for id_mod in id_mods_ark: os.system(f"steamcmd +login anonymous +workshop_download_item {id_game_workshop} {id_mod} +quit") modextract(id_mod, id_game_workshop) def modextract(id_mod, id_game_workshop): dir_steam_workshop = f"{dir_workshop_ark}/content/{id_game_workshop}/{id_mod}/WindowsNoEditor/" dir_ark_mods = f"{dir_mods_ark}{id_mod}" dir_extract = dir_ark_mods if id_mod == "111111111": return for dirpath, dirnames, filenames in os.walk(dir_steam_workshop): for dname in dirnames: os.makedirs(os.path.join(dir_extract, os.path.relpath(os.path.join(dirpath, dname), dir_steam_workshop)), exist_ok=True) if not os.path.isdir(os.path.join(dir_steam_workshop, dname)): shutil.rmtree(os.path.join(dir_extract, os.path.relpath(os.path.join(dirpath, dname), dir_steam_workshop))) try: for curdir, subdirs, files in os.walk(os.path.join(dir_steam_workshop)): for i, file in enumerate(files): name, ext = os.path.splitext(file) if ext == ".z": src = os.path.join(curdir, file) dst = os.path.join(dir_extract, os.path.relpath(name)) uncompressed = os.path.join(curdir, file + ".uncompressed_size") unpack(src, dst) print("[+] Extracted " + file) os.remove(src) if os.path.isfile(uncompressed): os.remove(uncompressed) except (UnpackException, SignatureUnpackException, CorruptUnpackException) as e: print("[x] Unpacking .z files failed, aborting mod install") return False modname = subprocess.check_output(['curl', '-s', 'https://steamcommunity.com/sharedfiles/filedetails/?id={}'.format(id_mod)]).decode('utf-8') modname = re.search(r'