• Python Watchdog:高效的文件系统监控


    1. 写在前面

    软件开发中,有时候需要通过 Python 去监听指定区域文件或目录的创建、修改,或者删除,从而引发特定的事件处理。本篇博客为你介绍第三方模块 Watchdog 实现对文件事件的监控。

    公众号: 滑翔的纸飞机

    2. Watchdog

    2.1 什么是 Watchdog?

    用于监视文件系统事件的 Python API 和 shell 实用程序。

    **项目地址:**https://pypi.org/project/watchdog/
    **最新版本:**Watchdog 3.0.0 适用于 Python 3.7+
    **安装:**需要运行以下命令进行安装(确保使用的是 Python 3.7+):

    pip install watchdog
    
    • 1

    2.2 官方快速入门示例

    以下示例程序:将以递归方式监视当前目录文件系统变更,并简单地将它们输出到控制台;

    import sys
    import logging
    from watchdog.observers import Observer
    from watchdog.events import LoggingEventHandler
    
    if __name__ == "__main__":
        # 设置日志信息格式
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s - %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S')
        # 要监控的目录路径
        path = sys.argv[1] if len(sys.argv) > 1 else '.'
        # 创建一个日志事件处理程序
        event_handler = LoggingEventHandler()
        # 创建一个观察者对象
        observer = Observer()
        # 声明一个定时任务
        observer.schedule(event_handler, path, recursive=True)
        # 启动定时任务
        observer.start()
        try:
            while observer.is_alive():
                observer.join(1)
        finally:
            observer.stop()
            observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    输出: 跟踪目录变更事件,通过日志输出变更记录。

    例如: 创建 test > 1.txt 控制台输出:

    2023-10-19 00:56:18 - Created directory: /Users/demo/2023/10/watchdog/test
    2023-10-19 00:56:18 - Modified directory: /Users/demo/2023/10/watchdog
    2023-10-19 00:56:27 - Created file: /Users/demo/2023/10/watchdog/test/1.txt
    2023-10-19 00:56:27 - Modified directory: /Users/demo/2023/10/watchdog/test
    
    • 1
    • 2
    • 3
    • 4

    2.3 Event Handler 和 Observer

    Watchdog 的主要实现或者可以说 Watchdog 的构件是基于以下类:

    • Observer:观察者,用于监视目录并将调用分派给事件处理程序;
    • Event handler: 文件系统事件和事件处理程序;

    说白了,Observer 监控目录,触发 Event handler 针对事件做出响应;

    导入方式:

    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler 
    
    • 1
    • 2
    2.3.1 Event handler

    以下是Watchdog中默认提供的4个事件处理类:

    • FileSystemEventHandler:文件,事件处理器的基类,用于处理事件;
    • PatternMatchingEventHandler:模式匹配文件;
    • RegexMatchingEventHandler:正则匹配文件;
    • LoggingEventHandler:记录日志。

    有关处理程序的更多详情,请参阅此链接

    通过扩展 Watchdog 提供的默认事件处理程序类,实现自定义的函数来处理修改、创建、删除和移动事件。还可以覆盖 FileSystemEventHandler 中的函数(以下函数),因为其他事件处理类都继承自该类。

    **on_any_event(event):**捕获所有事件处理程序;
    **on_created(event):**在创建文件或目录时调用;
    **on_deleted(event):**删除文件或目录时调用;
    **on_modified(event):**当文件或目录被修改时调用;
    **on_moved(event):**在移动或重命名文件或目录时调用;
    **on_closed(event):**文件已关闭时调用;
    **on_opened(event):**打开文件时调用;

    每个函数都有一个名为 event 的输入参数,其中包含以下变量:

    • event_type:字符串形式的事件类型(“moved”、“deleted”、“created”、“modified”、“closed”、“opened”),默认为 “无”;
    • is_directory:True:表示事件针对目录;
    • src_path:触发此事件的文件系统对象的源路径;
    2.3.2 Observer

    如果大家熟悉设计模式,那么 Watchdog 就遵循观察设计模式。因此,每个观察者都会有事件,如果文件或目录有任何变化,它就会查看并显示变化。

    Observer,观察目录,针对事件调用处理程序,也可以直接导入特定平台的类,并用它代替 Observer

    2.3.3 回顾上文简单示例

    通过上述介绍,对 Event handler 和 Observer有一个简单的理解,现在我们回过头继续来看官方示例:

    import sys 
    import logging
    from watchdog.observers import Observer
    from watchdog.events import LoggingEventHandler
    
    if __name__ == "__main__":
        # 设置日志信息格式
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s - %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S')
        # 要监控的目录路径
        path = sys.argv[1] if len(sys.argv) > 1 else '.'
        # 创建一个日志事件处理程序
        event_handler = LoggingEventHandler()
        # 创建一个观察者对象
        observer = Observer()
        # 声明一个定时任务
        observer.schedule(event_handler, path, recursive=True)
        # 启动定时任务
        observer.start()
        try:
            while observer.is_alive():
                observer.join(1)
        finally:
            observer.stop()
            observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    (1)event_handler = LoggingEventHandler(): 创建一个日志事件处理程序;
    (2)observer = Observer():创建一个观察者对象;
    (3)observer.schedule(event_handler, path, recursive=True):声明一个定时任务,传入事件处理程序、监控路径、以及是否递归子目录;
    (4)observer.start():启动定时任务;

    进一步分析下:

    schedule(self, event_handler, path, recursive=False):该方法用于监视 path 路径,并调用给定的事情 event_handler 。

    参数 recursive 表示是否递归子目录,即监听子目录,默认为 False。

    start():启动线程,这里开启了新的守护线程,主程序如果结束, 该线程也会停止。
    每个线程对象只能调用1次,它安排对象的 run() 方法在单独的控制线程中调用,如果在同一线程对象上多次调用此方法将引发 RuntimeError。

    2.4. 理解和使用

    基于上述关键概念介绍以及官方示例,自己实现一个文件事件监听;

    在本例中,使用 FileSystemEventHandler 事件类。对一个文件夹设置监视,并在有文件产生时触发另一个函数。处理完成后,将把文件移到另一个文件夹。

    (1)首先,你需要创建一个继承自 FileSystemEventHandler 事件处理类,并创建一个观察者和自定义的事件处理程序实例。

    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    
    class MyHandler(FileSystemEventHandler):
      pass
    
    observer = Observer()
    event_handler = MyHandler()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (2)创建一个定时任务,传入以下参数

    • event_handler:刚创建的处理程序对象;
    • path:要跟踪的文件夹路径;
    • recursive:是否递归子目录;
    observer.schedule(event_handler, path='./input_files', recursive=True)
    
    • 1

    (3) observer.start() - 启动任务,等待目录产生事件,触发事件处理程序中的代码。

    (4) observer.stop() - 该函数将清理资源。

    (5) 最后用 observer.join() 结束,因为我们在这里使用的是多线程概念。join() 将连接多个线程,直到调用 join 方法的线程终止。

    observer.start()
    try:
        while True:
            time.sleep(300)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接下去,自定义事件处理类:MyHandler

    在这个示例中,我将检查是否有文件上传到所跟踪的文件夹中。为此,我可以使用 on_created(event):

    def create_directory(file_path=None):
        # 以'年-月-日'的格式获取当前日期
        current_date = datetime.now().strftime('%Y-%m-%d')
    
        # 创建一个包含当前日期的文件夹
        folder_path = f'{file_path}/{current_date}'
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
            return folder_path
        else:
            return folder_path
    
    
    class MyHandler(FileSystemEventHandler):
        def on_created(self, event):
            dir_path = event.src_path.split('/input_files')
            processed_files = f'{dir_path[0]}/processed_files'
    
            child_processed_dir = create_directory(file_path=processed_files)
    
            if event:
                print("file created:{}".format(event.src_path))
                # 这里调用其他处理函数
                main(file_name=event.src_path)
    
                file_name = event.src_path.split('/')[-1]
                destination_path = f'{child_processed_dir}/{file_name}'
    
                # 将文件移动到其他目录
                shutil.move(event.src_path, destination_path)
                print("file moved:{} to {}".format(event.src_path, destination_path))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在上面的示例中,我使用函数 create_directory() 来检查目标路径中是否有当前日期的文件夹,否则就创建相同的文件夹。

    然后,在其他 python 脚本函数 main() 中做了一些处理后,使用相同的路径作为目标路径来移动文件

    下面是最终代码:my_event_handler.py

    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    import shutil
    import time
    import os
    from datetime import datetime
    from watchdog_fileobserver import main
    
    
    def create_directory(file_path=None):
        # 以'年-月-日'的格式获取当前日期
        current_date = datetime.now().strftime('%Y-%m-%d')
    
        # 创建一个包含当前日期的文件夹
        folder_path = f'{file_path}/{current_date}'
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)
            return folder_path
        else:
            return folder_path
    
    
    class MyHandler(FileSystemEventHandler):
        def on_created(self, event):
            dir_path = event.src_path.split('/input_files')
            processed_files = f'{dir_path[0]}/processed_files'
    
            child_processed_dir = create_directory(file_path=processed_files)
    
            if event:
                print("file created:{}".format(event.src_path))
                # 这里调用其他处理函数
                main(file_name=event.src_path)
    
                file_name = event.src_path.split('/')[-1]
                destination_path = f'{child_processed_dir}/{file_name}'
    
                shutil.move(event.src_path, destination_path)
                print("file moved:{} to {}".format(event.src_path, destination_path))
    
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = MyHandler()
        observer.schedule(event_handler, path='./input_files', recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(300)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    watchdog_fileobserver.py:

    import csv
    
    def read_csv_file(file_name):
        try:
            with open(f"{file_name}", 'r') as file:
              csvreader = csv.DictReader(file)
              for row in csvreader:
                print(row)
            return csvreader
        except Exception as e:
            pass
    
    def main(file_name=None):
        if file_name:
            dict_data = read_csv_file(file_name)
            print("Process completed")
        else:
            print("Invalid file path")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这种情况下,需要等待文件上传,然后执行所需的操作。为此,你可以在事件函数中添加以下代码:

    def on_created(self, event):
      file_size = -1
      while file_size != os.path.getsize(event.src_path):
          file_size = os.path.getsize(event.src_path)
          print(file_size)
          time.sleep(1)
    
    ###    OR   ###
    
    def on_created(self, event):
      file = None
      while file is None:
        try:
            file = open(event.src_path)
        except OSError:
            logger.info('Waiting for file transfer....')
            time.sleep(1)
            continue
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    验证:

    脚本所在路径下,创建用于监听目录 input_files, 在该目录下创建一个文件,控制台输出:

    file created:/Users/demo/2023/10/watchdog/input_files/text.txt
    Process completed
    file moved:/Users/demo/2023/10/watchdog/input_files/text.txt to /Users/demo/2023/10/watchdog/processed_files/2023-10-20/text.txt
    
    • 1
    • 2
    • 3

    目录如下:

    .
    ├── input_files
    ├── my_event_handler.py
    ├── processed_files
    │   └── 2023-10-20
    │       └── text.txt
    └── watchdog_fileobserver.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.5. Watchdog 使用案例

    2.5.1 忽略子目录或只包含模式匹配文件的情况

    如果要忽略某个目录中的某些文件,可以使用最简单的方法之一,即使用 PatternMatchingEventHandler

    在文件 my_event_handler.py 中,修改 MyHandler 中的继承类(PatternMatchingEventHandler),如下所示:

    class MyHandler(PatternMatchingEventHandler):
        ....
        ....
    
    if __name__ == "__main__":
        event_handler = MyHandler(patterns=["*.csv", "*.pdf"],
                                  ignore_patterns=[],
                                  ignore_directories=True
                                  )
         ....
         ....
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    2.5.2 使用 Celery 来启动/停止 Watchdog

    可以使用下面的示例来实现 Watchdog。不过,这个示例只是一个关于如何将 celery 集成到Watchdog 中的想法。

    from celery import Celery
    from watchdog.observers import Observer
    from watchdog.events import PatternMatchingEventHandler
    import os
    import time
    
    app = Celery('celery_ex.celery_apptask_ex', broker='redis://localhost:6379/0')
    
    
    @app.task
    def process_file(file_path):
        # do something with the file
        with open(file_path, 'r') as f:
            print(f.read())
    
    
    class MyHandler(PatternMatchingEventHandler):
        def on_created(self, event):
            file_size = -1
            while file_size != os.path.getsize(event.src_path):
                file_size = os.path.getsize(event.src_path)
                print(file_size)
                time.sleep(1)
    
            if event:
                print("file created:{}".format(event.src_path))
                # call function here
                process_file.apply_async(args=(event.src_path,))
    
    
    if __name__ == "__main__":
        observer = Observer()
        event_handler = MyHandler(patterns=["*.csv", "*.pdf"],
                                  ignore_patterns=[],
                                  ignore_directories=True
                                  )
        observer.schedule(event_handler, path='./input_files', recursive=True)
        observer.start()
    
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    此示例需要 redis、celery 支持;

    2.5.3 监控目录变化

    观察者(observer)可以设置指定目录及其所有子目录,在文件或目录创建、删除或修改时调用相应的方法(on_created、on_deleted 或 on_modified),观察者以无限循环的方式运行,可以被键盘中断打断。

    import time
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    
    class EventHandler(FileSystemEventHandler):
        def on_created(self, event):
            if event.is_directory:
                print("Directory created:", event.src_path)
            else:
                print("File created:", event.src_path)
    
        def on_deleted(self, event):
            if event.is_directory:
                print("Directory deleted:", event.src_path)
            else:
                print("File deleted:", event.src_path)
    
        def on_modified(self, event):
            if event.is_directory:
                print("Directory modified:", event.src_path)
            else:
                print("File modified:", event.src_path)
    
    event_handler = EventHandler()
    observer = Observer()
    observer.schedule(event_handler, "/path/to/dir", recursive=True)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    2.5.4 使用线程和多进程执行 Watchdog 来启动独立进程

    可以运行 Watchdog,使用线程和多进程并行处理多个文件。下面是一个相同的示例:

    from watchdog.observers import Observer
    from watchdog.events import PatternMatchingEventHandler
    import os
    import ntpath
    import time
    import optparse
    import multiprocessing
    import threading
    from collections import OrderedDict
    
    lock = threading.RLock()
    
    
    def process_function(get_event, event_dict):
        print(f"Process started for event: {get_event}")
        dir_path = ntpath.abspath(get_event)
        file_name = ntpath.basename(get_event)
    
        if len(get_event) > 0:
            your_own_function()
            do something....
    
    class Handler(PatternMatchingEventHandler):
        def __init__(self, queue):
            PatternMatchingEventHandler.__init__(self, patterns=['*.csv'],
                                                 ignore_patterns=[],
                                                 ignore_directories=True)
            self.queue = queue
    
        def on_created(self, event):
            # logger.info(f"Wait while the transfer of the file is finished before processing it")
            # file_size = -1
            # while file_size != os.path.getsize(event.src_path):
            #     file_size = os.path. getsize(event.src_path)
            #     time.sleep(1)
    
            file = None
            while file is None:
                try:
                    file = open(event.src_path)
                except OSError:
                    logger.info('Waiting for file transfer')
                    time.sleep(5)
                    continue
    
            self.queue.put(event.src_path)
    
        def on_modified(self, event):
            pass
    
    
    def start_watchdog(watchdog_queue, dir_path):
        logger.info(f"Starting Watchdog Observer\n")
        event_handler = Handler(watchdog_queue)
        observer = Observer()
        observer.schedule(event_handler, dir_path, recursive=False)
        observer.start()
    
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            logger.error(f"Error: {str(error)}")
        observer.join()
    
    
    if __name__ == '__main__':
        dir_path = r'//file_path/'
    
        watchdog_queue = Queue()
    
        logger.info(f"Starting Worker Thread")
        worker = threading.Thread(target=start_watchdog, name="Watchdog",
                                  args=(watchdog_queue, dir_path), daemon=True)
        worker.start()
    
        mp = Manager()
        event_dict = mp.dict()
    
        while True:
            if not watchdog_queue.empty():
                logger.info(f"Is Queue empty: {watchdog_queue.empty()}")
                pool = Pool()
                pool.apply_async(process_function, (watchdog_queue.get(), event_dict))
            else:
                time.sleep(1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    2.5.5 在 Watchdog 中进行日志记录

    要记录事件,可以创建一个继承自 FileSystemEventHandler 类的自定义事件处理程序类,并重写与要记录的事件相对应的方法。

    下面举例说明如何使用 Watchdog 库记录文件创建和修改事件:

    import logging
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    
    class LogEventHandler(FileSystemEventHandler):
        def on_created(self, event):
            if not event.is_directory:
                logging.info(f"File created: {event.src_path}")
    
        def on_modified(self, event):
            if not event.is_directory:
                logging.info(f"File modified: {event.src_path}")
    
    logging.basicConfig(filename='watchdog.log', level=logging.INFO, format='%(asctime)s - %(message)s')
    event_handler = LogEventHandler()
    observer = Observer()
    observer.schedule(event_handler, "/path/to/", recursive=True)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3. 最后

    无论你是在一个需要跟踪多个文件的大型项目中工作,还是只想关注单个文件的任务,Watchdog 库都能满足你的需求。

    感谢您花时间阅读文章
    关注公众号不迷路
  • 相关阅读:
    java计算机毕业设计高校科研信息管理系统源码+mysql数据库+系统+lw文档+部署
    SpringBoot的HandlerInterceptor拦截器使用方法
    Vmware: network 相关
    AI大模型探索之路-训练篇3:大语言模型全景解读
    网络安全(黑客)自学笔记
    [计算几何] 2 二维凸包/笨蛋(我)也能看懂的二维凸包算法
    [SpringCloud] Eureka 与 Ribbon 简介
    新电脑验机步骤(1)
    【超全汇总】HTTP协议
    QT基础教程(GUI程序原理分析)
  • 原文地址:https://blog.csdn.net/u011521019/article/details/133938110