- import threading,queue,time
-
- class Thread(threading.Thread):
- def __init__(self,name,q):
- threading.Thread.__init__(self)
- self.name=name
- self.q=q
- def run(self):
- #print('Starting '+self.name)
- while self.q.qsize()>0:
- print('\r%d/%d'%(lst_length-self.q.qsize(),lst_length),end='') #展示进度
- getData(self.q)
- #print('Ending '+self.name)
- def getData(q):
- qu=q.get()
- time.sleep(1)
- result=(qu,qu**2) #结果可能会乱序,所以此处为元组
- results_list.append(result)
- def buildQueueAndThreads(lst,threadNum):
- workQueue=queue.Queue()
- [workQueue.put(i) for i in lst] #构建队列
- threads_list=[]
- for i in range(1,threadNum+1):
- thread=Thread('Thread-'+str(i),q=workQueue)
- thread.start()
- threads_list.append(thread)
- print('\nWaiting for finishing...')
- for thread in threads_list: #等待子线程运行完再回到主线程
- thread.join()
- threadNum=100
- lst=list(range(500))
- lst_length=len(lst)
- start_time=time.time()
- results_list=[]
- buildQueueAndThreads(lst,threadNum)
- print('\n%d 线程处理耗时: %.fs'%(threadNum,time.time()-start_time))
- print('result_list length: ',len(results_list))
- print(results_list[:5]) #顺序可能会乱,结果为元组的列表
99/500 Waiting for finishing... 499/500 100 线程处理耗时: 5s result_list length: 500 [(1, 1), (0, 0), (3, 9), (2, 4), (4, 16)]
有时爬虫程序设置较多线程时,频繁地访问会被服务器拒绝,我们希望为每个线程分配一定数量的任务,当任务数量(尝试爬取次数)达到时即自动结束,这样虽然不能保证数据获取完全,但会随着任务数量的减少,线程数自动减少,从而降低访问频率。未获取完全的部分数据可通过多次运行来实现:
- #多线程示例
- import threading,queue,time
-
- class Thread(threading.Thread):
- def __init__(self,name,q):
- threading.Thread.__init__(self)
- self.name=name
- self.q=q
- self.count=0
- def run(self):
- #print('Starting '+self.name)
- while self.q.qsize()>0 and self.count<10 and continue_all_threads:
- self.count+=1
- print('\r%d/%d continue all threads:%s'%(lst_length-self.q.qsize(),lst_length,continue_all_threads),end='') #展示进度
- #print('Ending '+self.name)
- def getData(q):
- qu=q.get()
- time.sleep(1)
- result=(qu,qu**2) #结果可能会乱序,所以此处为元组
- results_list.append(result)
- def buildQueueAndThreads(lst,threadNum):
- workQueue=queue.Queue()
- [workQueue.put(i) for i in lst] #构建队列
- threads_list=[]
- for i in range(1,threadNum+1):
- thread=Thread('Thread-'+str(i),q=workQueue)
- time.sleep(0.1)#延时0.1s,否则进度输出可能不是所期望的结果
- thread.start()
- threads_list.append(thread)
- print('\nWaiting for finishing...')
- #for thread in threads_list: #不等子线程结束再进行主线程,主线程与子线程同时进行
- # thread.join()
- alive_num=threadNum
- lst_length=len(lst)
- last_workQueue_size=workQueue.qsize()
- last_time=time.time()
- while alive_num>0:
- alive_num=len([t for t in threads_list if t.is_alive()])
- workQueue_size=workQueue.qsize()
- if workQueue_size
- last_workQueue_size=workQueue_size
- last_time=time.time()
- delta_time=time.time()-last_time
- if delta_time>5: #如果超过5s,队列中的任务数量没有减少,则爬取失败,跳出循环。线程函数非爬虫程序,所以此处情况不会发生
- break
- print('\rAliving num:%d Progress:%d/%d'%(alive_num,lst_length-workQueue.qsize(),lst_length),end='')
- global continue_all_threads #此处要声明为全局变量
- continue_all_threads=False
- threadNum=6
- lst=list(range(56))
- lst_length=len(lst)
- start_time=time.time()
- print()
- results_list=[]
- continue_all_threads=True
- buildQueueAndThreads(lst,threadNum)
- print('\n%d 线程处理耗时: %.1fs'%(threadNum,time.time()-start_time))
- print('result_list length: ',len(results_list))
- print(results_list[:5]) #顺序可能会乱,结果为元组的列表
5/56 continue all threads:True
Waiting for finishing...
Aliving num:0 Progress:56/56rue
6 线程处理耗时: 10.4s
result_list length: 56
[(0, 0), (1, 1), (2, 4), (3, 9), (4, 16)]