This commit is contained in:
Евгений Храмов 2023-05-14 16:13:33 +03:00
parent fdabffb1e2
commit 6e50bde067

124
hlna.py

@ -1,9 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import re import re
import sys
import zlib import zlib
import struct import struct
import shutil import shutil
import logging
import datetime import datetime
import subprocess import subprocess
@ -399,33 +401,22 @@ def modextract(id_mod, id_game_workshop):
# if not os.path.isfile(dstfile) or os.path.getmtime(srcfile) > os.path.getmtime(dstfile): # if not os.path.isfile(dstfile) or os.path.getmtime(srcfile) > os.path.getmtime(dstfile):
# print_line(dstfile) # print_line(dstfile)
# shutil.copy2(srcfile, dstfile) # shutil.copy2(srcfile, dstfile)
for root, dirs, files in os.walk(dir_steam_workshop): try:
for curdir, subdirs, files in os.walk(os.path.join(dir_steam_workshop)):
for file in files: for file in files:
if file.endswith('.z'): name, ext = os.path.splitext(file)
filepath = os.path.join(root, file) if ext == ".z":
extract_path = os.path.join(dir_extract, os.path.splitext(file)[0]) src = os.path.join(curdir, file)
dst = os.path.join(curdir, name)
if not os.path.isfile(extract_path) or os.path.getmtime(filepath) > os.path.getmtime(extract_path): uncompressed = os.path.join(curdir, file + ".uncompressed_size")
print(f"{os.path.getsize(filepath):<10} {file:<20} ", end="") arkit.unpack(src, dst)
with open(filepath, "rb") as zfile: #print("[+] Extracted " + file)
data = zfile.read() os.remove(src)
if data[0:8] != b"\xC1\x83\x2A\x9E\x00\x00\x00\x00": if os.path.isfile(uncompressed):
raise ValueError("Bad file magic") os.remove(uncompressed)
chunk_size, compressed_size, uncompressed_size = struct.unpack("<LLL", data[8:20]) except (arkit.UnpackException, arkit.SignatureUnpackException, arkit.CorruptUnpackException) as e:
chunks = [] print("[x] Unpacking .z files failed, aborting mod install")
compressed_used = 0 return False
while compressed_used < compressed_size:
chunk_header = data[20 + compressed_used:24 + compressed_used]
compressed_chunk_size, uncompressed_chunk_size = struct.unpack("<LL", chunk_header)
chunks.append(data[24 + compressed_used:24 + compressed_used + compressed_chunk_size])
compressed_used += compressed_chunk_size
uncompressed_data = zlib.decompress(b"".join(chunks))
with open(extract_path, "wb") as outfile:
outfile.write(uncompressed_data)
os.utime(extract_path, (os.path.getatime(filepath), os.path.getmtime(filepath)))
print("\r\x1b[K", end="")
modname = subprocess.check_output(['curl', '-s', 'https://steamcommunity.com/sharedfiles/filedetails/?id={}'.format(id_mod)]).decode('utf-8') modname = subprocess.check_output(['curl', '-s', 'https://steamcommunity.com/sharedfiles/filedetails/?id={}'.format(id_mod)]).decode('utf-8')
modname = re.search(r'<div class="workshopItemTitle">(.+)</div>', modname) modname = re.search(r'<div class="workshopItemTitle">(.+)</div>', modname)
@ -474,6 +465,87 @@ def modextract(id_mod, id_game_workshop):
os.makedirs(dir_ark_mods) os.makedirs(dir_ark_mods)
logging.basicConfig(stream=sys.stderr, level=logging.CRITICAL)
class UnpackException(Exception):
pass
class SignatureUnpackException(UnpackException):
pass
class CorruptUnpackException(UnpackException):
pass
def arkit(src, dst):
with open(src, 'rb') as f:
sigver = struct.unpack('q', f.read(8))[0]
unpacked_chunk = f.read(8)
packed = f.read(8)
unpacked = f.read(8)
size_unpacked_chunk = struct.unpack('q', unpacked_chunk)[0]
size_packed = struct.unpack('q', packed)[0]
size_unpacked = struct.unpack('q', unpacked)[0]
#Verify the integrity of the Archive Header
if sigver == 2653586369:
if isinstance(size_unpacked_chunk, int) and isinstance(size_packed , int) and isinstance(size_unpacked , int):
logging.info("Archive is valid.")
logging.debug(f"Archive header size information. Unpacked Chunk: {size_unpacked_chunk}({unpacked_chunk}) Full Packed: {size_packed}({packed}) Full Unpacked: {size_unpacked}({unpacked})")
#Obtain the Archive Compression Index
compression_index = []
size_indexed = 0
while size_indexed < size_unpacked:
raw_compressed = f.read(8)
raw_uncompressed = f.read(8)
compressed = struct.unpack('q', raw_compressed)[0]
uncompressed = struct.unpack('q', raw_uncompressed)[0]
compression_index.append((compressed, uncompressed))
size_indexed += uncompressed
logging.debug(f"{len(compression_index)}: {size_indexed}/{size_unpacked} ({compressed}/{uncompressed}) - {raw_compressed} - {raw_uncompressed}")
if size_unpacked != size_indexed:
msg = f"Header-Index mismatch. Header indicates it should only have {size_unpacked} bytes when uncompressed but the index indicates {size_indexed} bytes."
logging.critical(msg)
raise CorruptUnpackException(msg)
#Read the actual archive data
data = b''
read_data = 0
for compressed, uncompressed in compression_index:
compressed_data = f.read(compressed)
uncompressed_data = zlib.decompress(compressed_data)
#Verify the size of the data is consistent with the archives index
if len(uncompressed_data) == uncompressed:
data += uncompressed_data
read_data += 1
#Verify there is only one partial chunk
if len(uncompressed_data) != size_unpacked_chunk and read_data != len(compression_index):
msg = f"Index contains more than one partial chunk: was {len(uncompressed_data)} when the full chunk size is {size_unpacked_chunk}, chunk {read_data}/{len(compression_index)}"
logging.critical(msg)
raise CorruptUnpackException(msg)
else:
msg = f"Uncompressed chunk size is not the same as in the index: was {len(uncompressed_data)} but should be {uncompressed}."
logging.critical(msg)
raise CorruptUnpackException(msg)
else:
msg = f"Data types in the headers should be int's. Size Types: unpacked_chunk({type(size_unpacked_chunk)}), packed({type(size_packed)}), unpacked({type(size_unpacked)})"
logging.critical(msg)
raise CorruptUnpackException(msg)
else:
msg = "The signature and format version is incorrect. Signature was {} should be 2653586369.".format(sigver)
logging.critical(msg)
raise SignatureUnpackException(msg)
#Write the extracted data to disk
with open(dst, 'wb') as f:
f.write(data)
logging.info("Archive has been extracted.")
@ hlna.command() @ hlna.command()
@ click.option("-m", required=True, help="Название Сервера") @ click.option("-m", required=True, help="Название Сервера")
@ click.option("-e/-d", default=True, help="-e активировать карты, -d деактивировать") @ click.option("-e/-d", default=True, help="-e активировать карты, -d деактивировать")