From 6cead02c6a18cc8e3ad5a37703fc7ef09f44e5b0 Mon Sep 17 00:00:00 2001 From: Rasl Date: Mon, 24 Nov 2025 21:02:52 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=BD=D0=BE=20EDA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main_new.py | 360 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 248 insertions(+), 112 deletions(-) diff --git a/main_new.py b/main_new.py index 67944b4..04d5a5f 100644 --- a/main_new.py +++ b/main_new.py @@ -1,34 +1,135 @@ import clickhouse_connect import time -from scipy.sparse import lil_matrix, vstack +import os +import numpy as np +from scipy.sparse import lil_matrix, csr_matrix from sklearn.preprocessing import normalize -from sklearn.feature_extraction import FeatureHasher -from sklearn.decomposition import TruncatedSVD import hdbscan -import numpy as np -# ----------------- Замер времени ----------------- -start_time = time.time() -start_time_all = start_time -# ----------------- Подключение к ClickHouse ----------------- +from sklearn.metrics.pairwise import cosine_similarity +from collections import defaultdict +import gc +import matplotlib.pyplot as plt +from sklearn.decomposition import TruncatedSVD + + +# Начало замера времени всего скрипта +start_time_all = time.time() + +# Подключение к ClickHouse try: client = clickhouse_connect.get_client( host='localhost', port=8123, username='default', password='', - settings={ - 'max_memory_usage': 4_000_000_000, - 'max_execution_time': 3600, - } + settings={'max_memory_usage': 4_000_000_000} ) print("Подключение к ClickHouse установлено") except Exception as e: print(f"Ошибка подключения: {e}") exit() -interval = 5 # интервал времени в секундах +# Интервал +interval = 600 # 10 минут + + +# ---------------------- EDA -------------------------- +# Создаем папку куда будем складывать графики +os.makedirs("eda_plots", exist_ok=True) -# ----------------- Получаем активные IP ----------------- +# События по минутам +events_query = """ +SELECT + toStartOfMinute(timestamp) AS ts, + count() AS events +FROM DIPLOM_NEW +GROUP BY ts +ORDER BY ts +""" +events_result = client.query(events_query) +events_ts = [row[0] for row in events_result.result_rows] +events_cnt = [row[1] for row in events_result.result_rows] + +# Уникальные IP по минутам +unique_query = """ +SELECT + toStartOfMinute(timestamp) AS ts, + uniqExact(ip) AS uniq_ips +FROM DIPLOM_NEW +GROUP BY ts +ORDER BY ts +""" +unique_result = client.query(unique_query) +unique_ts = [row[0] for row in unique_result.result_rows] +unique_cnt = [row[1] for row in unique_result.result_rows] + +# EDA статистика +print("\n--- EDA статистика ---") +events_np = np.array(events_cnt) +unique_np = np.array(unique_cnt) + +mean_events = events_np.mean() +std_events = events_np.std() +mean_unique = unique_np.mean() +std_unique = unique_np.std() + +print(f"Общее количество событий: {sum(events_cnt):,}") +print(f"Среднее событий в минуту: {mean_events:.1f} ± {std_events:.1f}") +print(f"Максимальная активность: {max(events_cnt)} событий/минуту") +print(f"Среднее уникальных IP/минуту: {mean_unique:.1f} ± {std_unique:.1f}") + +# ЛОГИКА ОПРЕДЕЛЕНИЯ АТАК --- +# Атаки определяются по правилу 3-х сигм +# Значения > среднее + 3σ считаются статистически значимыми аномалиями(предпологаем что ботнет) + +#Анализируем данные +attack_threshold_3sigma = mean_events + 3 * std_events # 99.7% доверительный интервал +attack_ts_3sigma = [(ts, cnt) for ts, cnt in zip(events_ts, events_cnt) if cnt > attack_threshold_3sigma] +attack_ts = [ts for ts, cnt in attack_ts_3sigma] +attack_counts = [cnt for ts, cnt in attack_ts_3sigma] + +# График событий с пометкой атак +plt.figure(figsize=(16, 8)) + +# Основной график событий +plt.subplot(2, 1, 1) +plt.plot(events_ts, events_cnt, 'b-', alpha=0.7, label='События') +plt.axhline(y=mean_events, color='green', linestyle='--', alpha=0.7, label=f'Среднее ({mean_events:.1f})') +plt.axhline(y=attack_threshold_3sigma, color='red', linestyle='--',label=f'Порог атаки 3σ ({attack_threshold_3sigma:.1f})') + +# Помечаем атаки красными точками +attack_indices = [i for i, cnt in enumerate(events_cnt) if cnt > attack_threshold_3sigma] +attack_x = [events_ts[i] for i in attack_indices] +attack_y = [events_cnt[i] for i in attack_indices] +plt.scatter(attack_x, attack_y, color='red', s=30, zorder=5, label='Обнаруженные атаки') + +plt.title("Количество событий по минутам с пометкой атак") +plt.ylabel("Событий в минуту") +plt.legend() +plt.grid(True, alpha=0.3) + +# --- График уникальных IP --- +plt.subplot(2, 1, 2) +plt.plot(unique_ts, unique_cnt, color="orange", label='Уникальные IP') +plt.axhline(y=mean_unique, color='green', linestyle='--', alpha=0.7, label=f'Среднее ({mean_unique:.1f})') + +# Помечаем те же интервалы атак +attack_unique_y = [unique_cnt[i] for i in attack_indices] +plt.scatter(attack_x, attack_unique_y, color='red', s=30, zorder=5, label='Интервалы атак') + +plt.title("Количество уникальных IP по минутам") +plt.ylabel("Уникальных IP") +plt.xlabel("Время") +plt.legend() +plt.grid(True, alpha=0.3) + +plt.tight_layout() +plt.savefig("eda_plots/events_with_attacks.png", dpi=150) +plt.close() +print("EDA завершено. Графики сохранены в eda_plots/events_with_attacks.png\n") + + +# Получение активных IP active_ips_query = """ SELECT ip FROM DIPLOM_NEW @@ -37,26 +138,31 @@ HAVING COUNT() >= 25 """ active_ips_result = client.query(active_ips_query) data_IP = [row[0] for row in active_ips_result.result_rows] + +print(f"Активных IP (≥25 событий): {len(data_IP)}") + ip_to_idx = {ip: idx for idx, ip in enumerate(data_IP)} -print(f"Активных IP: {len(data_IP)}") -# ----------------- Временной диапазон ----------------- + +# Временной диапазон time_range_query = "SELECT MIN(timestamp), MAX(timestamp) FROM DIPLOM_NEW" -time_result = client.query(time_range_query) -min_time, max_time = time_result.result_rows[0] +min_time, max_time = client.query(time_range_query).result_rows[0] + lenn = int((max_time.timestamp() - min_time.timestamp()) // interval) + 1 -print(f"Интервалов: {lenn}") +print(f"Временных интервалов по {interval} сек: {lenn}") + + +# Формируем разреженную матрицу +print("Строим разреженную матрицу активности...") -# ----------------- Создание матрицы активности ----------------- -print("Формируем разреженную матрицу активности...") activity_matrix = lil_matrix((len(data_IP), lenn), dtype=np.int8) -batch_size = 10000 # для обработки частями, чтобы не былоо ошибок по памяти +batch_size = 10000 client.command("DROP TEMPORARY TABLE IF EXISTS active_ips") client.command("CREATE TEMPORARY TABLE active_ips (ip String)") for i in range(0, len(data_IP), batch_size): - batch = data_IP[i:i+batch_size] + batch = data_IP[i:i + batch_size] values = ", ".join([f"('{ip}')" for ip in batch]) client.command(f"INSERT INTO active_ips VALUES {values}") @@ -66,117 +172,147 @@ FROM DIPLOM_NEW t INNER JOIN active_ips a ON t.ip = a.ip ORDER BY t.timestamp """ -data_result = client.query(data_query) -data = data_result.result_rows -for ip, ts in data: +for ip, ts in client.query(data_query).result_rows: row = ip_to_idx[ip] col = int((ts.timestamp() - min_time.timestamp()) // interval) activity_matrix[row, col] = 1 activity_matrix = activity_matrix.tocsr() -print(f"Время подготовки данных: {time.time() - start_time:.2f} сек") +print("Матрица построена.\n") + +# ------------- Нормализация и снижение размерности ------------- +activity_matrix = normalize(activity_matrix, norm="l2") +reducer = TruncatedSVD( + n_components=50, + random_state=42, + algorithm='randomized' +) -# ----------------- Хеширование ----------------- -use_fraction = 20 # на сколько мы делим число интервалов -n_features = max(1, lenn // use_fraction) +X_reduced = reducer.fit_transform(activity_matrix) -hasher = FeatureHasher(n_features=n_features, input_type='pair') -batch_size = 1000 -batches = len(data_IP) // batch_size + 1 -X_hashed_parts = [] -for b in range(batches): - start = b * batch_size - end = min((b + 1) * batch_size, len(data_IP)) - batch_rows = [] - for i in range(start, end): - row = activity_matrix[i] - if row.nnz == 0: - batch_rows.append(hasher.transform([[]])) - else: - features = [(str(idx), 1) for idx in row.indices] - batch_rows.append(hasher.transform([features])) +# ----------------- КЛАСТЕРИЗАЦИЯ --------------------- +print("Запускаем HDBSCAN...") +clusterer = hdbscan.HDBSCAN( + min_cluster_size=5, + min_samples=3, + metric="euclidean", + cluster_selection_method="eom" +) - batch_matrix = vstack(batch_rows) - X_hashed_parts.append(batch_matrix) - del batch_rows +labels = clusterer.fit_predict(X_reduced) +print("Кластеризация завершена.\n") -# Объединяем части матрицы -X_hashed = vstack(X_hashed_parts) -del X_hashed_parts +# ---------АНАЛИЗ КЛАСТЕРОВ------------------------- +print("\nАнализируем кластеры...") +#Получим кластеры без шума +clusters = defaultdict(list) +for ip, label in zip(data_IP, labels): + if label != -1: + clusters[label].append(ip) + +# Преобразуем EDA интервалы атак в индексы колонок +attack_intervals = [ + int((ts.timestamp() - min_time.timestamp()) // interval) + for ts in attack_ts +] + +# ПОИСК ПОДОЗРИТЕЛЬНЫХ КЛАСТЕРОВ +cluster_stats = [] + +for cid, ips in clusters.items(): + indices = [ip_to_idx[ip] for ip in ips] + + total_activity = 0 + attack_activity = 0 + + # Считаем активность + for idx in indices: + row = activity_matrix[idx] + total_activity += row.sum() + + # активность в интервалы EDA-атак + for ai in attack_intervals: + if ai < row.shape[1]: + attack_activity += row[0, ai] + + attack_percent = (attack_activity / total_activity * 100) if total_activity else 0 + + # Считаем подозрительных критериев + danger_score = 0 + if len(ips) >= 8: # крупный кластер + danger_score += 1 + if attack_percent >= 10: # участвует в EDA-аномалиях + danger_score += 1 + if total_activity / len(ips) > 5: # активность на один IP высокая + danger_score += 1 + + cluster_stats.append({ + 'cluster_id': cid, + 'ips': ips, + 'ip_indices': indices, + 'ip_count': len(ips), + 'attack_percent': attack_percent, + 'total_activity': int(total_activity), + 'danger_score': danger_score + }) + +# Сортируем по подозрительности (без учета similarity!) +cluster_stats.sort(key=lambda x: x['danger_score'], reverse=True) + +# Смотрим схожесть внутри кластеров +def calc_cluster_similarity(ip_indices, max_size=500): + if len(ip_indices) > max_size: + sample = np.random.choice(ip_indices, size=max_size, replace=False) + else: + sample = ip_indices -# Нормализация -X_hashed = normalize(X_hashed, norm='l2', axis=1) + sims = cosine_similarity(activity_matrix[sample]) + return float(sims.mean()) -# Снижение размерности -svd_components = 100 # колличество компонент -svd = TruncatedSVD(n_components=svd_components) -X_reduced = svd.fit_transform(X_hashed) +#Добавляем информацию о схожести +for c in cluster_stats: + c['similarity'] = calc_cluster_similarity(c['ip_indices']) -# ----------------- Кластеризация ----------------- -print("Запускаем HDBSCAN...") -clusterer = hdbscan.HDBSCAN( - min_cluster_size=5, - min_samples=3, - metric='euclidean', - cluster_selection_epsilon=0.05, - cluster_selection_method='eom' -) -labels = clusterer.fit_predict(X_reduced) -# ----------------- Подсчёт внутрикластерной схожести ----------------- -from sklearn.metrics.pairwise import cosine_similarity -from collections import defaultdict -import gc +# ----ФИНАЛЬНОЕ РЕШЕНИЕ: БОТНЕТ / НЕ БОТНЕТ----------------+ -print("Вычисляем внутрикластерную схожесть (cosine similarity)...") +def final_botnet_decision(c): + """ + Логика такая: + - danger_score — первичный индикатор (до similarity) + - similarity — контрольная проверка координации + """ + if c['danger_score'] < 2: # слишком низкая подозрительность + return False + if c['similarity'] < 0.6: # нет координации — не ботнет + return False + return True # ботнет подтверждён -cluster_similarities = {} -clusters = defaultdict(list) -for ip, label in zip(data_IP, labels): - clusters[label].append(ip) +botnets = [c for c in cluster_stats if final_botnet_decision(c)] +normals = [c for c in cluster_stats if not final_botnet_decision(c)] -for cluster_id in set(labels): - if cluster_id == -1: - continue +print(f"\nИтого ботнет-кластеров: {len(botnets)}") +print(f"Всего IP в ботнетах: {sum(c['ip_count'] for c in botnets)}") - indices = [i for i, l in enumerate(labels) if l == cluster_id] - n = len(indices) - # Ограничиваем слишком большие кластеры, чтобы не было проблем по памяти - if n > 500: - sample_indices = np.random.choice(indices, size=500, replace=False) - else: - sample_indices = indices - - submatrix = activity_matrix[sample_indices] - - sims = cosine_similarity(submatrix, dense_output=False) - mean_sim = sims.mean() - cluster_similarities[cluster_id] = float(mean_sim) - gc.collect() - -# ----------------- Сохранение результатов ----------------- -output_file = "botnet_clusters.txt" - -with open(output_file, "w") as f: - for cluster_id, ips in clusters.items(): - if cluster_id == -1: - continue - sim = cluster_similarities.get(cluster_id, 0) - f.write(f"=== Кластер {cluster_id} (IP: {len(ips)}, средняя схожесть={sim:.4f}) ===\n") - for ip in ips: - f.write(ip + "\n") - f.write("\n") - -# ----------------- Финальная статистика ----------------- -n_clusters = len([c for c in clusters if c != -1]) -print(f"Найдено кластеров: {n_clusters}") -print(f"Результаты сохранены в: {output_file}") -print(f"Время обработки всего скрипта: {time.time()-start_time_all:.2f}") +# -----СОХРАНЯЕМ ТОЛЬКО БОТНЕТЫ------------------------------ + +with open("botnet_clusters.txt", "w", encoding='utf-8') as f: + for i, c in enumerate(botnets, 1): + f.write(f"\n--- БОТНЕТ #{i} ---\n") + f.write(f"Количество IP: {c['ip_count']}\n") + f.write(f"Участие в атаках(EDA): {c['attack_percent']:.1f}%\n") + f.write(f"Схожесть: {c['similarity']:.3f}\n\n") + f.write("Список IP:\n") + for ip in c['ips']: + f.write(f" {ip}\n") + +print("\nРезультаты записаны в botnet_clusters.txt") +print(f"Время выполнения: {time.вtime() - start_time_all:.2f} сек") \ No newline at end of file