• 基于自编码器的心电信号异常检测(Pytorch)


    代码较为简单,很容易读懂。

    1. # Importing necessary libraries for TensorFlow, pandas, numpy, and matplotlib
    2. import tensorflow as tf
    3. import pandas as pd
    4. import numpy as np
    5. import matplotlib.pyplot as plt
    6. import copy
    7. # Importing the PyTorch library
    8. import torch
    9. # Importing additional libraries for data manipulation, visualization, and machine learning
    10. import copy
    11. import seaborn as sns
    12. from pylab import rcParams
    13. from matplotlib import rc
    14. from sklearn.model_selection import train_test_split
    15. # Importing PyTorch modules for neural network implementation
    16. from torch import nn, optim
    17. import torch.nn.functional as F
    18. import torch.nn as nn
    19. # Ignoring warnings to enhance code cleanliness
    20. import warnings
    21. warnings.filterwarnings('ignore')
    22. df = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv',header=None)
    23. df.head().T

    df.describe()

    1. df.isna().sum()
    2. 0 0
    3. 1 0
    4. 2 0
    5. 3 0
    6. 4 0
    7. ..
    8. 136 0
    9. 137 0
    10. 138 0
    11. 139 0
    12. 140 0
    13. Length: 141, dtype: int64
    14. df.dtypes
    15. 0 float64
    16. 1 float64
    17. 2 float64
    18. 3 float64
    19. 4 float64
    20. ...
    21. 136 float64
    22. 137 float64
    23. 138 float64
    24. 139 float64
    25. 140 float64
    26. Length: 141, dtype: object
    27. new_columns = list(df.columns)
    28. new_columns[-1] = 'target'
    29. df.columns = new_columns
    30. df.target.value_counts()
    31. 1.0 2919
    32. 0.0 2079
    33. Name: target, dtype: int64
    34. value_counts = df['target'].value_counts()
    35. # Plotting
    36. plt.figure(figsize=(8, 6))
    37. value_counts.plot(kind='bar', color='skyblue')
    38. plt.title('Value Counts of Target Column')
    39. plt.xlabel('Target Values')
    40. plt.ylabel('Count')
    41. # Display the count values on top of the bars
    42. for i, count in enumerate(value_counts):
    43. plt.text(i, count + 0.1, str(count), ha='center', va='bottom')
    44. plt.show()

    1. classes = df.target.unique()
    2. def plot_ecg(data, class_name, ax, n_steps=10):
    3. # Convert data to a DataFrame
    4. time_series_df = pd.DataFrame(data)
    5. # Apply a moving average for smoothing
    6. smooth_data = time_series_df.rolling(window=n_steps, min_periods=1).mean()
    7. # Calculate upper and lower bounds for confidence interval
    8. deviation = time_series_df.rolling(window=n_steps, min_periods=1).std()
    9. upper_bound = smooth_data + deviation
    10. lower_bound = smooth_data - deviation
    11. # Plot the smoothed data
    12. ax.plot(smooth_data, color='black', linewidth=2)
    13. # Plot the confidence interval
    14. ax.fill_between(time_series_df.index, lower_bound[0], upper_bound[0], color='black', alpha=0.2)
    15. # Set the title
    16. ax.set_title(class_name)
    17. # Plotting setup
    18. fig, axs = plt.subplots(
    19. nrows=len(classes) // 3 + 1,
    20. ncols=3,
    21. sharey=True,
    22. figsize=(14, 8)
    23. )
    24. # Plot for each class
    25. for i, cls in enumerate(classes):
    26. ax = axs.flat[i]
    27. data = df[df.target == cls].drop(labels='target', axis=1).mean(axis=0).to_numpy()
    28. plot_ecg(data, cls, ax) # Using 'cls' directly as class name
    29. # Adjust layout and remove extra axes
    30. fig.delaxes(axs.flat[-1])
    31. fig.tight_layout()
    32. plt.show()

    1. normal_df = df[df.target == 1].drop(labels='target', axis=1)
    2. normal_df.shape
    3. (2919, 140)
    4. anomaly_df = df[df.target != 1].drop(labels='target', axis=1)
    5. anomaly_df.shape
    6. (2079, 140)
    7. # Splitting the Dataset
    8. # Initial Train-Validation Split:
    9. # The dataset 'normal_df' is divided into training and validation sets.
    10. # 15% of the data is allocated to the validation set.
    11. # The use of 'random_state=42' ensures reproducibility.
    12. train_df, val_df = train_test_split(
    13. normal_df,
    14. test_size=0.15,
    15. random_state=42
    16. )
    17. # Further Splitting for Validation and Test:
    18. # The validation set obtained in the previous step is further split into validation and test sets.
    19. # 33% of the validation set is allocated to the test set.
    20. # The same 'random_state=42' is used for consistency in randomization.
    21. val_df, test_df = train_test_split(
    22. val_df,
    23. test_size=0.30,
    24. random_state=42
    25. )
    26. # Function to Create a Dataset
    27. def create_dataset(df):
    28. # Convert DataFrame to a list of sequences, each represented as a list of floats
    29. sequences = df.astype(np.float32).to_numpy().tolist()
    30. # Convert sequences to PyTorch tensors, each with shape (sequence_length, 1, num_features)
    31. dataset = [torch.tensor(s).unsqueeze(1).float() for s in sequences]
    32. # Extract dimensions of the dataset
    33. n_seq, seq_len, n_features = torch.stack(dataset).shape
    34. # Return the dataset, sequence length, and number of features
    35. return dataset, seq_len, n_features
    36. # Create the training dataset from train_df
    37. train_dataset, seq_len, n_features = create_dataset(train_df)
    38. # Create the validation dataset from val_df
    39. val_dataset, _, _ = create_dataset(val_df)
    40. # Create the test dataset for normal cases from test_df
    41. test_normal_dataset, _, _ = create_dataset(test_df)
    42. # Create the test dataset for anomalous cases from anomaly_df
    43. test_anomaly_dataset, _, _ = create_dataset(anomaly_df)

    Implementation of LSTM-Based Autoencoder for ECG Anomaly Detection

    1. class Encoder(nn.Module):
    2. def __init__(self, seq_len, n_features, embedding_dim=64):
    3. super(Encoder, self).__init__()
    4. self.seq_len, self.n_features = seq_len, n_features
    5. self.embedding_dim, self.hidden_dim = embedding_dim, 2 * embedding_dim
    6. self.rnn1 = nn.LSTM(
    7. input_size=n_features,
    8. hidden_size=self.hidden_dim,
    9. num_layers=1,
    10. batch_first=True
    11. )
    12. self.rnn2 = nn.LSTM(
    13. input_size=self.hidden_dim,
    14. hidden_size=embedding_dim,
    15. num_layers=1,
    16. batch_first=True
    17. )
    18. def forward(self, x):
    19. x = x.reshape((1, self.seq_len, self.n_features))
    20. x, (_, _) = self.rnn1(x)
    21. x, (hidden_n, _) = self.rnn2(x)
    22. return hidden_n.reshape((self.n_features, self.embedding_dim))
    23. class Decoder(nn.Module):
    24. def __init__(self, seq_len, input_dim=64, n_features=1):
    25. super(Decoder, self).__init__()
    26. self.seq_len, self.input_dim = seq_len, input_dim
    27. self.hidden_dim, self.n_features = 2 * input_dim, n_features
    28. self.rnn1 = nn.LSTM(
    29. input_size=input_dim,
    30. hidden_size=input_dim,
    31. num_layers=1,
    32. batch_first=True
    33. )
    34. self.rnn2 = nn.LSTM(
    35. input_size=input_dim,
    36. hidden_size=self.hidden_dim,
    37. num_layers=1,
    38. batch_first=True
    39. )
    40. self.output_layer = nn.Linear(self.hidden_dim, n_features)
    41. def forward(self, x):
    42. x = x.repeat(self.seq_len, self.n_features)
    43. x = x.reshape((self.n_features, self.seq_len, self.input_dim))
    44. x, (hidden_n, cell_n) = self.rnn1(x)
    45. x, (hidden_n, cell_n) = self.rnn2(x)
    46. x = x.reshape((self.seq_len, self.hidden_dim))
    47. return self.output_layer(x)
    48. class Autoencoder(nn.Module):
    49. def __init__(self, seq_len, n_features, embedding_dim=64):
    50. super(Autoencoder, self).__init__()
    51. self.encoder = Encoder(seq_len, n_features, embedding_dim).to(device)
    52. self.decoder = Decoder(seq_len, embedding_dim, n_features).to(device)
    53. def forward(self, x):
    54. x = self.encoder(x)
    55. x = self.decoder(x)
    56. return x
    57. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    58. model = Autoencoder(seq_len, n_features, 128)
    59. model = model.to(device)

    Training and Visualization of ECG Autoencoder Model

    1. def plot_input_reconstruction(model, dataset, epoch):
    2. model = model.eval()
    3. plt.figure(figsize=(10, 5))
    4. # Take the first sequence from the dataset
    5. seq_true = dataset[0].to(device)
    6. seq_pred = model(seq_true)
    7. with torch.no_grad():
    8. # Squeeze the sequences to ensure they are 1-dimensional
    9. input_sequence = seq_true.squeeze().cpu().numpy()
    10. reconstruction_sequence = seq_pred.squeeze().cpu().numpy()
    11. # Check the shape after squeezing
    12. if input_sequence.ndim != 1 or reconstruction_sequence.ndim != 1:
    13. raise ValueError("Input and reconstruction sequences must be 1-dimensional after squeezing.")
    14. # Plotting the sequences
    15. plt.plot(input_sequence, label='Input Sequence', color='black')
    16. plt.plot(reconstruction_sequence, label='Reconstruction Sequence', color='red')
    17. plt.fill_between(range(len(input_sequence)), input_sequence, reconstruction_sequence, color='gray', alpha=0.5)
    18. plt.title(f'Input vs Reconstruction - Epoch {epoch}')
    19. plt.legend()
    20. plt.show()
    21. import torch
    22. import numpy as np
    23. import copy
    24. def train_model(model, train_dataset, val_dataset, n_epochs, save_path):
    25. optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
    26. criterion = torch.nn.L1Loss(reduction='sum').to(device)
    27. history = {'train': [], 'val': []}
    28. best_model_wts = copy.deepcopy(model.state_dict())
    29. best_loss = float('inf')
    30. for epoch in range(1, n_epochs + 1):
    31. model.train()
    32. train_losses = []
    33. for seq_true in train_dataset:
    34. optimizer.zero_grad()
    35. seq_true = seq_true.to(device)
    36. seq_pred = model(seq_true)
    37. loss = criterion(seq_pred, seq_true)
    38. loss.backward()
    39. optimizer.step()
    40. train_losses.append(loss.item())
    41. val_losses = []
    42. model.eval()
    43. with torch.no_grad():
    44. for seq_true in val_dataset:
    45. seq_true = seq_true.to(device)
    46. seq_pred = model(seq_true)
    47. loss = criterion(seq_pred, seq_true)
    48. val_losses.append(loss.item())
    49. train_loss = np.mean(train_losses)
    50. val_loss = np.mean(val_losses)
    51. history['train'].append(train_loss)
    52. history['val'].append(val_loss)
    53. if val_loss < best_loss:
    54. best_loss = val_loss
    55. best_model_wts = copy.deepcopy(model.state_dict())
    56. # Save the best model weights
    57. print("Saving best model")
    58. torch.save(model.state_dict(), save_path)
    59. print(f'Epoch {epoch}: train loss {train_loss} val loss {val_loss}')
    60. if epoch == 1 or epoch % 5 == 0:
    61. plot_input_reconstruction(model, val_dataset, epoch)
    62. # Load the best model weights before returning
    63. model.load_state_dict(best_model_wts)
    64. return model.eval(), history
    65. save_path = 'best_model.pth' # Replace with your actual path
    66. model, history = train_model(model, train_dataset, val_dataset, 100, save_path)

    1. ax = plt.figure().gca()
    2. ax.plot(history['train'],label='Train Loss', color='black')
    3. ax.plot(history['val'],label='Val Loss', color='red')
    4. plt.ylabel('Loss')
    5. plt.xlabel('Epoch')
    6. plt.legend(['train', 'test'])
    7. plt.title('Loss over training epochs')
    8. plt.show();

    ECG Anomaly Detection Model Evaluation and Visualization

    1. model = Autoencoder(seq_len, n_features, 128)
    2. model.load_state_dict(torch.load('best_model.pth'))
    3. model = model.to(device)
    4. model.eval()
    5. Autoencoder(
    6. (encoder): Encoder(
    7. (rnn1): LSTM(1, 256, batch_first=True)
    8. (rnn2): LSTM(256, 128, batch_first=True)
    9. )
    10. (decoder): Decoder(
    11. (rnn1): LSTM(128, 128, batch_first=True)
    12. (rnn2): LSTM(128, 256, batch_first=True)
    13. (output_layer): Linear(in_features=256, out_features=1, bias=True)
    14. )
    15. )
    16. def predict(model, dataset):
    17. predictions, losses = [], []
    18. criterion = nn.L1Loss(reduction='sum').to(device)
    19. with torch.no_grad():
    20. model = model.eval()
    21. for seq_true in dataset:
    22. seq_true = seq_true.to(device)
    23. seq_pred = model(seq_true)
    24. loss = criterion(seq_pred, seq_true)
    25. predictions.append(seq_pred.cpu().numpy().flatten())
    26. losses.append(loss.item())
    27. return predictions, losses
    28. _, losses = predict(model, train_dataset)
    29. sns.distplot(losses, bins=50, kde=True, label='Train',color='black');
    30. #Visualising train loss

    1. Threshold = 25
    2. predictions, pred_losses = predict(model, test_normal_dataset)
    3. sns.distplot(pred_losses, bins=50, kde=True,color='black')

    1. correct = sum(l <= 25 for l in pred_losses)
    2. print(f'Correct normal predictions: {correct}/{len(test_normal_dataset)}')
    3. Correct normal predictions: 141/145
    4. anomaly_dataset = test_anomaly_dataset[:len(test_normal_dataset)]
    5. predictions, pred_losses = predict(model, anomaly_dataset)
    6. sns.distplot(pred_losses, bins=50, kde=True,color='red');

    1. correct = sum(l > 25 for l in pred_losses)
    2. print(f'Correct anomaly predictions: {correct}/{len(anomaly_dataset)}')

    Correct anomaly predictions: 145/145

    1. def plot_prediction(data, model, title, ax):
    2. predictions, pred_losses = predict(model, [data])
    3. ax.plot(data, label='true',color='black')
    4. ax.plot(predictions[0], label='reconstructed',color='red')
    5. ax.set_title(f'{title} (loss: {np.around(pred_losses[0], 2)})')
    6. ax.legend()
    7. fig, axs = plt.subplots(
    8. nrows=2,
    9. ncols=4,
    10. sharey=True,
    11. sharex=True,
    12. figsize=(22, 8)
    13. )
    14. for i, data in enumerate(test_normal_dataset[:4]):
    15. plot_prediction(data, model, title='Normal', ax=axs[0, i])
    16. for i, data in enumerate(test_anomaly_dataset[:4]):
    17. plot_prediction(data, model, title='Anomaly', ax=axs[1, i])
    18. fig.tight_layout();

    工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

  • 相关阅读:
    CSS3------CSS大结局
    牛客网剑指offer刷题练习之链表中环的入口结点
    前端实现微信扫一扫的思路
    人大金仓助力国家电网调度中心培养国产数据库专家人才
    [附源码]Python计算机毕业设计Django架构的博客平台设计
    探索设计模式的魅力:状态模式揭秘-如何优雅地处理复杂状态转换
    隐私计算融合应用研究
    【数据结构初阶】--- 栈和队列
    低代码在企业数字化转型中有什么价值?
    HTML、CSS常用的vscode插件 +Css reset 和Normalize.css
  • 原文地址:https://blog.csdn.net/weixin_39402231/article/details/139755059