import clickhouse_connect import time from scipy.sparse import coo_matrix # Замер времени start_time = time.time() # Подключение к ClickHouse try: client = clickhouse_connect.get_client( host='localhost', port=8123, username='default', password='', settings={ 'max_memory_usage': 4_000_000_000, 'max_execution_time': 3600, } ) print("Подключение к ClickHouse установлено") except Exception as e: print(f"Ошибка подключения: {e}") exit() interval = 5 # интервал времени в секундах # Получаем активные IP active_ips_query = """ SELECT ip FROM DIPLOM_NEW GROUP BY ip HAVING COUNT() >= 25 """ active_ips_result = client.query(active_ips_query) data_IP = [row[0] for row in active_ips_result.result_rows] #Пронумеруем IP ip_to_idx = {ip: idx for idx, ip in enumerate(data_IP)} print(f"Активных IP: {len(data_IP)}") # Временной диапазон. Для его оценки возьмем из БД максимальное и минимальное время time_range_query = "SELECT MIN(timestamp), MAX(timestamp) FROM DIPLOM_NEW" time_result = client.query(time_range_query) min_time, max_time = time_result.result_rows[0] lenn = int((max_time.timestamp() - min_time.timestamp()) // interval) + 1 print(f"Интервалов: {lenn}") # Будем решать задачу через csr_matrix rows = [] cols = [] # Временная таблица для join client.command("DROP TEMPORARY TABLE IF EXISTS active_ips") client.command("CREATE TEMPORARY TABLE active_ips (ip String)") # Загружаем IP частями batch_size = 10000 for i in range(0, len(data_IP), batch_size): batch = data_IP[i:i+batch_size] values = ", ".join([f"('{ip}')" for ip in batch]) client.command(f"INSERT INTO active_ips VALUES {values}") #Получаем данные data_query = """ SELECT t.ip, t.timestamp FROM DIPLOM_NEW t INNER JOIN active_ips a ON t.ip = a.ip ORDER BY t.timestamp """ data_result = client.query(data_query) data = data_result.result_rows # Заполняем матрицу for ip, ts in data: rows.append(ip_to_idx[ip]) cols.append(int((ts.timestamp() - min_time.timestamp()) // interval)) # Преобразуем в разреженную матрицу values = [1] * len(rows) activity_matrix = coo_matrix((values, (rows, cols)), shape=(len(data_IP), lenn)).tocsr() print(f"Обработано IP: {len(data_IP)}") print(f"Время подготовки данных: {time.time() - start_time:.2f} сек") start_time = time.time() from sklearn.preprocessing import normalize from sklearn.decomposition import TruncatedSVD import hdbscan from collections import defaultdict #Нормализация activity_matrix = normalize(activity_matrix, norm='l2', axis=1, copy=True) #Снижение размерности SVD # Оставляем только главные компоненты — ускоряет кластеризацию и отсекает шум n_components = 100 #Число можно варировать svd = TruncatedSVD(n_components=n_components, random_state=42) X_reduced = svd.fit_transform(activity_matrix) #Кластеризация HDBSCAN # min_cluster_size — минимальный размер группы (ботнета) # min_samples — чувствительность к шуму clusterer = hdbscan.HDBSCAN( min_cluster_size=5, #минимальный размер группы (ботнета) min_samples=3, #чувствительность к шуму metric='euclidean', cluster_selection_epsilon=0.05, cluster_selection_method='eom' ) labels = clusterer.fit_predict(X_reduced) #Сохранение групп IP в txt файл clusters = defaultdict(list) for ip, label in zip(data_IP, labels): clusters[label].append(ip) #Сохраняем только кластеры (без шума) with open("botnet_clusters.txt", "w") as f: for cluster_id, ips in clusters.items(): if cluster_id == -1: continue # пропускаем шум f.write(f"=== Кластер {cluster_id} (IP: {len(ipпше s)}) ===\n") for ip in ips: f.write(ip + "\n") f.write("\n") #Вывод статистики n_clusters = len([c for c in clusters if c != -1]) print(f"Время выполнения кластеризации: {time.time() - start_time:.2f} сек") print(f"Найдено кластеров: {n_clusters}") print(f"Результаты сохранены в: botnet_clusters.txt")