Написал: rand
Эксклюзивно для: Для работы требуется установить:
Bash: Скопировать в буфер обмена
Для работы модуля копирования строк из файла:
Bash: Скопировать в буфер обмена
Для работы модуля изменения кодировки файла:
Bash: Скопировать в буфер обмена
Всем привет, после того как был написан алгоритм по удалению дублей в больших файлах, меня некоторые просили сделать мультитул для работы с текстовиками. В одной ветке мне показали мультитул от разработчика под ником Lays, но судя по Virus Total он протроянен. Поэтому была поставлена задача перед собой повторить его функционал на петухоне. Над продуктом я работаю уже плотно около недели, и получилось уже большую часть функций реализовать. Остальные попробую сделать в течении месяца и буду обновлять этот продукт через редактирование поста (исправление найденных багов, добавление функционала). Продукт тестировался на сгенерированных строках, и может некорректно работать с вашими файлами, по найденым багам и пожеланиям просьба отписаться в ветке.
Структура ПО:
Это основной файл который отрисовывает консольный интерфейс и вызывает основные функции из модулей:
Как выглядит интерфейс:
Код:
Python: Скопировать в буфер обмена
Реализованные модули:
1. Удаление дубликатов (remove_duplicates.py), это просто копипаст мультипроцессорного клинера и сортировщика из моей предыдущей ветки.
Код:
Python: Скопировать в буфер обмена
2. Сравнение текстовых баз (compare_databases.py). Работает следующим образом, указываете директорию где находятся текстовые файлы с базами, происходит их сравнение на дубли, дубли очищаются на выходе, получаете базы без дублей+файл с удаленными дублями (типа антипаблика).
Код:
Python: Скопировать в буфер обмена
3. Найти и извлечь Email из строк (extract_email.py), находит по регулярке в текстовом файле Email адреса и извлекает их.
Код:
Python: Скопировать в буфер обмена
4. Извлечь логины (extract_logins.py). Работает по принципу, указываете разделитель в вашем логе и столбец с логином (начинается с 0), извлекаете логины.
Код:
Python: Скопировать в буфер обмена
5. Извлечь пароли (extract_passwords.py). Все тоже самое что и для логинов, просто дубль функционала.
Код:
Python: Скопировать в буфер обмена
6. Сортировать по доменам (Email/URL) - (sort_by_domains.py), работает в трех режимах (one_file/by_domains/filter_domains). Режим когда просто выполняется сортировка строк по доменам в 1 выходной файл, и когда файлы разбиваются по доменам, а также чтение лога и фильтрация в файл или по файлам по конкретно указанным доменам. Email или URL вычисляется динамически по регулярке.
Код:
Python: Скопировать в буфер обмена
7. Удаление доменов из Email адресов. (remove_domains.py) Прописываете путь к файлу c мыльниками, на выходе получаете файл без @domain.com
Код:
Python: Скопировать в буфер обмена
8. Склеить строки с паролями(merge_with_passwords.py) - (допустим надо склеить строки для брута). Работает в двух режимах. ('manual' или 'from_file'). В первом режиме вы задаете 1 пароль на строку, во втором склеиваете 2 списка это логины и пароли (допустим).
Код:
Python: Скопировать в буфер обмена
9. Объединение баз. (merge_databases.py) Выполнение слияния нескольких текстовиков в один файл. Указываете директорию с текстовиками, на выходе получаете один файл.
Код:
Python: Скопировать в буфер обмена
10. Разделить один файл на несколько частей (split_database.py). Ну тут уже идет реверс, отдаете один файл, указываете по какому количеству строк его разбить, получаете несколько сплит файлов.
Код:
Python: Скопировать в буфер обмена
11. Рандомизация строк в файле (randomize.py). Тут выполняется перемешивание строк в файле который отдаете рандомным образом, на выходе получаете рандомизированные позиции строк в файле.
Код:
Python: Скопировать в буфер обмена
12. Копирование строк из текстового файла, работает с использованием библиотеки pyperclip, так что не забудьте сделать
Код:
Python: Скопировать в буфер обмена
13. Чистка текстового файла от мусора. Работает в 3х режимах по выбору (1. Разрешить все спецсимволы, 2. Разрешить только специальные символы (@, ., _, -), 3. Не разрешать спецсимволы)
Python: Скопировать в буфер обмена
14. Изменение кодировки файла. Дисклеймер: Модуль может работать кривовато, на моих тестах вроде нормально преобразует, на ваших боевых файлах как работать будет не знаю. Юзать на свой страх и риск. По найденным ошибкам просьба отписаться в ПМ или в ветке. Поддерживает преобразование в следующие кодировки (исходная кодировка в теории у файла может быть любой):
Python: Скопировать в буфер обмена
15. Нормализатор строк в файлах. Работает в 4-х режимах:
P.S. Модули можно использовать как по отдельности утилитами, так и через основной файл из консольного меню.
P.S.S. Первую версию на публику прикрепил в атаче. По мере свободного времени буду обновлять и исправлять найденные ошибки.
Эксклюзивно для: Для работы требуется установить:
Bash: Скопировать в буфер обмена
pip install colorlog==6.9.0
Для работы модуля копирования строк из файла:
Bash: Скопировать в буфер обмена
pip install pyperclip==1.9.0
Для работы модуля изменения кодировки файла:
Bash: Скопировать в буфер обмена
pip install chardet==5.2.0
Всем привет, после того как был написан алгоритм по удалению дублей в больших файлах, меня некоторые просили сделать мультитул для работы с текстовиками. В одной ветке мне показали мультитул от разработчика под ником Lays, но судя по Virus Total он протроянен. Поэтому была поставлена задача перед собой повторить его функционал на петухоне. Над продуктом я работаю уже плотно около недели, и получилось уже большую часть функций реализовать. Остальные попробую сделать в течении месяца и буду обновлять этот продукт через редактирование поста (исправление найденных багов, добавление функционала). Продукт тестировался на сгенерированных строках, и может некорректно работать с вашими файлами, по найденым багам и пожеланиям просьба отписаться в ветке.
Структура ПО:
multitool_txt.py
Это основной файл который отрисовывает консольный интерфейс и вызывает основные функции из модулей:
Как выглядит интерфейс:
Код:
Python: Скопировать в буфер обмена
Code:
import os
import time
from colorama import init, Fore, Style
# Инициализация colorama
init(autoreset=True)
# Импорт всех функций
from utils.remove_duplicates import remove_duplicates
from utils.compare_databases import compare_files
from utils.normalize_base import process_lines as normalize
from utils.extract_email import extract_emails
from utils.extract_logins import extract_logins
from utils.extract_passwords import extract_passwords
from utils.sort_by_domains import sort_domains
from utils.remove_domains import remove_domains
from utils.merge_with_passwords import append_passwords
from utils.merge_databases import merge_files_in_directory
from utils.split_database import split_file
from utils.randomize import randomize_file
from utils.copy_to_clipboard import copy_to_clipboard
from utils.remove_non_latin import clean_file
from utils.format_database import convert_file_encoding, POPULAR_ENCODINGS
def main_multitool():
def display_menu():
print(Fore.CYAN + Style.BRIGHT + "\nВыберите операцию:")
print(Fore.GREEN + "1. Удалить дубликаты")
print(Fore.YELLOW + "2. Сравнить базы данных")
print(Fore.MAGENTA + "3. Найти и извлечь Email из строк")
print(Fore.BLUE + "4. Извлечь логины")
print(Fore.RED + "5. Извлечь пароли")
print(Fore.CYAN + "6. Сортировать по доменам (Email/URL)")
print(Fore.GREEN + "7. Удалить домены в строках Email")
print(Fore.YELLOW + "8. Склеить строки с паролями")
print(Fore.MAGENTA + "9. Объединить базы")
print(Fore.BLUE + "10. Разбить базу на части")
print(Fore.RED + "11. Рандомизировать базу")
print(Fore.CYAN + "12. Скопировать строки в буфер обмена")
print(Fore.GREEN + "13. Удалить строки с не латинскими символами")
print(Fore.YELLOW + "14. Изменить кодировку файла")
print(Fore.MAGENTA + "15. Нормализовать базу")
print(Fore.RED + Style.BRIGHT + "0. Выйти\n")
def get_file():
"""Позволяет выбрать файл через диалоговое окно или ввод пути вручную."""
print("\nВыберите способ ввода файла:")
print(Fore.CYAN + "1. Ввести путь к файлу вручную")
print(Fore.MAGENTA + "2. Выбрать файл через окно (пока в разработке)")
while True:
try:
choice = int(input("Ваш выбор: "))
if choice == 1:
file_path = input("Введите полный путь к файлу: ").strip()
if os.path.isfile(file_path):
return file_path
else:
print(Fore.RED + "Файл не найден. Попробуйте снова.")
elif choice == 2:
print(Fore.RED + Style.BRIGHT + "Функция в разработке, ожидайте в новой версии!\n")
continue
else:
print(Fore.YELLOW + "Введите 1 или 2.")
except ValueError:
print(Fore.RED + "Введите число.")
def get_input(prompt):
try:
return int(input(prompt))
except ValueError:
print(Fore.RED + "Введите число.")
return None
# ASCII-заставка
print(Fore.LIGHTGREEN_EX + """
# # ## # # # ## ##### # # #####
## ## # # # # # # # #
# ## # # # # ##### ### ##### #### #### # # # # #
# # # # # # # # # # # # # ###### # # #
# # # # # # # # # # # # # # # # #
# # # ## # # # # # # # # # # # # #
# # ### # ### ### ##### ### #### #### ### # # # #
# # ### # # #### #### #
# # # # # # # # #
##### # # # ### ##### # ### ##### #### #### # ### # # # # ### #####
# # # # ## # # ## # # # # # # ## # #### #### # #
# # # # # # # # # # # # # # # # # # # # ####
# # ##### # # ## # # # # # # # # # # # # # # ## # #
##### # # ### # # # ##### # #### # # # #### #### ## ##### #####
####
""")
while True:
display_menu()
choice = get_input(Fore.CYAN + "Ваш выбор: ")
if choice is None:
continue
if choice == 0:
print(Fore.RED + "Выход из программы.")
break
# Выбор файла
if not choice == 2 and not choice == 9:
file_name = get_file()
print(Fore.GREEN + f"Выбран файл: {file_name}")
else:
pass
# Выполнение операций
if choice == 1: # Удаление дублей
remove_duplicates(file_name)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 2: # Сравнение файлов
print(Fore.RED + "Предупреждение!!! Файлы при сравнении полностью загрузятся в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
output_dir = input("Введите путь к директории для сохранения результатов: ").strip()
compare_files(input_dir, output_dir)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 3: # Извлекаем Email по регулярке
output_file = input("Введите путь и имя выходного файла с Email (с расширением): ").strip()
extract_emails(file_name, output_file)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 4: # Извлекаем логины
output_file = input("Введите путь и имя выходного файла с логинами (с расширением): ").strip()
delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())
extract_logins(file_name, output_file, delimiter, position)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 5: # Извлекаем пароли из строк
output_file = input("Введите путь и имя выходного файла с паролями (с расширением): ").strip()
delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())
extract_passwords(file_name, output_file, delimiter, position)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 6: # Сортировка по доменам
output_dir = input("Введите путь к директории для сохранения: ").strip()
# Меню выбора режима
while True:
print(Fore.YELLOW + "\nВыберите режим обработки:")
print(Fore.LIGHTMAGENTA_EX + "1. Сортировка всех строк в один файл (one_file)")
print(Fore.LIGHTCYAN_EX + "2. Сортировка по доменам в отдельные файлы (by_domains)")
print(Fore.LIGHTRED_EX + "3. Экспорт строк по указанным доменам (filter_domains)")
print(Fore.LIGHTBLUE_EX + "0. Перезапуск меню.")
choice = input("Введите номер режима: ").strip()
if choice == "1":
output_mode = "one_file"
filter_domains = None
break
elif choice == "2":
output_mode = "by_domains"
filter_domains = None
break
elif choice == "3":
output_mode = "filter_domains"
domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
filter_domains = [domain.strip() for domain in domains_input.split(',')]
break
elif choice == "0":
print("Выход из программы.")
time.sleep(3)
main_multitool()
else:
print("Неверный выбор. Попробуйте снова.")
sort_domains(file_name, output_dir, output_mode, filter_domains)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 7: # Удаление доменов из Email адресов
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
remove_domains(file_name, output_file)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 8: # Склеить строки с паролями
print(Fore.RED + "Предупреждение!!! Данная функция очень требовательна к системным ресурсам.")
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()
if not delimiter:
print(Fore.RED + "ОШИБКА: Разделитель не указан.")
return
if mode == 'manual':
password = input("Введите пароль для добавления: ").strip()
append_passwords(file_name, output_file, mode, password=password, delimiter=delimiter)
elif mode == 'from_file':
passwords_file = input("Введите путь к файлу с паролями: ").strip()
append_passwords(file_name, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)
else:
print(Fore.RED + "ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 9: # Объединить базы
directory = input("Введите путь к директории с файлами для слияния: ").strip()
output_file = input("Введите имя выходного файла (с расширением): ").strip()
merge_files_in_directory(directory, output_file)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 10: # Разделить один файл на несколько частей
output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())
split_file(file_name, output_directory, lines_per_file)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 11: # Рандомизация строк в файле
print(Fore.RED + "Предупреждение!!! Файл полностью загрузится в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
output_file = input("Введите путь для сохранения перемешанного файла (с расширением): ").strip()
buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())
randomize_file(file_name, output_file, buffer_size)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 12: # Копирование строк в буфер.
try:
print(Fore.LIGHTGREEN_EX + "Запуск процесса")
mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()
if mode == '1':
copy_to_clipboard(file_name)
elif mode == '2':
start_line = int(input("Введите номер начальной строки: ").strip())
end_line = int(input("Введите номер конечной строки: ").strip())
copy_to_clipboard(file_name, start_line, end_line)
else:
print("Неверный выбор режима.")
print(Fore.LIGHTRED_EX + "Выбран неверный режим. Перезапуск меню...")
time.sleep(3)
main_multitool()
except Exception as e:
print(Fore.RED + f"Ошибка: {e}")
finally:
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 13: # Удалить строки с не латинскими буквами, почистить лог от мусора
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
print("Выберите режим обработки спецсимволов:")
print(Fore.GREEN + "1 - Разрешить все спецсимволы")
print(Fore.YELLOW + "2 - Разрешить только специальные символы (@, ., _, -)")
print(Fore.RED + "3 - Не разрешать спецсимволы")
special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()
special_mode = ''
if special_mode_choice == '1':
special_mode = 'all'
elif special_mode_choice == '2':
special_mode = 'custom'
elif special_mode_choice == '3':
special_mode = 'none'
else:
print(Fore.RED + "Неверный выбор режима. Завершение операции.")
return
clean_file(file_name, output_file, special_mode)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 14: # Изменение кодировки и сортировка строк файла
try:
print(Fore.LIGHTGREEN_EX + "Запуск операции")
# Ввод параметров пользователем
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
print(Fore.LIGHTBLUE_EX + "Доступные кодировки:")
for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
print(Fore.LIGHTGREEN_EX + f"{i}. {encoding}")
while True:
try:
choice = int(input("Выберите номер целевой кодировки: "))
if 1 <= choice <= len(POPULAR_ENCODINGS):
target_encoding = POPULAR_ENCODINGS[choice - 1]
break
else:
print("Некорректный выбор. Попробуйте снова.")
except ValueError:
print("Введите корректный номер из списка.")
convert_file_encoding(file_name, output_file, target_encoding)
except KeyboardInterrupt:
print(Fore.LIGHTRED_EX + "Операция прервана. Перезапуск меню...")
time.sleep(3)
main_multitool()
except Exception as e:
print(Fore.LIGHTRED_EX + f"Ошибка в основной функции: {e}, перезапуск меню")
time.sleep(3)
main_multitool()
finally:
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == 15: # Нормализатор баз
while True:
print(Fore.YELLOW + "\nВыберите режим обработки:")
print(Fore.LIGHTMAGENTA_EX + "1. Форматирование строк (format)")
print(Fore.LIGHTCYAN_EX + "2. Фильтрация строк по формату (filter)")
print(Fore.LIGHTRED_EX + "3. Очистка строк от спецсимволов (clean)")
print(Fore.LIGHTBLUE_EX + "4. Обрезка строк по длине (trim)")
print(Fore.LIGHTGREEN_EX + "0. Выход.")
choice = input("Введите номер режима: ").strip()
if choice == "1":
mode = "format"
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
delimiter = input("Введите разделитель: ").strip()
input_format = input("Введите ожидаемый формат входящей строки (например, log:pass:link): ").strip()
format_order = input("Введите новый порядок формата (например, link:log:pass): ").strip()
normalize(file_name, output_file, mode=mode, delimiter=delimiter, input_format=input_format, format_order=format_order)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == "2":
mode = "filter"
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
delimiter = input("Введите разделитель: ").strip()
valid_format = input("Введите допустимый формат (например, mail:pass): ").strip()
normalize(file_name, output_file, mode=mode, delimiter=delimiter, valid_format=valid_format)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == "3":
mode = "clean"
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
exclude_chars = input("Введите символы для исключения (например, !@#$%^&*): ").strip()
normalize(file_name, output_file, mode=mode, delimiter=None, exclude_chars=exclude_chars)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == "4":
mode = "trim"
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
min_len = int(input("Введите минимальную длину строки: ").strip())
max_len = int(input("Введите максимальную длину строки: ").strip())
normalize(file_name, output_file, mode=mode, min_len=min_len, max_len=max_len, delimiter=None)
print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
time.sleep(3)
main_multitool()
elif choice == "0":
print("Выход из программы.")
break
else:
print("Неверный выбор. Попробуйте снова.")
else:
print(Fore.RED + "Эта функция пока не реализована. Попробуйте другую опцию.")
if __name__ == "__main__":
main_multitool()
Реализованные модули:
1. Удаление дубликатов (remove_duplicates.py), это просто копипаст мультипроцессорного клинера и сортировщика из моей предыдущей ветки.
Код:
Python: Скопировать в буфер обмена
Code:
# Импорт необходимых библиотек
import os # Для работы с операционной системой и файловой системой
import heapq # Для эффективного слияния отсортированных последовательностей
import time # Для измерения времени выполнения скрипта
import logging # Для ведения логов
import shutil # Для удаления файлов и директории TEMP
import uuid # Для генерации уникальных идентификаторов
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed # Для параллельного выполнения задач
from colorlog import ColoredFormatter # Для создания цветных логов
# Настройка цветного логирования
formatter = ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
}
)
# Создание и настройка обработчика логов
handler = logging.StreamHandler() # Создаем обработчик для вывода логов в консоль
handler.setFormatter(formatter) # Устанавливаем форматтер для обработчика
logger = logging.getLogger() # Получаем объект логгера
logger.addHandler(handler) # Добавляем обработчик к логгеру
logger.setLevel(logging.INFO) # Устанавливаем уровень логирования
# Создание временной директории
temp_dir = os.path.join(os.getcwd(), 'TEMP') # Путь к временной директории в текущей рабочей директории
if not os.path.exists(temp_dir): # Если директория не существует
os.makedirs(temp_dir) # Создаем её
def create_temp_merged_file(temp_dir):
"""
Создает временный файл для слияния данных.
:param temp_dir: Путь к временной директории
"""
unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4().hex}.tmp") # Генерируем уникальное имя файла
temp_merged_file = open(unique_filename, 'w', encoding='utf-8') # Открываем файл для записи
return temp_merged_file, unique_filename # Возвращаем объект файла и его имя
def heavy_computation(n):
"""
Выполняет тяжелое вычисление (для симуляции нагрузки, процентов на 5 по моим замерам увеличивает скорость обработки лога).
:param n: Число для вычислений
:return: Результат вычисления
"""
logger.debug(f"Запуск тяжелого вычисления с параметром: {n}")
result = 0
for i in range(n):
result += i * i # Выполняем сложение квадратов чисел
logger.debug(f"Результат тяжелого вычисления: {result}")
return result
def process_chunk_and_write_multiprocess(chunk, temp_dir):
"""
Обрабатывает чанк данных и записывает результат во временный файл.
:param chunk: Список строк для обработки
:param temp_dir: Путь к временной директории
:return: Имя созданного временного файла или None в случае ошибки
"""
try:
logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
heavy_computation(10000000) # Симуляция тяжелых вычислений
unique_items = set(chunk) # Убираем дубликаты на уровне чанка
sorted_chunk = sorted(unique_items) # Сортируем уникальные элементы
unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp") # Генерируем уникальное имя файла
with open(unique_filename, 'w', encoding='utf-8') as temp_file:
temp_file.write("\n".join(sorted_chunk) + "\n") # Сохраняем только уникальные строки
return unique_filename
except Exception as e:
logger.error(f"Ошибка при обработке чанка: {e}")
return None
def merge_files_parallel(temp_files, output_file, num_threads=24):
"""
Выполняет параллельное слияние временных файлов.
:param temp_files: Список временных файлов для слияния
:param output_file: Имя выходного файла
:param num_threads: Количество потоков для использования
:return: (количество уникальных строк, количество дубликатов, имя выходного файла)
"""
try:
logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
unique_count = 0
duplicate_count = 0
file_iters = []
try:
for temp_file in temp_files:
file_iters.append \
(open(temp_file, 'r', encoding='utf-8', errors='replace')) # Открываем все временные файлы
merged_iter = heapq.merge(*[iter(f) for f in file_iters]) # Создаем итератор для слияния
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
prev_line = None
for line in merged_iter:
line = line.strip()
if line != prev_line: # Если текущая строка отличается от предыдущей
outfile.write(line + '\n') # Записываем её в выходной файл
prev_line = line
unique_count += 1
else:
duplicate_count += 1 # Увеличиваем счетчик дубликатов
except Exception as e:
logger.error(f"Ошибка при слиянии файлов: {e}")
return 0, 0
finally:
for f in file_iters:
f.close() # Закрываем все открытые файлы
logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
return unique_count, duplicate_count, output_file
except Exception as e:
logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
return 0, 0
def batch_merge(temp_files, batch_size, temp_dir, num_merge_processes=24):
"""
Выполняет пакетное слияние временных файлов.
:param temp_files: Список временных файлов
:param batch_size: Размер пакета для слияния
:param temp_dir: Путь к временной директории
:param num_merge_processes: Количество процессов для слияния
:return: (список объединенных файлов, общее количество уникальных строк, общее количество дубликатов)
"""
try:
logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")
merged_files = []
total_unique_count = 0
total_duplicate_count = 0
with ProcessPoolExecutor(max_workers=num_merge_processes) as merge_executor:
futures = []
for i in range(0, len(temp_files), batch_size):
batch = temp_files[i:i + batch_size] # Формируем пакет файлов
logger.info(f"Слияние пакета с файлов {i + 1} по {min(i + batch_size, len(temp_files))}")
temp_merged_file, unique_filename = create_temp_merged_file(temp_dir)
temp_merged_file.close()
futures.append(merge_executor.submit(merge_files_parallel, batch, unique_filename))
for future in as_completed(futures):
unique_count, duplicate_count, temp_file = future.result() # Получаем результат слияния
if unique_count or duplicate_count:
merged_files.append(temp_file) # Добавляем временный файл в список
total_unique_count += unique_count
total_duplicate_count += duplicate_count
for temp_file in temp_files:
if os.path.exists(temp_file):
logger.info(f"Удаление временного файла: {temp_file}")
os.remove(temp_file) # Удаляем обработанные временные файлы
else:
logger.warning(f"Файл не найден для удаления: {temp_file}")
return merged_files, total_unique_count, total_duplicate_count
except Exception as e:
logger.error(f"Ошибка при пакетном слиянии: {e}")
return temp_files, 0, 0
def final_merge(temp_dir, output_file):
"""
Выполняет финальное слияние всех оставшихся временных файлов.
:param temp_dir: Путь к временной директории
:param output_file: Имя выходного файла
:return: (количество уникальных строк, количество удаленных дубликатов)
"""
logger.info(f"Финальная стадия слияния временных файлов из папки {temp_dir}.")
temp_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f))] # Список временных файлов для финального слияния
if len(temp_files) > 1: # Если файлов больше одного, запускаем слияние
unique_count, duplicate_count, output_file = merge_files_parallel(temp_files, output_file) # Параллельно сливаем файлы
elif len(temp_files) == 1: # Если остался только один файл
logger.info(f"Остался один файл. Переименование {temp_files[0]} в {output_file}")
os.rename(temp_files[0], output_file) # Переименовываем файл в выходной
unique_count, duplicate_count = 0, 0 # Устанавливаем нулевые значения для счетчиков
else:
logger.error("Не осталось временных файлов для слияния!")
return 0, 0 # Возвращаем нули в случае ошибки
logger.info(f"Финальное слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
try:
shutil.rmtree(temp_dir) # Удаляем временную директорию
logger.info(f"Временная папка {temp_dir} успешно удалена.")
except Exception as e:
logger.error(f"Ошибка при удалении временной папки {temp_dir}: {e}")
return unique_count, duplicate_count # Возвращаем количество уникальных строк и дубликатов
def read_and_process_chunks_multiprocess(input_file, chunk_size=2000000, num_processes=24):
"""
Читает входной файл по чанкам и обрабатывает их в многопроцессорном режиме.
:param input_file: Имя входного файла
:param chunk_size: Размер чанка (количество строк)
:param num_processes: Количество процессов для обработки
:return: (список временных файлов, общее количество прочитанных строк)
"""
temp_files = [] # Список для временных файлов
chunk = [] # Буфер для хранения чанка строк
original_count = 0 # Счетчик общего числа строк
logger.info(f"Чтение и обработка файла {input_file} в {num_processes} процессах...")
try:
with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile: # Открываем входной файл для чтения
with ProcessPoolExecutor \
(max_workers=num_processes) as executor: # Создаем процессный пул для обработки чанков
futures = [] # Список задач для выполнения
for line in infile: # Читаем файл построчно
chunk.append(line.strip()) # Добавляем строку в чанк
original_count += 1 # Увеличиваем счетчик строк
if len(chunk) >= chunk_size: # Если чанк достиг нужного размера
futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir)) # Отправляем чанк на обработку в пул процессов
chunk = [] # Очищаем чанк
if chunk: # Если остались строки после завершения цикла
futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir)) # Обрабатываем оставшийся чанк
for future in as_completed(futures): # Ожидаем завершения всех задач
temp_file = future.result() # Получаем результат задачи
if temp_file:
temp_files.append(temp_file) # Добавляем временный файл в список
logger.info \
(f"Чтение и обработка завершены (процессами). Всего строк: {original_count}") # Логируем завершение обработки файла
except Exception as e:
logger.error(f"Ошибка при чтении файла (процессы): {e}") # Логируем ошибку в случае сбоя
return temp_files, original_count # Возвращаем список временных файлов и общее количество строк
def sort_and_uniq_streaming_multiprocess(input_file, output_file, chunk_size=2000000, batch_size=10, num_processes=24, num_merge_processes=24):
"""
Основная функция для сортировки и удаления дубликатов из большого файла с использованием многопроцессорной обработки.
:param input_file: Имя входного файла
:param output_file: Имя выходного файла
:param chunk_size: Размер чанка для обработки
:param batch_size: Размер пакета для слияния
:param num_processes: Количество процессов для обработки чанков
:param num_merge_processes: Количество процессов для слияния
:return: (общее количество строк, количество уникальных строк)
"""
logger.info(f"Старт обработки файла {input_file}...") # Логируем начало процесса
temp_files, original_count = read_and_process_chunks_multiprocess(input_file, chunk_size, num_processes) # Читаем и обрабатываем файл по чанкам
logger.info(f"Начинается пакетное слияние временных файлов...") # Логируем начало пакетного слияния
total_unique_count = 0 # Инициализируем счетчик всех уникальных строк
total_duplicate_count = 0 # Инициализируем счетчик всех дубликатов
while len(temp_files) > batch_size: # Пока временных файлов больше, чем размер пакета
temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes) # Выполняем пакетное слияние
total_unique_count += unique_count # Обновляем общий счетчик уникальных строк
total_duplicate_count += duplicate_count # Обновляем общий счетчик дубликатов
unique_count, duplicate_count = final_merge(temp_dir, output_file) # Выполняем финальное слияние
total_unique_count += unique_count # Обновляем счетчик уникальных строк
total_duplicate_count += duplicate_count # Обновляем счетчик дубликатов
return original_count, unique_count # Возвращаем общее количество строк и количество уникальных строк
# Основная функция модуля
def remove_duplicates(data):
tic = time.perf_counter() # Начало отсчета времени
input_file = data
output_file = "output.txt" # Указываем экспортируемый файл
original_count, unique_count = sort_and_uniq_streaming_multiprocess(input_file, output_file, num_processes=24, num_merge_processes=24) # Запускаем основную функцию обработки
tac = time.perf_counter() # Конец отсчета времени
logging.info(f"Все временные файлы удалены. Уникальных строк: {unique_count}, Дублей удалено (на всех этапах процессов слияния): {original_count - unique_count}") # Логируем результаты
logging.info(f"Всего обработано строк: {original_count}") # Логируем количество обработанных строк
logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.2f} секунд") # Логируем время выполнения
logging.info(f"Файл успешно сохранен в {output_file}")
2. Сравнение текстовых баз (compare_databases.py). Работает следующим образом, указываете директорию где находятся текстовые файлы с базами, происходит их сравнение на дубли, дубли очищаются на выходе, получаете базы без дублей+файл с удаленными дублями (типа антипаблика).
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
from collections import defaultdict
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def read_file_lines(filepath):
"""
Читает строки из файла построчно и возвращает их как множество.
:param filepath: Путь к файлу.
:return: Генератор строк.
"""
logger.info(f"Начато чтение строк из файла: {filepath}")
total_lines = 0
try:
with open(filepath, 'r', encoding='utf-8') as file:
for line in file:
total_lines += 1
yield line.strip()
logger.info(f"Успешно прочитано {total_lines} строк из файла: {filepath}")
except Exception as e:
logger.error(f"Ошибка при чтении файла {filepath}: {e}")
raise
def compare_files(input_dir, output_dir):
"""
Сравнивает строки в текстовых файлах из указанной директории.
:param input_dir: Директория с текстовыми файлами.
:param output_dir: Директория для сохранения результатов.
"""
try:
start_time = time.time()
# Проверка входной и выходной директории
logger.info(f"Проверка существования директории {input_dir}")
if not os.path.exists(input_dir):
raise FileNotFoundError(f"Директория {input_dir} не найдена.")
logger.info(f"Проверка существования директории {output_dir}")
if not os.path.exists(output_dir):
logger.info(f"Директория не найдена, создаём: {output_dir}")
os.makedirs(output_dir)
# Получаем список текстовых файлов
logger.info(f"Получение списка файлов в директории: {input_dir}")
files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
if len(files) < 2:
raise ValueError("Для сравнения необходимо минимум два файла.")
logger.info(f"Найдено файлов для сравнения: {len(files)}. Список файлов: {files}")
# Хранилище строк
file_lines = defaultdict(set)
all_lines = set()
# Чтение строк из файлов
for filepath in files:
logger.info(f"Чтение строк из файла: {filepath}")
for line in read_file_lines(filepath):
file_lines[filepath].add(line)
all_lines.add(line)
# Определяем общие строки
logger.info("Вычисление совпадающих строк...")
common_lines = set.intersection(*(lines for lines in file_lines.values()))
logger.info(f"Обнаружено {len(common_lines)} совпадающих строк.")
# Уникальные строки для каждого файла
for filepath, lines in file_lines.items():
unique_lines = lines - common_lines
unique_output_path = os.path.join(output_dir, f"unique_{os.path.basename(filepath)}")
logger.info(f"Сохранение уникальных строк для {os.path.basename(filepath)}. Всего строк: {len(unique_lines)}.")
with open(unique_output_path, 'w', encoding='utf-8', errors='replace') as file:
file.writelines(f"{line}\n" for line in unique_lines)
logger.info(f"Уникальные строки для {os.path.basename(filepath)} сохранены в {unique_output_path}")
# Сохраняем совпадающие строки
common_output_path = os.path.join(output_dir, "common_lines.txt")
logger.info(f"Сохранение совпадающих строк. Всего строк: {len(common_lines)}.")
with open(common_output_path, 'w', encoding='utf-8', errors='replace') as file:
file.writelines(f"{line}\n" for line in common_lines)
logger.info(f"Совпадающие строки сохранены в {common_output_path}")
end_time = time.time()
logger.info(f"Сравнение завершено за {end_time - start_time:.2f} секунд.")
except Exception as e:
logger.error(f"Ошибка при сравнении файлов: {e}")
raise e
def main():
input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
output_dir = input("Введите путь к директории для сохранения результатов: ").strip()
try:
logger.info("Начало выполнения программы сравнения текстовых файлов.")
compare_files(input_dir, output_dir)
logger.info("Сравнение завершено успешно.")
except Exception as e:
logger.error(f"Ошибка: {e}")
if __name__ == "__main__":
main()
Код:
Python: Скопировать в буфер обмена
Code:
import os
import re
import heapq
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("extract_email.log", encoding="utf-8")
]
)
# Регулярное выражение для извлечения email
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
def extract_emails(input_file: str, output_file: str):
"""
Извлекает email-адреса из строк входного файла и сохраняет их в отдельный файл.
:param input_file: Путь к входному файлу
:param output_file: Имя выходного файла для сохранения email-адресов
"""
start_time = time.time()
logger.info(f"Начало извлечения email-адресов. Входной файл: {input_file}, Выходной файл: {output_file}")
try:
# Проверяем наличие директории для выходного файла
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
total_emails_extracted = 0 # Счётчик извлечённых email-адресов
# Открываем входной и выходной файлы
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
# Используем heapq.merge для обработки строк
for line in heapq.merge(iter(infile)):
try:
# Извлекаем все email из строки
emails = EMAIL_REGEX.findall(line)
if emails:
for email in emails:
outfile.write(email + '\n')
total_emails_extracted += len(emails)
except Exception as e:
logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")
logger.info(f"Извлечение завершено. Всего email-адресов извлечено: {total_emails_extracted}")
except FileNotFoundError:
logger.error(f"Файл {input_file} не найден.")
return
except Exception as e:
logger.error(f"Ошибка в процессе извлечения: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
try:
logger.info("Запуск программы")
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
extract_emails(input_file, output_file)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("extract_logins.log", encoding="utf-8")
]
)
def extract_logins(input_file: str, output_file: str, delimiter: str, position: int):
"""
Извлекает логины из строк файла и сохраняет их в отдельный файл.
:param input_file: Путь к входному файлу
:param output_file: Имя выходного файла для сохранения логинов
:param delimiter: Разделитель, используемый для извлечения логина
:param position: Позиция логина по разделителю (0 — первая колонка)
"""
start_time = time.time()
logger.info(f"Начало извлечения логинов. Входной файл: {input_file}, Выходной файл: {output_file}, "
f"Разделитель: '{delimiter}', Позиция: {position}")
try:
# Проверяем наличие директории для выходного файла
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
total_login_extracted = 0 # Счётчик извлечённых логинов
# Открываем входной и выходной файлы
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
# Использую heapq для обработки строк
for line in heapq.merge(iter(infile)):
try:
# Разбиваю строку по разделителю
parts = line.strip().split(delimiter)
if len(parts) > position:
login = parts[position].strip()
outfile.write(login + '\n')
total_login_extracted += 1
else:
logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
except Exception as e:
logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")
logger.info(f"Извлечение завершено. Всего логинов извлечено: {total_login_extracted}")
except FileNotFoundError:
logger.error(f"Файл {input_file} не найден.")
return
except Exception as e:
logger.error(f"Ошибка в процессе извлечения: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
try:
logger.info("Запуск программы")
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с раширением): ").strip()
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())
extract_logins(input_file, output_file, delimiter, position)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
5. Извлечь пароли (extract_passwords.py). Все тоже самое что и для логинов, просто дубль функционала.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("extract_password.log", encoding="utf-8")
]
)
def extract_passwords(input_file: str, output_file: str, delimiter: str, position: int):
"""
Извлекает пароли из строк файла и сохраняет их в отдельный файл.
:param input_file: Путь к входному файлу
:param output_file: Имя выходного файла для сохранения паролей
:param delimiter: Разделитель, используемый для извлечения пароля
:param position: Позиция пароля по разделителю (0 — первая колонка)
"""
start_time = time.time()
logger.info(f"Начало извлечения паролей. Входной файл: {input_file}, Выходной файл: {output_file}, "
f"Разделитель: '{delimiter}', Позиция: {position}")
try:
# Проверяем наличие директории для выходного файла
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
total_passwords_extracted = 0 # Счётчик извлечённых паролей
# Открываем входной и выходной файлы
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
# Использую heapq для обработки строк
for line in heapq.merge(iter(infile)):
try:
# Разбиваю строку по разделителю
parts = line.strip().split(delimiter)
if len(parts) > position:
password = parts[position].strip()
outfile.write(password + '\n')
total_passwords_extracted += 1
else:
logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
except Exception as e:
logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")
logger.info(f"Извлечение завершено. Всего паролей извлечено: {total_passwords_extracted}")
except FileNotFoundError:
logger.error(f"Файл {input_file} не найден.")
return
except Exception as e:
logger.error(f"Ошибка в процессе извлечения: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
try:
logger.info("Запуск программы")
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с раширением): ").strip()
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())
extract_passwords(input_file, output_file, delimiter, position)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
6. Сортировать по доменам (Email/URL) - (sort_by_domains.py), работает в трех режимах (one_file/by_domains/filter_domains). Режим когда просто выполняется сортировка строк по доменам в 1 выходной файл, и когда файлы разбиваются по доменам, а также чтение лога и фильтрация в файл или по файлам по конкретно указанным доменам. Email или URL вычисляется динамически по регулярке.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import re
import heapq
import logging
import time
import tempfile
import shutil
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
handlers=[logging.StreamHandler(),
logging.FileHandler("sort_by_domains.log", encoding="utf-8", mode='a')]
)
# Регулярные выражения для извлечения email и URL
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
URL_REGEX = re.compile(r'(https?://)?([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})')
def extract_domain(string: str):
"""
Извлекает домен из строки (email или URL).
:param string: Строка, содержащая email или URL
:return: Домен или None
"""
# Проверка для email
email_match = EMAIL_REGEX.search(string)
if email_match:
domain = email_match.group(0).split('@')[1]
logger.debug(f"Извлечен домен электронной почты: {domain}")
return domain
# Проверка для URL
url_match = URL_REGEX.search(string)
if url_match:
domain = url_match.group(2)
# Извлекаем только зону первого уровня (например, '.ru', '.com')
parts = domain.split('.')
if len(parts) > 1:
top_level_domain = parts[-1]
# Собираем домен из первого уровня
domain = f".{top_level_domain}"
logger.debug(f"Извлечен домен URL (по зоне первого уровня): {domain}")
return domain
logger.debug(f"Не удалось извлечь домен из строки: {string}")
return None
def sort_domains(input_file: str, output_dir: str, output_mode: str, filter_domains=None):
"""
Обрабатывает строки из входного файла и сохраняет домены в файлы.
:param input_file: Путь к входному файлу
:param output_dir: Путь к директории для сохранения результатов
:param output_mode: Режим обработки ('one_file', 'by_domains', 'filter_domains')
:param filter_domains: Список доменов для фильтрации (только для режима 'filter_domains')
"""
try:
# Засекаем время начала обработки
start_time = time.time()
logger.info(f"Начало обработки файла: {input_file}")
logger.info(f"Режим обработки: {output_mode}")
if filter_domains:
logger.info(f"Фильтрация по доменам: {', '.join(filter_domains)}")
# Проверяем наличие директории для выходных файлов
if not os.path.exists(output_dir):
logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
os.makedirs(output_dir)
if output_mode == 'one_file':
# Создаем временную папку для хранения промежуточных файлов
temp_dir = tempfile.mkdtemp(dir=output_dir)
logger.info(f"Создана временная папка для промежуточных файлов: {temp_dir}")
# Статистика обработки
total_lines = 0
processed_lines = 0
unique_domains = set()
domain_line_counts = {}
# Открываем входной файл
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
# Словарь для хранения доменов по файлам
domain_files = {}
# Чтение строк из файла с использованием heapq.merge для эффективности и экономии ОЗУ
for line in heapq.merge(infile):
line = line.strip()
total_lines += 1
# Извлечение домена из строки
domain = extract_domain(line)
if domain:
processed_lines += 1
unique_domains.add(domain)
# Подсчет строк для каждого домена
domain_line_counts[domain] = domain_line_counts.get(domain, 0) + 1
if output_mode == "one_file":
# В режиме 'one_file' записываем все строки в файлы по доменам
temp_file_path = os.path.join(temp_dir, f"{domain}.tmp")
if domain not in domain_files:
domain_files[domain] = open(temp_file_path, 'w', encoding='utf-8')
domain_files[domain].write(line + '\n')
elif output_mode == "by_domains":
# В режиме 'by_domains' создаём файлы для каждого домена
if domain not in domain_files:
domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
domain_files[domain].write(line + '\n')
elif output_mode == "filter_domains" and filter_domains:
# В режиме 'filter_domains' сохраняем строки только для указанных доменов
if domain in filter_domains:
if domain not in domain_files:
domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
domain_files[domain].write(line + '\n')
# Закрытие временных файлов для доменов
for file in domain_files.values():
file.close()
# В режиме 'one_file' объединяем все временные файлы и сортируем их
if output_mode == "one_file":
logger.info("Начинаем слияние и сортировку временных файлов...")
with open(os.path.join(output_dir, "output.txt"), 'w', encoding='utf-8') as output_file:
# Создаём генератор для всех временных файлов
all_temp_files = [open(os.path.join(temp_dir, f"{domain}.tmp"), 'r', encoding='utf-8')
for domain in domain_files.keys()]
# Слияние временных файлов с сортировкой по домену в один файл
merged_lines = heapq.merge(*all_temp_files, key=lambda line: extract_domain(line))
output_file.writelines(merged_lines)
# Закрытие всех временных файлов
for file in all_temp_files:
file.close()
logger.info("Слияние и сортировка завершены. Все данные записаны в output.txt.")
# Удаляем временную папку и все ее содержимое
shutil.rmtree(temp_dir)
logger.info(f"Временная папка {temp_dir} была удалена.")
# Засекаем время окончания обработки
end_time = time.time()
elapsed_time = end_time - start_time
# Логирование статистики обработки
logger.info(f"Статистика обработки:")
logger.info(f"- Всего строк в файле: {total_lines}")
logger.info(f"- Обработано строк с доменами: {processed_lines}")
logger.info(f"- Уникальных доменов найдено: {len(unique_domains)}")
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
# Логирование распределения доменов (топ-5)
sorted_domains = sorted(domain_line_counts.items(), key=lambda x: x[1], reverse=True)
logger.info("Топ-5 доменов по количеству строк:")
for domain, count in sorted_domains[:5]:
logger.info(f" - {domain}: {count} строк")
except FileNotFoundError:
logger.error(f"ОШИБКА: Файл {input_file} не найден.")
except PermissionError:
logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или директории {output_dir}.")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в процессе обработки: {e}")
def main():
try:
logger.info("=" * 50)
logger.info("СТАРТ СКРИПТА: Сортировка доменов")
logger.info("=" * 50)
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_dir = input("Введите путь к директории для сохранения: ").strip()
# Меню выбора режима
while True:
print("\nВыберите режим обработки:")
print("1. Сортировка всех строк в один файл (one_file)")
print("2. Сортировка по доменам в отдельные файлы (by_domains)")
print("3. Экспорт строк по указанным доменам (filter_domains)")
print("0. Выход")
choice = input("Введите номер режима: ").strip()
if choice == "1":
output_mode = "one_file"
filter_domains = None
break
elif choice == "2":
output_mode = "by_domains"
filter_domains = None
break
elif choice == "3":
output_mode = "filter_domains"
domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
filter_domains = [domain.strip() for domain in domains_input.split(',')]
break
elif choice == "0":
print("Выход из программы.")
return
else:
print("Неверный выбор. Попробуйте снова.")
# Обработка строк в зависимости от выбранного режима
sort_domains(input_file, output_dir, output_mode, filter_domains)
logger.info("=" * 50)
logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
logger.info("=" * 50)
except KeyboardInterrupt:
logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")
if __name__ == "__main__":
main()
7. Удаление доменов из Email адресов. (remove_domains.py) Прописываете путь к файлу c мыльниками, на выходе получаете файл без @domain.com
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("remove_domains.log", encoding="utf-8", mode='a')
]
)
def extract_name(email: str) -> str:
"""
Извлекает имя пользователя из email (часть до @).
:param email: Строка с email.
:return: Имя пользователя или пустая строка.
"""
if "@" in email:
return email.split("@")[0]
return email
def line_generator(file_path: str):
"""
Генератор для построчного чтения файла.
:param file_path: Путь к входному файлу.
:yield: Очищенная строка из файла.
"""
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
yield line.strip()
def remove_domains(input_file: str, output_file: str):
"""
Удаляет домены из строк электронной почты и записывает результат в выходной файл.
:param input_file: Путь к входному файлу.
:param output_file: Путь к выходному файлу.
"""
try:
start_time = time.time()
logger.info(f"Начало обработки файла: {input_file}")
# Проверяем наличие директории для выходного файла и создаём её при необходимости
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
os.makedirs(output_dir)
# Проверяем наличие расширения у выходного файла
if not os.path.splitext(output_file)[1]:
logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
output_file += ".txt"
# Открыть файл для записи и обработать данные
with open(output_file, 'w', encoding='utf-8') as outfile:
processed_lines = (
extract_name(email) for email in line_generator(input_file)
)
for name in heapq.merge(processed_lines):
outfile.write(name + '\n')
end_time = time.time()
logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
logger.info(f"Результат записан в файл: {output_file}")
except FileNotFoundError:
logger.error(f"ОШИБКА: Файл {input_file} не найден.")
except PermissionError:
logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или {output_file}.")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")
def main():
try:
logger.info("=" * 50)
logger.info("СТАРТ СКРИПТА: Удаление доменов из email")
logger.info("=" * 50)
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
if not os.path.exists(input_file):
logger.error(f"Входной файл {input_file} не найден.")
return
remove_domains(input_file, output_file)
logger.info("=" * 50)
logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
logger.info("=" * 50)
except KeyboardInterrupt:
logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")
if __name__ == "__main__":
main()
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("append_passwords.log", encoding='utf-8', mode='a')
]
)
def line_generator(file_path: str):
"""
Генератор для построчного чтения файла.
:param file_path: Путь к входному файлу.
"""
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
yield line.strip()
def password_generator(file_path: str):
"""
Генератор для построчного чтения файла с паролями.
:param file_path: Путь к файлу с паролями.
:yield: Пароль из файла.
"""
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
yield line.strip()
def append_passwords_with_password_file(input_file: str, output_file: str, passwords_file: str, delimiter: str):
"""
Режим добавления паролей из файла, добавляя все пароли на каждую строку.
:param input_file: Путь к входному файлу.
:param output_file: Путь к выходному файлу.
:param passwords_file: Путь к файлу с паролями.
:param delimiter: Разделитель для строки и пароля.
"""
with open(output_file, 'w', encoding='utf-8') as outfile:
input_lines = line_generator(input_file)
password_lines = list(password_generator(passwords_file)) # Сохраняем все пароли в список
for line in input_lines:
for password in password_lines:
outfile.write(f"{line}{delimiter}{password}\n")
def append_passwords_with_manual_password(input_file: str, output_file: str, password: str, delimiter: str):
"""
Режим добавления указанного пользователем пароля к строкам.
:param input_file: Путь к входному файлу.
:param output_file: Путь к выходному файлу.
:param password: Пароль для добавления.
:param delimiter: Разделитель для строки и пароля.
"""
with open(output_file, 'w', encoding='utf-8') as outfile:
for line in line_generator(input_file):
outfile.write(f"{line}{delimiter}{password}\n")
def append_passwords(input_file: str, output_file: str, mode: str, password: str = None, passwords_file: str = None, delimiter: str = ':'):
"""
Добавляет пароли к строкам в зависимости от выбранного режима.
:param input_file: Путь к входному файлу.
:param output_file: Путь к выходному файлу.
:param mode: Режим работы ('manual' или 'from_file').
:param password: Пароль для режима 'manual'.
:param passwords_file: Путь к файлу с паролями для режима 'from_file'.
:param delimiter: Разделитель для строки и пароля.
"""
try:
start_time = time.time()
logger.info(f"Начало обработки файла: {input_file}")
logger.info(f"Выбранный режим: {mode}")
# Проверяем наличие директории для выходного файла и создаём её при необходимости
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
os.makedirs(output_dir)
# Проверяем наличие расширения у выходного файла
if not os.path.splitext(output_file)[1]:
logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
output_file += ".txt"
if mode == 'manual':
if password is None:
raise ValueError("Для режима 'manual' необходимо указать пароль.")
append_passwords_with_manual_password(input_file, output_file, password, delimiter)
elif mode == 'from_file':
if passwords_file is None or not os.path.exists(passwords_file):
raise ValueError("Для режима 'from_file' необходимо указать существующий файл с паролями.")
append_passwords_with_password_file(input_file, output_file, passwords_file, delimiter)
else:
raise ValueError("Неверный режим. Используйте 'manual' или 'from_file'.")
end_time = time.time()
logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
logger.info(f"Результат записан в файл: {output_file}")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")
def main():
try:
logger.info("=" * 50)
logger.info("СТАРТ СКРИПТА: Добавление пароля к строкам")
logger.info("=" * 50)
# Запрос параметров у пользователя
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()
if not delimiter:
logger.error("ОШИБКА: Разделитель не указан.")
return
if mode == 'manual':
password = input("Введите пароль для добавления: ").strip()
append_passwords(input_file, output_file, mode, password=password, delimiter=delimiter)
elif mode == 'from_file':
passwords_file = input("Введите путь к файлу с паролями: ").strip()
append_passwords(input_file, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)
else:
logger.error("ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")
logger.info("=" * 50)
logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
logger.info("=" * 50)
except KeyboardInterrupt:
logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
except Exception as e:
logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")
if __name__ == "__main__":
main()
9. Объединение баз. (merge_databases.py) Выполнение слияния нескольких текстовиков в один файл. Указываете директорию с текстовиками, на выходе получаете один файл.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import heapq
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("merge_databases.log", encoding="utf-8")
]
)
def merge_files_in_directory(directory: str, output_file: str):
"""
Сливает все файлы из указанной директории в один файл.
:param directory: Путь к директории с файлами для слияния
:param output_file: Имя выходного файла
"""
start_time = time.time()
logger.info(f"Начало слияния файлов. Директория: {directory}, Выходной файл: {output_file}")
try:
# Получаем список всех файлов в директории
temp_files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
if not temp_files:
logger.error(f"В указанной директории {directory} не найдено файлов для слияния.")
return
logger.info(f"Обнаружено {len(temp_files)} файлов для обработки.")
file_iters = []
total_lines_written = 0 # Счётчик строк
try:
# Открываем все файлы для чтения
for temp_file in temp_files:
logger.info(f"Открытие файла: {temp_file}")
file_iters.append(open(temp_file, 'r', encoding='utf-8', errors='replace'))
# Открываем выходной файл для записи
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
logger.info(f"Запись в файл: {output_file}")
# Записываем строки из всех файлов, используя merge
for line in heapq.merge(*[iter(f) for f in file_iters]):
outfile.write(line)
total_lines_written += 1
logger.info(f"Слияние завершено. Всего строк записано: {total_lines_written}")
except Exception as e:
logger.error(f"Ошибка при обработке файлов: {e}")
return
finally:
# Закрываем все открытые файлы
for f in file_iters:
logger.info(f"Закрытие файла: {f.name}")
f.close()
except Exception as e:
logger.error(f"Ошибка в процессе слияния: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
"""
Основная функция для выбора директории и вызова слияния.
"""
try:
logger.info("Запуск программы")
# Запрос директории и имени выходного файла у пользователя
directory = input("Введите путь к директории с файлами для слияния: ").strip()
output_file = input("Введите имя выходного файла (с расширением): ").strip()
if not os.path.isdir(directory):
logger.error(f"Указанный путь {directory} не является директорией.")
return
merge_files_in_directory(directory, output_file)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
10. Разделить один файл на несколько частей (split_database.py). Ну тут уже идет реверс, отдаете один файл, указываете по какому количеству строк его разбить, получаете несколько сплит файлов.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("split_database.log", encoding="utf-8")
]
)
def split_file(input_file: str, output_directory: str, lines_per_file: int):
"""
Разделяет файл на несколько частей с минимальным использованием памяти.
:param input_file: Путь к входному файлу
:param output_directory: Директория для сохранения разделённых файлов
:param lines_per_file: Количество строк в каждом выходном файле
"""
start_time = time.time()
logger.info(f"Начало разделения файла. Входной файл: {input_file}, Директория: {output_directory}, Строк на файл: {lines_per_file}")
try:
# Проверка входного файла
if not os.path.isfile(input_file):
logger.error(f"Файл {input_file} не найден или не является файлом.")
return
# Проверка выходной директории
if not os.path.exists(output_directory):
logger.info(f"Выходная директория {output_directory} не найдена. Создаём её.")
os.makedirs(output_directory)
# Итеративное чтение и запись
def file_iterator(file_path):
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
yield line
# Итератор для входного файла
file_iter = file_iterator(input_file)
file_index = 1
current_file = None
lines_written = 0
for line in file_iter:
if lines_written % lines_per_file == 0: # Новый файл
if current_file:
current_file.close()
logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")
output_file = os.path.join(output_directory, f"split_part_{file_index}.txt")
current_file = open(output_file, 'w', encoding='utf-8', errors='replace')
logger.info(f"Создан файл: {output_file}")
file_index += 1
lines_written = 0
current_file.write(line)
lines_written += 1
if current_file:
current_file.close()
logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")
except Exception as e:
logger.error(f"Ошибка в процессе разделения: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
"""
Основная функция для выбора входного файла, выходной директории и параметров разделения.
"""
try:
logger.info("Запуск программы")
# Запрос данных у пользователя
input_file = input("Введите путь к файлу для разделения: ").strip()
output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())
split_file(input_file, output_directory, lines_per_file)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
11. Рандомизация строк в файле (randomize.py). Тут выполняется перемешивание строк в файле который отдаете рандомным образом, на выходе получаете рандомизированные позиции строк в файле.
Код:
Python: Скопировать в буфер обмена
Code:
import os
import random
import logging
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("randomize.log", encoding="utf-8")
]
)
def randomize_file(input_file: str, output_file: str, buffer_size: int):
"""
Перемешивает строки в файле случайным образом.
:param input_file: Путь к входному файлу
:param output_file: Путь к выходному файлу
:param buffer_size: Количество строк, загружаемых в память за раз
"""
start_time = time.time()
logger.info(f"Начало рандомизации файла. Входной файл: {input_file}, Выходной файл: {output_file}, Размер буфера: {buffer_size}")
try:
# Проверка входного файла
if not os.path.isfile(input_file):
logger.error(f"Файл {input_file} не найден или не является файлом.")
return
# Проверка выхода
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
# Чтение строк по частям
all_lines = []
with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
logger.info(f"Чтение строк из {input_file} с использованием буфера")
buffer = []
for line in infile:
buffer.append(line)
if len(buffer) >= buffer_size:
logger.debug(f"Загружен буфер из {len(buffer)} строк")
all_lines.extend(buffer)
buffer = []
if buffer:
all_lines.extend(buffer)
logger.info(f"Файл загружен в память. Всего строк: {len(all_lines)}")
# Перемешивание строк
random.shuffle(all_lines)
logger.info(f"Строки успешно перемешаны")
# Запись результата
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
logger.info(f"Запись перемешанных строк в {output_file}")
outfile.writelines(all_lines)
logger.info(f"Успешное завершение. Перемешанный файл сохранён в {output_file}")
except Exception as e:
logger.error(f"Ошибка при обработке: {e}")
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
"""
Основная функция для выбора входного и выходного файлов, а также настроек рандомизации.
"""
try:
logger.info("Запуск программы")
# Запрос данных у пользователя
input_file = input("Введите путь к файлу для рандомизации: ").strip()
output_file = input("Введите путь для сохранения перемешанного файла: ").strip()
buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())
randomize_file(input_file, output_file, buffer_size)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except ValueError:
logger.error("Ошибка: размер буфера должен быть числом.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
12. Копирование строк из текстового файла, работает с использованием библиотеки pyperclip, так что не забудьте сделать
pip install pyperclip==1.9.0
, работает в двух режимах: 1. Копирование строк из файла полностью; 2. Копирование строк по диапазону из файла. Предупреждение: следите за объемом ОЗУ при работе этого модуля.Код:
Python: Скопировать в буфер обмена
Code:
import logging
import pyperclip # Для работы с буфером обмена
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("copy_to_clipboard.log", encoding="utf-8")
]
)
def read_file_lines(file_path, start_line=None, end_line=None):
"""
Генератор для чтения строк из файла с поддержкой диапазона.
:param file_path: Путь к файлу
:param start_line: Номер начальной строки (1-based, включительно)
:param end_line: Номер конечной строки (1-based, включительно)
"""
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
for current_line_num, line in enumerate(f, start=1):
if start_line and current_line_num < start_line:
continue
if end_line and current_line_num > end_line:
break
yield line.strip()
def copy_to_clipboard(file_path, start_line=None, end_line=None):
"""
Копирует содержимое файла или его части в буфер обмена.
:param file_path: Путь к файлу
:param start_line: Номер начальной строки (1-based, включительно)
:param end_line: Номер конечной строки (1-based, включительно)
"""
try:
logger.info(f"Начало копирования. Файл: {file_path}, Диапазон: {start_line}-{end_line}")
lines = list(read_file_lines(file_path, start_line, end_line))
if not lines:
logger.warning("Выбранный диапазон пуст.")
return
# Отображаем первую и последнюю строки диапазона
print(f"Первая строка: {lines[0]}")
print(f"Последняя строка: {lines[-1]}")
confirm = input("Скопировать в буфер обмена? (y/n): ").strip().lower()
if confirm != 'y':
print("Операция отменена пользователем.")
return
# Копируем в буфер обмена
pyperclip.copy('\n'.join(lines))
print("Данные скопированы в буфер обмена.")
logger.info("Копирование завершено успешно.")
except FileNotFoundError:
logger.error(f"Файл {file_path} не найден.")
except Exception as e:
logger.error(f"Ошибка в процессе копирования: {e}")
def main():
try:
logger.info("Запуск программы")
file_path = input("Введите путь к файлу: ").strip()
mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()
if mode == '1':
copy_to_clipboard(file_path)
elif mode == '2':
start_line = int(input("Введите номер начальной строки: ").strip())
end_line = int(input("Введите номер конечной строки: ").strip())
copy_to_clipboard(file_path, start_line, end_line)
else:
print("Неверный выбор режима.")
logger.warning("Выбран неверный режим.")
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
13. Чистка текстового файла от мусора. Работает в 3х режимах по выбору (1. Разрешить все спецсимволы, 2. Разрешить только специальные символы (@, ., _, -), 3. Не разрешать спецсимволы)
Python: Скопировать в буфер обмена
Code:
import os
import logging
import time
import string
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("remove_non_latin.log", encoding="utf-8")
]
)
def is_latin_line(line: str, special_mode: str) -> bool:
"""
Проверяет, состоит ли строка из латинских символов и спецсимволов в зависимости от режима.
:param line: Входная строка
:param special_mode: Режим обработки спецсимволов:
- 'all' - разрешить все спецсимволы.
- 'custom' - разрешить только определенные спецсимволы.
- 'none' - не разрешать спецсимволы.
:return: True, если строка соответствует требованиям.
"""
# Основной набор символов: латинские буквы, цифры и пробелы
allowed_chars = string.ascii_letters + string.digits + ' '
if special_mode == 'all':
# Разрешаем все печатаемые символы
allowed_chars += string.punctuation
elif special_mode == 'custom':
# Разрешаем только указанные символы
allowed_chars += "@._-"
# special_mode == 'none': никаких дополнительных символов
return all(char in allowed_chars for char in line)
def line_generator(file_path: str):
"""
Генератор для построчного чтения файла.
:param file_path: Путь к файлу
"""
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
for line in file:
yield line.strip()
def clean_file(input_file: str, output_file: str, special_mode: str):
"""
Очищает файл, удаляя строки с нелатинскими символами или нежелательными символами.
:param input_file: Путь к входному файлу
:param output_file: Имя выходного файла
:param special_mode: Режим обработки спецсимволов ('all', 'custom', 'none')
"""
start_time = time.time()
logger.info(f"Начало очистки файла. Входной файл: {input_file}, Выходной файл: {output_file}, "
f"Режим спецсимволов: {special_mode}")
try:
# Проверяем наличие директории для выходного файла
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
os.makedirs(output_dir)
total_lines = 0 # Общее количество строк
kept_lines = 0 # Количество оставленных строк
removed_lines = 0 # Количество удаленных строк
# Открываем файл для записи
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
for line in line_generator(input_file):
total_lines += 1
if is_latin_line(line, special_mode):
outfile.write(line + '\n')
kept_lines += 1
else:
removed_lines += 1
logger.info(f"Очистка завершена. Обработано строк: {total_lines}, Сохранено строк: {kept_lines}, "
f"Удалено строк: {removed_lines}")
except FileNotFoundError:
logger.error(f"Файл {input_file} не найден.")
return
except Exception as e:
logger.error(f"Ошибка в процессе очистки: {e}")
return
finally:
# Рассчитываем время выполнения
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def main():
try:
logger.info("Запуск программы")
# Запрос параметров у юзера
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
print("Выберите режим обработки спецсимволов:")
print("1 - Разрешить все спецсимволы")
print("2 - Разрешить только специальные символы (@, ., _, -)")
print("3 - Не разрешать спецсимволы")
special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()
special_mode = ''
if special_mode_choice == '1':
special_mode = 'all'
elif special_mode_choice == '2':
special_mode = 'custom'
elif special_mode_choice == '3':
special_mode = 'none'
else:
logger.error("Неверный выбор режима. Завершение программы.")
return
clean_file(input_file, output_file, special_mode)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
14. Изменение кодировки файла. Дисклеймер: Модуль может работать кривовато, на моих тестах вроде нормально преобразует, на ваших боевых файлах как работать будет не знаю. Юзать на свой страх и риск. По найденным ошибкам просьба отписаться в ПМ или в ветке. Поддерживает преобразование в следующие кодировки (исходная кодировка в теории у файла может быть любой):
- UTF-8
- UTF-16
- UTF-16LE
- UTF-16BE
- ASCII
- Windows-1251
- ISO-8859-1
- KOI8-R
- GB18030
- Big5
Python: Скопировать в буфер обмена
Code:
import os
import logging
import chardet
import time
# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("format_database.log", encoding="utf-8")
]
)
POPULAR_ENCODINGS = [
"UTF-8", "UTF-16", "UTF-16LE", "UTF-16BE", "ASCII",
"Windows-1251", "ISO-8859-1", "KOI8-R", "GB18030", "Big5"
]
def detect_file_encoding(file_path, sample_size=262144):
"""
Определяет кодировку текстового файла.
:param file_path: Путь к файлу
:param sample_size: Количество байт для анализа
:return: Предполагаемая кодировка файла
"""
try:
with open(file_path, 'rb') as file:
raw_data = file.read(sample_size)
result = chardet.detect(raw_data)
detected_encoding = result.get('encoding', None)
confidence = result.get('confidence', 0)
logger.warning(f"Результат анализа кодировки: {result}")
if detected_encoding is None or detected_encoding.lower() == 'ascii':
logger.warning(f"Кодировка файла '{file_path}' не определена или определена как ASCII.")
print("Укажите предполагаемую кодировку (например, UTF-8, Windows-1251): ")
detected_encoding = input("Введите кодировку: ").strip()
elif confidence < 0.8:
logger.warning(f"Кодировка '{detected_encoding}' определена с низкой уверенностью ({confidence:.2f}).")
print("Обнаружена кодировка с низкой уверенностью. Хотите указать свою? (y/n): ")
if input().strip().lower() == 'y':
detected_encoding = input("Введите кодировку: ").strip()
logger.info(f"Обнаружена кодировка файла '{file_path}': {detected_encoding}")
return detected_encoding
except Exception as e:
logger.error(f"Ошибка определения кодировки файла {file_path}: {e}")
return 'utf-8'
def add_bom_if_needed(file_path, encoding):
"""
Добавляет BOM (маркер порядка байт) в файл, если кодировка поддерживает его.
:param file_path: Путь к файлу
:param encoding: Кодировка файла
"""
bom_map = {
"utf-8": b'\xef\xbb\xbf',
"utf-16": b'\xff\xfe', # UTF-16 LE по умолчанию
"utf-16le": b'\xff\xfe',
"utf-16be": b'\xfe\xff'
}
if encoding.lower() in bom_map:
logger.info(f"Добавление BOM для кодировки {encoding}")
bom = bom_map[encoding.lower()]
with open(file_path, 'rb') as f:
content = f.read()
with open(file_path, 'wb') as f:
f.write(bom + content)
logger.info(f"BOM успешно добавлен для файла: {file_path}")
def convert_file_encoding(input_file, output_file, target_encoding):
"""
Преобразует кодировку текстового файла построчно и добавляет BOM, если это необходимо.
:param input_file: Путь к входному файлу
:param output_file: Путь к выходному файлу
:param target_encoding: Целевая кодировка
"""
start_time = time.time()
logger.info(f"Начало преобразования кодировки. Входной файл: {input_file}, Целевая кодировка: {target_encoding}")
try:
source_encoding = detect_file_encoding(input_file)
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(input_file, 'r', encoding=source_encoding, errors='replace') as infile, \
open(output_file, 'w', encoding=target_encoding, errors='replace') as outfile:
for line in infile:
outfile.write(line.rstrip('\r\n') + '\n')
# Добавление BOM, если требуется
add_bom_if_needed(output_file, target_encoding)
logger.info(f"Преобразование завершено. Файл сохранён: {output_file}")
# Проверка записи
verify_result = verify_encoding(output_file)
logger.info(f"Кодировка выходного файла определена как: {verify_result.get('encoding')}")
except FileNotFoundError:
logger.error(f"Файл {input_file} не найден.")
except Exception as e:
logger.error(f"Ошибка в процессе преобразования: {e}")
finally:
elapsed_time = time.time() - start_time
logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")
def verify_encoding(file_path):
"""
Проверяет кодировку файла.
:param file_path: Путь к файлу
:return: Результаты анализа кодировки
"""
with open(file_path, 'rb') as f:
raw_data = f.read()
result = chardet.detect(raw_data)
return result
def main():
"""
Основная функция для выполнения программы.
"""
try:
logger.info("Запуск программы")
input_file = input("Введите путь к входному файлу (с расширением): ").strip()
output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
print("Доступные кодировки:")
for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
print(f"{i}. {encoding}")
while True:
try:
choice = int(input("Выберите номер целевой кодировки: "))
if 1 <= choice <= len(POPULAR_ENCODINGS):
target_encoding = POPULAR_ENCODINGS[choice - 1]
break
else:
print("Некорректный выбор. Попробуйте снова.")
except ValueError:
print("Введите корректный номер из списка.")
convert_file_encoding(input_file, output_file, target_encoding)
except KeyboardInterrupt:
logger.warning("Операция прервана пользователем.")
except Exception as e:
logger.error(f"Ошибка в основной функции: {e}")
if __name__ == "__main__":
main()
15. Нормализатор строк в файлах. Работает в 4-х режимах:
- Форматирование строк (format) - выполняет форматирование строк в экспортируемый файл, указываете разделитель строки входящего файла и их формат к примеру "ip:mailass", указываете нужный формат "mailass:ip" на выходе волучаете отформатированные строки.
- Фильтрация строк по формату (filter) - указываете разделитель, и формат тех строк которые нужно вытащить из файла, например только mailass, на выходе получаете отфильтрованные строки и зфайла.
- Очистка строк от спецсимволов (clean) - указываете спецсимволы которые вы хотите почистить из строк, на выходе получаете файл со строками без спецсимволов указанных алгоритму.
- Обрезка строк по длине (trim) - указываете диапазон по какой длине отсеивать строки в экспортируемый файл, получаете файл со строками по указанному количеству символов диапазона.
P.S. Модули можно использовать как по отдельности утилитами, так и через основной файл из консольного меню.
P.S.S. Первую версию на публику прикрепил в атаче. По мере свободного времени буду обновлять и исправлять найденные ошибки.