hln-a/hlna.py

942 lines
37 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import re
import sys
import zlib
import struct
import logging
import datetime
import requests
import subprocess
import yaml
import click
import colorama
import hlnaui
from pathlib import Path
from rcon.source import Client
from PyQt6 import QtWidgets
home_dir = Path.home()
logging.basicConfig(stream=sys.stderr, level=logging.CRITICAL)
def find_file(path):
"""Находим все конфиги в зависимости от пути"""
arr = next(os.walk(path), (None, None, []))[2] # [] if no file
if '.directory' in arr:
arr.remove('.directory')
return arr
dir_config = f"{home_dir}/.config/hlna/"
dir_maps_ark = f"{dir_config}ARK/"
dir_deactivated = f"{dir_maps_ark}deactivated/"
list_config = find_file(dir_maps_ark)
delist_config = find_file(dir_deactivated)
def create_dir(directory):
"""Проверка и создание директории"""
if not os.path.exists(directory):
os.makedirs(directory)
create_dir(dir_config)
def path_server():
"""Получение пути для хранения файлов серверов, записываем путь в yaml файл"""
dir_server = input(f"""Укажите путь до каталога, где будут храниться файлы сервера. По-умолчанию {home_dir}/Servers/
:""")
if not dir_server:
dir_server = f"{home_dir}/Servers/"
yaml_create("path_server", dir_server)
return dir_server
@click.group()
def hlna():
pass
def unpack(src, dst):
"""Добавить документацию"""
with open(src, 'rb') as f:
sigver = struct.unpack('q', f.read(8))[0]
unpacked_chunk = f.read(8)
packed = f.read(8)
unpacked = f.read(8)
size_unpacked_chunk = struct.unpack('q', unpacked_chunk)[0]
size_packed = struct.unpack('q', packed)[0]
size_unpacked = struct.unpack('q', unpacked)[0]
# Verify the integrity of the Archive Header
if sigver == 2653586369:
if isinstance(size_unpacked_chunk, int) and isinstance(size_packed, int) and isinstance(size_unpacked, int):
logging.info("Archive is valid.")
logging.debug(
f"Archive header size information. Unpacked Chunk: {size_unpacked_chunk}({unpacked_chunk}) Full Packed: {size_packed}({packed}) Full Unpacked: {size_unpacked}({unpacked})")
# Obtain the Archive Compression Index
compression_index = []
size_indexed = 0
while size_indexed < size_unpacked:
raw_compressed = f.read(8)
raw_uncompressed = f.read(8)
compressed = struct.unpack('q', raw_compressed)[0]
uncompressed = struct.unpack('q', raw_uncompressed)[0]
compression_index.append((compressed, uncompressed))
size_indexed += uncompressed
logging.debug(
f"{len(compression_index)}: {size_indexed}/{size_unpacked} ({compressed}/{uncompressed}) - {raw_compressed} - {raw_uncompressed}")
if size_unpacked != size_indexed:
msg = f"Header-Index mismatch. Header indicates it should only have {size_unpacked} bytes when uncompressed but the index indicates {size_indexed} bytes."
logging.critical(msg)
return print_line(msg, flag=False)
# Read the actual archive data
data = b''
read_data = 0
for compressed, uncompressed in compression_index:
compressed_data = f.read(compressed)
uncompressed_data = zlib.decompress(compressed_data)
# Verify the size of the data is consistent with the archives index
if len(uncompressed_data) == uncompressed:
data += uncompressed_data
read_data += 1
# Verify there is only one partial chunk
if len(uncompressed_data) != size_unpacked_chunk and read_data != len(compression_index):
msg = f"Index contains more than one partial chunk: was {len(uncompressed_data)} when the full chunk size is {size_unpacked_chunk}, chunk {read_data}/{len(compression_index)}"
logging.critical(msg)
return print_line(msg, flag=False)
else:
msg = f"Uncompressed chunk size is not the same as in the index: was {len(uncompressed_data)} but should be {uncompressed}."
logging.critical(msg)
return print_line(msg, flag=False)
else:
msg = f"Data types in the headers should be int's. Size Types: unpacked_chunk({type(size_unpacked_chunk)}), packed({type(size_packed)}), unpacked({type(size_unpacked)})"
logging.critical(msg)
return print_line(msg, flag=False)
else:
msg = "The signature and format version is incorrect. Signature was {} should be 2653586369.".format(sigver)
logging.critical(msg)
return print_line(msg,flag=False)
# Write the extracted data to disk
with open(dst, 'wb') as f:
f.write(data)
logging.info("Archive has been extracted.")
def print_line(*text, flag=True):
"""Добавление обводки вокруг текста, покраска"""
color = colorama.Fore.GREEN if flag else colorama.Fore.RED
print(colorama.Fore.YELLOW + "." * 30)
print(color, *text)
print(colorama.Fore.YELLOW + "." * 30 + colorama.Style.RESET_ALL)
def check_int(number=""):
"""Проверка на ввод числа"""
while True:
try:
x = input(number)
if x == "":
return 0
x = int(x)
return x
except ValueError:
print_line("Введите число")
@hlna.command(help='Выбор игры и сбор настроек для сервера(-ов)')
def config():
count_game = check_int("""Выберите игру для конфигурирования
1. ARK Survival Evolved
2. 7 Days to Die
: """)
if count_game == 1:
config_ark()
elif count_game == 2:
config_7daystodie()
else:
print_line("Пока есть только ARK и 7Days xD")
def ports_array():
port_s = []
query_p = []
rcon_p = []
for k in list_config:
data_port = read_yaml(k, game="ARK")
port_s.append(data_port['Port'])
query_p.append(data_port['QueryPort'])
rcon_p.append(data_port['RCONPort'])
return port_s, query_p, rcon_p
def ports(port, ports_arr, flag):
while True:
if not port:
if not ports_arr:
if flag == 0:
port = 7777
elif flag == 1:
port = 27015
elif flag == 2:
port = 27044
print_line("Port=", port)
return port
else:
port = max(ports_arr) + 2
print_line("Port=", port)
if port in ports_arr:
print("Порт уже занят")
else:
return port
def config_cluster():
cluster_id = ""
cluster_dir_override = ""
count_cluster = check_int("""Укажите требуется ли кластер? default: Нет
1. Да
2. Нет
: """)
if count_cluster == 1:
cluster_server = True
while True:
cluster_id = input("Укажите id для кластера, любое сочетание символов: \n")
if cluster_id == '':
print("Введите символы: ")
else:
create_dir(dir_server_ark + cluster_id)
cluster_dir_override = (dir_server_ark + cluster_id)
break
else:
cluster_server = False
return cluster_server, cluster_id, cluster_dir_override
def config_nummap():
count_maps = check_int("Укажите количество карт: \n")
if count_maps == 0: # 0 возвращает check_int когда, ничего не было введено
count_maps = 1
return count_maps
def config_maps(i):
while True:
"""Проверка на выбор карты из списка"""
amount_map = check_int("""Выберите карту из списка указав номер
1. The Island
2. The Center
3. Scorched Earth
4. Ragnarok
5. Aberration
6. Extinction
7. Valguero
8. Genesis: Part 1
9. Crystal Isles
10. Genesis: Part 2
11. Lost Island
12. Fjordur
: """)
if amount_map == 0: # 0 возвращает check_int когда, ничего не было введено
amount_map = i + 1
if 0 < amount_map <= 12:
break
if list_config:
port_s, query_p, rcon_p = ports_array()
else:
port_s = query_p = rcon_p = []
if amount_map == 1:
map_s = "TheIsland"
elif amount_map == 2:
map_s = "TheCenter"
elif amount_map == 3:
map_s = "ScorchedEarth_P"
elif amount_map == 4:
map_s = "Ragnarok"
elif amount_map == 5:
map_s = "Aberration_P"
elif amount_map == 6:
map_s = "Extinction"
elif amount_map == 7:
map_s = "Valguero_P"
elif amount_map == 8:
map_s = "Genesis"
elif amount_map == 9:
map_s = "CrystalIsles"
elif amount_map == 10:
map_s = "Gen2"
elif amount_map == 11:
map_s = "LostIsland"
elif amount_map == 12:
map_s = "Fjordur"
else:
# Если вдруг каким-то боком проверка не отработает и не будет нужной цифры
map_s = 'TheIsland'
return map_s, port_s, query_p, rcon_p
def config_nameserver(map_s):
while True:
name_server = input("Укажите название Сервера: \n")
if name_server == "":
if map_s in list_config:
count = 1
new_name = map_s
while new_name in list_config:
new_name = f"{map_s}{str(count)}"
count += 1
list_config.append(new_name)
print_line(list_config)
break
else:
print_line(list_config)
list_config.append(map_s)
break
else:
if name_server in list_config:
print_line("Имя занято", flag=False)
config_nameserver(map_s)
else:
list_config.append(name_server) # если enter, то ставим последним элементом карту
break
return list_config
def config_ports(port_s):
port = check_int("Укажите порт сервера: ")
port_server = ports(port, port_s, 0)
return port_server
def config_query(query_p):
port = check_int("Укажите query порт сервера: ")
query_port = ports(port, query_p, 1)
return query_port
def config_rcon(rcon_p):
port = check_int("Укажите порт сервера: ")
rcon_port = ports(port, rcon_p, 2)
return rcon_port
def config_password():
password_server = input("Укажите пароль Сервера: \n")
return password_server
def config_adminpassword():
adminpassword_server = input("Укажите пароль администратора сервера: \n")
return adminpassword_server
def config_players():
max_players = check_int("Укажите максимальное количество игроков: \n")
if max_players == 0:
max_players = 70
return max_players
def config_listen():
print("Передавать сервер в глобальный список серверов steam?")
listen_server_amount = check_int("""\n
1. Да
2. Нет
:""")
if listen_server_amount == 1:
listen_server = True
elif listen_server_amount == 2:
listen_server = False
else:
listen_server = True
return listen_server
def config_ark(list_config=list_config):
"""конфигурирование сервера арк"""
create_dir(dir_server_ark)
create_dir(dir_maps_ark)
cluster_server, cluster_id, cluster_dir_override = config_cluster()
if list_config:
print("Уже установленные карты: ")
for i in list_config:
data = read_yaml(i, game="ARK")
print(f"{i} : {data['map']}")
count_maps = config_nummap()
print_line(count_maps)
for i in range(count_maps):
map_s, port_s, query_p, rcon_p = config_maps(i)
print_line(map_s, port_s, query_p, rcon_p)
list_config = config_nameserver(map_s)
port_server = config_ports(port_s)
query_port = config_query(query_p)
rcon_port = config_rcon(rcon_p)
rcon_enabled = True
password_server = config_password()
adminpassword_server = config_adminpassword()
max_players = config_players()
listen_server = config_listen()
print_line(cluster_server, map_s, list_config[-1], port_server, query_port,
rcon_enabled, rcon_port, adminpassword_server, password_server, max_players,
cluster_id, cluster_dir_override, listen_server)
yaml_create("ARK", "", cluster_server, map_s, list_config[-1], port_server, query_port,
rcon_enabled, rcon_port, adminpassword_server, password_server, max_players,
cluster_id, cluster_dir_override, listen_server)
def config_7daystodie():
"""конфигурирование сервера 7 days to die"""
list_simvols = ("$", "@", "-", ".", "%", "{", "}", "+", "/")
create_dir(dir_server_7days)
config_7days = input("Введите имя конфига (serverconfig):\n")
if config_7days == "":
config_7days = "serverconfig"
elif config_7days == "serverconfig":
config_7days = "serverconfig"
elif config_7days in list_simvols:
print_line("Запрещённые символы", flag=False)
else:
xml_parser()
systemd_unit_create("7Days", config_7days)
def xml_parser():
"""добавить документацию"""
print("Я пока не умею парсить xml))", flag=False)
def yaml_create(game, dir_server="", cluster_server="", map_s="", name_server="", port_server="", query_port="",
rcon_enabled="", rcon_port="", adminpassword_server="", password_server="", max_players="",
id_mods_ark="", cluster_id="", cluster_dir_override="", listen_server=""):
"""Создаёт на основании собранных данных yaml конфиг"""
if game == "ARK":
path_yaml = dir_maps_ark + name_server
settings = [
{
'map': map_s,
'Cluster': cluster_server,
'SessionName': name_server,
'Port': port_server,
'QueryPort': query_port,
'RCONEnabled': rcon_enabled,
'RCONPort': rcon_port,
'ServerAdminPassword': adminpassword_server,
'ServerPassword': password_server,
'MaxPlayers': max_players,
'ModsId': id_mods_ark,
'Listen': listen_server,
'ServerPath': dir_server_ark,
'clusterid': cluster_id,
'clusterdir': cluster_dir_override
}
]
elif game == "path_server":
path_yaml = dir_config + "config"
settings = [
{
'path_server': dir_server
}
]
with open(path_yaml, 'w') as yamlfile:
yaml.dump(settings, yamlfile)
print(colorama.Fore.GREEN + f"Конфиг {path_yaml} создан" + colorama.Style.RESET_ALL)
if game != "path_server":
systemd_unit_create(game)
def systemd_unit_create(game, config_7days="", name_server=list_config):
"""Создаёт на основании yaml конфига systemd юнит"""
if game == "ARK":
id_game = "376030"
for i in name_server:
data = read_yaml(i, game="ARK")
ntff = "" if not data['Cluster'] else "-NoTransferFromFiltering"
unit_dir_server = dir_server_ark
systemd_unit_exec = f"{dir_server_exec}ShooterGameServer {data['map']}?listen={data['Listen']}?SessionName={data['SessionName']}?Port={data['Port']}?QueryPort={data['QueryPort']}?RCONEnabled={data['RCONEnabled']}?RCONPort={data['RCONPort']}?ServerAdminPassword={data['ServerAdminPassword']}??MaxPlayers={data['MaxPlayers']} -clusterid={data['clusterid']} -ClusterDirOverride={data['clusterdir']} {ntff}"
unit_file = f"{dir_unit}ark_{data['SessionName']}.service".lower()
elif game == "7Days":
id_game = "294420"
# сюда дописать обращение к xml_parser для получения уникального имени сервера
unit_dir_server = dir_server_7days
systemd_unit_exec = f"{dir_server_7days}startserver.sh -configfile={config_7days}.xml"
unit_file = f"{dir_unit}7days.service".lower()
unit_text = f'''[Unit]
Description={game}: Server
Wants=network-online.target
After=syslog.target network.target nss-lookup.target network-online.target
[Service]
ExecStartPre=/usr/bin/steamcmd +force_install_dir {unit_dir_server} +login anonymous +app_update {id_game} +quit
TimeoutStartSec=99999
ExecStart={systemd_unit_exec}
LimitNOFILE=100000
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s INT $MAINPID
[Install]
WantedBy=default.target
'''
with open(unit_file, 'w') as systemd_unit:
systemd_unit.write(unit_text)
unit_name = unit_file.split("/")[-1]
os.system(f"systemctl --user unmask {unit_name}")
os.system('systemctl --user daemon-reload')
os.system(f"systemctl --user enable {unit_name}")
@hlna.command(help='Скачивание и установка модов <hlna ark -m all -i 111111111>')
@click.argument('g', nargs=1)
@click.option('-m', default='all', help="Название карты для запуска или all для запуска всех карт")
@click.option("-i/-u", default=True, help="-i установить/обновить моды, -u удалить моды")
@click.argument('id_mods_ark', nargs=-1)
def mod(g, m, i, id_mods_ark):
if g == "ark":
if not os.path.isdir(dir_mods_ark):
create_dir(dir_mods_ark)
id_mods_ark = id_mods_ark[0].split(',')
if id_mods_ark[1] != "":
for id_mod in id_mods_ark:
dir_mod_ark = f"{dir_mods_ark}/{id_mod}"
if not os.path.isfile(f"{dir_mod_ark}.mod"):
if i:
print_line(f"Скачиваем мод {id_mod}")
moddownload(g, m, id_mod, dir_mod_ark)
else:
os.system(f"rm -rf {dir_mod_ark}")
print_line(f"{dir_mod_ark} удалён")
os.system(f"rm {dir_mods_ark}/{id_mod}.mod")
print_line(f"{dir_mods_ark}/{id_mod}.mod удалён")
else:
print_line(f"Мод уже установлен")
modupdate(g, m, id_mod, dir_mod_ark)
else:
print_line("Введите id модов через запятую без пробелов")
else:
print_line("Не поддерживаемая игра")
def modupdate(g, m, id_mod, dir_mod_ark):
if g == "ark":
print_line(f"Проверяем обновление мода {id_mod}")
with open(os.path.join(dir_mod_ark, f"appworkshop_346110.acf"), "r") as f:
content = f.readlines()
content = "".join(content)
locale_date = ""
for line in content.splitlines():
if '\t"WorkshopItemsInstalled"' in line:
for line in content.splitlines():
if f'\t\t"timeupdated"' in line:
locale_date = line.split('"')[3]
if '}' in line:
break
break
data = {
'itemcount': 1,
'publishedfileids[0]': id_mod
}
zapros = requests.post('http://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1', data=data)
json_zapros = zapros.json()
steam_date = json_zapros['response']['publishedfiledetails'][0]['time_updated']
if int(steam_date) != int(locale_date):
print_line(f"Обновляем мод {id_mod}")
moddownload(g, m, id_mod, dir_mod_ark)
else:
print_line(f"Мод {id_mod} обновлен")
def modupdateall(g, m):
print_line("Проверяем обновление всех установленных модов")
for file in os.listdir(dir_mods_ark):
if os.path.isfile(os.path.join(dir_mods_ark, file)):
if file.endswith('.mod'):
id_mod = file.split(".")[0]
if id_mod == "111111111":
continue
dir_mod_ark = f"{dir_mods_ark}/{id_mod}"
modupdate(g, m, id_mod, dir_mod_ark)
def moddownload(g, m, id_mod, dir_mod_ark):
"""Распаковывает файлы мода и создаёт .mod файл для него"""
if g == "ark":
id_game_workshop = "346110"
dir_steam_workshop = f"{dir_workshop_ark}/content/{id_game_workshop}/{id_mod}/WindowsNoEditor"
dir_extract = dir_mod_ark
if id_mod == "111111111":
return
if os.path.isfile(f"{dir_workshop_ark}/appworkshop_{id_game_workshop}.acf"):
os.system(f"rm {dir_workshop_ark}/appworkshop_{id_game_workshop}.acf")
os.system(f"steamcmd +force_install_dir {home_dir}/.local/share/Steam/ +login anonymous +workshop_download_item {id_game_workshop} {id_mod} +quit")
try:
for curdir, subdirs, files in os.walk(os.path.join(dir_steam_workshop)):
for file in files:
name, ext = os.path.splitext(file)
if ext == ".z":
src = os.path.join(curdir, file)
dst = os.path.join(curdir, name)
uncompressed = os.path.join(curdir, file + ".uncompressed_size")
unpack(src, dst)
print("[+] Extracted " + file)
os.remove(src)
if os.path.isfile(uncompressed):
os.remove(uncompressed)
except Exception as e:
print(e)
print("[x] Unpacking .z files failed, aborting mod install")
return False
os.system(f"rm -rf {dir_mod_ark}")
os.system(f"mv -f {dir_steam_workshop} {dir_mod_ark}")
modname = subprocess.check_output(
['curl', '-s', f'https://steamcommunity.com/sharedfiles/filedetails/?id={id_mod}']).decode('utf-8')
modname = re.search(r'<div class="workshopItemTitle">(.+)</div>', modname)
modname = modname and modname.group(1)
if os.path.isfile(f"{dir_mod_ark}.mod"):
os.remove(f"{dir_mod_ark}.mod")
with open(f"{dir_extract}/mod.info", "rb") as modinfo:
data = modinfo.read()
mapnamelen = struct.unpack_from("<L", data, 0)[0]
mapname = data[4:mapnamelen + 3]
nummaps = struct.unpack_from("<L", data, mapnamelen + 4)[0]
pos = mapnamelen + 8
modname = (modname.encode() or mapname.decode()) + b'\x00'
modnamelen = len(modname)
modpath = b"../../../" + dir_shooter.encode() + b"/Content/Mods/" + id_mod.encode() + b'\x00'
modpathlen = len(modpath)
with open(f"{dir_mod_ark}.mod", "wb") as mod:
mod.write(struct.pack(f'<LLL{modnamelen}sL{modpathlen}sL', int(id_mod), 0, modnamelen, modname,
modpathlen, modpath, nummaps))
for mapnum in range(nummaps):
mapfilelen = struct.unpack_from("<L", data, pos)[0]
mapfile = data[mapnamelen + 12:mapnamelen + 12 + mapfilelen]
mod.write(struct.pack("<L%ds" % mapfilelen, mapfilelen, mapfile))
pos = pos + 4 + mapfilelen
mod.write(b"\x33\xFF\x22\xFF\x02\x00\x00\x00\x01")
if os.path.isfile(os.path.join(dir_extract, "modmeta.info")):
with open(os.path.join(dir_extract, "modmeta.info"), "rb") as f:
with open(f"{dir_extract}.mod", "ab") as f_out:
f_out.write(f.read())
else:
with open(f"{dir_mods_ark}.mod", "wb") as f_out:
f_out.write(b'\x01\x00\x00\x00\x08\x00\x00\x00ModType\x00\x02\x00\x00\x001\x00')
x = os.system(f"mv {dir_workshop_ark}/appworkshop_{id_game_workshop}.acf {dir_mod_ark}/appworkshop_{id_game_workshop}.acf")
@hlna.command(help='Выключение/включение серверов (без удаления) <hlna switch -m all -d')
@click.argument('g', nargs=1)
@click.option("-m", required=False, help="Название cервера")
@click.option("-e/-d", default=True, help="-e активировать карты, -d деактивировать")
def switch(g, m, e): #добавить all
if g == "ark":
m = m.split(",")
if not os.path.isdir(dir_deactivated):
create_dir(dir_deactivated)
if e:
port_s, query_p, rcon_p = ports_array()
state_config = list_config if e else delist_config
state_msg = "активных" if e else "не активных"
for i in m:
try:
if i in state_config:
print(f"Карта {i} уже есть в {state_msg}")
continue
data = read_yaml(i, not e)
if e:
data['Port'] = ports(data['Port'], port_s, e)
data['QueryPort'] = ports(data['QueryPort'], port_s, e)
data['RCONPort'] = ports(data['RCONPort'], port_s, e)
yaml_create("ARK", data['map'], data['Cluster'], data['SessionName'], data['Port'], data['QueryPort'],
data['RCONEnabled'], data['RCONPort'], data['ServerAdminPassword'], data['ServerPassword'],
data['MaxPlayers'], data['ModsId'], data['Listen'], data['ServerPath'], data['clusterid'],
data['clusterdir'])
x = os.system(
f"rm {dir_deactivated}{i} >> {dir_logs}{date} 2>&1")
with open(f"{dir_logs}{date}.log", "a") as f:
f.write(f"[{t}] Сервер {i} перемещён из {state_msg}\n")# переписать эту залупу
else:
x = os.system(f"mv {dir_maps_ark}{i} {dir_deactivated} >> {dir_logs}{date} 2>&1")
with open(f"{dir_logs}{date}.log", "a") as f:
f.write(f"[{t}] Сервер {i} перемещён из {state_msg}\n")
if x == 0:
print(f"Готов - {i}")
#start = "start" if e else "stop"
enable = "enable" if e else "disable"
os.system(f"systemctl --user {enable} ark_{i}")
else:
print_line(f"Ошибка перемещения {i}", flag=False)
except:
print_line("ошибка операции", flag=False)
@hlna.command(help='Выводит статус настроеных серверов')
def status(list_config=list_config):
if list_config == [] and delist_config == []:
print_line("Сервера не сконфигурированы", flag=False)
else:
for i in list_config:
data = read_yaml(i, game="ARK")
x = os.system(f"lsof -w -i :{data['Port']}")
if x == 0:
print_line("Сервер запущен")
else:
print_line("Сервер не запущен", flag=False)
# этот принт надо отдельной функцией сделать, чтобы убрать дублирование текста
print(f"""
Имя сервера: {i}
Карта: {data['map']}
Моды: {data['ModsId']}
Пароль: {data['ServerPassword']}S
Кластер: {data['Cluster']}
Кластер id: {data['clusterid']}
Query порт: {data['QueryPort']}
Порт сервера: {data['Port']}
Rcon включен: {data['RCONEnabled']}
Rcon порт : {data['RCONPort']}
Максимальное кол-во игроков: {data['MaxPlayers']}""")
print("-" * 40)
if delist_config != []:
x = input("Есть неактивные сервера, показать Y/n: ")
if x != "n":
for i in delist_config:
data = read_yaml(i, False)
print(f"""
Имя сервера: {i}
Карта: {data['map']}
Моды: {data['ModsId']}
Пароль: {data['ServerPassword']}
Кластер: {data['Cluster']}
Кластер id: {data['clusterid']}
Query порт: {data['QueryPort']}
Порт сервера: {data['Port']}
Rcon включен: {data['RCONEnabled']}
Rcon порт : {data['RCONPort']}
Максимальное кол-во игроков: {data['MaxPlayers']}""")
print("-" * 40)
@hlna.command(help='Запуск, сконфигурированного сервера или кластера <hlna start -g ark -m all>')
@click.option('-g', required=True, help="Название игры для запуска. (ark, 7days")
@click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт")
def start(g, m):
"""Запускает сервер выбранной игры"""
# добавить проверку на ввод аргумента ark/7day если else: давать подсказку
# если нет конфигов, то выводим что серверов нет
modupdateall(g, m)
start_stop("start", g, m)
@hlna.command(help='Остановка, сконфигурированного сервера или кластера <hlna stop -g ark -m all>')
@click.option('-g', required=True, help="Название игры для запуска. (ark, 7days")
@click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт")
def stop(g, m):
start_stop("stop", g, m)
@hlna.command(help='Перезапуск, сконфигурированного сервера или кластера <hlna restart -g ark -m all>')
@click.option('-g', required=True, help="Название игры для запуска. (ark, 7days")
@click.option('-m', default='all', help="Название карты для запуска или all для запуска все карт")
def restart(g, m):
modupdateall(g, m)
start_stop("restart", g, m)
def check_exist_servers(g):
"""Проверяет наличие конфигов для активных карт"""
if g == "ark":
if list_config == []:
print_line("Нет сконфигурированных серверов", flag=False) # добавить отсюда вилку на вопрос с конфигурацией
else:
return
elif g == "7days":
print_line("7Days")
def start_stop(action, g, m, list_config=list_config):
"""Функция изменения статусов сервера"""
if g == "ark":
check_exist_servers(g)
dict_mapname = {}
dict_allmapname = []
for i in list_config:
data = read_yaml(i, game="ARK")
dict_mapname[data['SessionName']] = data['map']
dict_allmapname.append(data['SessionName'])
names_serverstart = []
for ns, v in dict_mapname.items():
if v in m:
names_serverstart.append(ns)
if list_config != []: # Перенести выше для проверки наличия конфигов
if m == "all":
names_serverstart = dict_allmapname
print(f"Выполняется для карт(-ы) {names_serverstart}")
else:
names_serverstart = choose_map(names_serverstart)
for i in names_serverstart:
data = read_yaml(i, game="ARK")
if action == "stop" or action == "restart":
rcon_local(i, "SaveWorld")
x = os.system(f"systemctl --user {action} ark_{data['SessionName'].lower()}.service")
if x == 0:
print_line(f"Готово {action} для {g}")
elif g == "7days":
x = os.system(f"systemctl --user {action} 7days.service")
if x == 0:
print_line("Готово")
else:
return
def read_yaml(list_config=list_config, flag=True, game=""):
"""Читает конфиги и отдаёт данные туда где их запросили"""
# Читаем конфиги активных или неактивных карт в зависимости от флага
if game == "ARK":
path_yaml = f"{dir_maps_ark}{list_config}" if flag else f"{dir_deactivated}{list_config}"
elif game == "path_server":
path_yaml = dir_config + "config"
with open(path_yaml, "r") as yamlfile:
data = yaml.load(yamlfile, Loader=yaml.FullLoader)
return data[0] # возвращаем словарь со всеми значениями
def choose_map(arr):
"""Функция выбора карт"""
new_arr = []
arr = sorted(arr)
print('Найдены сервера с этой картой')
for i, map in enumerate(arr):
print(f"{i + 1}) {map}")
while True:
try:
x = input("Выберите сервер из списка, либо несколько через запятую: ").split(',')
x = [int(i) for i in x]
break
except:
print_line("Неправильный ввод",flag=False)
for i in x:
new_arr.append(arr[i - 1])
print('Выбранные сервера:', new_arr)
return new_arr
@hlna.command(help='Отправка команд на игровой сервер через rcon <rcon SaveWorld -m all>')
@click.argument('c', nargs=1)
@click.option('-m', required=True, help="Название карты для применения rcon команды")
def rcon(m, c):
rcon_local(m, c)
def rcon_local(m, c):
try:
dict_mapname = {}
dict_adminpwd = {}
if list_config:
rcon_ports = []
for i in list_config:
data = read_yaml(i, game="ARK")
dict_mapname[data['RCONPort']] = data['map']
dict_adminpwd[data['RCONPort']] = data['ServerAdminPassword']
if m == "all":
for rcon_p in dict_mapname:
rcon_ports.append(rcon_p)
else:
for rcon_p, name_map in dict_mapname.items():
if name_map in m:
rcon_ports.append(rcon_p)
for port in rcon_ports:
passwd = dict_adminpwd[port]
with Client('127.0.0.1', port, passwd=str(passwd)) as client:
response = client.run(c)
print_line(f"Rcon выполнен {response} {dict_mapname[port]}")
else:
pass
except:
print_line(f"Ошибка отправки команды {c} в {m}", flag=False)
def zero(x=""):
"""Потом пригодится (нет)"""
return ""
if not os.path.exists(dir_config + "config"):
dir_server = path_server()
else:
data = read_yaml(game="path_server")
if data['path_server'] == "":
path_server()
else:
dir_server = data['path_server']
dir_unit = f"{home_dir}/.config/systemd/user/"
dir_logs = f"{dir_config}logs/"
dir_server_ark = f"{dir_server}ARK/"
dir_server_exec = f"{dir_server_ark}ShooterGame/Binaries/Linux/"
dir_workshop_ark = f"{home_dir}/.local/share/Steam/steamapps/workshop"
dir_shooter = "ShooterGame"
dir_mods_ark = f"{dir_server_ark}ShooterGame/Content/Mods"
dir_server_7days = f"{dir_server}/7Days/"
now = datetime.datetime.now()
date = now.strftime("%Y-%m-%d")
t = now.strftime("%H:%M:%S")
create_dir(dir_server)
create_dir(dir_unit)
create_dir(dir_logs)
class HlnaApp(QtWidgets.QMainWindow, hlnaui.Ui_mainWindow):
def __init__(self):
super().__init__()
self.setupUi(self)
def hlnag():
if len(sys.argv) > 1:
hlna()
else:
app = QtWidgets.QApplication(sys.argv)
window = HlnaApp()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
hlnag()