What's new
Runion

This is a sample guest message. Register a free account today to become a member! Once signed in, you'll be able to participate on this site by adding your own topics and posts, as well as connect with other members through your own private inbox!

TXT MultiTool или комбайн для работы со строками в текстовых файлах

rand

Light Weight
Депозит
$0
Написал: rand
Эксклюзивно для: Для работы требуется установить:
Bash: Скопировать в буфер обмена
pip install colorlog==6.9.0

Для работы модуля копирования строк из файла:
Bash: Скопировать в буфер обмена
pip install pyperclip==1.9.0

Для работы модуля изменения кодировки файла:
Bash: Скопировать в буфер обмена
pip install chardet==5.2.0

Всем привет, после того как был написан алгоритм по удалению дублей в больших файлах, меня некоторые просили сделать мультитул для работы с текстовиками. В одной ветке мне показали мультитул от разработчика под ником Lays, но судя по Virus Total он протроянен. Поэтому была поставлена задача перед собой повторить его функционал на петухоне. Над продуктом я работаю уже плотно около недели, и получилось уже большую часть функций реализовать. Остальные попробую сделать в течении месяца и буду обновлять этот продукт через редактирование поста (исправление найденных багов, добавление функционала). Продукт тестировался на сгенерированных строках, и может некорректно работать с вашими файлами, по найденым багам и пожеланиям просьба отписаться в ветке.

Структура ПО:
multitool_txt.py
Это основной файл который отрисовывает консольный интерфейс и вызывает основные функции из модулей:

Как выглядит интерфейс:

Код:
Python: Скопировать в буфер обмена
Code:
import os
import time
from colorama import init, Fore, Style

# Инициализация colorama
init(autoreset=True)

# Импорт всех функций
from utils.remove_duplicates import remove_duplicates
from utils.compare_databases import compare_files
from utils.normalize_base import process_lines as normalize
from utils.extract_email import extract_emails
from utils.extract_logins import extract_logins
from utils.extract_passwords import extract_passwords
from utils.sort_by_domains import sort_domains
from utils.remove_domains import remove_domains
from utils.merge_with_passwords import append_passwords
from utils.merge_databases import merge_files_in_directory
from utils.split_database import split_file
from utils.randomize import randomize_file
from utils.copy_to_clipboard import copy_to_clipboard
from utils.remove_non_latin import clean_file
from utils.format_database import convert_file_encoding, POPULAR_ENCODINGS

def main_multitool():
 def display_menu():
  print(Fore.CYAN + Style.BRIGHT + "\nВыберите операцию:")
  print(Fore.GREEN + "1. Удалить дубликаты")
  print(Fore.YELLOW + "2. Сравнить базы данных")
  print(Fore.MAGENTA + "3. Найти и извлечь Email из строк")
  print(Fore.BLUE + "4. Извлечь логины")
  print(Fore.RED + "5. Извлечь пароли")
  print(Fore.CYAN + "6. Сортировать по доменам (Email/URL)")
  print(Fore.GREEN + "7. Удалить домены в строках Email")
  print(Fore.YELLOW + "8. Склеить строки с паролями")
  print(Fore.MAGENTA + "9. Объединить базы")
  print(Fore.BLUE + "10. Разбить базу на части")
  print(Fore.RED + "11. Рандомизировать базу")
  print(Fore.CYAN + "12. Скопировать строки в буфер обмена")
  print(Fore.GREEN + "13. Удалить строки с не латинскими символами")
  print(Fore.YELLOW + "14. Изменить кодировку файла")
  print(Fore.MAGENTA + "15. Нормализовать базу")
  print(Fore.RED + Style.BRIGHT + "0. Выйти\n")

 def get_file():
  """Позволяет выбрать файл через диалоговое окно или ввод пути вручную."""
  print("\nВыберите способ ввода файла:")
  print(Fore.CYAN + "1. Ввести путь к файлу вручную")
  print(Fore.MAGENTA + "2. Выбрать файл через окно (пока в разработке)")

  while True:
   try:
    choice = int(input("Ваш выбор: "))
    if choice == 1:
     file_path = input("Введите полный путь к файлу: ").strip()
     if os.path.isfile(file_path):
      return file_path
     else:
      print(Fore.RED + "Файл не найден. Попробуйте снова.")
    elif choice == 2:
     print(Fore.RED + Style.BRIGHT + "Функция в разработке, ожидайте в новой версии!\n")
     continue
    else:
     print(Fore.YELLOW + "Введите 1 или 2.")
   except ValueError:
    print(Fore.RED + "Введите число.")

 def get_input(prompt):
  try:
   return int(input(prompt))
  except ValueError:
   print(Fore.RED + "Введите число.")
   return None

 # ASCII-заставка
 print(Fore.LIGHTGREEN_EX + """
 # #   ##  #  #  #      ##    ##### # # #####
 ## ##   #  #    #      #    #  # #  # 
 # ## # # # #  ##### ###  ##### #### ####  #    #  # #  # 
 # # # # #  #  #  #  # # # # #  ###### #  #  # 
 # # # # #  #  #  #  # # # # #    #  # #  # 
 # # # ## #  #  #  #  # # # # #    #  # #  # 
 # # ### # ###  ### #####  ### #### #### ###    #  # #  # 
                          
 #             #   ###       # # #### ####    #   
 #             #   #        # # # # # #      
 ##### # #   # ### ##### # ### #####   ####  #### # ###   # # #  #    ###  #####
 # # # #   ##  # # ## # # #   #  # # ##    #  #### ####    #  # 
 # # # #   #  # # # # # #   #  # # #    # #   #  #   #  ####
 # # #####   #  # ## # # # #   #  # # #    # # # # # # ##  #   #
 #####  #   #  ### # # # #####   #  #### #    # # #### #### ##  ##### #####
   ####                                
 """)

 while True:
  display_menu()
  choice = get_input(Fore.CYAN + "Ваш выбор: ")

  if choice is None:
   continue

  if choice == 0:
   print(Fore.RED + "Выход из программы.")
   break

  # Выбор файла
  if not choice == 2 and not choice == 9:
   file_name = get_file()
   print(Fore.GREEN + f"Выбран файл: {file_name}")
  else:
   pass

  # Выполнение операций
  if choice == 1: # Удаление дублей
   remove_duplicates(file_name)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 2: # Сравнение файлов
   print(Fore.RED + "Предупреждение!!! Файлы при сравнении полностью загрузятся в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
   input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
   output_dir = input("Введите путь к директории для сохранения результатов: ").strip()
   compare_files(input_dir, output_dir)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 3: # Извлекаем Email по регулярке
   output_file = input("Введите путь и имя выходного файла с Email (с расширением): ").strip()
   extract_emails(file_name, output_file)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 4: # Извлекаем логины
   output_file = input("Введите путь и имя выходного файла с логинами (с расширением): ").strip()
   delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
   position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())
   extract_logins(file_name, output_file, delimiter, position)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 5: # Извлекаем пароли из строк
   output_file = input("Введите путь и имя выходного файла с паролями (с расширением): ").strip()
   delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
   position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())
   extract_passwords(file_name, output_file, delimiter, position)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 6: # Сортировка по доменам
   output_dir = input("Введите путь к директории для сохранения: ").strip()
   # Меню выбора режима
   while True:
    print(Fore.YELLOW + "\nВыберите режим обработки:")
    print(Fore.LIGHTMAGENTA_EX + "1. Сортировка всех строк в один файл (one_file)")
    print(Fore.LIGHTCYAN_EX + "2. Сортировка по доменам в отдельные файлы (by_domains)")
    print(Fore.LIGHTRED_EX + "3. Экспорт строк по указанным доменам (filter_domains)")
    print(Fore.LIGHTBLUE_EX + "0. Перезапуск меню.")

    choice = input("Введите номер режима: ").strip()

    if choice == "1":
     output_mode = "one_file"
     filter_domains = None
     break
    elif choice == "2":
     output_mode = "by_domains"
     filter_domains = None
     break
    elif choice == "3":
     output_mode = "filter_domains"
     domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
     filter_domains = [domain.strip() for domain in domains_input.split(',')]
     break
    elif choice == "0":
     print("Выход из программы.")
     time.sleep(3)
     main_multitool()
    else:
     print("Неверный выбор. Попробуйте снова.")
   sort_domains(file_name, output_dir, output_mode, filter_domains)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 7: # Удаление доменов из Email адресов
   output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
   remove_domains(file_name, output_file)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 8: # Склеить строки с паролями
   print(Fore.RED + "Предупреждение!!! Данная функция очень требовательна к системным ресурсам.")
   output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
   mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
   delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()

   if not delimiter:
    print(Fore.RED + "ОШИБКА: Разделитель не указан.")
    return

   if mode == 'manual':
    password = input("Введите пароль для добавления: ").strip()
    append_passwords(file_name, output_file, mode, password=password, delimiter=delimiter)

   elif mode == 'from_file':
    passwords_file = input("Введите путь к файлу с паролями: ").strip()
    append_passwords(file_name, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)

   else:
    print(Fore.RED + "ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 9: # Объединить базы
   directory = input("Введите путь к директории с файлами для слияния: ").strip()
   output_file = input("Введите имя выходного файла (с расширением): ").strip()
   merge_files_in_directory(directory, output_file)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 10: # Разделить один файл на несколько частей
   output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
   lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())
   split_file(file_name, output_directory, lines_per_file)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 11: # Рандомизация строк в файле
   print(Fore.RED + "Предупреждение!!! Файл полностью загрузится в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
   output_file = input("Введите путь для сохранения перемешанного файла (с расширением): ").strip()
   buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())
   randomize_file(file_name, output_file, buffer_size)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 12: # Копирование строк в буфер.
   try:
    print(Fore.LIGHTGREEN_EX + "Запуск процесса")
    mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()

    if mode == '1':
     copy_to_clipboard(file_name)
    elif mode == '2':
     start_line = int(input("Введите номер начальной строки: ").strip())
     end_line = int(input("Введите номер конечной строки: ").strip())
     copy_to_clipboard(file_name, start_line, end_line)
    else:
     print("Неверный выбор режима.")
     print(Fore.LIGHTRED_EX + "Выбран неверный режим. Перезапуск меню...")
     time.sleep(3)
     main_multitool()
   except Exception as e:
    print(Fore.RED + f"Ошибка: {e}")
   finally:
    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
    time.sleep(3)
    main_multitool()
  elif choice == 13: # Удалить строки с не латинскими буквами, почистить лог от мусора
   output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
   print("Выберите режим обработки спецсимволов:")
   print(Fore.GREEN + "1 - Разрешить все спецсимволы")
   print(Fore.YELLOW + "2 - Разрешить только специальные символы (@, ., _, -)")
   print(Fore.RED + "3 - Не разрешать спецсимволы")
   special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()
   special_mode = ''
   if special_mode_choice == '1':
    special_mode = 'all'
   elif special_mode_choice == '2':
    special_mode = 'custom'
   elif special_mode_choice == '3':
    special_mode = 'none'
   else:
    print(Fore.RED + "Неверный выбор режима. Завершение операции.")
    return
   clean_file(file_name, output_file, special_mode)
   print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
   time.sleep(3)
   main_multitool()
  elif choice == 14: # Изменение кодировки и сортировка строк файла
   try:
    print(Fore.LIGHTGREEN_EX + "Запуск операции")

    # Ввод параметров пользователем
    output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
    print(Fore.LIGHTBLUE_EX + "Доступные кодировки:")
    for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
     print(Fore.LIGHTGREEN_EX + f"{i}. {encoding}")
    while True:
     try:
      choice = int(input("Выберите номер целевой кодировки: "))
      if 1 <= choice <= len(POPULAR_ENCODINGS):
       target_encoding = POPULAR_ENCODINGS[choice - 1]
       break
      else:
       print("Некорректный выбор. Попробуйте снова.")
     except ValueError:
      print("Введите корректный номер из списка.")

    convert_file_encoding(file_name, output_file, target_encoding)

   except KeyboardInterrupt:
    print(Fore.LIGHTRED_EX + "Операция прервана. Перезапуск меню...")
    time.sleep(3)
    main_multitool()
   except Exception as e:
    print(Fore.LIGHTRED_EX + f"Ошибка в основной функции: {e}, перезапуск меню")
    time.sleep(3)
    main_multitool()
   finally:
    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
    time.sleep(3)
    main_multitool()
  elif choice == 15: # Нормализатор баз
   while True:
    print(Fore.YELLOW + "\nВыберите режим обработки:")
    print(Fore.LIGHTMAGENTA_EX + "1. Форматирование строк (format)")
    print(Fore.LIGHTCYAN_EX + "2. Фильтрация строк по формату (filter)")
    print(Fore.LIGHTRED_EX + "3. Очистка строк от спецсимволов (clean)")
    print(Fore.LIGHTBLUE_EX + "4. Обрезка строк по длине (trim)")
    print(Fore.LIGHTGREEN_EX + "0. Выход.")

    choice = input("Введите номер режима: ").strip()

    if choice == "1":
     mode = "format"
     output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
     delimiter = input("Введите разделитель: ").strip()
     input_format = input("Введите ожидаемый формат входящей строки (например, log:pass:link): ").strip()
     format_order = input("Введите новый порядок формата (например, link:log:pass): ").strip()
     normalize(file_name, output_file, mode=mode, delimiter=delimiter, input_format=input_format, format_order=format_order)
     print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
     time.sleep(3)
     main_multitool()

    elif choice == "2":
     mode = "filter"
     output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
     delimiter = input("Введите разделитель: ").strip()
     valid_format = input("Введите допустимый формат (например, mail:pass): ").strip()
     normalize(file_name, output_file, mode=mode, delimiter=delimiter, valid_format=valid_format)
     print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
     time.sleep(3)
     main_multitool()

    elif choice == "3":
     mode = "clean"
     output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
     exclude_chars = input("Введите символы для исключения (например, !@#$%^&*): ").strip()
     normalize(file_name, output_file, mode=mode, delimiter=None, exclude_chars=exclude_chars)
     print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
     time.sleep(3)
     main_multitool()

    elif choice == "4":
     mode = "trim"
     output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
     min_len = int(input("Введите минимальную длину строки: ").strip())
     max_len = int(input("Введите максимальную длину строки: ").strip())
     normalize(file_name, output_file, mode=mode, min_len=min_len, max_len=max_len, delimiter=None)
     print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
     time.sleep(3)
     main_multitool()

    elif choice == "0":
     print("Выход из программы.")
     break

    else:
     print("Неверный выбор. Попробуйте снова.")
  else:
   print(Fore.RED + "Эта функция пока не реализована. Попробуйте другую опцию.")

if __name__ == "__main__":
 main_multitool()

Реализованные модули:

1. Удаление дубликатов (remove_duplicates.py), это просто копипаст мультипроцессорного клинера и сортировщика из моей предыдущей ветки.
Код:
Python: Скопировать в буфер обмена
Code:
# Импорт необходимых библиотек
import os # Для работы с операционной системой и файловой системой
import heapq # Для эффективного слияния отсортированных последовательностей
import time # Для измерения времени выполнения скрипта
import logging # Для ведения логов
import shutil # Для удаления файлов и директории TEMP
import uuid # Для генерации уникальных идентификаторов
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed # Для параллельного выполнения задач
from colorlog import ColoredFormatter # Для создания цветных логов

# Настройка цветного логирования
formatter = ColoredFormatter(
 "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
 datefmt=None,
 reset=True,
 log_colors={
  'DEBUG': 'cyan',
  'INFO': 'green',
  'WARNING': 'yellow',
  'ERROR': 'red',
  'CRITICAL': 'bold_red',
 }
)

# Создание и настройка обработчика логов
handler = logging.StreamHandler() # Создаем обработчик для вывода логов в консоль
handler.setFormatter(formatter) # Устанавливаем форматтер для обработчика
logger = logging.getLogger() # Получаем объект логгера
logger.addHandler(handler) # Добавляем обработчик к логгеру
logger.setLevel(logging.INFO) # Устанавливаем уровень логирования

# Создание временной директории
temp_dir = os.path.join(os.getcwd(), 'TEMP') # Путь к временной директории в текущей рабочей директории
if not os.path.exists(temp_dir): # Если директория не существует
 os.makedirs(temp_dir) # Создаем её

def create_temp_merged_file(temp_dir):
 """
 Создает временный файл для слияния данных.

 :param temp_dir: Путь к временной директории
 """
 unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4().hex}.tmp") # Генерируем уникальное имя файла
 temp_merged_file = open(unique_filename, 'w', encoding='utf-8') # Открываем файл для записи
 return temp_merged_file, unique_filename # Возвращаем объект файла и его имя

def heavy_computation(n):
 """
 Выполняет тяжелое вычисление (для симуляции нагрузки, процентов на 5 по моим замерам увеличивает скорость обработки лога).

 :param n: Число для вычислений
 :return: Результат вычисления
 """
 logger.debug(f"Запуск тяжелого вычисления с параметром: {n}")
 result = 0
 for i in range(n):
  result += i * i # Выполняем сложение квадратов чисел
 logger.debug(f"Результат тяжелого вычисления: {result}")
 return result

def process_chunk_and_write_multiprocess(chunk, temp_dir):
 """
 Обрабатывает чанк данных и записывает результат во временный файл.

 :param chunk: Список строк для обработки
 :param temp_dir: Путь к временной директории
 :return: Имя созданного временного файла или None в случае ошибки
 """
 try:
  logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
  heavy_computation(10000000) # Симуляция тяжелых вычислений
  unique_items = set(chunk) # Убираем дубликаты на уровне чанка
  sorted_chunk = sorted(unique_items) # Сортируем уникальные элементы
  unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp") # Генерируем уникальное имя файла
  with open(unique_filename, 'w', encoding='utf-8') as temp_file:
   temp_file.write("\n".join(sorted_chunk) + "\n") # Сохраняем только уникальные строки
  return unique_filename
 except Exception as e:
  logger.error(f"Ошибка при обработке чанка: {e}")
  return None

def merge_files_parallel(temp_files, output_file, num_threads=24):
 """
 Выполняет параллельное слияние временных файлов.

 :param temp_files: Список временных файлов для слияния
 :param output_file: Имя выходного файла
 :param num_threads: Количество потоков для использования
 :return: (количество уникальных строк, количество дубликатов, имя выходного файла)
 """
 try:
  logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
  unique_count = 0
  duplicate_count = 0

  file_iters = []
  try:
   for temp_file in temp_files:
    file_iters.append \
     (open(temp_file, 'r', encoding='utf-8', errors='replace')) # Открываем все временные файлы

   merged_iter = heapq.merge(*[iter(f) for f in file_iters]) # Создаем итератор для слияния

   with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
    prev_line = None
    for line in merged_iter:
     line = line.strip()
     if line != prev_line: # Если текущая строка отличается от предыдущей
      outfile.write(line + '\n') # Записываем её в выходной файл
      prev_line = line
      unique_count += 1
     else:
      duplicate_count += 1 # Увеличиваем счетчик дубликатов

  except Exception as e:
   logger.error(f"Ошибка при слиянии файлов: {e}")
   return 0, 0

  finally:
   for f in file_iters:
    f.close() # Закрываем все открытые файлы

  logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
  return unique_count, duplicate_count, output_file

 except Exception as e:
  logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
  return 0, 0

def batch_merge(temp_files, batch_size, temp_dir, num_merge_processes=24):
 """
 Выполняет пакетное слияние временных файлов.

 :param temp_files: Список временных файлов
 :param batch_size: Размер пакета для слияния
 :param temp_dir: Путь к временной директории
 :param num_merge_processes: Количество процессов для слияния
 :return: (список объединенных файлов, общее количество уникальных строк, общее количество дубликатов)
 """
 try:
  logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")
  merged_files = []
  total_unique_count = 0
  total_duplicate_count = 0

  with ProcessPoolExecutor(max_workers=num_merge_processes) as merge_executor:
   futures = []

   for i in range(0, len(temp_files), batch_size):
    batch = temp_files[i:i + batch_size] # Формируем пакет файлов
    logger.info(f"Слияние пакета с файлов {i + 1} по {min(i + batch_size, len(temp_files))}")

    temp_merged_file, unique_filename = create_temp_merged_file(temp_dir)
    temp_merged_file.close()
    futures.append(merge_executor.submit(merge_files_parallel, batch, unique_filename))

   for future in as_completed(futures):
    unique_count, duplicate_count, temp_file = future.result() # Получаем результат слияния
    if unique_count or duplicate_count:
     merged_files.append(temp_file) # Добавляем временный файл в список
    total_unique_count += unique_count
    total_duplicate_count += duplicate_count

   for temp_file in temp_files:
    if os.path.exists(temp_file):
     logger.info(f"Удаление временного файла: {temp_file}")
     os.remove(temp_file) # Удаляем обработанные временные файлы
    else:
     logger.warning(f"Файл не найден для удаления: {temp_file}")

  return merged_files, total_unique_count, total_duplicate_count

 except Exception as e:
  logger.error(f"Ошибка при пакетном слиянии: {e}")
  return temp_files, 0, 0

def final_merge(temp_dir, output_file):
 """
 Выполняет финальное слияние всех оставшихся временных файлов.

 :param temp_dir: Путь к временной директории
 :param output_file: Имя выходного файла
 :return: (количество уникальных строк, количество удаленных дубликатов)
 """
 logger.info(f"Финальная стадия слияния временных файлов из папки {temp_dir}.")
 temp_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f))] # Список временных файлов для финального слияния

 if len(temp_files) > 1: # Если файлов больше одного, запускаем слияние
  unique_count, duplicate_count, output_file = merge_files_parallel(temp_files, output_file) # Параллельно сливаем файлы
 elif len(temp_files) == 1: # Если остался только один файл
  logger.info(f"Остался один файл. Переименование {temp_files[0]} в {output_file}")
  os.rename(temp_files[0], output_file) # Переименовываем файл в выходной
  unique_count, duplicate_count = 0, 0 # Устанавливаем нулевые значения для счетчиков
 else:
  logger.error("Не осталось временных файлов для слияния!")
  return 0, 0 # Возвращаем нули в случае ошибки

 logger.info(f"Финальное слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")

 try:
  shutil.rmtree(temp_dir) # Удаляем временную директорию
  logger.info(f"Временная папка {temp_dir} успешно удалена.")
 except Exception as e:
  logger.error(f"Ошибка при удалении временной папки {temp_dir}: {e}")

 return unique_count, duplicate_count # Возвращаем количество уникальных строк и дубликатов

def read_and_process_chunks_multiprocess(input_file, chunk_size=2000000, num_processes=24):
 """
 Читает входной файл по чанкам и обрабатывает их в многопроцессорном режиме.

 :param input_file: Имя входного файла
 :param chunk_size: Размер чанка (количество строк)
 :param num_processes: Количество процессов для обработки
 :return: (список временных файлов, общее количество прочитанных строк)
 """
 temp_files = [] # Список для временных файлов
 chunk = [] # Буфер для хранения чанка строк
 original_count = 0 # Счетчик общего числа строк

 logger.info(f"Чтение и обработка файла {input_file} в {num_processes} процессах...")

 try:
  with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile: # Открываем входной файл для чтения
   with ProcessPoolExecutor \
     (max_workers=num_processes) as executor: # Создаем процессный пул для обработки чанков
    futures = [] # Список задач для выполнения

    for line in infile: # Читаем файл построчно
     chunk.append(line.strip()) # Добавляем строку в чанк
     original_count += 1 # Увеличиваем счетчик строк
     if len(chunk) >= chunk_size: # Если чанк достиг нужного размера
      futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir)) # Отправляем чанк на обработку в пул процессов
      chunk = [] # Очищаем чанк

    if chunk: # Если остались строки после завершения цикла
     futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir)) # Обрабатываем оставшийся чанк

    for future in as_completed(futures): # Ожидаем завершения всех задач
     temp_file = future.result() # Получаем результат задачи
     if temp_file:
      temp_files.append(temp_file) # Добавляем временный файл в список

  logger.info \
   (f"Чтение и обработка завершены (процессами). Всего строк: {original_count}") # Логируем завершение обработки файла

 except Exception as e:
  logger.error(f"Ошибка при чтении файла (процессы): {e}") # Логируем ошибку в случае сбоя

 return temp_files, original_count # Возвращаем список временных файлов и общее количество строк

def sort_and_uniq_streaming_multiprocess(input_file, output_file, chunk_size=2000000, batch_size=10, num_processes=24, num_merge_processes=24):
 """
 Основная функция для сортировки и удаления дубликатов из большого файла с использованием многопроцессорной обработки.

 :param input_file: Имя входного файла
 :param output_file: Имя выходного файла
 :param chunk_size: Размер чанка для обработки
 :param batch_size: Размер пакета для слияния
 :param num_processes: Количество процессов для обработки чанков
 :param num_merge_processes: Количество процессов для слияния
 :return: (общее количество строк, количество уникальных строк)
 """
 logger.info(f"Старт обработки файла {input_file}...") # Логируем начало процесса

 temp_files, original_count = read_and_process_chunks_multiprocess(input_file, chunk_size, num_processes) # Читаем и обрабатываем файл по чанкам
 logger.info(f"Начинается пакетное слияние временных файлов...") # Логируем начало пакетного слияния

 total_unique_count = 0 # Инициализируем счетчик всех уникальных строк
 total_duplicate_count = 0 # Инициализируем счетчик всех дубликатов

 while len(temp_files) > batch_size: # Пока временных файлов больше, чем размер пакета
  temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes) # Выполняем пакетное слияние
  total_unique_count += unique_count # Обновляем общий счетчик уникальных строк
  total_duplicate_count += duplicate_count # Обновляем общий счетчик дубликатов

 unique_count, duplicate_count = final_merge(temp_dir, output_file) # Выполняем финальное слияние
 total_unique_count += unique_count # Обновляем счетчик уникальных строк
 total_duplicate_count += duplicate_count # Обновляем счетчик дубликатов

 return original_count, unique_count # Возвращаем общее количество строк и количество уникальных строк

# Основная функция модуля
def remove_duplicates(data):
 tic = time.perf_counter() # Начало отсчета времени
 input_file = data
 output_file = "output.txt" # Указываем экспортируемый файл
 original_count, unique_count = sort_and_uniq_streaming_multiprocess(input_file, output_file, num_processes=24, num_merge_processes=24) # Запускаем основную функцию обработки
 tac = time.perf_counter() # Конец отсчета времени
 logging.info(f"Все временные файлы удалены. Уникальных строк: {unique_count}, Дублей удалено (на всех этапах процессов слияния): {original_count - unique_count}") # Логируем результаты
 logging.info(f"Всего обработано строк: {original_count}") # Логируем количество обработанных строк
 logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.2f} секунд") # Логируем время выполнения
 logging.info(f"Файл успешно сохранен в {output_file}")

2. Сравнение текстовых баз (compare_databases.py). Работает следующим образом, указываете директорию где находятся текстовые файлы с базами, происходит их сравнение на дубли, дубли очищаются на выходе, получаете базы без дублей+файл с удаленными дублями (типа антипаблика).
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
from collections import defaultdict

# Настройка логирования
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def read_file_lines(filepath):
 """
 Читает строки из файла построчно и возвращает их как множество.
 :param filepath: Путь к файлу.
 :return: Генератор строк.
 """
 logger.info(f"Начато чтение строк из файла: {filepath}")
 total_lines = 0
 try:
  with open(filepath, 'r', encoding='utf-8') as file:
   for line in file:
    total_lines += 1
    yield line.strip()
  logger.info(f"Успешно прочитано {total_lines} строк из файла: {filepath}")
 except Exception as e:
  logger.error(f"Ошибка при чтении файла {filepath}: {e}")
  raise


def compare_files(input_dir, output_dir):
 """
 Сравнивает строки в текстовых файлах из указанной директории.

 :param input_dir: Директория с текстовыми файлами.
 :param output_dir: Директория для сохранения результатов.
 """
 try:
  start_time = time.time()

  # Проверка входной и выходной директории
  logger.info(f"Проверка существования директории {input_dir}")
  if not os.path.exists(input_dir):
   raise FileNotFoundError(f"Директория {input_dir} не найдена.")

  logger.info(f"Проверка существования директории {output_dir}")
  if not os.path.exists(output_dir):
   logger.info(f"Директория не найдена, создаём: {output_dir}")
   os.makedirs(output_dir)

  # Получаем список текстовых файлов
  logger.info(f"Получение списка файлов в директории: {input_dir}")
  files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
  if len(files) < 2:
   raise ValueError("Для сравнения необходимо минимум два файла.")

  logger.info(f"Найдено файлов для сравнения: {len(files)}. Список файлов: {files}")

  # Хранилище строк
  file_lines = defaultdict(set)
  all_lines = set()

  # Чтение строк из файлов
  for filepath in files:
   logger.info(f"Чтение строк из файла: {filepath}")
   for line in read_file_lines(filepath):
    file_lines[filepath].add(line)
    all_lines.add(line)

  # Определяем общие строки
  logger.info("Вычисление совпадающих строк...")
  common_lines = set.intersection(*(lines for lines in file_lines.values()))
  logger.info(f"Обнаружено {len(common_lines)} совпадающих строк.")

  # Уникальные строки для каждого файла
  for filepath, lines in file_lines.items():
   unique_lines = lines - common_lines
   unique_output_path = os.path.join(output_dir, f"unique_{os.path.basename(filepath)}")
   logger.info(f"Сохранение уникальных строк для {os.path.basename(filepath)}. Всего строк: {len(unique_lines)}.")
   with open(unique_output_path, 'w', encoding='utf-8', errors='replace') as file:
    file.writelines(f"{line}\n" for line in unique_lines)
   logger.info(f"Уникальные строки для {os.path.basename(filepath)} сохранены в {unique_output_path}")

  # Сохраняем совпадающие строки
  common_output_path = os.path.join(output_dir, "common_lines.txt")
  logger.info(f"Сохранение совпадающих строк. Всего строк: {len(common_lines)}.")
  with open(common_output_path, 'w', encoding='utf-8', errors='replace') as file:
   file.writelines(f"{line}\n" for line in common_lines)
  logger.info(f"Совпадающие строки сохранены в {common_output_path}")

  end_time = time.time()
  logger.info(f"Сравнение завершено за {end_time - start_time:.2f} секунд.")

 except Exception as e:
  logger.error(f"Ошибка при сравнении файлов: {e}")
  raise e


def main():
 input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
 output_dir = input("Введите путь к директории для сохранения результатов: ").strip()

 try:
  logger.info("Начало выполнения программы сравнения текстовых файлов.")
  compare_files(input_dir, output_dir)
  logger.info("Сравнение завершено успешно.")
 except Exception as e:
  logger.error(f"Ошибка: {e}")


if __name__ == "__main__":
 main()
3. Найти и извлечь Email из строк (extract_email.py), находит по регулярке в текстовом файле Email адреса и извлекает их.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import re
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("extract_email.log", encoding="utf-8")
 ]
)

# Регулярное выражение для извлечения email
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')

def extract_emails(input_file: str, output_file: str):
 """
 Извлекает email-адреса из строк входного файла и сохраняет их в отдельный файл.

 :param input_file: Путь к входному файлу
 :param output_file: Имя выходного файла для сохранения email-адресов
 """
 start_time = time.time()
 logger.info(f"Начало извлечения email-адресов. Входной файл: {input_file}, Выходной файл: {output_file}")

 try:
  # Проверяем наличие директории для выходного файла
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
   os.makedirs(output_dir)

  total_emails_extracted = 0 # Счётчик извлечённых email-адресов

  # Открываем входной и выходной файлы
  with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
    open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

   # Используем heapq.merge для обработки строк
   for line in heapq.merge(iter(infile)):
    try:
     # Извлекаем все email из строки
     emails = EMAIL_REGEX.findall(line)
     if emails:
      for email in emails:
       outfile.write(email + '\n')
      total_emails_extracted += len(emails)
    except Exception as e:
     logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

  logger.info(f"Извлечение завершено. Всего email-адресов извлечено: {total_emails_extracted}")

 except FileNotFoundError:
  logger.error(f"Файл {input_file} не найден.")
  return
 except Exception as e:
  logger.error(f"Ошибка в процессе извлечения: {e}")
  return

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 try:
  logger.info("Запуск программы")
  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

  extract_emails(input_file, output_file)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()
4. Извлечь логины (extract_logins.py). Работает по принципу, указываете разделитель в вашем логе и столбец с логином (начинается с 0), извлекаете логины.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("extract_logins.log", encoding="utf-8")
 ]
)

def extract_logins(input_file: str, output_file: str, delimiter: str, position: int):
 """
 Извлекает логины из строк файла и сохраняет их в отдельный файл.

 :param input_file: Путь к входному файлу
 :param output_file: Имя выходного файла для сохранения логинов
 :param delimiter: Разделитель, используемый для извлечения логина
 :param position: Позиция логина по разделителю (0 — первая колонка)
 """
 start_time = time.time()
 logger.info(f"Начало извлечения логинов. Входной файл: {input_file}, Выходной файл: {output_file}, "
    f"Разделитель: '{delimiter}', Позиция: {position}")

 try:
  # Проверяем наличие директории для выходного файла
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
   os.makedirs(output_dir)

  total_login_extracted = 0 # Счётчик извлечённых логинов

  # Открываем входной и выходной файлы
  with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
    open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

   # Использую heapq для обработки строк
   for line in heapq.merge(iter(infile)):
    try:
     # Разбиваю строку по разделителю
     parts = line.strip().split(delimiter)
     if len(parts) > position:
      login = parts[position].strip()
      outfile.write(login + '\n')
      total_login_extracted += 1
     else:
      logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
    except Exception as e:
     logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

  logger.info(f"Извлечение завершено. Всего логинов извлечено: {total_login_extracted}")

 except FileNotFoundError:
  logger.error(f"Файл {input_file} не найден.")
  return
 except Exception as e:
  logger.error(f"Ошибка в процессе извлечения: {e}")
  return

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 try:
  logger.info("Запуск программы")
  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с раширением): ").strip()
  output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
  delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
  position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())

  extract_logins(input_file, output_file, delimiter, position)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

5. Извлечь пароли (extract_passwords.py). Все тоже самое что и для логинов, просто дубль функционала.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("extract_password.log", encoding="utf-8")
 ]
)

def extract_passwords(input_file: str, output_file: str, delimiter: str, position: int):
 """
 Извлекает пароли из строк файла и сохраняет их в отдельный файл.

 :param input_file: Путь к входному файлу
 :param output_file: Имя выходного файла для сохранения паролей
 :param delimiter: Разделитель, используемый для извлечения пароля
 :param position: Позиция пароля по разделителю (0 — первая колонка)
 """
 start_time = time.time()
 logger.info(f"Начало извлечения паролей. Входной файл: {input_file}, Выходной файл: {output_file}, "
    f"Разделитель: '{delimiter}', Позиция: {position}")

 try:
  # Проверяем наличие директории для выходного файла
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
   os.makedirs(output_dir)

  total_passwords_extracted = 0 # Счётчик извлечённых паролей

  # Открываем входной и выходной файлы
  with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
    open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

   # Использую heapq для обработки строк
   for line in heapq.merge(iter(infile)):
    try:
     # Разбиваю строку по разделителю
     parts = line.strip().split(delimiter)
     if len(parts) > position:
      password = parts[position].strip()
      outfile.write(password + '\n')
      total_passwords_extracted += 1
     else:
      logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
    except Exception as e:
     logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

  logger.info(f"Извлечение завершено. Всего паролей извлечено: {total_passwords_extracted}")

 except FileNotFoundError:
  logger.error(f"Файл {input_file} не найден.")
  return
 except Exception as e:
  logger.error(f"Ошибка в процессе извлечения: {e}")
  return

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 try:
  logger.info("Запуск программы")
  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с раширением): ").strip()
  output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
  delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
  position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())

  extract_passwords(input_file, output_file, delimiter, position)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

6. Сортировать по доменам (Email/URL) - (sort_by_domains.py), работает в трех режимах (one_file/by_domains/filter_domains). Режим когда просто выполняется сортировка строк по доменам в 1 выходной файл, и когда файлы разбиваются по доменам, а также чтение лога и фильтрация в файл или по файлам по конкретно указанным доменам. Email или URL вычисляется динамически по регулярке.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import re
import heapq
import logging
import time
import tempfile
import shutil

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
 handlers=[logging.StreamHandler(),
    logging.FileHandler("sort_by_domains.log", encoding="utf-8", mode='a')]
)

# Регулярные выражения для извлечения email и URL
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
URL_REGEX = re.compile(r'(https?://)?([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})')


def extract_domain(string: str):
 """
 Извлекает домен из строки (email или URL).

 :param string: Строка, содержащая email или URL
 :return: Домен или None
 """
 # Проверка для email
 email_match = EMAIL_REGEX.search(string)
 if email_match:
  domain = email_match.group(0).split('@')[1]
  logger.debug(f"Извлечен домен электронной почты: {domain}")
  return domain

 # Проверка для URL
 url_match = URL_REGEX.search(string)
 if url_match:
  domain = url_match.group(2)
  # Извлекаем только зону первого уровня (например, '.ru', '.com')
  parts = domain.split('.')
  if len(parts) > 1:
   top_level_domain = parts[-1]
   # Собираем домен из первого уровня
   domain = f".{top_level_domain}"
  logger.debug(f"Извлечен домен URL (по зоне первого уровня): {domain}")
  return domain

 logger.debug(f"Не удалось извлечь домен из строки: {string}")
 return None


def sort_domains(input_file: str, output_dir: str, output_mode: str, filter_domains=None):
 """
 Обрабатывает строки из входного файла и сохраняет домены в файлы.

 :param input_file: Путь к входному файлу
 :param output_dir: Путь к директории для сохранения результатов
 :param output_mode: Режим обработки ('one_file', 'by_domains', 'filter_domains')
 :param filter_domains: Список доменов для фильтрации (только для режима 'filter_domains')
 """
 try:
  # Засекаем время начала обработки
  start_time = time.time()
  logger.info(f"Начало обработки файла: {input_file}")
  logger.info(f"Режим обработки: {output_mode}")
  if filter_domains:
   logger.info(f"Фильтрация по доменам: {', '.join(filter_domains)}")

  # Проверяем наличие директории для выходных файлов
  if not os.path.exists(output_dir):
   logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
   os.makedirs(output_dir)

  if output_mode == 'one_file':
   # Создаем временную папку для хранения промежуточных файлов
   temp_dir = tempfile.mkdtemp(dir=output_dir)
   logger.info(f"Создана временная папка для промежуточных файлов: {temp_dir}")

  # Статистика обработки
  total_lines = 0
  processed_lines = 0
  unique_domains = set()
  domain_line_counts = {}

  # Открываем входной файл
  with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
   # Словарь для хранения доменов по файлам
   domain_files = {}

   # Чтение строк из файла с использованием heapq.merge для эффективности и экономии ОЗУ
   for line in heapq.merge(infile):
    line = line.strip()
    total_lines += 1

    # Извлечение домена из строки
    domain = extract_domain(line)
    if domain:
     processed_lines += 1
     unique_domains.add(domain)

     # Подсчет строк для каждого домена
     domain_line_counts[domain] = domain_line_counts.get(domain, 0) + 1

     if output_mode == "one_file":
      # В режиме 'one_file' записываем все строки в файлы по доменам
      temp_file_path = os.path.join(temp_dir, f"{domain}.tmp")
      if domain not in domain_files:
       domain_files[domain] = open(temp_file_path, 'w', encoding='utf-8')
      domain_files[domain].write(line + '\n')
     elif output_mode == "by_domains":
      # В режиме 'by_domains' создаём файлы для каждого домена
      if domain not in domain_files:
       domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
      domain_files[domain].write(line + '\n')
     elif output_mode == "filter_domains" and filter_domains:
      # В режиме 'filter_domains' сохраняем строки только для указанных доменов
      if domain in filter_domains:
       if domain not in domain_files:
        domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
       domain_files[domain].write(line + '\n')

   # Закрытие временных файлов для доменов
   for file in domain_files.values():
    file.close()

  # В режиме 'one_file' объединяем все временные файлы и сортируем их
  if output_mode == "one_file":
   logger.info("Начинаем слияние и сортировку временных файлов...")
   with open(os.path.join(output_dir, "output.txt"), 'w', encoding='utf-8') as output_file:
    # Создаём генератор для всех временных файлов
    all_temp_files = [open(os.path.join(temp_dir, f"{domain}.tmp"), 'r', encoding='utf-8')
         for domain in domain_files.keys()]

    # Слияние временных файлов с сортировкой по домену в один файл
    merged_lines = heapq.merge(*all_temp_files, key=lambda line: extract_domain(line))
    output_file.writelines(merged_lines)

    # Закрытие всех временных файлов
    for file in all_temp_files:
     file.close()

    logger.info("Слияние и сортировка завершены. Все данные записаны в output.txt.")

   # Удаляем временную папку и все ее содержимое
   shutil.rmtree(temp_dir)
   logger.info(f"Временная папка {temp_dir} была удалена.")

  # Засекаем время окончания обработки
  end_time = time.time()
  elapsed_time = end_time - start_time

  # Логирование статистики обработки
  logger.info(f"Статистика обработки:")
  logger.info(f"- Всего строк в файле: {total_lines}")
  logger.info(f"- Обработано строк с доменами: {processed_lines}")
  logger.info(f"- Уникальных доменов найдено: {len(unique_domains)}")
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")

  # Логирование распределения доменов (топ-5)
  sorted_domains = sorted(domain_line_counts.items(), key=lambda x: x[1], reverse=True)
  logger.info("Топ-5 доменов по количеству строк:")
  for domain, count in sorted_domains[:5]:
   logger.info(f" - {domain}: {count} строк")

 except FileNotFoundError:
  logger.error(f"ОШИБКА: Файл {input_file} не найден.")
 except PermissionError:
  logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или директории {output_dir}.")
 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в процессе обработки: {e}")


def main():
 try:
  logger.info("=" * 50)
  logger.info("СТАРТ СКРИПТА: Сортировка доменов")
  logger.info("=" * 50)

  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_dir = input("Введите путь к директории для сохранения: ").strip()

  # Меню выбора режима
  while True:
   print("\nВыберите режим обработки:")
   print("1. Сортировка всех строк в один файл (one_file)")
   print("2. Сортировка по доменам в отдельные файлы (by_domains)")
   print("3. Экспорт строк по указанным доменам (filter_domains)")
   print("0. Выход")

   choice = input("Введите номер режима: ").strip()

   if choice == "1":
    output_mode = "one_file"
    filter_domains = None
    break
   elif choice == "2":
    output_mode = "by_domains"
    filter_domains = None
    break
   elif choice == "3":
    output_mode = "filter_domains"
    domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
    filter_domains = [domain.strip() for domain in domains_input.split(',')]
    break
   elif choice == "0":
    print("Выход из программы.")
    return
   else:
    print("Неверный выбор. Попробуйте снова.")

  # Обработка строк в зависимости от выбранного режима
  sort_domains(input_file, output_dir, output_mode, filter_domains)

  logger.info("=" * 50)
  logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
  logger.info("=" * 50)

 except KeyboardInterrupt:
  logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")



if __name__ == "__main__":
 main()

7. Удаление доменов из Email адресов. (remove_domains.py) Прописываете путь к файлу c мыльниками, на выходе получаете файл без @domain.com
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("remove_domains.log", encoding="utf-8", mode='a')
 ]
)


def extract_name(email: str) -> str:
 """
 Извлекает имя пользователя из email (часть до @).

 :param email: Строка с email.
 :return: Имя пользователя или пустая строка.
 """
 if "@" in email:
  return email.split("@")[0]
 return email


def line_generator(file_path: str):
 """
 Генератор для построчного чтения файла.

 :param file_path: Путь к входному файлу.
 :yield: Очищенная строка из файла.
 """
 with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
  for line in file:
   yield line.strip()


def remove_domains(input_file: str, output_file: str):
 """
 Удаляет домены из строк электронной почты и записывает результат в выходной файл.

 :param input_file: Путь к входному файлу.
 :param output_file: Путь к выходному файлу.
 """
 try:
  start_time = time.time()
  logger.info(f"Начало обработки файла: {input_file}")

  # Проверяем наличие директории для выходного файла и создаём её при необходимости
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
   os.makedirs(output_dir)

  # Проверяем наличие расширения у выходного файла
  if not os.path.splitext(output_file)[1]:
   logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
   output_file += ".txt"

  # Открыть файл для записи и обработать данные
  with open(output_file, 'w', encoding='utf-8') as outfile:
   processed_lines = (
    extract_name(email) for email in line_generator(input_file)
   )
   for name in heapq.merge(processed_lines):
    outfile.write(name + '\n')

  end_time = time.time()
  logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
  logger.info(f"Результат записан в файл: {output_file}")

 except FileNotFoundError:
  logger.error(f"ОШИБКА: Файл {input_file} не найден.")
 except PermissionError:
  logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или {output_file}.")
 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")


def main():
 try:
  logger.info("=" * 50)
  logger.info("СТАРТ СКРИПТА: Удаление доменов из email")
  logger.info("=" * 50)

  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_file = input("Введите путь к выходному файлу (с расширением): ").strip()

  if not os.path.exists(input_file):
   logger.error(f"Входной файл {input_file} не найден.")
   return

  remove_domains(input_file, output_file)

  logger.info("=" * 50)
  logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
  logger.info("=" * 50)

 except KeyboardInterrupt:
  logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")


if __name__ == "__main__":
 main()
8. Склеить строки с паролями(merge_with_passwords.py) - (допустим надо склеить строки для брута). Работает в двух режимах. ('manual' или 'from_file'). В первом режиме вы задаете 1 пароль на строку, во втором склеиваете 2 списка это логины и пароли (допустим).
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("append_passwords.log", encoding='utf-8', mode='a')
 ]
)


def line_generator(file_path: str):
 """
 Генератор для построчного чтения файла.

 :param file_path: Путь к входному файлу.
 """
 with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
  for line in file:
   yield line.strip()


def password_generator(file_path: str):
 """
 Генератор для построчного чтения файла с паролями.

 :param file_path: Путь к файлу с паролями.
 :yield: Пароль из файла.
 """
 with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
  for line in file:
   yield line.strip()


def append_passwords_with_password_file(input_file: str, output_file: str, passwords_file: str, delimiter: str):
 """
 Режим добавления паролей из файла, добавляя все пароли на каждую строку.

 :param input_file: Путь к входному файлу.
 :param output_file: Путь к выходному файлу.
 :param passwords_file: Путь к файлу с паролями.
 :param delimiter: Разделитель для строки и пароля.
 """
 with open(output_file, 'w', encoding='utf-8') as outfile:
  input_lines = line_generator(input_file)
  password_lines = list(password_generator(passwords_file)) # Сохраняем все пароли в список

  for line in input_lines:
   for password in password_lines:
    outfile.write(f"{line}{delimiter}{password}\n")


def append_passwords_with_manual_password(input_file: str, output_file: str, password: str, delimiter: str):
 """
 Режим добавления указанного пользователем пароля к строкам.

 :param input_file: Путь к входному файлу.
 :param output_file: Путь к выходному файлу.
 :param password: Пароль для добавления.
 :param delimiter: Разделитель для строки и пароля.
 """
 with open(output_file, 'w', encoding='utf-8') as outfile:
  for line in line_generator(input_file):
   outfile.write(f"{line}{delimiter}{password}\n")


def append_passwords(input_file: str, output_file: str, mode: str, password: str = None, passwords_file: str = None, delimiter: str = ':'):
 """
 Добавляет пароли к строкам в зависимости от выбранного режима.

 :param input_file: Путь к входному файлу.
 :param output_file: Путь к выходному файлу.
 :param mode: Режим работы ('manual' или 'from_file').
 :param password: Пароль для режима 'manual'.
 :param passwords_file: Путь к файлу с паролями для режима 'from_file'.
 :param delimiter: Разделитель для строки и пароля.
 """
 try:
  start_time = time.time()
  logger.info(f"Начало обработки файла: {input_file}")
  logger.info(f"Выбранный режим: {mode}")

  # Проверяем наличие директории для выходного файла и создаём её при необходимости
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
   os.makedirs(output_dir)

  # Проверяем наличие расширения у выходного файла
  if not os.path.splitext(output_file)[1]:
   logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
   output_file += ".txt"

  if mode == 'manual':
   if password is None:
    raise ValueError("Для режима 'manual' необходимо указать пароль.")
   append_passwords_with_manual_password(input_file, output_file, password, delimiter)

  elif mode == 'from_file':
   if passwords_file is None or not os.path.exists(passwords_file):
    raise ValueError("Для режима 'from_file' необходимо указать существующий файл с паролями.")
   append_passwords_with_password_file(input_file, output_file, passwords_file, delimiter)

  else:
   raise ValueError("Неверный режим. Используйте 'manual' или 'from_file'.")

  end_time = time.time()
  logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
  logger.info(f"Результат записан в файл: {output_file}")

 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")


def main():
 try:
  logger.info("=" * 50)
  logger.info("СТАРТ СКРИПТА: Добавление пароля к строкам")
  logger.info("=" * 50)

  # Запрос параметров у пользователя
  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
  mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
  delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()

  if not delimiter:
   logger.error("ОШИБКА: Разделитель не указан.")
   return

  if mode == 'manual':
   password = input("Введите пароль для добавления: ").strip()
   append_passwords(input_file, output_file, mode, password=password, delimiter=delimiter)

  elif mode == 'from_file':
   passwords_file = input("Введите путь к файлу с паролями: ").strip()
   append_passwords(input_file, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)

  else:
   logger.error("ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")

  logger.info("=" * 50)
  logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
  logger.info("=" * 50)

 except KeyboardInterrupt:
  logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")


if __name__ == "__main__":
 main()

9. Объединение баз. (merge_databases.py) Выполнение слияния нескольких текстовиков в один файл. Указываете директорию с текстовиками, на выходе получаете один файл.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("merge_databases.log", encoding="utf-8")
 ]
)


def merge_files_in_directory(directory: str, output_file: str):
 """
 Сливает все файлы из указанной директории в один файл.

 :param directory: Путь к директории с файлами для слияния
 :param output_file: Имя выходного файла
 """
 start_time = time.time()
 logger.info(f"Начало слияния файлов. Директория: {directory}, Выходной файл: {output_file}")

 try:
  # Получаем список всех файлов в директории
  temp_files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]

  if not temp_files:
   logger.error(f"В указанной директории {directory} не найдено файлов для слияния.")
   return

  logger.info(f"Обнаружено {len(temp_files)} файлов для обработки.")

  file_iters = []
  total_lines_written = 0 # Счётчик строк

  try:
   # Открываем все файлы для чтения
   for temp_file in temp_files:
    logger.info(f"Открытие файла: {temp_file}")
    file_iters.append(open(temp_file, 'r', encoding='utf-8', errors='replace'))

   # Открываем выходной файл для записи
   with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
    logger.info(f"Запись в файл: {output_file}")

    # Записываем строки из всех файлов, используя merge
    for line in heapq.merge(*[iter(f) for f in file_iters]):
     outfile.write(line)
     total_lines_written += 1

   logger.info(f"Слияние завершено. Всего строк записано: {total_lines_written}")

  except Exception as e:
   logger.error(f"Ошибка при обработке файлов: {e}")
   return

  finally:
   # Закрываем все открытые файлы
   for f in file_iters:
    logger.info(f"Закрытие файла: {f.name}")
    f.close()

 except Exception as e:
  logger.error(f"Ошибка в процессе слияния: {e}")
  return

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 """
 Основная функция для выбора директории и вызова слияния.
 """
 try:
  logger.info("Запуск программы")
  # Запрос директории и имени выходного файла у пользователя
  directory = input("Введите путь к директории с файлами для слияния: ").strip()
  output_file = input("Введите имя выходного файла (с расширением): ").strip()

  if not os.path.isdir(directory):
   logger.error(f"Указанный путь {directory} не является директорией.")
   return

  merge_files_in_directory(directory, output_file)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

10. Разделить один файл на несколько частей (split_database.py). Ну тут уже идет реверс, отдаете один файл, указываете по какому количеству строк его разбить, получаете несколько сплит файлов.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("split_database.log", encoding="utf-8")
 ]
)


def split_file(input_file: str, output_directory: str, lines_per_file: int):
 """
 Разделяет файл на несколько частей с минимальным использованием памяти.

 :param input_file: Путь к входному файлу
 :param output_directory: Директория для сохранения разделённых файлов
 :param lines_per_file: Количество строк в каждом выходном файле
 """
 start_time = time.time()
 logger.info(f"Начало разделения файла. Входной файл: {input_file}, Директория: {output_directory}, Строк на файл: {lines_per_file}")

 try:
  # Проверка входного файла
  if not os.path.isfile(input_file):
   logger.error(f"Файл {input_file} не найден или не является файлом.")
   return

  # Проверка выходной директории
  if not os.path.exists(output_directory):
   logger.info(f"Выходная директория {output_directory} не найдена. Создаём её.")
   os.makedirs(output_directory)

  # Итеративное чтение и запись
  def file_iterator(file_path):
   with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
    for line in file:
     yield line

  # Итератор для входного файла
  file_iter = file_iterator(input_file)
  file_index = 1
  current_file = None
  lines_written = 0

  for line in file_iter:
   if lines_written % lines_per_file == 0: # Новый файл
    if current_file:
     current_file.close()
     logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")
    output_file = os.path.join(output_directory, f"split_part_{file_index}.txt")
    current_file = open(output_file, 'w', encoding='utf-8', errors='replace')
    logger.info(f"Создан файл: {output_file}")
    file_index += 1
    lines_written = 0

   current_file.write(line)
   lines_written += 1

  if current_file:
   current_file.close()
   logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")

 except Exception as e:
  logger.error(f"Ошибка в процессе разделения: {e}")
  return

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 """
 Основная функция для выбора входного файла, выходной директории и параметров разделения.
 """
 try:
  logger.info("Запуск программы")

  # Запрос данных у пользователя
  input_file = input("Введите путь к файлу для разделения: ").strip()
  output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
  lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())

  split_file(input_file, output_directory, lines_per_file)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

11. Рандомизация строк в файле (randomize.py). Тут выполняется перемешивание строк в файле который отдаете рандомным образом, на выходе получаете рандомизированные позиции строк в файле.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import random
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("randomize.log", encoding="utf-8")
 ]
)


def randomize_file(input_file: str, output_file: str, buffer_size: int):
 """
 Перемешивает строки в файле случайным образом.

 :param input_file: Путь к входному файлу
 :param output_file: Путь к выходному файлу
 :param buffer_size: Количество строк, загружаемых в память за раз
 """
 start_time = time.time()
 logger.info(f"Начало рандомизации файла. Входной файл: {input_file}, Выходной файл: {output_file}, Размер буфера: {buffer_size}")

 try:
  # Проверка входного файла
  if not os.path.isfile(input_file):
   logger.error(f"Файл {input_file} не найден или не является файлом.")
   return

  # Проверка выхода
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
   os.makedirs(output_dir)

  # Чтение строк по частям
  all_lines = []
  with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
   logger.info(f"Чтение строк из {input_file} с использованием буфера")
   buffer = []
   for line in infile:
    buffer.append(line)
    if len(buffer) >= buffer_size:
     logger.debug(f"Загружен буфер из {len(buffer)} строк")
     all_lines.extend(buffer)
     buffer = []
   if buffer:
    all_lines.extend(buffer)

  logger.info(f"Файл загружен в память. Всего строк: {len(all_lines)}")

  # Перемешивание строк
  random.shuffle(all_lines)
  logger.info(f"Строки успешно перемешаны")

  # Запись результата
  with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
   logger.info(f"Запись перемешанных строк в {output_file}")
   outfile.writelines(all_lines)

  logger.info(f"Успешное завершение. Перемешанный файл сохранён в {output_file}")

 except Exception as e:
  logger.error(f"Ошибка при обработке: {e}")

 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
 """
 Основная функция для выбора входного и выходного файлов, а также настроек рандомизации.
 """
 try:
  logger.info("Запуск программы")

  # Запрос данных у пользователя
  input_file = input("Введите путь к файлу для рандомизации: ").strip()
  output_file = input("Введите путь для сохранения перемешанного файла: ").strip()
  buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())

  randomize_file(input_file, output_file, buffer_size)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except ValueError:
  logger.error("Ошибка: размер буфера должен быть числом.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

12. Копирование строк из текстового файла, работает с использованием библиотеки pyperclip, так что не забудьте сделать pip install pyperclip==1.9.0 , работает в двух режимах: 1. Копирование строк из файла полностью; 2. Копирование строк по диапазону из файла. Предупреждение: следите за объемом ОЗУ при работе этого модуля.
Код:

Python: Скопировать в буфер обмена
Code:
import logging
import pyperclip # Для работы с буфером обмена

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("copy_to_clipboard.log", encoding="utf-8")
 ]
)


def read_file_lines(file_path, start_line=None, end_line=None):
 """
 Генератор для чтения строк из файла с поддержкой диапазона.

 :param file_path: Путь к файлу
 :param start_line: Номер начальной строки (1-based, включительно)
 :param end_line: Номер конечной строки (1-based, включительно)
 """
 with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
  for current_line_num, line in enumerate(f, start=1):
   if start_line and current_line_num < start_line:
    continue
   if end_line and current_line_num > end_line:
    break
   yield line.strip()


def copy_to_clipboard(file_path, start_line=None, end_line=None):
 """
 Копирует содержимое файла или его части в буфер обмена.

 :param file_path: Путь к файлу
 :param start_line: Номер начальной строки (1-based, включительно)
 :param end_line: Номер конечной строки (1-based, включительно)
 """
 try:
  logger.info(f"Начало копирования. Файл: {file_path}, Диапазон: {start_line}-{end_line}")
  lines = list(read_file_lines(file_path, start_line, end_line))

  if not lines:
   logger.warning("Выбранный диапазон пуст.")
   return

  # Отображаем первую и последнюю строки диапазона
  print(f"Первая строка: {lines[0]}")
  print(f"Последняя строка: {lines[-1]}")
  confirm = input("Скопировать в буфер обмена? (y/n): ").strip().lower()

  if confirm != 'y':
   print("Операция отменена пользователем.")
   return

  # Копируем в буфер обмена
  pyperclip.copy('\n'.join(lines))
  print("Данные скопированы в буфер обмена.")
  logger.info("Копирование завершено успешно.")

 except FileNotFoundError:
  logger.error(f"Файл {file_path} не найден.")
 except Exception as e:
  logger.error(f"Ошибка в процессе копирования: {e}")


def main():
 try:
  logger.info("Запуск программы")
  file_path = input("Введите путь к файлу: ").strip()
  mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()

  if mode == '1':
   copy_to_clipboard(file_path)
  elif mode == '2':
   start_line = int(input("Введите номер начальной строки: ").strip())
   end_line = int(input("Введите номер конечной строки: ").strip())
   copy_to_clipboard(file_path, start_line, end_line)
  else:
   print("Неверный выбор режима.")
   logger.warning("Выбран неверный режим.")

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

13. Чистка текстового файла от мусора. Работает в 3х режимах по выбору (1. Разрешить все спецсимволы, 2. Разрешить только специальные символы (@, ., _, -), 3. Не разрешать спецсимволы)
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
import string

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("remove_non_latin.log", encoding="utf-8")
 ]
)

def is_latin_line(line: str, special_mode: str) -> bool:
 """
 Проверяет, состоит ли строка из латинских символов и спецсимволов в зависимости от режима.

 :param line: Входная строка
 :param special_mode: Режим обработки спецсимволов:
       - 'all' - разрешить все спецсимволы.
       - 'custom' - разрешить только определенные спецсимволы.
       - 'none' - не разрешать спецсимволы.
 :return: True, если строка соответствует требованиям.
 """
 # Основной набор символов: латинские буквы, цифры и пробелы
 allowed_chars = string.ascii_letters + string.digits + ' '

 if special_mode == 'all':
  # Разрешаем все печатаемые символы
  allowed_chars += string.punctuation
 elif special_mode == 'custom':
  # Разрешаем только указанные символы
  allowed_chars += "@._-"
 # special_mode == 'none': никаких дополнительных символов

 return all(char in allowed_chars for char in line)

def line_generator(file_path: str):
 """
 Генератор для построчного чтения файла.

 :param file_path: Путь к файлу
 """
 with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
  for line in file:
   yield line.strip()

def clean_file(input_file: str, output_file: str, special_mode: str):
 """
 Очищает файл, удаляя строки с нелатинскими символами или нежелательными символами.

 :param input_file: Путь к входному файлу
 :param output_file: Имя выходного файла
 :param special_mode: Режим обработки спецсимволов ('all', 'custom', 'none')
 """
 start_time = time.time()
 logger.info(f"Начало очистки файла. Входной файл: {input_file}, Выходной файл: {output_file}, "
    f"Режим спецсимволов: {special_mode}")

 try:
  # Проверяем наличие директории для выходного файла
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
   os.makedirs(output_dir)

  total_lines = 0 # Общее количество строк
  kept_lines = 0 # Количество оставленных строк
  removed_lines = 0 # Количество удаленных строк

  # Открываем файл для записи
  with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
   for line in line_generator(input_file):
    total_lines += 1
    if is_latin_line(line, special_mode):
     outfile.write(line + '\n')
     kept_lines += 1
    else:
     removed_lines += 1

  logger.info(f"Очистка завершена. Обработано строк: {total_lines}, Сохранено строк: {kept_lines}, "
     f"Удалено строк: {removed_lines}")

 except FileNotFoundError:
  logger.error(f"Файл {input_file} не найден.")
  return
 except Exception as e:
  logger.error(f"Ошибка в процессе очистки: {e}")
  return
 finally:
  # Рассчитываем время выполнения
  end_time = time.time()
  elapsed_time = end_time - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")

def main():
 try:
  logger.info("Запуск программы")
  # Запрос параметров у юзера
  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

  print("Выберите режим обработки спецсимволов:")
  print("1 - Разрешить все спецсимволы")
  print("2 - Разрешить только специальные символы (@, ., _, -)")
  print("3 - Не разрешать спецсимволы")
  special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()

  special_mode = ''
  if special_mode_choice == '1':
   special_mode = 'all'
  elif special_mode_choice == '2':
   special_mode = 'custom'
  elif special_mode_choice == '3':
   special_mode = 'none'
  else:
   logger.error("Неверный выбор режима. Завершение программы.")
   return

  clean_file(input_file, output_file, special_mode)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")

if __name__ == "__main__":
 main()

14. Изменение кодировки файла. Дисклеймер: Модуль может работать кривовато, на моих тестах вроде нормально преобразует, на ваших боевых файлах как работать будет не знаю. Юзать на свой страх и риск. По найденным ошибкам просьба отписаться в ПМ или в ветке. Поддерживает преобразование в следующие кодировки (исходная кодировка в теории у файла может быть любой):
  1. UTF-8
  2. UTF-16
  3. UTF-16LE
  4. UTF-16BE
  5. ASCII
  6. Windows-1251
  7. ISO-8859-1
  8. KOI8-R
  9. GB18030
  10. Big5
Код:

Python: Скопировать в буфер обмена
Code:
import os
import logging
import chardet
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s - %(levelname)s - %(message)s",
 handlers=[
  logging.StreamHandler(),
  logging.FileHandler("format_database.log", encoding="utf-8")
 ]
)

POPULAR_ENCODINGS = [
 "UTF-8", "UTF-16", "UTF-16LE", "UTF-16BE", "ASCII",
 "Windows-1251", "ISO-8859-1", "KOI8-R", "GB18030", "Big5"
]


def detect_file_encoding(file_path, sample_size=262144):
 """
 Определяет кодировку текстового файла.

 :param file_path: Путь к файлу
 :param sample_size: Количество байт для анализа
 :return: Предполагаемая кодировка файла
 """
 try:
  with open(file_path, 'rb') as file:
   raw_data = file.read(sample_size)
  result = chardet.detect(raw_data)
  detected_encoding = result.get('encoding', None)
  confidence = result.get('confidence', 0)

  logger.warning(f"Результат анализа кодировки: {result}")

  if detected_encoding is None or detected_encoding.lower() == 'ascii':
   logger.warning(f"Кодировка файла '{file_path}' не определена или определена как ASCII.")
   print("Укажите предполагаемую кодировку (например, UTF-8, Windows-1251): ")
   detected_encoding = input("Введите кодировку: ").strip()

  elif confidence < 0.8:
   logger.warning(f"Кодировка '{detected_encoding}' определена с низкой уверенностью ({confidence:.2f}).")
   print("Обнаружена кодировка с низкой уверенностью. Хотите указать свою? (y/n): ")
   if input().strip().lower() == 'y':
    detected_encoding = input("Введите кодировку: ").strip()

  logger.info(f"Обнаружена кодировка файла '{file_path}': {detected_encoding}")
  return detected_encoding

 except Exception as e:
  logger.error(f"Ошибка определения кодировки файла {file_path}: {e}")
  return 'utf-8'


def add_bom_if_needed(file_path, encoding):
 """
 Добавляет BOM (маркер порядка байт) в файл, если кодировка поддерживает его.

 :param file_path: Путь к файлу
 :param encoding: Кодировка файла
 """
 bom_map = {
  "utf-8": b'\xef\xbb\xbf',
  "utf-16": b'\xff\xfe', # UTF-16 LE по умолчанию
  "utf-16le": b'\xff\xfe',
  "utf-16be": b'\xfe\xff'
 }

 if encoding.lower() in bom_map:
  logger.info(f"Добавление BOM для кодировки {encoding}")
  bom = bom_map[encoding.lower()]
  with open(file_path, 'rb') as f:
   content = f.read()

  with open(file_path, 'wb') as f:
   f.write(bom + content)
  logger.info(f"BOM успешно добавлен для файла: {file_path}")


def convert_file_encoding(input_file, output_file, target_encoding):
 """
 Преобразует кодировку текстового файла построчно и добавляет BOM, если это необходимо.

 :param input_file: Путь к входному файлу
 :param output_file: Путь к выходному файлу
 :param target_encoding: Целевая кодировка
 """
 start_time = time.time()
 logger.info(f"Начало преобразования кодировки. Входной файл: {input_file}, Целевая кодировка: {target_encoding}")

 try:
  source_encoding = detect_file_encoding(input_file)
  output_dir = os.path.dirname(output_file)
  if output_dir and not os.path.exists(output_dir):
   os.makedirs(output_dir)

  with open(input_file, 'r', encoding=source_encoding, errors='replace') as infile, \
    open(output_file, 'w', encoding=target_encoding, errors='replace') as outfile:
   for line in infile:
    outfile.write(line.rstrip('\r\n') + '\n')

  # Добавление BOM, если требуется
  add_bom_if_needed(output_file, target_encoding)

  logger.info(f"Преобразование завершено. Файл сохранён: {output_file}")

  # Проверка записи
  verify_result = verify_encoding(output_file)
  logger.info(f"Кодировка выходного файла определена как: {verify_result.get('encoding')}")

 except FileNotFoundError:
  logger.error(f"Файл {input_file} не найден.")
 except Exception as e:
  logger.error(f"Ошибка в процессе преобразования: {e}")
 finally:
  elapsed_time = time.time() - start_time
  logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def verify_encoding(file_path):
 """
 Проверяет кодировку файла.

 :param file_path: Путь к файлу
 :return: Результаты анализа кодировки
 """
 with open(file_path, 'rb') as f:
  raw_data = f.read()
 result = chardet.detect(raw_data)
 return result


def main():
 """
 Основная функция для выполнения программы.
 """
 try:
  logger.info("Запуск программы")

  input_file = input("Введите путь к входному файлу (с расширением): ").strip()
  output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

  print("Доступные кодировки:")
  for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
   print(f"{i}. {encoding}")

  while True:
   try:
    choice = int(input("Выберите номер целевой кодировки: "))
    if 1 <= choice <= len(POPULAR_ENCODINGS):
     target_encoding = POPULAR_ENCODINGS[choice - 1]
     break
    else:
     print("Некорректный выбор. Попробуйте снова.")
   except ValueError:
    print("Введите корректный номер из списка.")

  convert_file_encoding(input_file, output_file, target_encoding)

 except KeyboardInterrupt:
  logger.warning("Операция прервана пользователем.")
 except Exception as e:
  logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
 main()

15. Нормализатор строк в файлах. Работает в 4-х режимах:
  1. Форматирование строк (format) - выполняет форматирование строк в экспортируемый файл, указываете разделитель строки входящего файла и их формат к примеру "ip:mail:pass", указываете нужный формат "mail:pass:ip" на выходе волучаете отформатированные строки.
  2. Фильтрация строк по формату (filter) - указываете разделитель, и формат тех строк которые нужно вытащить из файла, например только mail:pass, на выходе получаете отфильтрованные строки и зфайла.
  3. Очистка строк от спецсимволов (clean) - указываете спецсимволы которые вы хотите почистить из строк, на выходе получаете файл со строками без спецсимволов указанных алгоритму.
  4. Обрезка строк по длине (trim) - указываете диапазон по какой длине отсеивать строки в экспортируемый файл, получаете файл со строками по указанному количеству символов диапазона.
Код: Не влез в размер этого сообщения.

P.S. Модули можно использовать как по отдельности утилитами, так и через основной файл из консольного меню.
P.S.S. Первую версию на публику прикрепил в атаче. По мере свободного времени буду обновлять и исправлять найденные ошибки.



Вложения​


  • 36 КБ · Просмотры: 2
 
Rand ты красавчик!!! Спасибо что радуешь нас своими софтами для работы со строками, а то какой-то софт есть, но току от него не много. А твой как всегда красота.
 
Undergrowth сказал(а):
Посмотреть вложение 99119
цены не будет, если сделаешь такое контекстное меню
Нажмите, чтобы раскрыть...
Да, сделаю. Тогда придется держать задеплоенный интерпретатор в целевой операционной системе, а это не совсем безопасно. Я бы хотел сделать GUI в будущем, но мне нужна мультиплатформенность, например не все вызывают графический интерфейс в Linux, а работают только из консоли. Сейчас же задача все оттестировать на 100+ раз, поправить возможные ошибки, внести модификации по желанию комьюнити, добить по плану все модули. А потом уже можно выкатить отдельно GUI версию на том же PyQT, когда все будет работать правильно. Это пока черновой вариант.
 
rand сказал(а):
Да, сделаю. Тогда придется держать задеплоенный интерпретатор в целевой операционной системе, а это не совсем безопасно. Я бы хотел сделать GUI в будущем, но мне нужна мультиплатформенность, например не все вызывают графический интерфейс в Linux, а работают только из консоли. Сейчас же задача все оттестировать на 100+ раз, поправить возможные ошибки, добить по плану все модули. А потом уже можно выкатить отдельно GUI версию на том же PyQT, когда все будет работать правильно. Это пока черновой вариант.
Нажмите, чтобы раскрыть...
смотри в сторону flet
 
спасибо за софт в копилку, буду тестить) 👍
для меня он будет очень полезен, так как я работаю с базами
 
rand сказал(а):
Да, сделаю. Тогда придется держать задеплоенный интерпретатор в целевой операционной системе, а это не совсем безопасно. Я бы хотел сделать GUI в будущем, но мне нужна мультиплатформенность, например не все вызывают графический интерфейс в Linux, а работают только из консоли. Сейчас же задача все оттестировать на 100+ раз, поправить возможные ошибки, внести модификации по желанию комьюнити, добить по плану все модули. А потом уже можно выкатить отдельно GUI версию на том же PyQT, когда все будет работать правильно. Это пока черновой вариант.
Нажмите, чтобы раскрыть...
Set прикрути от дублей на выходе там где необходимо, а то забавная ситуация получается, после прогона парсом мыл, прогоняешь на дубли через тот же text util pack и ох.еваешь удивляешься от их кол-ва.
 
Stagex сказал(а):
Set прикрути от дублей на выходе там где необходимо, а то забавная ситуация получается, после прогона парсом мыл, прогоняешь на дубли через тот же text util pack и ох.еваешь удивляешься от их кол-ва.
Нажмите, чтобы раскрыть...
Не нужен set. Set грузит весь массив в память для успешного выполнения, а если допустим файл будет 30 гигабайт, а ОЗУ в системе 16? Я стараюсь там где можно в функциях оптимизировать для работы с большими файлами. Какие-то функции не получится без полной загрузки в память реализовать на генераторах и итераторах или с применением кучи (а также комплексно), в операциях где требуется полный объем памяти на обрабатываемые входящие файлы, выкатывается предупреждение. Так что если нужно почистить от дублей применяйте для этого специальную функцию очистки дублей для работы с большими файлами.

Предупреждение вот такого плана:

P.S. Одна из целей повторить весь функционал в операциях (мораль цели: не делать очередной говнософт который задыхается на файлах от 10 гигабайт и больше, пропускает строки и уходит в аут. Так если это не попытаться реализовать, тогда смысл от всего этого?) как это было сделано в алгоритме очистки дублей, тогда я очень сильно постарался чтобы это реализовать на петухоне и на основе этого алгоритма в основном и были сделаны дополнительные модули.
P.S.S. Предсказываю снова критику в мою сторону почему я не сделал динамическое определение кодировки перед началом обработки входящего файла? Ну пока в моих тестах все правильно отрабатывает на кодировках ANSI и UTF8. Если по остальным будут ошибки, просьба отписаться. Постараюсь что-нибудь придумать. И желательно прикреплять свой текстовик как пруф и описание операции (модуля) которая вызвала ошибку со скриншотом.
P.S.S.S. Конечно можно добавить в модули дополнительный режим использовать ли петухоновский set() при обработке файла, но это не основная задача пока-что. Так что учту на заметку.
 
Undergrowth сказал(а):
Скрытое содержимое
Нажмите, чтобы раскрыть...
Если без шлака, покажи мне плиз тоже, потому-что я даже не тестил функционал того софта. =) И не понимаю как работает тот же нормализатор и что он должен делать. =)
 
rand сказал(а):
Если без шлака, покажи мне плиз тоже, потому-что я даже не тестил функционал того софта. =) И не понимаю как работает тот же нормализатор и что он должен делать. =)
Нажмите, чтобы раскрыть...
gofile.io

TextUtils.exe.zip

47.5 KiB
gofile.io
 
rand сказал(а):
Да не, это обычный паблик. Я конечно запущу его на отдельно созданной виртуалке, непонятно что там понапихано=)

Посмотреть вложение 99161
Нажмите, чтобы раскрыть...
этому софту лет 10 мб больше, так к слову, но конечно лучше на виртуалке.
Береженного Бог бережет
 
rand сказал(а):
Да не, это обычный паблик. Я конечно запущу его на отдельно созданной виртуалке, непонятно что там понапихано=)

Посмотреть вложение 99161
Нажмите, чтобы раскрыть...
Я пользуюсь версией от vanko, код вроде чистый 👇

Вложения​


  • Cleaner x64(Origin&Deobf_versions).zip
    95 КБ · Просмотры: 11
 
Stagex сказал(а):
Я пользуюсь версией от vanko, код вроде чистый 👇
Нажмите, чтобы раскрыть...
можно и text utils раскодить(там .Net под DeepSea вроде), но лень, да и не думаю о наличии вредокода в нём. А уникальные машины на ВТ и Нello World давно уже считают зловред строкой ;)
 
Stagex сказал(а):
можно и text utils раскодить(там .Net под DeepSea вроде), но лень, да и не думаю о наличии вредокода в нём. А уникальные машины на ВТ и Нello World давно уже считают зловред строкой ;)
Нажмите, чтобы раскрыть...
популярный был даже очень, у интересной ца.
там где высокочастотный запрос там везде малварь.
 
m00r сказал(а):
популярный был даже очень, у интересной ца.
там где высокочастотный запрос там везде малварь.
Нажмите, чтобы раскрыть...
Ну у меня та же версия, что и по ссылке выше, тот же детект всмысле, но них.я там нет по факту.
 
Top