• python爬虫企业微信打卡数据,写入数据库


    # 坤坤小仙 2022年9月5日 周一 下午 16时55分05秒
    # 因微信企业号返回的格式为json,所以引入json
    import json
    # 暂时不清楚
    import requests
    # 引入数学方法
    import math
    # 引入时间函数
    import time
    # 引入时间函数
    import datetime
    # 引入连接oracle数据库的工具包
    import cx_Oracle
    
    # 数据库服务器地址
    server = '*'
    # 数据库的用户名
    user = '*'
    pasword = '***'  # 数据库用户对应的密码
    dbName = '*'  # 数据库名称
    corp_id = '*'  # 微信企业号提供的CORP_ID
    corp_secret = '*'
    
    
    # 将字符串转化为时间数组
    def datetime_timestamp(dt):
        # dt时间参数的字符串
        # 中间过程,一般都需要将字符串转化为时间数组
        time.strptime(dt, '%Y-%m-%d %H:%M:%S')
        # 将时间编程时间戳
        s = time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S'))
        # 返回结果
        return int(s)
    
    
    # 定义连接数据库方法
    def get_link_server():
        # 获取数据库的连接
        conn = cx_Oracle.connect(user, password, server)
        # 如果连接不为空
        if conn:
            # 返回结果
            return conn
        else:
            # 否则报错
            raise ValueError('连接数据库失败')
    
    
    # 获取用户账号列表
    def get_userid_list():
        # 连接数据库
        conn = get_link_server()
        # 获取游标
        cursor = conn.cursor()
        # 拼写sql 把用户账号拉出来
        sql = "select userid from EW_Employee_Data"
        # 执行sql语句
        cursor.execute(sql)
        # 获取返回结果中的第一个结果
        row = cursor.fetchone()
        # 定义用户账号的数组
        useridlist = []
        # 遍历从数据库爬取到的数据
        while row:
            # 将账号数据存取到账号的数组
            useridlist.append(row[0])
            # 获取下个数据
            row = cursor.fetchone()
        # 如果账号数组为空的话
        if useridlist:
            # 返回人员数组
            return useridlist
        else:
            # 否则报错
            raise ValueError('获取用户账号列表失败')
        # 关闭与数据库的连接
        conn.close()
    
    
    # 获取access_token
    def get_access_token():
        # 拼接url
        # 参考链接
        # https://developer.work.weixin.qq.com/document/path/91039
        # 如果你看参考文档有压力
        # 可以联系我
        # 电话|WeChat 18252477531
        # 我可以为你提供50RMB一次的答疑服务
        api_access_token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?" \
                               "corpid=*" \
                               "&" \
                               "corpsecret=*"
        # 向企业微信的接口发送请求,获得响应
        response = requests.get(api_access_token_url, verify=False)
        # 如果成功获取响应
        if response.status_code == 200:
            # 获取响应的json文本
            rep_dict = json.loads(response.text)
            # 过去错误码
            errcode = rep_dict.get('errcode')
            # 如果有错误码
            if errcode:
                # 报错
                raise valueerror('get wechat access token failed, errcode=%s.' % errcode)
            else:
                # 获取响应得到的json文本中的access_token的编码
                access_token = rep_dict.get('access_token')
                # 返回access_token的文本形式
                return access_token
        else:
            # 否则报错
            raise valueerror('获取企业微信的access token并未成功')
    
    
    def delete_this_month_pushcard_data():
        # 获取与数据库的连接
        conn = get_link_server()
        # 获取游标
        cursor = conn.cursor()
        # 调用存储过程
        cursor.callproc('delete_this_month_pushcard_data')
        conn.close
    
    
    # 获取微信打卡的json格式
    def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist):
        # 这里的url参考链接https://developer.work.weixin.qq.com/document/path/90262
        # 如果你看参考文档有压力
        # 可以联系我
        # WeChat geyikun0001
        # 我可以为你提供50RMB一次的答疑服务
        api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
        # 定义json文本的格式
        json_str = json.dumps({
            "opencheckindatatype": opencheckindatatype,
            'starttime': starttime,
            'endtime': endtime,
            'useridlist': useridlist})
        # 发送获取打卡数据的请求得到响应
        response = requests.post(api_punch_card_url, data=json_str, verify=False)
        # 如果响应的状态码为200
        if response.status_code == 200:
            # 获取响应的json文本对象
            rep_dic = json.loads(response.text)
            # 获取错误码
            errcode = rep_dic.get('errcode')
            if errcode == 42001:
                access_token = get_access_token(true)
                api_punch_card_url = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
                json_str = json.dumps(
                    {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime,
                     'useridlist': useridlist})
                response = requests.post(api_punch_card_url, data=json_str, verify=false)
                rep_dic = json.loads(response.text)
                errcode = rep_dic.get('errcode')
                if errcode:
                    raise valueerror('get punch data failed1, errcode=%s' % errcode)
                else:
                    value_str = rep_dic.get('checkindata')
                    # print(value_str._class_)
                    if value_str:
                        return value_str
                    else:
                        raise valueerror('get punch data failed2.')
            elif errcode:
                raise valueerror('get punch data failed3, errcode=%s' % errcode)
            # 上面的注释我就不写了,全是报错
            # 如果错误码为0的话,就是啥问题也没有
            else:
                # 获取数据
                value_str = rep_dic.get('checkindata')
                # 如果获取到的数据不为空
                if value_str:
                    # 返回结果
                    return value_str
                else:
                    # 报错
                    raise valueerror('i do not find employee punch data.')
        else:
            # 报错
            raise valueerror('get punch data failed5.')
    
    
    # 调用接口,获得数据
    if __name__ == '__main__':
        # 删除本月的打卡记录
        delete_this_month_pushcard_data()
        # 获取今天凌晨的时间
        today = datetime.date.today()
        print(today)
        # oneday = datetime.timedelta(days=1)  # days,即获取几天内的
        # 获取昨天凌晨的时间
        # yesterday = today - oneday
        # print(yesterday)
        print(today.strftime('%Y-%m') + '-01' + ' 00:00:00')
        # 开始时间的时间戳
        starttime = datetime_timestamp(today.strftime('%Y-%m') + '-01' + ' 00:00:00')
        # 结束时间的时间戳
        endtime = datetime_timestamp(today.strftime('%Y-%m-%d') + ' 00:00:00')
        # 获取access_token
        access_token = get_access_token()
        # 如果获取access_token
        if access_token:
            # 获取用户账号的数组
            useridlist = get_userid_list()
            # 如果数组不为空
            if useridlist:
                # print("11")
                # delete_this_month_pushcard_data()
                # 已100次作为一组,因为返回的数组数量的上线为100
                step = 100
                # 获取用户数组的长度
                total = len(useridlist)
                # 计算要循环的次数
                n = math.ceil(total / step)
                # 遍历
                for i in range(n):
                    # 打印遍历的用户账号的列表
                    print(useridlist[i * step:(i + 1) * step])
                    # 获取用户的打卡数据
                    punch_card = get_punchcard_info(access_token, 3, starttime, endtime,
                                                    useridlist[i * step:(i + 1) * step])
                    # 如果有打卡数据
                    if punch_card:
                        # 获取与数据库的连接
                        conn = get_link_server()
                        # 获取游标
                        cursor = conn.cursor()
                        # 遍历获取到的打卡数据
                        for dic_obj in punch_card:
                            # 获取一条打卡数据的json文本
                            json_data = json.loads(json.dumps(dic_obj))
                            # 拼接sql
                            sql = "insert into EW_PUSH_CARD_RECORD (userid,groupname,checkin_type,exception_type," \
                                  "checkin_time,location_title,location_detail," \
                                  "wifiname,notes,wifimac,deviceid,inputtime) values ('"
                            sql = sql + json_data["userid"] + "','" + json_data["groupname"] + "','" + json_data[
                                "checkin_type"] + "','" + json_data["exception_type"] + "','"
                            sql = sql + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(json_data["checkin_time"])))
                            sql = sql + "','" + json_data["location_title"] + "','" + json_data["location_detail"] + "','"
                            sql = sql + json_data["wifiname"] + "','" + json_data["notes"] + "','" \
                                  + str(json_data["wifimac"]) \
                                  + "','" + json_data["deviceid"] + "','" + time.strftime("%Y-%m-%d %H:%M:%S",
                                                                                          time.localtime(
                                                                                              int(time.time()))) + "')"
                            # 把拼接好的sql打印出来
                            print(sql)
                            # 执行sql
                            cursor.execute(sql)
                        # 提交事务
                        conn.commit()
                        # 关闭与数据库的连接
                        conn.close()
                    else:
                        # 报错
                        raise valueerror('没有获取到用户账号数组')
    
  • 相关阅读:
    2023工博会强势回归!智微工业携八大系列重磅亮相
    Golang——从入门到放弃
    Pycharm远程调试、运行服务器代码
    FOC控制之小A小B小C是如何追求小D的
    生产环境安装、配置、管理PostgreSQL数据库集群
    Spring——自动装配
    Nginx Rewrite
    javaweb-SMBMS
    unbuntu下安装gfortran
    低频量化之指数宏观数据(PE-PB-偏离-风险溢价比)
  • 原文地址:https://blog.csdn.net/FairyKunKun/article/details/126781639