import clickhouse_connect import time import os import numpy as np from scipy.sparse import lil_matrix, csr_matrix from sklearn.preprocessing import normalize import hdbscan from sklearn.metrics.pairwise import cosine_similarity from collections import defaultdict import gc import matplotlib.pyplot as plt from sklearn.decomposition import TruncatedSVD # Начало замера времени всего скрипта start_time_all = time.time() # Подключение к ClickHouse try: client = clickhouse_connect.get_client( host='localhost', port=8123, username='default', password='', settings={'max_memory_usage': 4_000_000_000} ) print("Подключение к ClickHouse установлено") except Exception as e: print(f"Ошибка подключения: {e}") exit() # Интервал interval = 600 # 10 минут # ---------------------- EDA -------------------------- # Создаем папку куда будем складывать графики os.makedirs("eda_plots", exist_ok=True) # События по минутам events_query = """ SELECT toStartOfMinute(timestamp) AS ts, count() AS events FROM DIPLOM_NEW GROUP BY ts ORDER BY ts """ events_result = client.query(events_query) events_ts = [row[0] for row in events_result.result_rows] events_cnt = [row[1] for row in events_result.result_rows] # Уникальные IP по минутам unique_query = """ SELECT toStartOfMinute(timestamp) AS ts, uniqExact(ip) AS uniq_ips FROM DIPLOM_NEW GROUP BY ts ORDER BY ts """ unique_result = client.query(unique_query) unique_ts = [row[0] for row in unique_result.result_rows] unique_cnt = [row[1] for row in unique_result.result_rows] # EDA статистика print("\n--- EDA статистика ---") events_np = np.array(events_cnt) unique_np = np.array(unique_cnt) mean_events = events_np.mean() std_events = events_np.std() mean_unique = unique_np.mean() std_unique = unique_np.std() print(f"Общее количество событий: {sum(events_cnt):,}") print(f"Среднее событий в минуту: {mean_events:.1f} ± {std_events:.1f}") print(f"Максимальная активность: {max(events_cnt)} событий/минуту") print(f"Среднее уникальных IP/минуту: {mean_unique:.1f} ± {std_unique:.1f}") # ЛОГИКА ОПРЕДЕЛЕНИЯ АТАК --- # Атаки определяются по правилу 3-х сигм # Значения > среднее + 3σ считаются статистически значимыми аномалиями(предпологаем что ботнет) #Анализируем данные attack_threshold_3sigma = mean_events + 3 * std_events # 99.7% доверительный интервал attack_ts_3sigma = [(ts, cnt) for ts, cnt in zip(events_ts, events_cnt) if cnt > attack_threshold_3sigma] attack_ts = [ts for ts, cnt in attack_ts_3sigma] attack_counts = [cnt for ts, cnt in attack_ts_3sigma] # График событий с пометкой атак plt.figure(figsize=(16, 8)) # Основной график событий plt.subplot(2, 1, 1) plt.plot(events_ts, events_cnt, 'b-', alpha=0.7, label='События') plt.axhline(y=mean_events, color='green', linestyle='--', alpha=0.7, label=f'Среднее ({mean_events:.1f})') plt.axhline(y=attack_threshold_3sigma, color='red', linestyle='--',label=f'Порог атаки 3σ ({attack_threshold_3sigma:.1f})') # Помечаем атаки красными точками attack_indices = [i for i, cnt in enumerate(events_cnt) if cnt > attack_threshold_3sigma] attack_x = [events_ts[i] for i in attack_indices] attack_y = [events_cnt[i] for i in attack_indices] plt.scatter(attack_x, attack_y, color='red', s=30, zorder=5, label='Обнаруженные атаки') plt.title("Количество событий по минутам с пометкой атак") plt.ylabel("Событий в минуту") plt.legend() plt.grid(True, alpha=0.3) # --- График уникальных IP --- plt.subplot(2, 1, 2) plt.plot(unique_ts, unique_cnt, color="orange", label='Уникальные IP') plt.axhline(y=mean_unique, color='green', linestyle='--', alpha=0.7, label=f'Среднее ({mean_unique:.1f})') # Помечаем те же интервалы атак attack_unique_y = [unique_cnt[i] for i in attack_indices] plt.scatter(attack_x, attack_unique_y, color='red', s=30, zorder=5, label='Интервалы атак') plt.title("Количество уникальных IP по минутам") plt.ylabel("Уникальных IP") plt.xlabel("Время") plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig("eda_plots/events_with_attacks.png", dpi=150) plt.close() print("EDA завершено. Графики сохранены в eda_plots/events_with_attacks.png\n") # Получение активных IP active_ips_query = """ SELECT ip FROM DIPLOM_NEW GROUP BY ip HAVING COUNT() >= 25 """ active_ips_result = client.query(active_ips_query) data_IP = [row[0] for row in active_ips_result.result_rows] print(f"Активных IP (≥25 событий): {len(data_IP)}") ip_to_idx = {ip: idx for idx, ip in enumerate(data_IP)} # Временной диапазон time_range_query = "SELECT MIN(timestamp), MAX(timestamp) FROM DIPLOM_NEW" min_time, max_time = client.query(time_range_query).result_rows[0] lenn = int((max_time.timestamp() - min_time.timestamp()) // interval) + 1 print(f"Временных интервалов по {interval} сек: {lenn}") # Формируем разреженную матрицу print("Строим разреженную матрицу активности...") activity_matrix = lil_matrix((len(data_IP), lenn), dtype=np.int8) batch_size = 10000 client.command("DROP TEMPORARY TABLE IF EXISTS active_ips") client.command("CREATE TEMPORARY TABLE active_ips (ip String)") for i in range(0, len(data_IP), batch_size): batch = data_IP[i:i + batch_size] values = ", ".join([f"('{ip}')" for ip in batch]) client.command(f"INSERT INTO active_ips VALUES {values}") data_query = """ SELECT t.ip, t.timestamp FROM DIPLOM_NEW t INNER JOIN active_ips a ON t.ip = a.ip ORDER BY t.timestamp """ for ip, ts in client.query(data_query).result_rows: row = ip_to_idx[ip] col = int((ts.timestamp() - min_time.timestamp()) // interval) activity_matrix[row, col] = 1 activity_matrix = activity_matrix.tocsr() print("Матрица построена.\n") # ------------- Нормализация и снижение размерности ------------- activity_matrix = normalize(activity_matrix, norm="l2") reducer = TruncatedSVD( n_components=50, random_state=42, algorithm='randomized' ) X_reduced = reducer.fit_transform(activity_matrix) # ----------------- КЛАСТЕРИЗАЦИЯ --------------------- print("Запускаем HDBSCAN...") clusterer = hdbscan.HDBSCAN( min_cluster_size=5, min_samples=3, metric="euclidean", cluster_selection_method="eom" ) labels = clusterer.fit_predict(X_reduced) print("Кластеризация завершена.\n") # ---------АНАЛИЗ КЛАСТЕРОВ------------------------- print("\nАнализируем кластеры...") #Получим кластеры без шума clusters = defaultdict(list) for ip, label in zip(data_IP, labels): if label != -1: clusters[label].append(ip) # Преобразуем EDA интервалы атак в индексы колонок attack_intervals = [ int((ts.timestamp() - min_time.timestamp()) // interval) for ts in attack_ts ] # ПОИСК ПОДОЗРИТЕЛЬНЫХ КЛАСТЕРОВ cluster_stats = [] for cid, ips in clusters.items(): indices = [ip_to_idx[ip] for ip in ips] total_activity = 0 attack_activity = 0 # Считаем активность for idx in indices: row = activity_matrix[idx] total_activity += row.sum() # активность в интервалы EDA-атак for ai in attack_intervals: if ai < row.shape[1]: attack_activity += row[0, ai] attack_percent = (attack_activity / total_activity * 100) if total_activity else 0 # Считаем подозрительных критериев danger_score = 0 if len(ips) >= 8: # крупный кластер danger_score += 1 if attack_percent >= 10: # участвует в EDA-аномалиях danger_score += 1 if total_activity / len(ips) > 5: # активность на один IP высокая danger_score += 1 cluster_stats.append({ 'cluster_id': cid, 'ips': ips, 'ip_indices': indices, 'ip_count': len(ips), 'attack_percent': attack_percent, 'total_activity': int(total_activity), 'danger_score': danger_score }) # Сортируем по подозрительности (без учета similarity!) cluster_stats.sort(key=lambda x: x['danger_score'], reverse=True) # Смотрим схожесть внутри кластеров def calc_cluster_similarity(ip_indices, max_size=500): if len(ip_indices) > max_size: sample = np.random.choice(ip_indices, size=max_size, replace=False) else: sample = ip_indices sims = cosine_similarity(activity_matrix[sample]) return float(sims.mean()) #Добавляем информацию о схожести for c in cluster_stats: c['similarity'] = calc_cluster_similarity(c['ip_indices']) # ----ФИНАЛЬНОЕ РЕШЕНИЕ: БОТНЕТ / НЕ БОТНЕТ----------------+ def final_botnet_decision(c): """ Логика такая: - danger_score — первичный индикатор (до similarity) - similarity — контрольная проверка координации """ if c['danger_score'] < 2: # слишком низкая подозрительность return False if c['similarity'] < 0.6: # нет координации — не ботнет return False return True # ботнет подтверждён botnets = [c for c in cluster_stats if final_botnet_decision(c)] normals = [c for c in cluster_stats if not final_botnet_decision(c)] print(f"\nИтого ботнет-кластеров: {len(botnets)}") print(f"Всего IP в ботнетах: {sum(c['ip_count'] for c in botnets)}") # -----СОХРАНЯЕМ ТОЛЬКО БОТНЕТЫ------------------------------ with open("botnet_clusters.txt", "w", encoding='utf-8') as f: for i, c in enumerate(botnets, 1): f.write(f"\n--- БОТНЕТ #{i} ---\n") f.write(f"Количество IP: {c['ip_count']}\n") f.write(f"Участие в атаках(EDA): {c['attack_percent']:.1f}%\n") f.write(f"Схожесть: {c['similarity']:.3f}\n\n") f.write("Список IP:\n") for ip in c['ips']: f.write(f" {ip}\n") print("\nРезультаты записаны в botnet_clusters.txt") print(f"Время выполнения: {time.вtime() - start_time_all:.2f} сек")