终究是一个非常痛苦的夜晚。我没想到我会活着这么痛苦。
帮别人做一个小功能。把一个 android 设备里面固定目录下的log 导出到本地,然后打成压缩包.
需求很简单,在听到这个需求之后,我当时的预估时间是10分钟到1小时最多,不会超过1小时。。。。
我没想到,真正让我痛苦的,不是遇到了 windows 文件夹权限的问题,而是没有遇到这个问题,但是我以为我遇到的是这个问题…
这句话有的绕口,但是读完之后你就知道,这句总结还算到位.。
长话短说,我在使用 open()
函数的时候,传入了一个文件夹作为参数,比如open("D:\\Games")
, 然后 python 一直报错,说权限不足。
PermissionError: [Errno 13] Permission denied: 'D:\\Projects'
如果是一个正常人类可能会去查看API,或者网上找一下原因。我直接去网上找原因了。因为我要打开的这个文件夹比较特殊,是从其他设备上面导过来的,导致我以为是 windows 权限的问题。
因为网上有很多很多提示Windows 权限的报错。
基于此,我在网上找了好久,主要的方向就是,怎么去满足这个权限,不让它open
失败。当然,没有效果的,因为根本原因是:通过open(file_path)
, 这个 file_path
如果是一个目录而不是文件的话,就必然报错提示权限不足。
当然,也不能说是一无所获。
我找到了以下几种可能可以避免Windows 权限的解决方案了,只是目前用不上。
这个地方门道还有点多,我一共使用了两种方式:
multiprocessing.Process
的方式去开启子进程,然后把提权的操作放在这个子进程里面,然后在这个子进程执行结束之后,再去在父进程里面去调用 open()
函数。这个 multiprocessing
还是蛮不错的,它可以开任意多个进程,而且可以传参数,可以进行进程之间的数据共享。open
不能去传文件夹路径作为参数,但是当时不清楚),于是就想到,是不是因为这个父子进程之间也是互相关联的呢,导致在父进程依然不能获取到正确的权限。那么,我直接把当前进程关闭,重新开一个进程,这样总不会互相影响了吧。所以就有了第二种实现方法。大体的实现逻辑就是利用 sys.argv
去自己给自己传参数,实现一个脚本在运行不同次数的时候,执行的逻辑不同,然后每次执行都是一个新的进程,完全不会被前面一次执行所影响。为了实现第二种不互相影响的实现,我还搞了一个临时文件用来存储上一次执行的结果。因为不能互相影响导致前面执行的内存数据,在下次执行的时候无法读取,因为上一次的程序已经执行结束了,这些内存里面的数据也就消失了。那么,我就搞一个文本文件,相当于数据库一样,去记录上一次执行的结果,然后第二次执行的时候可以去读这个文件的内容,然后再执行具体的逻辑。
哎,贴一下代码,不过,因为一直失败,所以前面有的代码没有保留,直接被删除了。不过第二种实现方式倒是保留了,有兴趣的可以看看。
这里最困难不是这两种实现方案,而是 Windows 的文件夹权限处理。这一块的文档不是很多,然后也有一些文档写的比较详情,但是里面的专业术语太多了,看不懂。
cacls Music /p everyone:f /t ====> 只有 everyone 了,但是要手动确认
cacls.exe Music /t /e /g everyone:F ====> 多了一个 everyone 但是反应迟钝。
cacls Music /p everyone:f /t /e /g ====》 无效
echo y|cacls Music /p everyone:f /t ===> 只有一个 everyone, 并且不需要手动确认了
这几个命令我找了好久才找到,也都实验了。还有一些其他的无效的命令,就没有记录了。
关于使用 cacls
还有 icacls
, 以及其他的方式给文件夹设置权限,里面的内容很多,感觉很难。
比如这篇:
PowerShell 修改文件夹及文件权限
还有好几篇也是这种比较详细的,但是真的看不懂…
// 第二种实现方案的完整代码:(可以正常运行的)
import os
import shutil
import subprocess
import sys
import time
import zipfile
default_pull_path = "/sdcard/Android/data/com.iauto.navi/files/bllog/"
default_place_path = str(os.getcwd())
default_temp_file = ".pull_temp.txt"
key_pull_path = "KEY_PULL_PATH"
key_place_path = "KEY_PLACE_PATH"
key_target_path = "KEY_TARGET_PATH"
def _read_stored_mappings():
mappings = {}
if not os.path.exists(default_temp_file):
return mappings
with open(default_temp_file, mode='r', newline='') as f:
lines = f.readlines()
for line in lines:
if key_place_path in line:
value = line.strip().split(",")[-1]
mappings[key_place_path] = value
if key_pull_path in line:
value = line.strip().split(",")[-1]
mappings[key_pull_path] = value
if key_target_path in line:
value = line.strip().split(",")[-1]
mappings[key_target_path] = value
return mappings
def _write_2_mapping_file(key, value):
dd = _read_stored_mappings()
dd[key] = value
with open(default_temp_file, mode='w+', newline='') as f:
for k, v in dd.items():
f.write(k)
f.write(",")
f.write(v)
f.write("\n")
def _write_full_2_mapping_file(mappings):
with open(default_temp_file, mode='w+', newline='') as f:
for k, v in mappings:
f.write(k)
f.write(",")
f.write(v)
f.write("\n")
def _e_slow_cmd(command="git -h"):
p = subprocess.Popen(command, shell=True)
return_code = p.wait()
print("result code: ", return_code)
return return_code == 0
def _exe(command):
p = subprocess.Popen(command, shell=True)
return_code = p.wait()
return return_code == 0
def _zip_dirs(output_zip_path, target_dir) -> tuple[bool, str]:
"""
make a zip file (output_zip_path) from target_dir.
:param output_zip_path: like /home/tony/logs/abc.zip
:param target_dir: like /home/tony/resources/
:return: output_zip_path
"""
# try:
# with open(target_dir, mode='r'): ### 在 windows 上面必然报错,只能针对 文件,不能针对 目录。
# pass
# except Exception as ex:
# traceback.print_exc()
# return False, "zip error: {}".format(ex)
# pass
with zipfile.ZipFile(output_zip_path, "w") as zip_obj:
# 压缩目录
# zipfile没有直接压缩目录的功能,要压缩目录只能遍历目录一个一个文件压。
for root, fd, files in os.walk(target_dir):
print("root:", root)
print("fd:", fd)
fpath = root.replace(target_dir, os.path.basename(target_dir))
# 如果想要目录为空时仍将该目录压缩进去,该目录也要压缩一遍;反之请将以下行注释掉
# zip_obj.write(root)
for fn in files:
# 拼接文件完整目录,不然只用文件名代码找不到文件
tmp_file_path = str(os.path.join(root, fn))
zip_obj.write(tmp_file_path, str(os.path.join(fpath, fn)))
return True, ""
def _first_time():
args = sys.argv
return len(args) <= 1 or len(args) >= 3 or (len(args) == 2 and not args[1].strip().isdigit())
def _is_windows():
return sys.platform == "win32"
def _restart_file_resource_mgr():
if _is_windows():
s1 = _exe("taskkill /f /im explorer.exe")
time.sleep(1)
s2 = _exe("start explorer.exe")
return s1 and s2
else:
return True
def _change_dir_mod(target_dir):
print("change_dir_mod.....")
if _is_windows():
# cacls.exe Music /t /e /g everyone:F
command = "echo y|cacls {} /p everyone:f /t".format(target_dir)
# command = "cacls.exe {} /t /e /g everyone:F".format(target_dir)
else:
command = "chmod -R u+w {}".format(target_dir)
success = _e_slow_cmd(command)
print("_change_dir_mod cmd?", command)
print("_change_dir_mod success?", success)
return success
def _get_pull_and_place_paths() -> tuple[str, str]:
mappings = _read_stored_mappings()
if key_pull_path in mappings and key_place_path in mappings:
return mappings[key_pull_path], mappings[key_place_path]
else:
pull_p = default_pull_path
place_p = os.getcwd()
# TODO
pull_p = "/sdcard/Music"
place_p = "C:\\Users\\Stone\\Desktop\\logs"
args = sys.argv
if len(args) == 2 and not args[1].strip().isdigit():
pull_p = args[1].strip()
elif len(args) == 3:
pull_p = args[1].strip()
place_p = args[2].strip()
mappings[key_pull_path] = pull_p
mappings[key_place_path] = place_p
_write_2_mapping_file(key_pull_path, pull_p)
_write_2_mapping_file(key_place_path, place_p)
return pull_p, place_p
def _get_target_dir_path(place_path=None, end_dir_name=None):
print("_get_target_dir_path,,,a={},b={}".format(place_path, end_dir_name))
# traceback.print_stack()
mappings = _read_stored_mappings()
print("_get_target_dir_path, mp:", mappings)
if key_target_path in mappings:
print("_get_target_dir_path, 1 mp:", mappings)
return mappings[key_target_path]
else:
print("_get_target_dir_path, 2 mp:", mappings)
value = os.path.join(place_path, end_dir_name)
mappings[key_target_path] = value
_write_2_mapping_file(key_target_path, value)
return value
def s0_pull_log_2_local() -> tuple[bool, str]:
cmd = "adb pull {} {}".format(*_get_pull_and_place_paths())
ss = _e_slow_cmd(cmd)
return ss, "" if not ss else "[{}] failed.".format(cmd)
def s1_chmod_local_dir() -> tuple[bool, str]:
pull_path, place_path = _get_pull_and_place_paths()
print("s1_chmod_local_dir 1", pull_path, place_path)
if not os.path.isdir(place_path):
return False, "place_path:[{}] is not a valid dir.".format(place_path)
try:
end_dir_name = list(filter(lambda name: len(name) > 0, pull_path.split("/")))[-1]
except Exception as e:
print("error of filter:", e)
return False, "get last pulled dir name failed."
if not end_dir_name or len(end_dir_name) <= 0:
print("pull path: [{}] is not a dir. GET OUT!!!!".format(pull_path))
return False, "get last pulled dir name failed."
target_dir = _get_target_dir_path(place_path, end_dir_name)
print("s1_chmod_local_dir 2", target_dir)
if not os.path.isdir(target_dir):
return False, "[{}] is not a dir. GET OUT!!!".format(target_dir)
success = _change_dir_mod(target_dir)
print("s1_chmod_local_dir 3", success)
return success, "" if success else "chmod [{}] failed. GET OUT!!!!".format(target_dir)
def s2_zip_local_dir() -> tuple[bool, str]:
pull_path, place_path = _get_pull_and_place_paths()
print("pull_path, place_path ", pull_path, place_path)
time_str = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
zip_fn = "log_{}.zip".format(time_str)
zip_fp = os.path.join(place_path, zip_fn)
print("_get_target_dir_path()", _get_target_dir_path())
return _zip_dirs(zip_fp, _get_target_dir_path())
def s3_delete_local_dir() -> tuple[bool, str]:
target_dir = _get_target_dir_path()
shutil.rmtree(target_dir)
os.remove(default_temp_file)
return True, ""
def handle_args():
print("handle_args in process: {}, {}".format(os.getcwd(), os.getpid()))
# success, text = False, "execute failed"
args = sys.argv
fp = args[0]
if _first_time():
print("exec 0")
success, text = s0_pull_log_2_local()
if not success:
print(text)
else:
_exe("python {} {}".format(fp, 1))
else:
arg = args[1]
num = int(arg.strip())
if 1 == num:
print("exec 1")
success, text = s1_chmod_local_dir()
if not success:
print(text)
else:
if _is_windows():
print("sleep 1 before zip in windows")
time.sleep(1)
print("sleep 1 done.....")
_exe("python {} {}".format(fp, 2))
elif 2 == num:
print("exec 2")
time.sleep(1)
print("exec 2, after sleep...")
success, text = s2_zip_local_dir()
if not success:
print(text)
else:
_exe("python {} {}".format(fp, 3))
pass
elif 3 == num:
print("exec 3")
s3_delete_local_dir()
print("step all done...")
if __name__ == '__main__':
handle_args()
# s2_zip_local_dir()
ss = 'C:\\Users\\Stone\\Desktop\\logs\\Music\\.thumbnails\\.database_uuid'
s2 = "C:\\Users\\Stone\\Desktop\\normal\\nets.txt"
# with open(ss):
# pass