Source code for ogstools.logparser.log_parser

# Copyright (c) 2012-2025, OpenGeoSys Community (http://www.opengeosys.org)
#            Distributed under a Modified BSD License.
#            See accompanying file LICENSE.txt or
#            http://www.opengeosys.org/project/license
#

import re
from dataclasses import asdict
from pathlib import Path
from queue import Empty, Queue
from typing import Any, cast

from ogstools.logparser.regexes import (
    Context,
    Log,
    MPIProcess,
    NoRankOutput,
    Termination,
)


def _try_match_line(
    line: str,
    line_nr: int,
    regex: re.Pattern,
    log_type: type[Log],
    fill_mpi: bool,
) -> Any | None:
    if match := regex.match(line):
        # Line , Process, Type specific
        ts = log_type.type_str()
        types = (str, int, int) + tuple(log_type.__annotations__.values())
        optional_mpi_id = (0,) if fill_mpi else ()
        match_with_line = (ts, line_nr) + optional_mpi_id + match.groups()
        return [
            ctor(s) for ctor, s in zip(types, match_with_line, strict=False)
        ]
    return None


[docs] def read_mpi_processes(file_name: str | Path) -> int: """ Counts the number of MPI processes started by OpenGeoSys-6 by detecting specific log entries in a given file. It assumes that each MPI process will log two specific messages: "This is OpenGeoSys-6 version" and "OGS started on". The function counts occurrences of these messages and divides the count by two to estimate the number of MPI processes. :param file_name: The path to the log file, as either a string or a Path object. :returns: An integer representing the estimated number of MPI processes based on the log file's content. """ occurrences = 0 file_name = Path(file_name) with file_name.open() as file: lines = iter(file) # There is no synchronisation barrier between both info, we count both and divide while re.search( "info: This is OpenGeoSys-6 version|info: OGS started on", next(lines), ): occurrences = occurrences + 1 return int(occurrences / 2)
[docs] def normalize_regex( ogs_res: list, parallel_log: bool = False, ) -> list: """ Takes regex patterns for serial computation and modify them for parallel Parallel log lines are prepended with the process id, e.g. [0] or [1] """ patterns = [] for regex, log_type in ogs_res: mpi_condition = ( parallel_log and issubclass(log_type, MPIProcess) and not issubclass(log_type, NoRankOutput) ) mpi_process_regex = "\\[(\\d+)\\]\\ " if mpi_condition else "" patterns.append((re.compile(mpi_process_regex + regex), log_type)) return patterns
[docs] def simple_consumer(queue: Queue) -> None: print("[Consumer] Started") try: while True: try: item = queue.get(timeout=1) # wait for a log item print(f"[Consumer] → {item}") except Empty: continue # no data yet, just keep looping except KeyboardInterrupt: print("[Consumer] Interrupted, exiting...")
[docs] def parse_line( patterns: list, line: str, parallel_log: bool, number_of_lines_read: int ) -> Log | Termination | None: for regex, log_type in patterns: has_mpi_process = parallel_log and issubclass(log_type, MPIProcess) fill_mpi = not has_mpi_process or issubclass(log_type, NoRankOutput) if r := _try_match_line( line, number_of_lines_read, # ToDo should not be here regex, log_type, fill_mpi=fill_mpi, ): return log_type(*r) return None
[docs] def read_version(file: Path) -> int: """ Read the version of the OGS log file. :param file: Path to the OGS log file. :returns: The version number as an integer. """ with file.open() as f: for line in f: match = re.search(r"Log version: (\d+)", line) if match: return int(match.group(1)) if ( "This is OpenGeoSys-6 version " in line and "Log version" not in line ): return 1 print("Log version could not be deduced. Please specify it.") return 1
[docs] def parse_file( file_name: str | Path, maximum_lines: int | None = None, force_parallel: bool = False, ) -> list[Any]: """ Parses a log file from OGS, applying regex patterns to extract specific information, The function supports processing files in serial or parallel mode. In parallel mode, a specific regex is used to match log entries from different processes. :param file_name: The path to the log file, as a string or Path object. :param maximum_lines: Optional maximum number of lines to read from the file. If not provided, the whole file is read. :param force_parallel: Should only be set to True if OGS run with MPI with a single core :returns: A list of extracted records based on the applied regex patterns. The exact type and structure of these records depend on the regex patterns and their associated processing functions. """ context = Context() file_name = Path(file_name) parallel_log = force_parallel or read_mpi_processes(file_name) > 1 version = read_version(file_name) patterns = normalize_regex(select_regex(version), parallel_log) number_of_lines_read = 0 with file_name.open() as file: lines = iter(file) records = [] for line in lines: number_of_lines_read += 1 if (maximum_lines is not None) and ( maximum_lines > number_of_lines_read ): break entry = parse_line( patterns, line, parallel_log, number_of_lines_read ) if entry: if version == 2: # version one need to call ogs_context # here adding context right away if not isinstance(entry, Termination): valid_log: Log = entry context.update(valid_log) entry_d = asdict(valid_log) context_d = asdict(context) fields_of_interest = valid_log.context_filter() filtered_context = { k: v for k, v in context_d.items() if k in fields_of_interest } records.append(entry_d | filtered_context) else: log_entry: Log = cast(Log, entry) context.update(log_entry) records.append(asdict(log_entry)) else: valid_entry: Log = cast(Log, entry) records.append(asdict(valid_entry)) return records
[docs] def select_regex(version: int) -> list[tuple[str, type[Log]]]: if version == 1: from ogstools.logparser.regexes import ogs_regexes return ogs_regexes() if version == 2: from ogstools.logparser.regexes import new_regexes return new_regexes() msg = f"Not supported log version (got: {version})" raise ValueError(msg)