• py并发编程实践-demo


    需求

    已知条件:appX -请求-> api  
    
    • 1

    多进程实现并发请求api

    • 给定app应用列表,请求api核数
    from datetime import datetime, timedelta
    from multiprocessing import Process
    
    
    class ProcessTest(object):
        """
         多进程并发请求API,并批量写入django表
         要点:1)并发;2)读写批量原则,批量读、批量写
         需求:已知1000个app,通过api获取其CPU核数
         思路:将app列表 按并发数 分段
        """
        def __init__(self, mon_day):
            self.mon_day = mon_day
    
        @staticmethod
        def requests_mon_api(app_id):
            import random
            return {app_id: random.randint(100, 5000)}
    
        @staticmethod
        def get_app_list():
            import time
            time.sleep(2)   # 耗时
            return ["app_"+str(i) for i in range(1000)]
    
        def records_to_db(self, records):
            # django table bulk create to db
            print("[{0}] -------->>>>>>>>>{1}".format(self.mon_day, records))
    
        def app_cores_to_db(self, app_id):
            # api 无限重试。。
            flag = 0
            while flag == 0:
                try:
                    app_records = self.requests_mon_api(app_id)
                    self.records_to_db(app_records)
                    flag = 1
                except Exception as e:
                    print(e.args, "retry", app_id)
    
        def batch_run(self, start, end, app_arr):
            batch_app = app_arr[start:end + 1]
            for app in batch_app:
                self.app_cores_to_db(app)
    
        def process_run(self, process_num, process_batch, app_arr):
    
            process_arr = []
            # from django import db
            for i in range(process_num):
                # db.close_old_connections()
                p = Process(target=self.batch_run, args=(i * process_batch, (i+1)*process_batch, app_arr))
                print("第{0}个进程,拉取范围[{1}:{2}],共拉取{3}条记录".format(i+1, i*process_batch, (i+1)*process_batch, process_batch))
                process_arr.append(p)
    
            for p in process_arr:
                p.start()
    
            for p in process_arr:
                p.join()
    
        def to_db(self):
            app_arr = self.get_app_list()
            process_num = 15
            total = len(app_arr)
            process_batch = total // process_num
    
            self.process_run(process_num=process_num, process_batch=process_batch, app_arr=app_arr)
            remain_index = process_batch * process_num + 1
    
            for app_id in app_arr[remain_index:]:
                try:
                    self.app_cores_to_db(app_id)
                except Exception as e:
                    print(e.args, app_id, "error")
    
    
    if __name__ == '__main__':
        day = (datetime.now() + timedelta(days=-0)).strftime("%Y-%m-%d")
        tp = ProcessTest(mon_day=day)
        tp.to_db()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
  • 相关阅读:
    R语言实战应用精讲50篇(二十三)-贝叶斯理论重要概念: 可信度Credibility, 模型Models, 和参数Parameters
    Selenium打开浏览器闪退问题(浏览器驱动是对应的前提)-解决办法
    LeetCode 2511. 最多可以摧毁的敌人城堡数目
    Python之第十一章 面向对象 --- 三大特征
    在前端开发领域,如何将AI技术应用于产品开发中?
    选择排序(C++实现)
    LINUX挂载共享盘
    mTD-SCDMA与TD-LTE双网络垂直切换matlab仿真
    springboot如何整个Swagger呢?
    进程调度的原理和算法探析
  • 原文地址:https://blog.csdn.net/Sunny_Future/article/details/134471757