• 基于Ray的分布式版本的决策树与随机森林


    微信公众号:大数据高性能计算

    在这里插入图片描述
    在金融场景或者其余场景,经常我们需要进行规则或者是策略,如何通过一些算法对入模特征完成分布式化的规则切分是​必须要做的事情。这里面有两个点:一种是规则切分成可解释性的规则,天然比如决策树、随机森林等,​第二个点就是生产阶段我们的数据规模往往比较大,不是单机可以解决的,这种情况下如何完成分布式化改造应对生产需求?​围绕这两个命题我们展开下文的解决方案。

    1 基于Ray的分布式版本的决策树与随机森林

    1.1 Ray实现的第一版以及问题

    我们基于Ray的Api 以及 sklearn的包来实现第一版本的决策树

    import ray
    from sklearn.datasets import load_iris
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.model_selection import train_test_split
    
    # Initialize Ray
    ray.init()
    
    @ray.remote
    def train_decision_tree(subset_x, subset_y):
        """Train a decision tree on a subset of data."""
        clf = DecisionTreeClassifier()
        clf.fit(subset_x, subset_y)
        return clf
    
    def distribute_training(data, labels, num_splits=4):
        """Distribute training across multiple cores using Ray."""
        # Split the data into multiple subsets
        chunked_data = np.array_split(data, num_splits)
        chunked_labels = np.array_split(labels, num_splits)
    
        # Distribute the training tasks
        futures = [train_decision_tree.remote(chunked_data[i], chunked_labels[i]) for i in range(num_splits)]
        
        # Fetch results
        models = ray.get(futures)
        return models
    
    if __name__ == "__main__":
        iris = load_iris()
        X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)
        
        models = distribute_training(X_train, y_train)
        for model in models:
            score = model.score(X_test, y_test)
            print(f"Model accuracy: {score:.2f}")
    
    # Shutdown Ray
    ray.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    运行之后的结果:

    在这里插入图片描述

    不过我们看下上面的代码,我们可以看到训练是使用了分布式,不过我们的load却是单机的,从实际来用的话,往往我们的数据集是非常庞大的,这时候就需要通过分布式的形式来加载数据集,之后再分布式的来进行训练.

    1.2 基于Ray的分布式DecisionTree版本

    这里面为了达到分布式的效果,我们准备自己mock 几个数据集文件,然后并行分布式的去读取这些文件,然后再进行并行的训练。

    首先我们假设我们的数据集存在多个文件里比如csv,真实情况可能对应的就是某些数仓,下面我们将会呈现怎么进行分布式并行的加载数据,并针对决策树进行升级。

    1.2.1 mock数据

    mock数据写入到CSV

    from sklearn.datasets import load_iris
    import pandas as pd
    import numpy as np
    
    def create_mock_csv_files(num_files=4):
        iris = load_iris()
        df = pd.DataFrame(data= np.c_[iris['data'], iris['target']], columns= iris['feature_names'] + ['target'])
        
        # Split and save to CSV
        chunked_data = np.array_split(df, num_files)
        file_paths = []
        for i, chunk in enumerate(chunked_data):
            file_name = f"data{i + 1}.csv"
            chunk.to_csv(file_name, index=False)
            file_paths.append(file_name)
        
        return file_paths
    
    file_paths = create_mock_csv_files()
    print(f"Created CSV files: {file_paths}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    1.2.2 分布式加载数据,分布式训练

    import ray
    import numpy as np
    import pandas as pd
    from sklearn.tree import DecisionTreeClassifier
    
    # Initialize Ray
    ray.init()
    
    @ray.remote
    def load_data(file_path):
        """Load data from a given file."""
        # For demonstration, I'm assuming the data is in CSV format.
        data = pd.read_csv(file_path)
        X = data.drop('target', axis=1).values
        y = data['target'].values
        return X, y
    
    @ray.remote
    def train_decision_tree(X, y):
        """Train a decision tree on given data."""
        clf = DecisionTreeClassifier()
        clf.fit(X, y)
        return clf
    
    def distributed_loading_and_training(file_paths):
        """Distribute data loading and training across cores using Ray."""
        
        # Distribute the data loading tasks
        data_futures = [load_data.remote(file) for file in file_paths]
        datasets = ray.get(data_futures)
    
        # Distribute the training tasks
        training_futures = [train_decision_tree.remote(X, y) for X, y in datasets]
        models = ray.get(training_futures)
        
        return models
    
    if __name__ == "__main__":
        # Assume you have data split across 4 CSV files
        file_paths = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]
    
        models = distributed_loading_and_training(file_paths)
        # Note: For demonstration purposes, this example lacks testing and accuracy reporting. 
        # You can extend it as per the previous example.
    
    # Shutdown Ray
    ray.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.2.3 测试模型的性能

    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    
    # ... [distributed_loading_and_training code from above] ...
    
    if __name__ == "__main__":
        file_paths = create_mock_csv_files()
        print(f"Created CSV files: {file_paths}")
        
        models = distributed_loading_and_training(file_paths)
        
        # Test the performance of one of the models
        iris = load_iris()
        X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)
        predictions = models[0].predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        print(f"Model accuracy: {accuracy:.2f}")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    完整的工程实现:

    from sklearn.datasets import load_iris
    import pandas as pd
    import numpy as np
    import ray
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    from sklearn.tree import DecisionTreeClassifier
    
    def create_mock_csv_files(num_files=4):
        iris = load_iris()
        df = pd.DataFrame(data=np.c_[iris['data'], iris['target']], columns=iris['feature_names'] + ['target'])
    
        # Split and save to CSV
        chunked_data = np.array_split(df, num_files)
        file_paths = []
        for i, chunk in enumerate(chunked_data):
            file_name = f"data{i + 1}.csv"
            chunk.to_csv(file_name, index=False)
            file_paths.append(file_name)
    
        return file_paths
    
    
    
    
    @ray.remote
    def load_data(file_path):
        """Load data from a given file."""
        # For demonstration, I'm assuming the data is in CSV format.
        data = pd.read_csv(file_path)
        X = data.drop('target', axis=1).values
        y = data['target'].values
        return X, y
    
    
    @ray.remote
    def train_decision_tree(X, y):
        """Train a decision tree on given data."""
        clf = DecisionTreeClassifier()
        clf.fit(X, y)
        return clf
    
    
    def distributed_loading_and_training(file_paths):
        """Distribute data loading and training across cores using Ray."""
    
        # Distribute the data loading tasks
        data_futures = [load_data.remote(file) for file in file_paths]
        datasets = ray.get(data_futures)
    
        # Distribute the training tasks
        training_futures = [train_decision_tree.remote(X, y) for X, y in datasets]
        models = ray.get(training_futures)
    
        return models
    
    
    # if __name__ == "__main__":
    #     # Assume you have data split across 4 CSV files
    #     file_paths = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]
    #
    #     models = distributed_loading_and_training(file_paths)
    #     # Note: For demonstration purposes, this example lacks testing and accuracy reporting.
    #     # You can extend it as per the previous example.
    
    
    
    #file_paths = create_mock_csv_files()
    #print(f"Created CSV files: {file_paths}")
    
    if __name__ == "__main__":
        file_paths = create_mock_csv_files()
        print(f"Created CSV files: {file_paths}")
        # Initialize Ray
        ray.init()
        models = distributed_loading_and_training(file_paths)
    
        # Test the performance of one of the models
        iris = load_iris()
        X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)
        predictions = models[0].predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        print(f"Model accuracy: {accuracy:.2f}")
        # Shutdown Ray
        ray.shutdown()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    在这里插入图片描述

    问题来了,我们发现准确率比较低,只有0.33 相比于单机差多了,这是什么问题导致的呢?

    这是因为我们本身的数据就比较小,只是Iris的一个子集,如果我们再切片的话,这个数据就不全,就可能造成样本偏差了,效果自然就会差许多

    为了解决这个问题,我们可以引用bagging的概念,Bagging可以更好的均衡效果,解决模型偏差,提高模型准确度,一个比较好的方式就是随机森林Random Forest

    1.3 基于Ray的分布式随机森林

    那我们要怎么样改造我们的上面的代码呢?

    首先改造训练方法,我们要使用随机森林

    
    from sklearn.ensemble import RandomForestClassifier
      @ray.remote
      def train_random_forest(X, y):
          """Train a random forest on given data."""
          clf = RandomForestClassifier(n_estimators=10)  # Using 10 trees for demonstration
          clf.fit(X, y)
          return clf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    我们需要改造分布式训练以及主类

    def distributed_loading_and_training(file_paths):
          """Distribute data loading and training across cores using Ray."""
          
          # Distribute the data loading tasks
          data_futures = [load_data.remote(file) for file in file_paths]
          datasets = ray.get(data_futures)
      
          # Distribute the training tasks
          training_futures = [train_random_forest.remote(X, y) for X, y in datasets]
          models = ray.get(training_futures)
          
          return models
      
      if __name__ == "__main__":
          file_paths = create_mock_csv_files()
          print(f"Created CSV files: {file_paths}")
          
          models = distributed_loading_and_training(file_paths)
          
          # Testing the ensemble's performance
          iris = load_iris()
          X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)
          
          # Average the predictions from all models to get the final prediction
          predictions = []
          for x in X_test:
              model_predictions = [model.predict([x])[0] for model in models]
              # Using majority voting for classification
              final_prediction = max(set(model_predictions), key=model_predictions.count)
              predictions.append(final_prediction)
          
          accuracy = accuracy_score(y_test, predictions)
          print(f"Ensemble model accuracy: {accuracy:.2f}")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    完整的代码如下:

    from sklearn.datasets import load_iris
    import pandas as pd
    import numpy as np
    import ray
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    from sklearn.ensemble import RandomForestClassifier
    
    def create_mock_csv_files(num_files=4):
        iris = load_iris()
        df = pd.DataFrame(data=np.c_[iris['data'], iris['target']], columns=iris['feature_names'] + ['target'])
    
        # Split and save to CSV
        chunked_data = np.array_split(df, num_files)
        file_paths = []
        for i, chunk in enumerate(chunked_data):
            file_name = f"data{i + 1}.csv"
            chunk.to_csv(file_name, index=False)
            file_paths.append(file_name)
    
        return file_paths
    
    
    
    
    @ray.remote
    def load_data(file_path):
        """Load data from a given file."""
        # For demonstration, I'm assuming the data is in CSV format.
        data = pd.read_csv(file_path)
        X = data.drop('target', axis=1).values
        y = data['target'].values
        return X, y
    
    
    @ray.remote
    def train_random_forest(X, y):
        """Train a random forest on given data."""
        clf = RandomForestClassifier(n_estimators=10)  # Using 10 trees for demonstration
        clf.fit(X, y)
        return clf
    
    
    def distributed_loading_and_training(file_paths):
        """Distribute data loading and training across cores using Ray."""
    
        # Distribute the data loading tasks
        data_futures = [load_data.remote(file) for file in file_paths]
        datasets = ray.get(data_futures)
    
        # Distribute the training tasks
        training_futures = [train_random_forest.remote(X, y) for X, y in datasets]
        models = ray.get(training_futures)
    
        return models
    
    
    if __name__ == "__main__":
        file_paths = create_mock_csv_files()
        print(f"Created CSV files: {file_paths}")
        # Initialize Ray
        ray.init()
        models = distributed_loading_and_training(file_paths)
    
        # Testing the ensemble's performance
        iris = load_iris()
        X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)
    
        # Average the predictions from all models to get the final prediction
        predictions = []
        for x in X_test:
            model_predictions = [model.predict([x])[0] for model in models]
            # Using majority voting for classification
            final_prediction = max(set(model_predictions), key=model_predictions.count)
            predictions.append(final_prediction)
    
        accuracy = accuracy_score(y_test, predictions)
        print(f"Ensemble model accuracy: {accuracy:.2f}")
        # Shutdown Ray
        ray.shutdown()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    在这里插入图片描述

    可以看到我们的效果立马好上去了。

  • 相关阅读:
    Unity Windows上Inspector界面黑屏无法显示
    电脑重装系统后我的电脑图标怎么添加到桌面上显示
    c++内存管理
    windows下使用php-ffmpeg获取视频第一帧的图片
    springboot项目打jar包,运行时提示jar中没有主清单属性
    SpringBoot加载测试类属性和配置说明
    深入了解C#泛型
    最详细的Keycloak教程(建议收藏):Keycloak实现手机号、验证码登陆——(二)Keycloak与SpringBoot的集成
    spring boot 使用SSE向前端推送数据
    二十三、【五种图层】
  • 原文地址:https://blog.csdn.net/zhangkai1992/article/details/134002292