import urllib3
import requests
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/107.0.0.0 Safari/398.63"}
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_proxy_url(ip=None, port=None, scheme="http",
*, username=None, passwd=None):
if ip is None:
return None
if port is None:
port = 80
user_pwd = ""
if username:
user_pwd = username
if passwd:
user_pwd += ":" + passwd
if user_pwd:
user_pwd += "@"
url = f"{scheme}://{user_pwd}{ip}:{port}"
return url
def my_get(url, proxy_url=None):
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
print(proxies)
res = requests.get(url, headers=headers, verify=False,
proxies=proxies)
return res.status_code, res.content
# 将下面的代理换成自己的信息即可
proxy = get_proxy_url("192.168.111.222", "8080",
username="++++++", passwd="*********")
print(my_get("https://www.baidu.com", proxy))
ABC#$%123
,可以使用%41%62%43%23%24%25%31%32%33
替代urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 其他代码与上面相同
def my_get(url, proxy_url=None):
proxies = None
if proxy_url:
proxies = {"http": proxy_url, "https": proxy_url}
session = requests.Session()
session.proxies.update(proxies)
session.headers.update(headers)
session.verify = False
res = session.get(url)
return res.status_code, res.content
import asyncio
import aiohttp
# 其他代码与上面相同
def my_get(url, proxy_url=None):
async def _inner_get(a_url):
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as sess:
async with sess.get(a_url, proxy=proxy_url, timeout=3) as res:
content = await res.read()
return res.status, content
async def _inner_tasks(*a_urls):
tasks = [asyncio.create_task(_inner_get(one_url))
for one_url in a_urls]
result = await asyncio.gather(*tasks, return_exceptions=True)
return result
# 解决RuntimeError: Event loop is closed
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
dl_result = asyncio.run(_inner_tasks(url))
return dl_result[0]
注意,为了解决RuntimeError: Event loop is closed, 在window下需要执行
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())