import clickhouse_connect import time from scipy.sparse import lil_matrix, vstack from sklearn.preprocessing import normalize from sklearn.feature_extraction import FeatureHasher from sklearn.decomposition import TruncatedSVD import hdbscan import numpy as np # ----------------- Замер времени ----------------- start_time = time.time() start_time_all = start_time # ----------------- Подключение к ClickHouse ----------------- try: client = clickhouse_connect.get_client( host='localhost', port=8123, username='default', password='', settings={ 'max_memory_usage': 4_000_000_000, 'max_execution_time': 3600, } ) print("Подключение к ClickHouse установлено") except Exception as e: print(f"Ошибка подключения: {e}") exit() interval = 5 # интервал времени в секундах # ----------------- Получаем активные IP ----------------- active_ips_query = """ SELECT ip FROM DIPLOM_NEW GROUP BY ip HAVING COUNT() >= 25 """ active_ips_result = client.query(active_ips_query) data_IP = [row[0] for row in active_ips_result.result_rows] ip_to_idx = {ip: idx for idx, ip in enumerate(data_IP)} print(f"Активных IP: {len(data_IP)}") # ----------------- Временной диапазон ----------------- time_range_query = "SELECT MIN(timestamp), MAX(timestamp) FROM DIPLOM_NEW" time_result = client.query(time_range_query) min_time, max_time = time_result.result_rows[0] lenn = int((max_time.timestamp() - min_time.timestamp()) // interval) + 1 print(f"Интервалов: {lenn}") # ----------------- Создание матрицы активности ----------------- print("Формируем разреженную матрицу активности...") activity_matrix = lil_matrix((len(data_IP), lenn), dtype=np.int8) batch_size = 10000 # для обработки частями, чтобы не былоо ошибок по памяти client.command("DROP TEMPORARY TABLE IF EXISTS active_ips") client.command("CREATE TEMPORARY TABLE active_ips (ip String)") for i in range(0, len(data_IP), batch_size): batch = data_IP[i:i+batch_size] values = ", ".join([f"('{ip}')" for ip in batch]) client.command(f"INSERT INTO active_ips VALUES {values}") data_query = """ SELECT t.ip, t.timestamp FROM DIPLOM_NEW t INNER JOIN active_ips a ON t.ip = a.ip ORDER BY t.timestamp """ data_result = client.query(data_query) data = data_result.result_rows for ip, ts in data: row = ip_to_idx[ip] col = int((ts.timestamp() - min_time.timestamp()) // interval) activity_matrix[row, col] = 1 activity_matrix = activity_matrix.tocsr() print(f"Время подготовки данных: {time.time() - start_time:.2f} сек") # ----------------- Хеширование ----------------- use_fraction = 20 # на сколько мы делим число интервалов n_features = max(1, lenn // use_fraction) hasher = FeatureHasher(n_features=n_features, input_type='pair') batch_size = 1000 batches = len(data_IP) // batch_size + 1 X_hashed_parts = [] for b in range(batches): start = b * batch_size end = min((b + 1) * batch_size, len(data_IP)) batch_rows = [] for i in range(start, end): row = activity_matrix[i] if row.nnz == 0: batch_rows.append(hasher.transform([[]])) else: features = [(str(idx), 1) for idx in row.indices] batch_rows.append(hasher.transform([features])) batch_matrix = vstack(batch_rows) X_hashed_parts.append(batch_matrix) del batch_rows # Объединяем части матрицы X_hashed = vstack(X_hashed_parts) del X_hashed_parts # Нормализация X_hashed = normalize(X_hashed, norm='l2', axis=1) # Снижение размерности svd_components = 100 # колличество компонент svd = TruncatedSVD(n_components=svd_components) X_reduced = svd.fit_transform(X_hashed) # ----------------- Кластеризация ----------------- print("Запускаем HDBSCAN...") clusterer = hdbscan.HDBSCAN( min_cluster_size=5, min_samples=3, metric='euclidean', cluster_selection_epsilon=0.05, cluster_selection_method='eom' ) labels = clusterer.fit_predict(X_reduced) # ----------------- Подсчёт внутрикластерной схожести ----------------- from sklearn.metrics.pairwise import cosine_similarity from collections import defaultdict import gc print("Вычисляем внутрикластерную схожесть (cosine similarity)...") cluster_similarities = {} clusters = defaultdict(list) for ip, label in zip(data_IP, labels): clusters[label].append(ip) for cluster_id in set(labels): if cluster_id == -1: continue indices = [i for i, l in enumerate(labels) if l == cluster_id] n = len(indices) # Ограничиваем слишком большие кластеры, чтобы не было проблем по памяти if n > 500: sample_indices = np.random.choice(indices, size=500, replace=False) else: sample_indices = indices submatrix = activity_matrix[sample_indices] sims = cosine_similarity(submatrix, dense_output=False) mean_sim = sims.mean() cluster_similarities[cluster_id] = float(mean_sim) gc.collect() # ----------------- Сохранение результатов ----------------- output_file = "botnet_clusters.txt" with open(output_file, "w") as f: for cluster_id, ips in clusters.items(): if cluster_id == -1: continue sim = cluster_similarities.get(cluster_id, 0) f.write(f"=== Кластер {cluster_id} (IP: {len(ips)}, средняя схожесть={sim:.4f}) ===\n") for ip in ips: f.write(ip + "\n") f.write("\n") # ----------------- Финальная статистика ----------------- n_clusters = len([c for c in clusters if c != -1]) print(f"Найдено кластеров: {n_clusters}") print(f"Результаты сохранены в: {output_file}") print(f"Время обработки всего скрипта: {time.time()-start_time_all:.2f}")