• python实现的一些方法,可以直接拿来用的那种


    1、日期生成

    很多时候我们需要批量生成日期,方法有很多,这里分享两段代码

    获取过去 N 天的日期:

    import datetime
    
    def get_nday_list(n):
        before_n_days = []
        for i in range(1, n + 1)[::-1]:
            before_n_days.append(str(datetime.date.today() - datetime.timedelta(days=i)))
        return before_n_days
    
    a = get_nday_list(30)
    print(a)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出:

    ['2021-12-23', '2021-12-24', '2021-12-25', '2021-12-26', '2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30', '2021-12-31', '2022-01-01', '2022-01-02', '2022-01-03', '2022-01-04', '2022-01-05', '2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09', '2022-01-10', '2022-01-11', '2022-01-12', '2022-01-13', '2022-01-14', '2022-01-15', '2022-01-16', '2022-01-17', '2022-01-18', '2022-01-19', '2022-01-20', '2022-01-21']
    
    
    • 1
    • 2

    生成一段时间区间内的日期:

    import datetime
    
    def create_assist_date(datestart = None,dateend = None):
        # 创建日期辅助表
    
        if datestart is None:
            datestart = '2016-01-01'
        if dateend is None:
            dateend = datetime.datetime.now().strftime('%Y-%m-%d')
    
        # 转为日期格式
        datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d')
        dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d')
        date_list = []
        date_list.append(datestart.strftime('%Y-%m-%d'))
        while datestart<dateend:
            # 日期叠加一天
            datestart+=datetime.timedelta(days=+1)
            # 日期转字符串存入列表
            date_list.append(datestart.strftime('%Y-%m-%d'))
        return date_list
    
    d_list = create_assist_date(datestart='2021-12-27', dateend='2021-12-30')
    d_list
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    输出:

    ['2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30']
    
    • 1

    2、保存数据到CSV

    保存数据到 CSV 是太常见的操作了

    def save_data(data, date):
        if not os.path.exists(r'2021_data_%s.csv' % date):
            with open("2021_data_%s.csv" % date, "a+", encoding='utf-8') as f:
                f.write("标题,热度,时间,url\n")
                for i in data:
                    title = i["title"]
                    extra = i["extra"]
                    time = i['time']
                    url = i["url"]
                    row = '{},{},{},{}'.format(title,extra,time,url)
                    f.write(row)
                    f.write('\n')
        else:
            with open("2021_data_%s.csv" % date, "a+", encoding='utf-8') as f:
                for i in data:
                    title = i["title"]
                    extra = i["extra"]
                    time = i['time']
                    url = i["url"]
                    row = '{},{},{},{}'.format(title,extra,time,url)
                    f.write(row)
                    f.write('\n')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3、带背景颜色的 Pyecharts

    Pyecharts 作为 Echarts 的优秀 Python 实现,受到众多开发者的青睐,用 Pyecharts 作图时,使用一个舒服的背景也会给我们的图表增色不少

    饼图为例,通过添加 JavaScript 代码来改变背景颜色

    def pie_rosetype(data) -> Pie:
        background_color_js = (
        "new echarts.graphic.LinearGradient(0, 0, 0, 1, "
        "[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)"
    )
        c = (
            Pie(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))
            .add(
                "",
                data,
                radius=["30%", "75%"],
                center=["45%", "50%"],
                rosetype="radius",
                label_opts=opts.LabelOpts(formatter="{b}: {c}"),
            )
            .set_global_opts(title_opts=opts.TitleOpts(title=""),
                            )
        )
        return c
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4、requests 库调用

    据统计,requests 库是 Python 家族里被引用的最多的第三方库,足见其江湖地位之高大!

    发送 GET 请求

    import requests
    
    
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
      'cookie': 'some_cookie'
    }
    response = requests.request("GET", url, headers=headers)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    发送 POST 请求

    import requests
    
    
    payload={}
    files=[]
    headers = {
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
      'cookie': 'some_cookie'
    }
    response = requests.request("POST", url, headers=headers, data=payload, files=files)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    根据某些条件循环请求,比如根据生成的日期

    def get_data(mydate):
        date_list = create_assist_date(mydate)
        url = "https://test.test"
        files=[]
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
            'cookie': ''
            }
        for d in date_list:
            payload={'p': '10',
            'day': d,
            'nodeid': '1',
            't': 'itemsbydate',
            'c': 'node'}
            for i in range(1, 100):
                payload['p'] = str(i)
                print("get data of %s in page %s" % (d, str(i)))
                response = requests.request("POST", url, headers=headers, data=payload, files=files)
                items = response.json()['data']['items']
                if items:
                    save_data(items, d)
                else:
                    break
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    5、Python 操作各种数据库

    操作 Redis

    连接 Redis

    import redis
    
    
    def redis_conn_pool():
        pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
        rd = redis.Redis(connection_pool=pool)
        return rd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    写入 Redis

    from redis_conn import redis_conn_pool
    
    
    rd = redis_conn_pool()
    rd.set('test_data', 'mytest')
    
    • 1
    • 2
    • 3
    • 4
    • 5

    操作 MongoDB

    连接 MongoDB

    from pymongo import MongoClient
    
    
    conn = MongoClient("mongodb://%s:%s@ipaddress:49974/mydb" % ('username', 'password'))
    db = conn.mydb
    mongo_collection = db.mydata
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    批量插入数据

    res = requests.get(url, params=query).json()
    commentList = res['data']['commentList']
    mongo_collection.insert_many(commentList)
    
    • 1
    • 2
    • 3

    操作 MySQL

    连接 MySQL

    import MySQLdb
    
    # 打开数据库连接
    db = MySQLdb.connect("localhost", "testuser", "test123", "TESTDB", charset='utf8' )
    
    # 使用cursor()方法获取操作游标 
    cursor = db.cursor()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    执行 SQL 语句

    # 使用 execute 方法执行 SQL 语句
    cursor.execute("SELECT VERSION()")
    
    # 使用 fetchone() 方法获取一条数据
    data = cursor.fetchone()
    
    print "Database version : %s " % data
    
    # 关闭数据库连接
    db.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    输出:

    Database version : 5.0.45
    
    • 1

    6、多线程代码

    多线程也有很多实现方式

    import threading
    import time
    
    exitFlag = 0
    
    class myThread (threading.Thread):
        def __init__(self, threadID, name, delay):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.delay = delay
        def run(self):
            print ("开始线程:" + self.name)
            print_time(self.name, self.delay, 5)
            print ("退出线程:" + self.name)
    
    def print_time(threadName, delay, counter):
        while counter:
            if exitFlag:
                threadName.exit()
            time.sleep(delay)
            print ("%s: %s" % (threadName, time.ctime(time.time())))
            counter -= 1
    
    # 创建新线程
    thread1 = myThread(1, "Thread-1", 1)
    thread2 = myThread(2, "Thread-2", 2)
    
    # 开启新线程
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print ("退出主线程")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  • 相关阅读:
    自动驾驶技术
    他没有输出一直卡在后面不懂,看报错应该是卡在# 循环生成诗句了
    腾讯云 BI 数据分析与可视化的快速入门指南
    SwiftUI 中的水平条形图
    快速傅里叶变换(FFT)
    Docker内安装Oracle19c(踩坑版放心安装)
    跳舞机游戏-第13届蓝桥杯Scratch选拔赛真题精选
    单元测试 :Junit框架
    JBoss JMXInvokerServlet 反序列化漏洞(CVE-2015-7501)
    腾讯mini项目-【指标监控服务重构】2023-08-26
  • 原文地址:https://blog.csdn.net/javascript_good/article/details/132626347