本项目主要实现卷积自编码器对于异常心电ECG信号的检测和识别,属于无监督学习中的生理信号检测的典型方法之一。
- normal_df = pd.read_csv("/heartbeat/ptbdb_normal.csv").iloc[:, :-1]
- anomaly_df = pd.read_csv("/heartbeat/ptbdb_abnormal.csv").iloc[:, :-1]
- normal_df.head()
信号可视化
- def plot_sample(normal, anomaly):
- index = np.random.randint(0, len(normal_df), 2)
-
- fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
- ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")
- ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")
- ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
- ax[0].set_title("Normal")
-
- ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")
- ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")
- ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
- ax[1].set_title("Anomaly")
-
- plt.tight_layout()
- plt.show()
-
-
- plot_sample(normal_df, anomaly_df)
- def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):
- df = pd.DataFrame(data)
- roll_df = df.rolling(step_size)
- smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)
- smoothed_std = roll_df.std().dropna().reset_index(drop=True)
- margin = 3*smoothed_std
- lower_bound = (smoothed_mean - margin).values.flatten()
- upper_bound = (smoothed_mean + margin).values.flatten()
-
- ax.plot(smoothed_mean.index, smoothed_mean)
- ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")
- ax.set_title(class_name, fontsize=9)
-
- fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
- axes = axes.flatten()
- for i, label in enumerate(CLASS_NAMES, start=1):
- data_group = df.groupby("target")
- data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()
- plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
- fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
- plt.tight_layout()
- normal_df.drop("target", axis=1, errors="ignore", inplace=True)
- normal = normal_df.to_numpy()
- anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
- anomaly = anomaly_df.to_numpy()
-
- X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
- print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
- class AutoEncoder(Model):
- def __init__(self, input_dim, latent_dim):
- super(AutoEncoder, self).__init__()
- self.input_dim = input_dim
- self.latent_dim = latent_dim
-
- self.encoder = tf.keras.Sequential([
- layers.Input(shape=(input_dim,)),
- layers.Reshape((input_dim, 1)), # Reshape to 3D for Conv1D
- layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
- layers.BatchNormalization(),
- layers.MaxPooling1D(2, padding="same"),
- layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
- layers.BatchNormalization(),
- layers.MaxPooling1D(2, padding="same"),
- layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
- layers.BatchNormalization(),
- layers.MaxPooling1D(2, padding="same"),
- ])
- # Previously, I was using UpSampling. I am trying Transposed Convolution this time around.
- self.decoder = tf.keras.Sequential([
- layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
- # layers.UpSampling1D(2),
- layers.BatchNormalization(),
- layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
- # layers.UpSampling1D(2),
- layers.BatchNormalization(),
- layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
- # layers.UpSampling1D(2),
- layers.BatchNormalization(),
- layers.Flatten(),
- layers.Dense(input_dim)
- ])
-
- def call(self, X):
- encoded = self.encoder(X)
- decoded = self.decoder(encoded)
- return decoded
-
-
- input_dim = X_train.shape[-1]
- latent_dim = 32
-
- model = AutoEncoder(input_dim, latent_dim)
- model.build((None, input_dim))
- model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
- model.summary()
- epochs = 100
- batch_size = 128
- early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)
-
-
- history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
- validation_split=0.1, callbacks=[early_stopping])
- plt.plot(history.history['loss'], label="Training loss")
- plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
- plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
- plt.title("Training loss")
- plt.ylabel("Loss")
- plt.xlabel("Epoch")
- plt.show()
- fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
- random_indexes = np.random.randint(0, len(X_train), size=5)
-
- for i, idx in enumerate(random_indexes):
- data = X_train[[idx]]
- plot_examples(model, data, ax=axes[0, i], title="Normal")
-
- for i, idx in enumerate(random_indexes):
- data = anomaly[[idx]]
- plot_examples(model, data, ax=axes[1, i], title="anomaly")
-
- plt.tight_layout()
- fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
- fig.savefig("autoencoder.png")
- plt.show()
- train_mae = model.evaluate(X_train, X_train, verbose=0)
- test_mae = model.evaluate(X_test, X_test, verbose=0)
- anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)
-
- print("Training dataset error: ", train_mae)
- print("Testing dataset error: ", test_mae)
- print("Anormaly dataset error: ", anomaly_mae)
MAE误差阈值=正常数据重建MAE均值+正常数据重建MAE标准差,此阈值可以用来直接检测某信号为正常信号还是异常心电信号。
- def predict(model, X):
- pred = model.predict(X, verbose=False)
- loss = mae(pred, X)
- return pred, loss
-
-
- _, train_loss = predict(model, X_train)
- _, test_loss = predict(model, X_test)
- _, anomaly_loss = predict(model, anomaly)
- threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous data
-
- bins = 40
- plt.figure(figsize=(9, 5), dpi=100)
- sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
- sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
- sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")
-
- ax = plt.gca() # Get the current Axes
- ylim = ax.get_ylim()
- plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
- plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),
- arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
- plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
- plt.show()
- plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
-
- ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
- print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))
相关项目开发和问题,欢迎后台沟通交流。