已知条件:appX -请求-> api
多进程实现并发请求api
from datetime import datetime, timedelta
from multiprocessing import Process
class ProcessTest(object):
"""
多进程并发请求API,并批量写入django表
要点:1)并发;2)读写批量原则,批量读、批量写
需求:已知1000个app,通过api获取其CPU核数
思路:将app列表 按并发数 分段
"""
def __init__(self, mon_day):
self.mon_day = mon_day
@staticmethod
def requests_mon_api(app_id):
import random
return {app_id: random.randint(100, 5000)}
@staticmethod
def get_app_list():
import time
time.sleep(2) # 耗时
return ["app_"+str(i) for i in range(1000)]
def records_to_db(self, records):
# django table bulk create to db
print("[{0}] -------->>>>>>>>>{1}".format(self.mon_day, records))
def app_cores_to_db(self, app_id):
# api 无限重试。。
flag = 0
while flag == 0:
try:
app_records = self.requests_mon_api(app_id)
self.records_to_db(app_records)
flag = 1
except Exception as e:
print(e.args, "retry", app_id)
def batch_run(self, start, end, app_arr):
batch_app = app_arr[start:end + 1]
for app in batch_app:
self.app_cores_to_db(app)
def process_run(self, process_num, process_batch, app_arr):
process_arr = []
# from django import db
for i in range(process_num):
# db.close_old_connections()
p = Process(target=self.batch_run, args=(i * process_batch, (i+1)*process_batch, app_arr))
print("第{0}个进程,拉取范围[{1}:{2}],共拉取{3}条记录".format(i+1, i*process_batch, (i+1)*process_batch, process_batch))
process_arr.append(p)
for p in process_arr:
p.start()
for p in process_arr:
p.join()
def to_db(self):
app_arr = self.get_app_list()
process_num = 15
total = len(app_arr)
process_batch = total // process_num
self.process_run(process_num=process_num, process_batch=process_batch, app_arr=app_arr)
remain_index = process_batch * process_num + 1
for app_id in app_arr[remain_index:]:
try:
self.app_cores_to_db(app_id)
except Exception as e:
print(e.args, app_id, "error")
if __name__ == '__main__':
day = (datetime.now() + timedelta(days=-0)).strftime("%Y-%m-%d")
tp = ProcessTest(mon_day=day)
tp.to_db()