From 103521b83c8e5d1d7226b51e0dbaa59adfeed601 Mon Sep 17 00:00:00 2001 From: Rasl Date: Sun, 9 Nov 2025 15:06:27 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B0=20=D1=81=20=D0=BA=D0=BB=D0=B0=D1=81=D1=82=D0=B5?= =?UTF-8?q?=D1=80=D0=B8=D0=B7=D0=B0=D1=86=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main_new.py | 148 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 97 insertions(+), 51 deletions(-) diff --git a/main_new.py b/main_new.py index ff62a3c..67944b4 100644 --- a/main_new.py +++ b/main_new.py @@ -1,11 +1,15 @@ import clickhouse_connect import time -from scipy.sparse import coo_matrix - -# Замер времени +from scipy.sparse import lil_matrix, vstack +from sklearn.preprocessing import normalize +from sklearn.feature_extraction import FeatureHasher +from sklearn.decomposition import TruncatedSVD +import hdbscan +import numpy as np +# ----------------- Замер времени ----------------- start_time = time.time() - -# Подключение к ClickHouse +start_time_all = start_time +# ----------------- Подключение к ClickHouse ----------------- try: client = clickhouse_connect.get_client( host='localhost', @@ -24,7 +28,7 @@ except Exception as e: interval = 5 # интервал времени в секундах -# Получаем активные IP +# ----------------- Получаем активные IP ----------------- active_ips_query = """ SELECT ip FROM DIPLOM_NEW @@ -33,35 +37,29 @@ HAVING COUNT() >= 25 """ active_ips_result = client.query(active_ips_query) data_IP = [row[0] for row in active_ips_result.result_rows] -#Пронумеруем IP ip_to_idx = {ip: idx for idx, ip in enumerate(data_IP)} - print(f"Активных IP: {len(data_IP)}") -# Временной диапазон. Для его оценки возьмем из БД максимальное и минимальное время +# ----------------- Временной диапазон ----------------- time_range_query = "SELECT MIN(timestamp), MAX(timestamp) FROM DIPLOM_NEW" time_result = client.query(time_range_query) min_time, max_time = time_result.result_rows[0] lenn = int((max_time.timestamp() - min_time.timestamp()) // interval) + 1 - print(f"Интервалов: {lenn}") -# Будем решать задачу через csr_matrix -rows = [] -cols = [] +# ----------------- Создание матрицы активности ----------------- +print("Формируем разреженную матрицу активности...") +activity_matrix = lil_matrix((len(data_IP), lenn), dtype=np.int8) -# Временная таблица для join +batch_size = 10000 # для обработки частями, чтобы не былоо ошибок по памяти client.command("DROP TEMPORARY TABLE IF EXISTS active_ips") client.command("CREATE TEMPORARY TABLE active_ips (ip String)") -# Загружаем IP частями -batch_size = 10000 for i in range(0, len(data_IP), batch_size): batch = data_IP[i:i+batch_size] values = ", ".join([f"('{ip}')" for ip in batch]) client.command(f"INSERT INTO active_ips VALUES {values}") -#Получаем данные data_query = """ SELECT t.ip, t.timestamp FROM DIPLOM_NEW t @@ -71,66 +69,114 @@ ORDER BY t.timestamp data_result = client.query(data_query) data = data_result.result_rows -# Заполняем матрицу for ip, ts in data: - rows.append(ip_to_idx[ip]) - cols.append(int((ts.timestamp() - min_time.timestamp()) // interval)) + row = ip_to_idx[ip] + col = int((ts.timestamp() - min_time.timestamp()) // interval) + activity_matrix[row, col] = 1 -# Преобразуем в разреженную матрицу -values = [1] * len(rows) -activity_matrix = coo_matrix((values, (rows, cols)), shape=(len(data_IP), lenn)).tocsr() - -print(f"Обработано IP: {len(data_IP)}") +activity_matrix = activity_matrix.tocsr() print(f"Время подготовки данных: {time.time() - start_time:.2f} сек") -start_time = time.time() +# ----------------- Хеширование ----------------- +use_fraction = 20 # на сколько мы делим число интервалов +n_features = max(1, lenn // use_fraction) -from sklearn.preprocessing import normalize -from sklearn.decomposition import TruncatedSVD -import hdbscan -from collections import defaultdict +hasher = FeatureHasher(n_features=n_features, input_type='pair') +batch_size = 1000 +batches = len(data_IP) // batch_size + 1 +X_hashed_parts = [] + +for b in range(batches): + start = b * batch_size + end = min((b + 1) * batch_size, len(data_IP)) + batch_rows = [] + + for i in range(start, end): + row = activity_matrix[i] + if row.nnz == 0: + batch_rows.append(hasher.transform([[]])) + else: + features = [(str(idx), 1) for idx in row.indices] + batch_rows.append(hasher.transform([features])) + + batch_matrix = vstack(batch_rows) + X_hashed_parts.append(batch_matrix) + del batch_rows -#Нормализация -activity_matrix = normalize(activity_matrix, norm='l2', axis=1, copy=True) +# Объединяем части матрицы +X_hashed = vstack(X_hashed_parts) +del X_hashed_parts -#Снижение размерности SVD -# Оставляем только главные компоненты — ускоряет кластеризацию и отсекает шум -n_components = 100 #Число можно варировать -svd = TruncatedSVD(n_components=n_components, random_state=42) -X_reduced = svd.fit_transform(activity_matrix) +# Нормализация +X_hashed = normalize(X_hashed, norm='l2', axis=1) -#Кластеризация HDBSCAN -# min_cluster_size — минимальный размер группы (ботнета) -# min_samples — чувствительность к шуму + +# Снижение размерности +svd_components = 100 # колличество компонент +svd = TruncatedSVD(n_components=svd_components) +X_reduced = svd.fit_transform(X_hashed) + +# ----------------- Кластеризация ----------------- +print("Запускаем HDBSCAN...") clusterer = hdbscan.HDBSCAN( - min_cluster_size=5, #минимальный размер группы (ботнета) - min_samples=3, #чувствительность к шуму + min_cluster_size=5, + min_samples=3, metric='euclidean', cluster_selection_epsilon=0.05, cluster_selection_method='eom' ) labels = clusterer.fit_predict(X_reduced) -#Сохранение групп IP в txt файл +# ----------------- Подсчёт внутрикластерной схожести ----------------- +from sklearn.metrics.pairwise import cosine_similarity +from collections import defaultdict +import gc + +print("Вычисляем внутрикластерную схожесть (cosine similarity)...") + +cluster_similarities = {} clusters = defaultdict(list) + for ip, label in zip(data_IP, labels): clusters[label].append(ip) -#Сохраняем только кластеры (без шума) -with open("botnet_clusters.txt", "w") as f: +for cluster_id in set(labels): + if cluster_id == -1: + continue + + indices = [i for i, l in enumerate(labels) if l == cluster_id] + n = len(indices) + + # Ограничиваем слишком большие кластеры, чтобы не было проблем по памяти + if n > 500: + sample_indices = np.random.choice(indices, size=500, replace=False) + else: + sample_indices = indices + + submatrix = activity_matrix[sample_indices] + + sims = cosine_similarity(submatrix, dense_output=False) + mean_sim = sims.mean() + cluster_similarities[cluster_id] = float(mean_sim) + gc.collect() + +# ----------------- Сохранение результатов ----------------- +output_file = "botnet_clusters.txt" + +with open(output_file, "w") as f: for cluster_id, ips in clusters.items(): if cluster_id == -1: - continue # пропускаем шум - f.write(f"=== Кластер {cluster_id} (IP: {len(ipпше s)}) ===\n") + continue + sim = cluster_similarities.get(cluster_id, 0) + f.write(f"=== Кластер {cluster_id} (IP: {len(ips)}, средняя схожесть={sim:.4f}) ===\n") for ip in ips: f.write(ip + "\n") f.write("\n") - -#Вывод статистики +# ----------------- Финальная статистика ----------------- n_clusters = len([c for c in clusters if c != -1]) -print(f"Время выполнения кластеризации: {time.time() - start_time:.2f} сек") print(f"Найдено кластеров: {n_clusters}") -print(f"Результаты сохранены в: botnet_clusters.txt") \ No newline at end of file +print(f"Результаты сохранены в: {output_file}") +print(f"Время обработки всего скрипта: {time.time()-start_time_all:.2f}")