import os
import logging
import time
from colorama import Fore
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("normalize_base.log", encoding="utf-8")
]
)
def normalize_lines(input_file: str, output_file: str, mode: str, delimiter: str, input_format: str = None,
format_order: str = None, valid_format: str = None, min_len: int = None, max_len: int = None, exclude_chars: str = None):
"""
Нормализация строк файла в зависимости от заданного режима.
:param input_file: Путь к входному файлу
:param output_file: Путь к выходному файлу
:param mode: Режим работы ("format", "filter", "clean", "trim")
:param delimiter: Разделитель строк
:param input_format: Формат входящих строк для режима "format" (например, "log:pass:link")
:param format_order: Новый формат для режима "format" (например, "link:log:pass")
:param valid_format: Формат для фильтрации строк в режиме "filter" (например, "mail:pass")
:param min_len: Минимальная длина строки для фильтрации в режиме "trim"
:param max_len: Максимальная длина строки для фильтрации в режиме "trim"
:param exclude_chars: Набор символов, которые нужно исключить из строки (для режима "clean")
"""
start_time = time.time()
processed_lines = 0
valid_lines = 0
logger.info(f"Начало обработки. Режим: {mode}, Входной файл: {input_file}, Выходной файл: {output_file}")
try:
# Проверка выходной директории
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
for line in (l.strip() for l in infile):
processed_lines += 1
if mode == "format":
# Проверка формата входящих строк и их переформатирование
input_keys = input_format.split(delimiter)
parts = line.split(delimiter)
if len(parts) == len(input_keys):
format_keys = format_order.split(delimiter)
reordered = delimiter.join(parts[input_keys.index(key)] for key in format_keys)
valid_lines += 1
yield reordered
elif mode == "filter":
# Фильтрация строк по формату
parts = line.split(delimiter)
valid_keys = valid_format.split(delimiter)
if len(parts) == len(valid_keys):
valid_lines += 1
yield line
elif mode == "clean":
# Очистка строки от пробелов и исключённых символов
if exclude_chars:
cleaned_line = ''.join(c for c in line if c not in exclude_chars).strip()
else:
cleaned_line = line.strip()
valid_lines += 1
yield cleaned_line
elif mode == "trim":
# Отсечение строк по длине
if min_len <= len(line) <= max_len:
valid_lines += 1
yield line
except Exception as e:
logger.error(f"Ошибка обработки: {e}")
finally:
elapsed_time = time.time() - start_time
logger.info(f"Обработка завершена за {elapsed_time:.2f} секунд.")
logger.info(f"Всего строк обработано: {processed_lines}")
logger.info(f"Строк, соответствующих условиям: {valid_lines}")
def process_lines(input_file, output_file, **kwargs):
try:
# Проверка выходной директории
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
processed_lines = 0
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
for line in normalize_lines(input_file, output_file, **kwargs):
outfile.write(line + '\n')
processed_lines += 1
logger.info(f"Всего строк записано в файл: {processed_lines}")
except Exception as e:
logger.error(f"Ошибка при обработке строк: {e}")
if __name__ == "__main__":
while True:
print(Fore.YELLOW + "\nВыберите режим обработки:")
print(Fore.LIGHTMAGENTA_EX + "1. Форматирование строк (format)")
print(Fore.LIGHTCYAN_EX + "2. Фильтрация строк по формату (filter)")
print(Fore.LIGHTRED_EX + "3. Очистка строк от спецсимволов (clean)")
print(Fore.LIGHTBLUE_EX + "4. Обрезка строк по длине (trim)")
print(Fore.LIGHTGREEN_EX + "0. Выход.")
choice = input("Введите номер режима: ").strip()
if choice == "1":
mode = "format"
input_file = input("Введите путь к входному файлу: ").strip()
output_file = input("Введите путь к выходному файлу: ").strip()
delimiter = input("Введите разделитель: ").strip()
input_format = input("Введите ожидаемый формат входящей строки (например, log:pass:link): ").strip()
format_order = input("Введите новый порядок формата (например, link:log:pass): ").strip()
process_lines(input_file, output_file, mode=mode, delimiter=delimiter, input_format=input_format, format_order=format_order)
elif choice == "2":
mode = "filter"
input_file = input("Введите путь к входному файлу: ").strip()
output_file = input("Введите путь к выходному файлу: ").strip()
delimiter = input("Введите разделитель: ").strip()
valid_format = input("Введите допустимый формат (например, mail:pass): ").strip()
process_lines(input_file, output_file, mode=mode, delimiter=delimiter, valid_format=valid_format)
elif choice == "3":
mode = "clean"
input_file = input("Введите путь к входному файлу: ").strip()
output_file = input("Введите путь к выходному файлу: ").strip()
exclude_chars = input("Введите символы для исключения (например, !@#$%^&*): ").strip()
process_lines(input_file, output_file, mode=mode, delimiter=None, exclude_chars=exclude_chars)
elif choice == "4":
mode = "trim"
input_file = input("Введите путь к входному файлу: ").strip()
output_file = input("Введите путь к выходному файлу: ").strip()
min_len = int(input("Введите минимальную длину строки: ").strip())
max_len = int(input("Введите максимальную длину строки: ").strip())
process_lines(input_file, output_file, mode=mode, min_len=min_len, max_len=max_len)
elif choice == "0":
print("Выход из программы.")
break
else:
print("Неверный выбор. Попробуйте снова.")