- import os
- import numpy as np
- import pandas as pd
- import seaborn as sns
- import matplotlib as mpl
- import matplotlib.pyplot as plt
-
-
- from sklearn.preprocessing import MinMaxScaler
- from sklearn.model_selection import train_test_split
-
-
- import tensorflow as tf
- from tensorflow.keras import layers,losses
- from tensorflow.keras.layers import Input, Dense, LSTM, RepeatVector, TimeDistributed
- from tensorflow.keras.models import Model
- from tensorflow.keras.utils import plot_model
-
-
- RANDOM_SEED = 42
- np.random.seed(RANDOM_SEED)
- # Import the dataset files
-
-
- ecg_train_file = 'ECG5000_TRAIN.csv'
- ecg_test_file = 'ECG5000_TEST.csv'
-
-
- train_data = pd.read_csv(ecg_train_file)
- test_data = pd.read_csv(ecg_test_file)
-
-
- df_train = pd.DataFrame(train_data)
- df_test = pd.DataFrame(test_data)
- df = pd.concat([df_train, df_test], ignore_index=True)
- df = df.drop(labels='id', axis=1)
- # 140 timesteps + target
- new_columns = list(df.columns)
- new_columns[-1] = 'target'
- df.columns = new_columns
- df

- normal_class = 1
- class_names = ['Normal','R on T','PVC','SP','UB']
- df['target'].value_counts()
- 1 2919
- 2 1767
- 4 194
- 3 96
- 5 24
- Name: target, dtype: int64
- plt.figure(figsize=(12,8))
- ax = sns.countplot(x=df.target)
- ax.set_xticklabels(class_names);

- def plot_time_series_class(data, class_name, ax, n_steps=10):
- '''
- This function plots an averaged time series for each class,
- smoothed out with a standard deviation around it.
- '''
-
-
- time_series_df = pd.DataFrame(data)
-
- # sliding window iterator rolling(win_size)
- smooth_path = time_series_df.rolling(n_steps).mean()
- path_deviation = 2 * time_series_df.rolling(n_steps).std()
-
-
- # smooth with a std under and over the line
- under_line = (smooth_path - path_deviation)[0]
- over_line = (smooth_path + path_deviation)[0]
-
-
- ax.plot(smooth_path, linewidth=2)
- ax.fill_between(path_deviation.index, under_line, over_line, alpha=.200)
- ax.set_title(class_name)
- classes = df.target.unique()
-
-
- fig, axs = plt.subplots(
- nrows=len(classes) // 3 + 1,
- ncols=3,
- sharey=True,
- figsize=(18, 8)
- )
-
-
- # plot for each class (1-5)
- for i, cls in enumerate(classes):
- ax = axs.flat[i]
- data = df[df.target == cls] \
- .drop(labels='target', axis=1) \
- .mean(axis=0) \
- .to_numpy()
- plot_time_series_class(data, class_names[i], ax)
-
-
- fig.delaxes(axs.flat[-1])
- fig.tight_layout();

- X_train, X_test, y_train, y_test = train_test_split(df.values,
- df.values[:,-1],
- test_size=0.33,
- random_state=RANDOM_SEED)
-
-
- X_test, X_val, y_test, y_val = train_test_split(X_test,
- y_test,
- test_size=0.5,
- random_state=RANDOM_SEED)
- print('The shape of the training set: ', X_train.shape)
- print('The shape of the validation set: ', X_val.shape)
- print('The shape of the test set: ', X_test.shape)
- The shape of the training set: (3350, 141)
- The shape of the validation set: (825, 141)
- The shape of the test set: (825, 141)
- scaler = MinMaxScaler()
- data_scaled = scaler.fit(X_train[:,:-1])
- X_train[:,:-1] = data_scaled.transform(X_train[:,:-1])
- X_val[:,:-1] = data_scaled.transform(X_val[:,:-1])
- X_test[:,:-1] = data_scaled.transform(X_test[:,:-1])
- df_train = pd.DataFrame(X_train)
- df_val = pd.DataFrame(X_val)
- df_test = pd.DataFrame(X_test)
- def filter_normal_signals(df):
- '''Filters out normal signals (target = 1)'''
-
- df = df[df.iloc[:,-1] == 1].drop(columns=df.columns[-1], axis=1)
- return df.values
-
-
- def filter_anomalous_signals(df):
- ''' Filters out anomalous signals (target > 1)'''
-
-
- df = df[df.iloc[:,-1] > 1].drop(columns=df.columns[-1], axis=1)
- return df.values
- normal_X_train = filter_normal_signals(df_train)
- normal_X_val = filter_normal_signals(df_val)
- normal_X_test = filter_normal_signals(df_test)
- print('The shape of the normal signals training set: ', normal_X_train.shape)
- print('The shape of the normal signals validation set: ', normal_X_val.shape)
- print('The shape of the normal signals test set: ', normal_X_test.shape)
- The shape of the normal signals training set: (1932, 140)
- The shape of the normal signals validation set: (492, 140)
- The shape of the normal signals test set: (495, 140)
- anomaly_X_train = filter_anomalous_signals(df_train)
- anomaly_X_val = filter_anomalous_signals(df_val)
- anomaly_X_test = filter_anomalous_signals(df_test)
- print('The shape of the normal signals training set: ', anomaly_X_train.shape)
- print('The shape of the normal signals validation set: ', anomaly_X_val.shape)
- print('The shape of the normal signals test set: ', anomaly_X_test.shape)
- The shape of the normal signals training set: (1418, 140)
- The shape of the normal signals validation set: (333, 140)
- The shape of the normal signals test set: (330, 140)
- # normal signals
-
-
- plt.figure(figsize=(12,8))
- for i in range(5,10):
- plt.plot(normal_X_train[i])

- # anomalous signals
-
-
- plt.figure(figsize=(12,8))
- for i in range(5,10):
- plt.plot(anomaly_X_train[i])

- class AutoEncoder(Model):
- def __init__(self):
- super(AutoEncoder, self).__init__()
- # Define the encoder
- self.encoder = tf.keras.Sequential([
- layers.Dense(64, activation="relu"),
- layers.Dense(32, activation="relu"),
- layers.Dense(16, activation="relu"),
- layers.Dense(8, activation="relu")])
-
-
- # Define the decoder
- self.decoder = tf.keras.Sequential([
- layers.Dense(16, activation="relu"),
- layers.Dense(32, activation="relu"),
- layers.Dense(64, activation="relu"),
- layers.Dense(140, activation="sigmoid")])
-
-
- def call(self, x):
- # Define how an evaluation of the network is performed
- encoded = self.encoder(x)
- decoded = self.decoder(encoded)
- return decoded
- autoencoder = AutoEncoder()
-
-
- early_stopping = tf.keras.callbacks.EarlyStopping(
- monitor='val_loss',
- patience=5,
- mode='min',
- restore_best_weights=True)
-
-
- lr_plateau = tf.keras.callbacks.ReduceLROnPlateau(
- monitor='val_loss',
- factor=0.5,
- patience=5,
- min_lr=1e-6,
- mode='min')
-
-
- autoencoder.compile(optimizer='adam', loss='mse')
- history = autoencoder.fit(normal_X_train, normal_X_train,
- epochs=30,
- batch_size=128,
- validation_data=(normal_X_val, normal_X_val),
- callbacks=[early_stopping, lr_plateau],
- shuffle=True)
- autoencoder.decoder.summary()
- Model: "sequential_1"
- _________________________________________________________________
- Layer (type) Output Shape Param #
- =================================================================
- dense_4 (Dense) (None, 16) 144
-
- dense_5 (Dense) (None, 32) 544
-
- dense_6 (Dense) (None, 64) 2112
-
- dense_7 (Dense) (None, 140) 9100
-
- =================================================================
- Total params: 11,900
- Trainable params: 11,900
- Non-trainable params: 0
- plt.figure(figsize=(12,8))
- plt.plot(history.history["loss"], label="Training Loss")
- plt.plot(history.history["val_loss"], label="Validation Loss")
- plt.legend()

- encoded_signal = autoencoder.encoder(normal_X_test).numpy()
- decoded_signal = autoencoder.decoder(encoded_signal).numpy()
-
-
- plt.figure(figsize=(12,8))
- plt.plot(normal_X_test[100], 'b')
- plt.plot(decoded_signal[100], 'r')
- plt.fill_between(np.arange(140), decoded_signal[100], normal_X_test[100], color='lightcoral')
- plt.legend(labels=["Input", "Reconstruction", "Error"])
- plt.show()

- encoded_signal = autoencoder.encoder(anomaly_X_test).numpy()
- decoded_signal = autoencoder.decoder(encoded_signal).numpy()
-
-
- plt.figure(figsize=(12,8))
- plt.plot(anomaly_X_test[100], 'b')
- plt.plot(decoded_signal[100], 'r')
- plt.fill_between(np.arange(140), decoded_signal[100], anomaly_X_test[100], color='lightcoral')
- plt.legend(labels=["Input", "Reconstruction", "Error"])
- plt.show()

- reconstruction = autoencoder.predict(normal_X_train)
- train_loss = tf.keras.losses.mse(reconstruction, normal_X_train)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(train_loss, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal train data reconstruction loss')

- mean_normal = np.mean(train_loss)
- std_normal = np.std(train_loss)
- threshold = mean_normal + 2*std_normal
- print("Threshold: ", threshold)
- Threshold: 0.008762959466782183
- reconstruction_test = autoencoder.predict(normal_X_test)
- test_loss_normal = tf.keras.losses.mse(reconstruction_test, normal_X_test)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(test_loss_normal, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal test data reconstruction loss')

- reconstruction_anomalies = autoencoder.predict(anomaly_X_test)
- test_loss_anomalies = tf.keras.losses.mse(reconstruction_anomalies, anomaly_X_test)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(test_loss_anomalies, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of anomalous test data reconstruction loss')

- sns.set(rc={'figure.figsize':(12,8)})
-
-
- # Plot two histograms of normal and anomalous data
- sns.distplot(test_loss_normal.numpy(), bins=50, label='normal')
- sns.distplot(test_loss_anomalies.numpy(), bins=50, label='anomaly')
-
-
- # Plot a vertical line representing the threshold
- plt.axvline(threshold, color='r', linewidth=2, linestyle='dashed', label='{:0.3f}'.format(threshold))
-
-
- # Add legend and title
- plt.legend(loc='upper right')
- plt.title('Threshold for detecting anomalies')
- plt.show()

- def predict_metrics(normal, anomalous, threshold):
- ''' Compute the True Positives (TP), True Negatives (TN),
- False Positives (FP) and False Negatives (FN) given the
- normal data, anomalous data and the threshold.
- '''
- # We correctly detect normal data if the normal loss is smaller than the threshold
- pred_normal = tf.math.less(normal, threshold)
-
- # We correctly detect anomalous data if the normal loss is greater than the threshold
- pred_anomaly = tf.math.greater(anomalous, threshold)
-
-
- tn = tf.math.count_nonzero(pred_normal).numpy()
- fp = normal.shape[0] - tn
-
-
- tp = tf.math.count_nonzero(pred_anomaly).numpy()
- fn = anomalous.shape[0] - tp
-
- return tp, tn, fp, fn
- tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
-
-
- cm = [[tn, fp],
- [fn, tp]]
- categories = ['Normal', 'Anomalies']
-
-
- plt.figure(figsize=(8,6))
- g = sns.heatmap(cm, annot=True, fmt="d",
- xticklabels=categories,
- yticklabels=categories,
- cmap="GnBu")
-
-
- g.set(xlabel='Predicted', ylabel='Actual', title='Confusion Matrix')
-
-
- plt.show()

- tpr_val = []
- fpr_val = []
-
-
- for t in np.linspace(0, 1, 100):
- tp, tn, fp, fn = predict_metrics(test_loss_normal,
- test_loss_anomalies,
- t/10)
- tpr = tp/(tp + fn)
- fpr = fp/(fp + tn)
- tpr_val.append(tpr)
- fpr_val.append(fpr)
-
-
- fig, ax = plt.subplots(figsize=(10,7))
- ax.plot(fpr_val, tpr_val)
- ax.plot(np.linspace(0, 1, 100),
- np.linspace(0, 1, 100),
- label='baseline',
- linestyle='--')
- plt.title('Receiver Operating Characteristic Curve', fontsize=18)
- plt.ylabel('TPR', fontsize=16)
- plt.xlabel('FPR', fontsize=16)
- plt.legend(fontsize=12);

- def compute_stats(tp, tn, fp, fn):
- ''' Computes accuracy, precision, recall and f1 scores
- given TP, TN, FP and FN'''
-
- acc = (tp + tn) / (tp + tn + fp + fn)
- prec = tp / (tp + fp)
- rec = tp / (tp + fn)
- f1 = 2 * (prec * rec) / (prec + rec)
-
-
- return acc, prec, rec, f1
- tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
- accuracy, precision, recall, f1 = compute_stats(tp, tn, fp, fn)
-
-
- print("Accuracy = {}".format(np.round(accuracy,4)))
- print("Precision = {}".format(np.round(precision,4)))
- print("Recall = {}".format(np.round(recall,4)))
- print("F1-score = {}".format(np.round(f1,4)))
- Accuracy = 0.9745
- Precision = 0.9558
- Recall = 0.9818
- F1-score = 0.9686
- class LSTMAutoencoder(Model):
- def __init__(self):
- super(LSTMAutoencoder, self).__init__()
- self.encoder = tf.keras.Sequential([
- layers.Reshape((normal_X_train.shape[1], 1)),
- layers.LSTM(128, input_shape=(normal_X_train.shape[1], 1), return_sequences=True),
- layers.LSTM(64, return_sequences=False),
- ])
-
-
- self.repeat_vector = layers.RepeatVector(normal_X_train.shape[1])
-
-
- self.decoder = tf.keras.Sequential([
- layers.LSTM(64, return_sequences=True),
- layers.LSTM(128, return_sequences=True),
- layers.TimeDistributed(layers.Dense(1)),
- layers.Reshape((normal_X_train.shape[1],))
- ])
-
-
- def call(self, x):
- encoded = self.encoder(x)
- repeated = self.repeat_vector(encoded)
- decoded = self.decoder(repeated)
- return decoded
- LSTMautoencoder = LSTMAutoencoder()
-
-
- early_stopping = tf.keras.callbacks.EarlyStopping(
- monitor='val_loss',
- patience=5,
- mode='min',
- verbose=1,
- restore_best_weights=True)
-
-
- lr_plateau = tf.keras.callbacks.ReduceLROnPlateau(
- monitor='val_loss',
- factor=0.5,
- patience=5,
- verbose=1,
- mode='min')
- LSTMautoencoder.compile(optimizer='adam', loss='mse')
-
-
- history = LSTMautoencoder.fit(normal_X_train, normal_X_train,
- epochs=20,
- validation_data=(normal_X_val, normal_X_val),
- callbacks=[early_stopping, lr_plateau],
- shuffle=True)
- plt.figure(figsize=(12,8))
- plt.plot(history.history["loss"], label="Training Loss")
- plt.plot(history.history["val_loss"], label="Validation Loss")
- plt.legend()

- encoded_signal_lstm = LSTMautoencoder.encoder(normal_X_test).numpy()
- repeated_signal_lstm = LSTMautoencoder.repeat_vector(encoded_signal_lstm).numpy()
- decoded_signal_lstm = LSTMautoencoder.decoder(repeated_signal_lstm).numpy()
-
-
- plt.figure(figsize=(12,8))
- plt.plot(normal_X_test[100], 'b')
- plt.plot(decoded_signal_lstm[100], 'r')
- plt.fill_between(np.arange(140), decoded_signal_lstm[100], normal_X_test[100], color='lightcoral')
- plt.legend(labels=["Input", "Reconstruction", "Error"])
- plt.show()

- encoded_data_lstm = LSTMautoencoder.encoder(anomaly_X_test).numpy()
- repeated_signal_lstm = LSTMautoencoder.repeat_vector(encoded_signal_lstm).numpy()
- decoded_data_lstm = LSTMautoencoder.decoder(repeated_signal_lstm).numpy()
-
-
- plt.figure(figsize=(12,8))
- plt.plot(anomaly_X_test[100], 'b')
- plt.plot(decoded_data_lstm[100], 'r')
- plt.fill_between(np.arange(140), decoded_data_lstm[100], anomaly_X_test[100], color='lightcoral')
- plt.legend(labels=["Input", "Reconstruction", "Error"])
- plt.show()

- reconstruction = LSTMautoencoder.predict(normal_X_train)
- train_loss = tf.keras.losses.mse(reconstruction, normal_X_train)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(train_loss, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal train data reconstruction loss')

- mean_normal = np.mean(train_loss)
- std_normal = np.std(train_loss)
- threshold = mean_normal + std_normal
- print("Threshold: ", threshold)
- Threshold: 0.009467508550873416
- reconstruction_test = LSTMautoencoder.predict(normal_X_test)
- test_loss_normal = tf.keras.losses.mse(reconstruction_test, normal_X_test)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(test_loss_normal, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of normal test data reconstruction loss')

- reconstruction_anomalies = LSTMautoencoder.predict(anomaly_X_test)
- test_loss_anomalies = tf.keras.losses.mse(reconstruction_anomalies, anomaly_X_test)
- fig, ax = plt.subplots(figsize=(12, 8))
- sns.distplot(test_loss_anomalies, bins=50, kde=True, ax=ax.grid()).set(title='Distribution of anomalous test data reconstruction loss')

- sns.set(rc={'figure.figsize':(12,8)})
-
-
- # Plot two histograms of normal and anomalous data
- sns.distplot(test_loss_normal.numpy(), bins=50, label='normal')
- sns.distplot(test_loss_anomalies.numpy(), bins=50, label='anomaly')
-
-
- # Plot a vertical line representing the threshold
- plt.axvline(threshold, color='r', linewidth=2, linestyle='dashed', label='{:0.3f}'.format(threshold))
-
-
- # Add legend and title
- plt.legend(loc='upper right')
- plt.title('Threshold for detecting anomalies')
- plt.show()

- tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
-
-
- cm = [[tn, fp],
- [fn, tp]]
-
-
- categories = ['Normal', 'Anomalies']
-
-
- plt.figure(figsize=(8,6))
- g = sns.heatmap(cm, annot=True, fmt="d",
- xticklabels=categories,
- yticklabels=categories,
- cmap="GnBu")
-
-
- g.set(xlabel='Predicted', ylabel='Actual', title='Confusion Matrix')
-
-
- plt.show()

- tpr_val = []
- fpr_val = []
-
-
- for t in np.linspace(0, 1, 100):
- tp,tn,fp,fn = predict_metrics(test_loss_normal,
- test_loss_anomalies,
- t/10)
- tpr = tp/(tp + fn)
- fpr = fp/(fp + tn)
- tpr_val.append(tpr)
- fpr_val.append(fpr)
-
-
- fig, ax = plt.subplots(figsize=(10,7))
- ax.plot(fpr_val, tpr_val)
- ax.plot(np.linspace(0, 1, 100),
- np.linspace(0, 1, 100),
- label='baseline',
- linestyle='--')
- plt.title('Receiver Operating Characteristic Curve', fontsize=18)
- plt.ylabel('TPR', fontsize=16)
- plt.xlabel('FPR', fontsize=16)
- plt.legend(fontsize=12);

- tp, tn, fp, fn = predict_metrics(test_loss_normal, test_loss_anomalies, threshold)
- accuracy, precision, recall, f1 = compute_stats(tp, tn, fp, fn)
-
-
- print("Accuracy = {}".format(np.round(accuracy,4)))
- print("Precision = {}".format(np.round(precision,4)))
- print("Recall = {}".format(np.round(recall,4)))
- print("F1-score = {}".format(np.round(f1,4)))
Accuracy = 0.9552
Precision = 0.9104
Recall = 0.9848
F1-score = 0.9461
工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。