Source code for ogstools.logparser.log_file_handler
# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#
from collections.abc import Callable
from pathlib import Path
from queue import Queue
from typing import Any
from watchdog.events import (
    DirModifiedEvent,
    FileCreatedEvent,
    FileModifiedEvent,
    FileSystemEventHandler,
)
from ogstools.logparser.log_parser import (
    normalize_regex,
    parse_line,
    select_regex,
)
from ogstools.logparser.regexes import Context, Log, Termination
[docs]
class LogFileHandler(FileSystemEventHandler):
[docs]
    def __init__(
        self,
        file_name: str | Path,
        queue: Queue,
        status: Context,
        stop_callback: Callable[[], tuple[None, Any]],
        force_parallel: bool = False,
        line_limit: int = 0,
    ):
        """
        :param file_name: The location of the log file to monitor.
        :param queue: The queue where log entries are put and to be consumed.
        :status: The status of the simulation (e.g. current time step).
        :stop_callback: A callback function to stop the simulation.
        :force_parallel: Only needed for MPI run with 1 process. Then it must be set to True.
        :line_limit: The number of lines to read before stopping the simulation. 0 means no limit.
        """
        self.file_name = Path(file_name)
        self.queue = queue
        self.status = status
        self.stop_callback = stop_callback
        self.force_parallel = force_parallel
        self.line_limit = line_limit
        self._file_read: bool = False
        self.num_lines_read: int = 0
        # real time monitoring is only working for log version 2 and serial logs or parallel sim without (ogs --parallel_log)
        self.patterns: list = normalize_regex(
            select_regex(version=2), parallel_log=False
        ) 
[docs]
    def on_created(self, event: FileCreatedEvent) -> None:
        if event.src_path != str(self.file_name):
            return
        print(f"{self.file_name} has been created.")
        self.process() 
[docs]
    def on_modified(self, event: FileModifiedEvent | DirModifiedEvent) -> None:
        if event.src_path != str(self.file_name):
            return
        self.process() 
[docs]
    def process(self) -> None:
        if not self._file_read:
            try:
                self._file: Any = self.file_name.open("r")
                self._file.seek(0, 0)
                self._file_read = True
            except FileNotFoundError:
                print(f"File not found yet: {self.file_name}")
                return
        print(f"{self.file_name} has been modified.")
        while True:
            line = self._file.readline()
            num_lines_current = self.num_lines_read + 1
            if not line or not line.endswith("\n"):
                break  # Wait for complete line before processing
            log_entry: Log | Termination | None = parse_line(
                self.patterns,
                line,
                parallel_log=False,
                number_of_lines_read=num_lines_current,
            )
            if log_entry:
                assert isinstance(log_entry, Log | Termination)
                self.queue.put(log_entry)
                self.status.update(log_entry)
            if isinstance(log_entry, Termination):
                print("===== Termination =====")
                self.queue.put(log_entry)
                self.status.update(log_entry)
                self.stop_callback()
                self._file.close()
                break
            if self.line_limit > 0 and self.num_lines_read > self.line_limit:
                self.stop_callback()
                self._file.close()
                break
            self.num_lines_read = self.num_lines_read + 1