• 基于自编码器和LSTM自编码器的心电信号异常检测


    1. import os
    2. import numpy as np
    3. import pandas as pd
    4. import seaborn as sns
    5. import matplotlib as mpl
    6. import matplotlib.pyplot as plt
    7. from sklearn.preprocessing import MinMaxScaler
    8. from sklearn.model_selection import train_test_split
    9. import tensorflow as tf
    10. from tensorflow.keras import layers,losses
    11. from tensorflow.keras.layers import Input, Dense, LSTM, RepeatVector, TimeDistributed
    12. from tensorflow.keras.models import Model
    13. from tensorflow.keras.utils import plot_model
    14. RANDOM_SEED = 42
    15. np.random.seed(RANDOM_SEED)
    16. # Import the dataset files
    17. ecg_train_file = 'ECG5000_TRAIN.csv'
    18. ecg_test_file = 'ECG5000_TEST.csv'
    19. train_data = pd.read_csv(ecg_train_file)
    20. test_data = pd.read_csv(ecg_test_file)
    21. df_train = pd.DataFrame(train_data)
    22. df_test = pd.DataFrame(test_data)
    23. df = pd.concat([df_train, df_test], ignore_index=True)
    24. df = df.drop(labels='id', axis=1)
    25. # 140 timesteps + target
    26. new_columns = list(df.columns)
    27. new_columns[-1] = 'target'
    28. df.columns = new_columns
    29. df

    1. normal_class = 1
    2. class_names = ['Normal','R on T','PVC','SP','UB']
    3. df['target'].value_counts()
    4. 1 2919
    5. 2 1767
    6. 4 194
    7. 3 96
    8. 5 24
    9. Name: target, dtype: int64
    10. plt.figure(figsize=(12,8))
    11. ax = sns.countplot(x=df.target)
    12. ax.set_xticklabels(class_names);

    1. def plot_time_series_class(data, class_name, ax, n_steps=10):
    2. '''
    3. This function plots an averaged time series for each class,
    4. smoothed out with a standard deviation around it.
    5. '''
    6. time_series_df = pd.DataFrame(data)
    7. # sliding window iterator rolling(win_size)
    8. smooth_path = time_series_df.rolling(n_steps).mean()
    9. path_deviation = 2 * time_series_df.rolling(n_steps).std()
    10. # smooth with a std under and over the line
    11. under_line = (smooth_path - path_deviation)[0]
    12. over_line = (smooth_path + path_deviation)[0]
    13. ax.plot(smooth_path, linewidth=2)
    14. ax.fill_between(path_deviation.index, under_line, over_line, alpha=.200)
    15. ax.set_title(class_name)
    16. classes = df.target.unique()
    17. fig, axs = plt.subplots(
    18. nrows=len(classes) // 3 + 1,
    19. ncols=3,
    20. sharey=True,
    21. figsize=(18, 8)
    22. )
    23. # plot for each class (1-5)
    24. for i, cls in enumerate(classes):
    25. ax = axs.flat[i]
    26. data = df[df.target == cls] \
    27. .drop(labels='target', axis=1) \
    28. .mean(axis=0) \
    29. .to_numpy()
    30. plot_time_series_class(data, class_names[i], ax)
    31. fig.delaxes(axs.flat[-1])
    32. fig.tight_layout();

    1. X_train, X_test, y_train, y_test = train_test_split(df.values,
    2. df.values[:,-1],
    3. test_size=0.33,
    4. random_state=RANDOM_SEED)
    5. X_test, X_val, y_test, y_val = train_test_split(X_test,
    6. y_test,
    7. test_size=0.5,
    8. random_state=RANDOM_SEED)
    9. print('The shape of the training set: ', X_train.shape)
    10. print('The shape of the validation set: ', X_val.shape)
    11. print('The shape of the test set: ', X_test.shape)
    12. The shape of the training set: (3350, 141)
    13. The shape of the validation set: (825, 141)
    14. The shape of the test set: (825, 141)
    15. scaler = MinMaxScaler()
    16. data_scaled = scaler.fit(X_train[:,:-1])
    17. X_train[:,:-1] = data_scaled.transform(X_train[:,:-1])
    18. X_val[:,:-1] = data_scaled.transform(X_val[:,:-1])
    19. X_test[:,:-1] = data_scaled.transform(X_test[:,:-1])
    20. df_train = pd.DataFrame(X_train)
    21. df_val = pd.DataFrame(X_val)
    22. df_test = pd.DataFrame(X_test)
    23. def filter_normal_signals(df):
    24. '''Filters out normal signals (target = 1)'''
    25. df = df[df.iloc[:,-1] == 1].drop(columns=df.columns[-1], axis=1)
    26. return df.values
    27. def filter_anomalous_signals(df):
    28. ''' Filters out anomalous signals (target > 1)'''
    29. df = df[df.iloc[:,-1] > 1].drop(columns=df.columns[-1], axis=1)
    30. return df.values
    31. normal_X_train = filter_normal_signals(df_train)
    32. normal_X_val = filter_normal_signals(df_val)
    33. normal_X_test = filter_normal_signals(df_test)
    34. print('The shape of the normal signals training set: ', normal_X_train.shape)
    35. print('The shape of the normal signals validation set: ', normal_X_val.shape)
    36. print('The shape of the normal signals test set: ', normal_X_test.shape)
    37. The shape of the normal signals training set: (1932, 140)
    38. The shape of the normal signals validation set: (492, 140)
    39. The shape of the normal signals test set: (495, 140)
    40. anomaly_X_train = filter_anomalous_signals(df_train)
    41. anomaly_X_val = filter_anomalous_signals(df_val)
    42. anomaly_X_test = filter_anomalous_signals(df_test)
    43. print('The shape of the normal signals training set: ', anomaly_X_train.shape)
    44. print('The shape of the normal signals validation set: ', anomaly_X_val.shape)
    45. print('The shape of the normal signals test set: ', anomaly_X_test.shape)
    46. The shape of the normal signals training set: (1418, 140)
    47. The shape of the normal signals validation set: (333, 140)
    48. The shape of the normal signals test set: (330, 140)
    49. # normal signals
    50. plt.figure(figsize=(12,8))
    51. for i in range(5,10):
    52. plt.plot(normal_X_train[i])

    1. # anomalous signals
    2. plt.figure(figsize=(12,8))
    3. for i in range(5,10):
    4. plt.plot(anomaly_X_train[i])

    1. class AutoEncoder(Model):
    2. def __init__(self):
    3. super(AutoEncoder, self).__init__()
    4. # Define the encoder
    5. self.encoder = tf.keras.Sequential([
    6. layers.Dense(64, activation="relu"),
    7. layers.Dense(32, activation="relu"),
    8. layers.Dense(16, activation="relu"),
    9. layers.Dense(8, activation="relu")])
    10. # Define the decoder
    11. self.decoder = tf.keras.Sequential([
    12. layers.Dense(16, activation="relu"),
    13. layers.Dense(32, activation="relu"),
    14. layers.Dense(64, activation="relu"),
    15. layers.Dense(140, activation="sigmoid")])
    16. def call(self, x):
    17. # Define how an evaluation of the network is performed
    18. encoded = self.encoder(x)
    19. decoded = self.decoder(encoded)
    20. return decoded

    1. autoencoder = AutoEncoder()
    2. early_stopping = tf.keras.callbacks.EarlyStopping(
    3. monitor='val_loss',
    4. patience=5,
    5. mode='min',
    6. restore_best_weights=True)
    7. lr_plateau = tf.keras.callbacks.ReduceLROnPlateau(
    8. monitor='val_loss',
    9. factor=0.5,
    10. patience=5,
    11. min_lr=1e-6,
    12. mode='min')
    13. autoencoder.compile(optimizer='adam', loss='mse')
    14. history = autoencoder.fit(normal_X_train, normal_X_train,
    15. epochs=30,
    16. batch_size=128,
    17. validation_data=(normal_X_val, normal_X_val),
    18. callbacks=[early_stopping, lr_plateau],
    19. shuffle=True)
    20. autoencoder.decoder.summary()
    21. Model: "sequential_1"
    22. _________________________________________________________________
    23. Layer (type) Output Shape Param #
    24. =================================================================
    25. dense_4 (Dense) (None, 16) 144
    26. dense_5 (Dense) (None, 32) 544
    27. dense_6 (Dense) (None, 64) 2112
    28. dense_7 (Dense) (None, 140) 9100
    29. =================================================================
    30. Total params: 11,900
    31. Trainable params: 11,900
    32. Non-trainable params: 0
    33. plt.figure(figsize=(12,8))
    34. plt.plot(history.history["loss"], label="Training Loss")
    35. plt.plot(history.history["val_loss"], label="Validation Loss")
    36. plt.legend()

    1. encoded_signal = autoencoder.encoder(normal_X_test).numpy()
    2. decoded_signal = autoencoder.decoder(encoded_signal).numpy()
    3. plt.figure(figsize=(12,8))
    4. plt.plot(normal_X_test[100], 'b')
    5. plt.plot(decoded_signal[100], 'r')
    6. plt.fill_between(np.arange(140), decoded_signal[100], normal_X_test[100], color='lightcoral')
    7. plt.legend(labels=["Input", "Reconstruction", "Error"])
    8. plt.show()

    1. encoded_signal = autoencoder.encoder(anomaly_X_test).numpy()
    2. decoded_signal = autoencoder.decoder(encoded_signal).numpy()
    3. plt.figure(figsize=(12,8))
    4. plt.plot(anomaly_X_test[100], 'b')
    5. plt.plot(decoded_signal[100], 'r')
    6. plt.fill_between(np.arange(140), decoded_signal[100], anomaly_X_test[100], color='lightcoral')
    7. plt.legend(labels=["Input", "Reconstruction", "Error"])
    8. plt.show()

    1. reconstruction = autoencoder.predict(normal_X_train)
    2. train_loss = tf.keras.losses.mse(reconstruction, normal_X_train)
    3. fig, ax = plt.subplots(figsize=(12, 8))
    4. sns.distplot(train_loss, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal train data reconstruction loss')

    1. mean_normal = np.mean(train_loss)
    2. std_normal = np.std(train_loss)
    3. threshold = mean_normal + 2*std_normal
    4. print("Threshold: ", threshold)
    5. Threshold: 0.008762959466782183
    6. reconstruction_test = autoencoder.predict(normal_X_test)
    7. test_loss_normal = tf.keras.losses.mse(reconstruction_test, normal_X_test)
    8. fig, ax = plt.subplots(figsize=(12, 8))
    9. sns.distplot(test_loss_normal, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal test data reconstruction loss')

    1. reconstruction_anomalies = autoencoder.predict(anomaly_X_test)
    2. test_loss_anomalies = tf.keras.losses.mse(reconstruction_anomalies, anomaly_X_test)
    3. fig, ax = plt.subplots(figsize=(12, 8))
    4. sns.distplot(test_loss_anomalies, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of anomalous test data reconstruction loss')

    1. sns.set(rc={'figure.figsize':(12,8)})
    2. # Plot two histograms of normal and anomalous data
    3. sns.distplot(test_loss_normal.numpy(), bins=50, label='normal')
    4. sns.distplot(test_loss_anomalies.numpy(), bins=50, label='anomaly')
    5. # Plot a vertical line representing the threshold
    6. plt.axvline(threshold, color='r', linewidth=2, linestyle='dashed', label='{:0.3f}'.format(threshold))
    7. # Add legend and title
    8. plt.legend(loc='upper right')
    9. plt.title('Threshold for detecting anomalies')
    10. plt.show()

    1. def predict_metrics(normal, anomalous, threshold):
    2. ''' Compute the True Positives (TP), True Negatives (TN),
    3. False Positives (FP) and False Negatives (FN) given the
    4. normal data, anomalous data and the threshold.
    5. '''
    6. # We correctly detect normal data if the normal loss is smaller than the threshold
    7. pred_normal = tf.math.less(normal, threshold)
    8. # We correctly detect anomalous data if the normal loss is greater than the threshold
    9. pred_anomaly = tf.math.greater(anomalous, threshold)
    10. tn = tf.math.count_nonzero(pred_normal).numpy()
    11. fp = normal.shape[0] - tn
    12. tp = tf.math.count_nonzero(pred_anomaly).numpy()
    13. fn = anomalous.shape[0] - tp
    14. return tp, tn, fp, fn
    15. tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
    16. cm = [[tn, fp],
    17. [fn, tp]]
    18. categories = ['Normal', 'Anomalies']
    19. plt.figure(figsize=(8,6))
    20. g = sns.heatmap(cm, annot=True, fmt="d",
    21. xticklabels=categories,
    22. yticklabels=categories,
    23. cmap="GnBu")
    24. g.set(xlabel='Predicted', ylabel='Actual', title='Confusion Matrix')
    25. plt.show()

    1. tpr_val = []
    2. fpr_val = []
    3. for t in np.linspace(0, 1, 100):
    4. tp, tn, fp, fn = predict_metrics(test_loss_normal,
    5. test_loss_anomalies,
    6. t/10)
    7. tpr = tp/(tp + fn)
    8. fpr = fp/(fp + tn)
    9. tpr_val.append(tpr)
    10. fpr_val.append(fpr)
    11. fig, ax = plt.subplots(figsize=(10,7))
    12. ax.plot(fpr_val, tpr_val)
    13. ax.plot(np.linspace(0, 1, 100),
    14. np.linspace(0, 1, 100),
    15. label='baseline',
    16. linestyle='--')
    17. plt.title('Receiver Operating Characteristic Curve', fontsize=18)
    18. plt.ylabel('TPR', fontsize=16)
    19. plt.xlabel('FPR', fontsize=16)
    20. plt.legend(fontsize=12);

    1. def compute_stats(tp, tn, fp, fn):
    2. ''' Computes accuracy, precision, recall and f1 scores
    3. given TP, TN, FP and FN'''
    4. acc = (tp + tn) / (tp + tn + fp + fn)
    5. prec = tp / (tp + fp)
    6. rec = tp / (tp + fn)
    7. f1 = 2 * (prec * rec) / (prec + rec)
    8. return acc, prec, rec, f1
    9. tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
    10. accuracy, precision, recall, f1 = compute_stats(tp, tn, fp, fn)
    11. print("Accuracy = {}".format(np.round(accuracy,4)))
    12. print("Precision = {}".format(np.round(precision,4)))
    13. print("Recall = {}".format(np.round(recall,4)))
    14. print("F1-score = {}".format(np.round(f1,4)))
    15. Accuracy = 0.9745
    16. Precision = 0.9558
    17. Recall = 0.9818
    18. F1-score = 0.9686
    19. class LSTMAutoencoder(Model):
    20. def __init__(self):
    21. super(LSTMAutoencoder, self).__init__()
    22. self.encoder = tf.keras.Sequential([
    23. layers.Reshape((normal_X_train.shape[1], 1)),
    24. layers.LSTM(128, input_shape=(normal_X_train.shape[1], 1), return_sequences=True),
    25. layers.LSTM(64, return_sequences=False),
    26. ])
    27. self.repeat_vector = layers.RepeatVector(normal_X_train.shape[1])
    28. self.decoder = tf.keras.Sequential([
    29. layers.LSTM(64, return_sequences=True),
    30. layers.LSTM(128, return_sequences=True),
    31. layers.TimeDistributed(layers.Dense(1)),
    32. layers.Reshape((normal_X_train.shape[1],))
    33. ])
    34. def call(self, x):
    35. encoded = self.encoder(x)
    36. repeated = self.repeat_vector(encoded)
    37. decoded = self.decoder(repeated)
    38. return decoded
    39. LSTMautoencoder = LSTMAutoencoder()
    40. early_stopping = tf.keras.callbacks.EarlyStopping(
    41. monitor='val_loss',
    42. patience=5,
    43. mode='min',
    44. verbose=1,
    45. restore_best_weights=True)
    46. lr_plateau = tf.keras.callbacks.ReduceLROnPlateau(
    47. monitor='val_loss',
    48. factor=0.5,
    49. patience=5,
    50. verbose=1,
    51. mode='min')
    52. LSTMautoencoder.compile(optimizer='adam', loss='mse')
    53. history = LSTMautoencoder.fit(normal_X_train, normal_X_train,
    54. epochs=20,
    55. validation_data=(normal_X_val, normal_X_val),
    56. callbacks=[early_stopping, lr_plateau],
    57. shuffle=True)
    58. plt.figure(figsize=(12,8))
    59. plt.plot(history.history["loss"], label="Training Loss")
    60. plt.plot(history.history["val_loss"], label="Validation Loss")
    61. plt.legend()

    1. encoded_signal_lstm = LSTMautoencoder.encoder(normal_X_test).numpy()
    2. repeated_signal_lstm = LSTMautoencoder.repeat_vector(encoded_signal_lstm).numpy()
    3. decoded_signal_lstm = LSTMautoencoder.decoder(repeated_signal_lstm).numpy()
    4. plt.figure(figsize=(12,8))
    5. plt.plot(normal_X_test[100], 'b')
    6. plt.plot(decoded_signal_lstm[100], 'r')
    7. plt.fill_between(np.arange(140), decoded_signal_lstm[100], normal_X_test[100], color='lightcoral')
    8. plt.legend(labels=["Input", "Reconstruction", "Error"])
    9. plt.show()

    1. encoded_data_lstm = LSTMautoencoder.encoder(anomaly_X_test).numpy()
    2. repeated_signal_lstm = LSTMautoencoder.repeat_vector(encoded_signal_lstm).numpy()
    3. decoded_data_lstm = LSTMautoencoder.decoder(repeated_signal_lstm).numpy()
    4. plt.figure(figsize=(12,8))
    5. plt.plot(anomaly_X_test[100], 'b')
    6. plt.plot(decoded_data_lstm[100], 'r')
    7. plt.fill_between(np.arange(140), decoded_data_lstm[100], anomaly_X_test[100], color='lightcoral')
    8. plt.legend(labels=["Input", "Reconstruction", "Error"])
    9. plt.show()

    1. reconstruction = LSTMautoencoder.predict(normal_X_train)
    2. train_loss = tf.keras.losses.mse(reconstruction, normal_X_train)
    3. fig, ax = plt.subplots(figsize=(12, 8))
    4. sns.distplot(train_loss, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal train data reconstruction loss')

    1. mean_normal = np.mean(train_loss)
    2. std_normal = np.std(train_loss)
    3. threshold = mean_normal + std_normal
    4. print("Threshold: ", threshold)
    5. Threshold: 0.009467508550873416
    6. reconstruction_test = LSTMautoencoder.predict(normal_X_test)
    7. test_loss_normal = tf.keras.losses.mse(reconstruction_test, normal_X_test)
    8. fig, ax = plt.subplots(figsize=(12, 8))
    9. sns.distplot(test_loss_normal, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal test data reconstruction loss')

    1. reconstruction_anomalies = LSTMautoencoder.predict(anomaly_X_test)
    2. test_loss_anomalies = tf.keras.losses.mse(reconstruction_anomalies, anomaly_X_test)
    3. fig, ax = plt.subplots(figsize=(12, 8))
    4. sns.distplot(test_loss_anomalies, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of anomalous test data reconstruction loss')

    1. sns.set(rc={'figure.figsize':(12,8)})
    2. # Plot two histograms of normal and anomalous data
    3. sns.distplot(test_loss_normal.numpy(), bins=50, label='normal')
    4. sns.distplot(test_loss_anomalies.numpy(), bins=50, label='anomaly')
    5. # Plot a vertical line representing the threshold
    6. plt.axvline(threshold, color='r', linewidth=2, linestyle='dashed', label='{:0.3f}'.format(threshold))
    7. # Add legend and title
    8. plt.legend(loc='upper right')
    9. plt.title('Threshold for detecting anomalies')
    10. plt.show()

    1. tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
    2. cm = [[tn, fp],
    3. [fn, tp]]
    4. categories = ['Normal', 'Anomalies']
    5. plt.figure(figsize=(8,6))
    6. g = sns.heatmap(cm, annot=True, fmt="d",
    7. xticklabels=categories,
    8. yticklabels=categories,
    9. cmap="GnBu")
    10. g.set(xlabel='Predicted', ylabel='Actual', title='Confusion Matrix')
    11. plt.show()

    1. tpr_val = []
    2. fpr_val = []
    3. for t in np.linspace(0, 1, 100):
    4. tp,tn,fp,fn = predict_metrics(test_loss_normal,
    5. test_loss_anomalies,
    6. t/10)
    7. tpr = tp/(tp + fn)
    8. fpr = fp/(fp + tn)
    9. tpr_val.append(tpr)
    10. fpr_val.append(fpr)
    11. fig, ax = plt.subplots(figsize=(10,7))
    12. ax.plot(fpr_val, tpr_val)
    13. ax.plot(np.linspace(0, 1, 100),
    14. np.linspace(0, 1, 100),
    15. label='baseline',
    16. linestyle='--')
    17. plt.title('Receiver Operating Characteristic Curve', fontsize=18)
    18. plt.ylabel('TPR', fontsize=16)
    19. plt.xlabel('FPR', fontsize=16)
    20. plt.legend(fontsize=12);

    1. tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
    2. accuracy, precision, recall, f1 = compute_stats(tp, tn, fp, fn)
    3. print("Accuracy = {}".format(np.round(accuracy,4)))
    4. print("Precision = {}".format(np.round(precision,4)))
    5. print("Recall = {}".format(np.round(recall,4)))
    6. print("F1-score = {}".format(np.round(f1,4)))

    Accuracy = 0.9552
    Precision = 0.9104
    Recall = 0.9848
    F1-score = 0.9461

    工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

  • 相关阅读:
    Phaser帧动画没有效果
    Web自动化测试理解
    TMS320F28335启动过程
    数据分析案例-顾客购物数据可视化分析(文末送书)
    uni-app解决表格uni-table样式问题
    Nginx 通过A域名代理B域名,保持A域名访问状态
    账户、权限中心
    flink集群与资源@k8s源码分析-集群
    Mongodbd的简学
    2022年最新前端面试题
  • 原文地址:https://blog.csdn.net/weixin_39402231/article/details/139759235