Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

541 строка
22 KiB
Python

#import helpers21
import math
from pandas import DataFrame
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from matplotlib import colors
import sklearn
from sklearn import preprocessing
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from sklearn.metrics import zero_one_loss
from sklearn import svm
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from matplotlib import pyplot
from pandas import DataFrame
import numpy as np
import matplotlib.pylab as plt
import tensorflow.keras
import numpy as np
import math
import matplotlib.pylab as plt
from sklearn import svm
import numpy as np
from pandas import DataFrame
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation
visual = True
verbose_show = False
# generate 2d classification dataset
def datagen(x_c, y_c, n_samples, n_features):
center = [[x_c, y_c]] if n_features == 2 else None
X, Y = make_blobs(n_samples = n_samples, centers = center, n_features = n_features, cluster_std = 0.1)
if n_features == 2:
plt.figure(figsize=(12, 8))
plt.scatter(X[:,0], X[:,1], marker='o', s=7, color = 'b', label = 'Training set')
plt.legend(loc = 'upper left', fontsize = 12)
plt.title('Training set')
plt.xlabel('x')
plt.ylabel('y')
plt.savefig('out/train_set.png')
plt.show()
np.savetxt('data.txt', X)
return X
def ire(vector1, vector2):
x = 0
for i in range(len(vector1)):
x += (vector2[i] - vector1[i])**2
# !! Round to .xx
ire = round(math.sqrt(x), 2)
return ire
def ire_array(array1, array2):
ire_list = []
for index in range(array1.shape[0]):
ire_list.append(ire(array1[index], array2[index]))
ire_array = np.array(ire_list)
return ire_array
class EarlyStoppingOnValue(tensorflow.keras.callbacks.Callback):
def __init__(self, monitor='loss', baseline=None):
super(tensorflow.keras.callbacks.Callback, self).__init__()
self.baseline = baseline
self.monitor = monitor
def on_epoch_end(self, epoch, logs=None):
current_value = self.get_monitor_value(logs)
if current_value < self.baseline:
self.model.stop_training = True
def get_monitor_value(self, logs):
monitor_value = logs.get(self.monitor)
if monitor_value is None:
print(
'Early stopping conditioned on metric `%s` '
'which is not available. Available metrics are: %s' %
(self.monitor, ','.join(list(logs.keys()))), RuntimeWarning
)
return monitor_value
#создание и обучение модели автокодировщика
def create_fit_save_ae(cl_train, ae_file, irefile, epohs, verbose_show, patience):
size = cl_train.shape[1]
#ans = '2'
ans = input('Задать архитектуру автокодировщиков или использовать архитектуру по умолчанию? (1/2): ')
if ans == '1':
n = int(input("Задайте количество скрытых слоёв (нечетное число) : "))
# Ниже строки читать входные данные пользователя с помощью функции map ()
ae_arch = list(map(int, input("Задайте архитектуру скрытых слоёв автокодировщика, например, в виде 3 1 3 : ").strip().split()))[:n]
ae = tensorflow.keras.models.Sequential()
# input layer
ae.add(tensorflow.keras.layers.Dense(size))
ae.add(tensorflow.keras.layers.Activation('tanh'))
# hidden layers
for i in range(len(ae_arch)):
ae.add(tensorflow.keras.layers.Dense(ae_arch[i]))
ae.add(tensorflow.keras.layers.Activation('tanh'))
# output layer
ae.add(tensorflow.keras.layers.Dense(size))
ae.add(tensorflow.keras.layers.Activation('linear'))
else:
ae = tensorflow.keras.models.Sequential()
# input layer
ae.add(tensorflow.keras.layers.Dense(size))
ae.add(tensorflow.keras.layers.Activation('tanh'))
# hidden layers
ae.add(tensorflow.keras.layers.Dense(3))
ae.add(tensorflow.keras.layers.Activation('tanh'))
ae.add(tensorflow.keras.layers.Dense(2))
ae.add(tensorflow.keras.layers.Activation('tanh'))
ae.add(tensorflow.keras.layers.Dense(1))
ae.add(tensorflow.keras.layers.Activation('tanh'))
ae.add(tensorflow.keras.layers.Dense(2))
ae.add(tensorflow.keras.layers.Activation('tanh'))
ae.add(tensorflow.keras.layers.Dense(3))
ae.add(tensorflow.keras.layers.Activation('tanh'))
# output layer
ae.add(tensorflow.keras.layers.Dense(size))
ae.add(tensorflow.keras.layers.Activation('linear'))
optimizer = tensorflow.keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False)
ae.compile(loss='mean_squared_error', optimizer=optimizer)
error_stop = 0.0001
epo = epohs
early_stopping_callback_on_error = EarlyStoppingOnValue(monitor='loss', baseline=error_stop)
early_stopping_callback_on_improving = tensorflow.keras.callbacks.EarlyStopping(monitor='loss',
min_delta=0.0001, patience = patience,
verbose=1, mode='auto',
baseline=None,
restore_best_weights=False)
history_callback = tensorflow.keras.callbacks.History()
verbose = 1 if verbose_show else 0
history_object = ae.fit(cl_train, cl_train,
batch_size=cl_train.shape[0],
epochs=epo,
callbacks=[early_stopping_callback_on_error, history_callback,
early_stopping_callback_on_improving],
verbose=verbose)
ae_trainned = ae
ae_pred = ae_trainned.predict(cl_train)
ae_trainned.save(ae_file)
IRE_array = np.round(ire_array(cl_train, ae_pred), 2)
IREth = np.amax(IRE_array)
with open(irefile, 'w') as file:
file.write(str(IREth))
print()
print()
return ae_trainned, IRE_array, IREth
def test(y_pred, Y_test):
y_pred[y_pred != Y_test] = -100 # find and mark classification error
n_errors = (y_pred == -100).astype(int).sum()
return n_errors
def predict_ae(nn, x_test, threshold):
x_test_predicted = nn.predict(x_test)
ire = ire_array(x_test, x_test_predicted)
predicted_labels = (ire > threshold).astype(float)
predicted_labels = predicted_labels.reshape((predicted_labels.shape[0], 1))
ire = np.transpose(np.array([ire]))
return predicted_labels, ire
def load_ae(path_to_ae_file):
return tensorflow.keras.models.load_model(path_to_ae_file)
def square_calc(numb_square, X_train, ae, IRE_th, num, visual):
# scan
x_min, x_max = X_train[:, 0].min() - 2, X_train[:, 0].max() + 1
# print(x_min, x_max)
y_min, y_max = X_train[:, 1].min() - 1, X_train[:, 1].max() + 1
# print(y_min, y_max)
h_x = (x_max - x_min) / 100
h_y = (y_max - y_min) / 100
h_y = h_x
#print('ШАГ x:', h_x)
#print('ШАГ y:', h_y)
xx, yy = np.meshgrid(np.arange(x_min, x_max, h_x), np.arange(y_min, y_max, h_y))
X_plot = np.c_[xx.ravel(), yy.ravel()]
# получение ответов автоэнкодера
Z, ire = predict_ae(ae, X_plot, IRE_th)
# print('z')
# print(Z)
X_def = np.array([0, 0], ndmin=2)
for ind, ans in enumerate(Z):
if ans == 0:
# print(ans, ' kl= 1')
# print(ind, len (svm_predicted_scan))
X_def = np.append(X_def, [X_plot[ind]], axis=0)
# построение областей покрытия и границ классов
X_def = np.delete(X_def, 0, axis=0)
Z = Z.reshape(xx.shape)
if visual:
plt.figure(figsize=(12, 6))
# fig, ax = plt.subplots()
plt.contourf(xx, yy, Z, cmap=plt.cm.tab10, alpha=0.5)
plt.scatter(X_train[:, 0], X_train[:, 1], marker='o', s=7, color='b')
plt.legend(['C1'])
plt.xlabel('X')
plt.ylabel('Y')
plt.xlim(xx.min(), xx.max())
plt.ylim(yy.min(), yy.max())
plt.title('Autoencoder AE' + str(num) + '. Training set. Class boundary')
plt.savefig('out/AE' + str(num) + '_train_def.png')
plt.show()
h_x = (x_max - x_min) / numb_square
h_y = (y_max - y_min) / numb_square
h_x = abs(h_x)
h_y = abs(h_y)
col_id = np.zeros(numb_square)
col_id_ae = np.zeros(numb_square)
for i in range(numb_square):
for x in X_train[:, 0]:
if x_min + i * h_x <= x < x_min + (i + 1) * h_x:
col_id[i] = 1
for x in X_def[:, 0]:
if x_min + i * h_x <= x < x_min + (i + 1) * h_x:
col_id_ae[i] = 1
amount = 0
cart = np.zeros((numb_square, numb_square))
for_rect = np.array([0, 0], ndmin=2)
for index, element in enumerate(col_id):
if element == 1:
for i in range(numb_square):
for xy in X_train:
if y_min + i * h_y <= xy[1] < y_min + (i + 1) * h_y and x_min + index * h_x <= xy[0] < x_min + (
index + 1) * h_x:
amount = amount + 1
cart[numb_square - i - 1, index] = 1
for_rect = np.append(for_rect, np.array([x_min + index * h_x, y_min + i * h_y], ndmin=2),
axis=0)
break
for_rect = np.delete(for_rect, 0, axis=0)
# print('cart', cart)
#print('amount: ', amount)
amount_ae = 0
cart_ae = np.zeros((numb_square, numb_square))
for_rect_ae = np.array([0, 0], ndmin=2)
for index, element in enumerate(col_id_ae):
if element == 1:
for i in range(numb_square):
for xy in X_def:
if y_min + i * h_y <= xy[1] < y_min + (i + 1) * h_y and x_min + index * h_x <= xy[0] < x_min + (
index + 1) * h_x:
amount_ae = amount_ae + 1
cart_ae[numb_square - i - 1, index] = 1
for_rect_ae = np.append(for_rect_ae, np.array([x_min + index * h_x, y_min + i * h_y], ndmin=2),
axis=0)
break
for_rect_ae = np.delete(for_rect_ae, 0, axis=0)
# print('cart_ae', cart_ae)
print('amount: ', amount)
print('amount_ae: ', amount_ae)
if visual:
label0_ae = 'Распознанное AE' + str(num) + ' множество'
s0_ae = 0.3
label0 = 'Обучающее множество'
s0 = 12
fig = plt.figure(figsize=(16, 7))
ax_1 = fig.add_subplot(1, 2, 1)
ax_2 = fig.add_subplot(1, 2, 2)
ax_1.grid(which='major', axis='both', linestyle='-', color='k', linewidth=0.5)
ax_1.set_xticks(np.arange(x_min, x_max, h_x))
ax_1.set_yticks(np.arange(y_min, y_max, h_y))
x_lbl = np.round(np.arange(x_min, x_max, h_x), 1).tolist()
y_lbl = np.round(np.arange(y_min, y_max, h_y), 1).tolist()
ax_1.set_xticklabels(x_lbl)
ax_1.set_yticklabels(y_lbl)
for xy in for_rect:
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='none', facecolor='royalblue',
alpha=0.3)
ax_1.add_patch(rect)
ax_1.scatter(X_train[:, 0], X_train[:, 1], marker='o', s=s0, color='indigo', label=label0)
ax_1.legend(loc='upper left', fontsize=12)
ax_1.set_title('Площадь обучающего множества |Xt|', fontsize=14)
ax_1.set_xlabel('X')
ax_1.set_ylabel('Y')
ax_1.set_xlim(x_min, x_max)
ax_1.set_ylim(y_min, y_max)
ax_2.grid(which='major', axis='both', linestyle='-', color='k', linewidth=0.5)
ax_2.set_xticks(np.arange(x_min, x_max, h_x))
ax_2.set_yticks(np.arange(y_min, y_max, h_y))
ax_2.set_xticklabels(x_lbl)
ax_2.set_yticklabels(y_lbl)
for xy in for_rect_ae:
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='none', facecolor='coral', alpha=0.4)
ax_2.add_patch(rect)
ax_2.scatter(X_def[:, 0], X_def[:, 1], marker='o', s=s0, color='b', label=label0_ae)
ax_2.legend(loc='upper left', fontsize=12)
ax_2.set_title('Площадь деформированного множества |Xd|', fontsize=14)
ax_2.set_xlabel('X')
ax_2.set_ylabel('Y')
ax_2.set_xlim(x_min, x_max)
ax_2.set_ylim(y_min, y_max)
# plt.xlim(x_min - 4*h_x ,x_max + 4*h_x)
# plt.ylim(y_min - 4*h_y, y_max + 4*h_y)
plt.savefig('out/XtXd_' + str(num) + '.png')
plt.show()
if visual:
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(22, 8))
# n = 1
for ax in axes.flat:
# ax.set(title='axes_' + str(n), xticks=[], yticks=[])
# n += 1
# ax.scatter(X_ov[:, 0], X_ov[:, 1], marker='o', s=s0, color='b', label=label0)
ax.grid(which='major', axis='both', linestyle='-', color='k', linewidth=0.5)
ax.set_xticks(np.arange(x_min, x_max, h_x))
ax.set_yticks(np.arange(y_min, y_max, h_y)) # +0.7
x_lbl = np.round(np.arange(x_min, x_max, h_x), 1).tolist()
y_lbl = np.round(np.arange(y_min, y_max, h_y), 1).tolist()
ax.set_xticklabels(x_lbl)
ax.set_yticklabels(y_lbl)
ax.set_xlim(x_min, x_max)
ax.set_ylim(y_min, y_max)
ax.set_xlabel('X')
ax.set_ylabel('Y')
# ax.set_axis_label_font_size(fontsize=14)
for xy in for_rect_ae:
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='k', facecolor='coral')
ax.add_patch(rect)
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='none', facecolor='coral',
label='Площадь множества |Xd|')
ax.add_patch(rect)
for xy in for_rect:
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='k', facecolor='royalblue')
ax.add_patch(rect)
rect = patches.Rectangle((xy[0], xy[1]), h_x, h_y, linewidth=1, edgecolor='none', facecolor='royalblue',
label='Площадь множества |Xt|')
ax.add_patch(rect)
# ax.scatter(for_rect[0, 0] + 0.1, for_rect[0, 1] + 0.1, marker='o', s=s0, color='cornflowerblue', label='Объем обучающего множества, Xt')
# ax.scatter(for_rect_ae[0,0]+ 0.1, for_rect_ae[0,1]+ 0.1, marker='o', s=s0, color='darkorange', label='Объем деформированного множества, Xd')
ax.legend(loc='upper left', fontsize=16)
####ax.set_ylim(-2.5, 2.9)
# ax.set_ylim(-1.7, 2.4)#ae2
nn = 0
for xy_ae in for_rect_ae:
for xy in for_rect:
if xy_ae[0] == xy[0] and xy_ae[1] == xy[1]:
nn = nn + 1
if nn == 1:
rect1 = patches.Rectangle((xy_ae[0], xy_ae[1]), h_x, h_y, linewidth=1, edgecolor='k',
facecolor='none', hatch='/', label='Площадь на пересечении |Xt| и |Xd|')
axes[2].add_patch(rect1)
else:
rect1 = patches.Rectangle((xy_ae[0], xy_ae[1]), h_x, h_y, linewidth=1, edgecolor='k',
facecolor='none', hatch='/')
axes[2].add_patch(rect1)
# now#rect1 = patches.Rectangle((xy_ae[0]- 2*h_x, xy_ae[1]-2*h_y), h_x, h_y, linewidth=1, edgecolor='k', facecolor='none', hatch='/', label='Площадь на пересечении |Xt| и |Xd|')
# now#axes[2].add_patch(rect1)
axes[2].legend(loc='upper left', fontsize=16)
# print('true')
flag = 1
n = 0
for xy_ae in for_rect_ae:
flag = 1
for xy in for_rect:
if xy_ae[0] == xy[0] and xy_ae[1] == xy[1]:
# print(xy_ae[0], '!=', xy[0],' and ', xy_ae[1], '!=', xy[1])
flag = 0
if flag == 1:
n = n + 1
if n == 1:
rect2 = patches.Rectangle((xy_ae[0], xy_ae[1]), h_x, h_y, linewidth=1, edgecolor='k', facecolor='none',
hatch='/', label='Площадь |Xd| за исключением |Xt| (|Xd\Xt|)')
axes[0].add_patch(rect2)
else:
rect2 = patches.Rectangle((xy_ae[0], xy_ae[1]), h_x, h_y, linewidth=1, edgecolor='k', facecolor='none',
hatch='/')
axes[0].add_patch(rect2)
rect1 = patches.Rectangle((for_rect_ae[0, 0], for_rect_ae[0, 1]), h_x, h_y, linewidth=1, edgecolor='k',
facecolor='none', label='Площадь |Xt| за исключением |Xd| (|Xt\Xd|)')
axes[1].add_patch(rect1)
# now#rect2 = patches.Rectangle((xy_ae[0], xy_ae[1]), h_x, h_y, linewidth=1, edgecolor='k', facecolor='none', hatch='/', label='Площадь |Xd| за исключением |Xt| (|Xd\Xt|)')
# now#axes[0].add_patch(rect2)
axes[0].legend(loc='upper left', fontsize=16)
axes[1].legend(loc='upper left', fontsize=16)
axes[0].set_title('Excess. AE' + str(num), fontsize=20)
axes[1].set_title('Deficit. AE' + str(num), fontsize=20)
axes[2].set_title('Coating. AE' + str(num), fontsize=20)
plt.savefig('out/XtXd_' + str(num) + '_metrics.png')
plt.show()
square_ov = amount * h_x * h_y
square_ae = amount_ae * h_x * h_y
print()
print('Оценка качества AE' + str(num))
extra_pre_ae = square_ov / square_ae
# print('square_ov:', square_ov)
# print('square_ae:', square_ae)
Ex = cart_ae - cart
Excess = np.sum(Ex == 1) / amount
print('IDEAL = 0. Excess: ', Excess)
Def = cart - cart_ae
Deficit = np.sum(Def == 1) / amount
print('IDEAL = 0. Deficit: ', Deficit)
cart[cart > 0] = 5
Coa = cart - cart_ae
Coating = np.sum(Coa == 4) / amount
print('IDEAL = 1. Coating: ', Coating)
summa = Deficit + Coating
print('summa: ', summa)
print('IDEAL = 1. Extrapolation precision (Approx): ', extra_pre_ae)
print()
print()
with open('out/result.txt', 'w') as file:
file.write(
'------------Оценка качества AE' + str(num) + ' С ПОМОЩЬЮ НОВЫХ МЕТРИК------------' + '\n' + \
'Approx = ' + str(extra_pre_ae) + '\n' + \
'Excess = ' + str(Excess) + '\n' + \
'Deficit = ' + str(Deficit) + '\n' + \
'Coating = ' + str(Coating) + '\n')
return xx, yy, Z
#####2D
def plot_xdef(X_train, xx, yy, Z):
plt.contourf(xx, yy, Z, cmap=plt.cm.tab10, alpha=0.5)
plt.scatter(X_train[:, 0], X_train[:, 1], marker='o', s=7, color='b')
plt.legend(['C1'])
plt.xlabel('X')
plt.ylabel('Y')
plt.xlim(xx.min(), xx.max())
plt.ylim(yy.min(), yy.max())
def plot2in1(X_train, xx, yy, Z1, Z2):
plt.subplot(1, 2, 1)
plot_xdef(X_train, xx, yy, Z1)
plt.title('Autoencoder AE1')#. Training set. Class boundary')
plt.subplot(1, 2, 2)
plot_xdef(X_train, xx, yy, Z2)
plt.title('Autoencoder AE2')#. Training set. Class boundary')
plt.savefig('out/AE1_AE2_train_def.png')
plt.show()
def anomaly_detection_ae(predicted_labels, ire, ire_th):
ire = np.round(ire,2)
ire_th = np.round(ire_th, 2)
if predicted_labels.sum() == 0:
print("Аномалий не обнаружено")
else:
print()
print('%-10s%-10s%-10s%-10s' % ('i', 'Labels', 'IRE', 'IREth'))
for i, pred in enumerate(predicted_labels):
print('%-10s%-10s%-10s%-10s' % (i, pred, ire[i], ire_th))
print('Обнаружено ', predicted_labels.sum(), ' аномалий')
def plot2in1_anomaly(X_train, xx, yy, Z1, Z2, anomalies):
plt.subplot(1, 2, 1)
plot_xdef(X_train, xx, yy, Z1)
plt.scatter(anomalies[:, 0], anomalies[:, 1], marker='o', s=12, color='r')
plt.title('Autoencoder AE1')#. Training set. Class boundary')
plt.subplot(1, 2, 2)
plot_xdef(X_train, xx, yy, Z2)
plt.scatter(anomalies[:, 0], anomalies[:, 1], marker='o', s=12, color='r')
plt.title('Autoencoder AE2')#. Training set. Class boundary')
plt.savefig('out/AE1_AE2_train_def_anomalies.png')
plt.show()
def ire_plot(title, IRE_test, IREth, ae_name):
x = range(1, len(IRE_test) + 1)
IREth_array = [IREth for x in x]
plt.figure(figsize = (16, 8))
plt.title('IRE for ' + title + ' set. ' + ae_name, fontsize = 24)
plt.plot(x, IRE_test, linestyle = '-', color = 'r', lw = 2, label = 'IRE')
plt.plot(x, IREth_array, linestyle = '-', color = 'k', lw = 2, label = 'IREth')
#plt.xlim(0, len(x))
ymax = 1.5 * max(np.amax(IRE_test), IREth)
plt.ylim(0, ymax)
plt.xlabel('Vector number', fontsize = 20)
plt.ylabel('IRE', fontsize = 20)
plt.grid()
plt.legend(loc = 'upper left', fontsize = 16)
plt.gcf().savefig('out/IRE_' + title + ae_name + '.png')
plt.show()
return