Source code for magboltz_gui.util.output_parser

"""Parse raw Magboltz stdout into the structured run-result model."""

from __future__ import annotations

import re
from datetime import datetime, timezone
from typing import List, Optional

from magboltz_gui.util.run_result import (
    RunResult,
    MixtureGas,
    RunMeta,
    RunInput,
    RunConditions,
    RunIntegration,
    RunFlags,
    RunCounts,
    RunTransport,
    DriftVelocity,
    RunDiffusion,
    DiffusionComponent,
    RunFrequenciesTotal,
    GasFrequencies,
    CollisionProcess,
    ConvergenceRow,
    EnergyRow,
    RunTables,
    RunRaw,
)


_FLOAT_RE = re.compile(r"[-+]?\d*\.\d+(?:[DEde][-+]?\d+)?|[-+]?\d+(?:[DEde][-+]?\d+)?")


def _parse_float(text: str) -> Optional[float]:
    try:
        return float(text.replace("D", "E").replace("d", "E"))
    except Exception:
        return None


def _parse_int(text: str) -> Optional[int]:
    try:
        return int(text)
    except Exception:
        return None


def _find_first_float(line: str) -> Optional[float]:
    m = _FLOAT_RE.search(line)
    if not m:
        return None
    return _parse_float(m.group(0))


def _floats_in_line(line: str) -> List[float]:
    out: List[float] = []
    for m in _FLOAT_RE.finditer(line):
        val = _parse_float(m.group(0))
        if val is not None:
            out.append(val)
    return out


[docs] def parse_magboltz_output(stdout_text: str, input_text: Optional[str] = None, input_path: Optional[str] = None) -> RunResult: """Extract transport quantities, tables, and metadata from one Magboltz run.""" warnings: List[str] = [] lines = stdout_text.splitlines() result = RunResult( meta=RunMeta(timestamp_utc=datetime.now(timezone.utc).isoformat()), input=RunInput(input_text=input_text, input_path=input_path), conditions=RunConditions(integration=RunIntegration(), flags=RunFlags()), counts=RunCounts(), transport=RunTransport(diffusion=RunDiffusion()), frequencies_total=RunFrequenciesTotal(), tables=RunTables(units={ "vel": "um/ns", "pos": "dimensionless", "time": "ps", "energy": "eV", "difxx": "cm^2/s", "difyy": "cm^2/s", "difzz": "cm^2/s", }), raw=RunRaw(stdout_text=stdout_text, parser_warnings=warnings), ) # Tool version for line in lines: if "PROGRAM MAGBOLTZ" in line and "VERSION" in line: result.meta.tool_version = " ".join(line.strip().split()) break # Mixture section mix_start = None for i, line in enumerate(lines): if "GASES USED" in line and "PERCENTAGE USED" in line: mix_start = i + 1 break if mix_start is not None: for line in lines[mix_start:]: if not line.strip(): break parts = line.strip().split() if not parts: continue try: fraction = _parse_float(parts[-1]) name = parts[0] model_tag = " ".join(parts[1:-1]) or None result.mixture.append(MixtureGas(name=name, model_tag=model_tag, fraction_percent=fraction)) except Exception: warnings.append(f"Failed to parse mixture line: {line}") # Conditions for line in lines: if "GAS TEMPERATURE" in line: result.conditions.gas_temperature_C = _find_first_float(line) elif "GAS PRESSURE" in line: result.conditions.gas_pressure_torr = _find_first_float(line) elif "INTEGRATION FROM" in line and "IN" in line and "STEPS" in line: vals = _floats_in_line(line) if len(vals) >= 3: result.conditions.integration.E_min_eV = vals[0] result.conditions.integration.E_max_eV = vals[1] result.conditions.integration.n_steps = int(vals[2]) elif "PENNING EFFECTS" in line: result.conditions.flags.penning_included = "INCLUDED" in line and "NOT" not in line elif "ANISOTROPIC SCATTERING TYPE" in line: m = re.search(r"TYPE\s+(\d+)", line) if m: result.conditions.flags.anisotropic_scattering_type = _parse_int(m.group(1)) elif "SHORT DECORRELATION LENGTH" in line: m = re.search(r"=\s*([0-9]+)", line) if m: result.conditions.flags.short_decorrelation_length_collisions = _parse_int(m.group(1)) elif "THERMAL MOTION OF GAS" in line: result.conditions.flags.thermal_motion_included = "INCLUDED" in line elif "ELECTRIC FIELD" in line: result.conditions.electric_field_V_cm = _find_first_float(line) elif "MAGNETIC FIELD" in line: result.conditions.magnetic_field_kG = _find_first_float(line) elif "ANGLE BETWEEN ELECTRIC AND MAGNETIC FIELD" in line: result.conditions.angle_E_B_deg = _find_first_float(line) elif "CYCLOTRON FREQ." in line: result.conditions.cyclotron_freq_rad_ps = _find_first_float(line) elif "INITIAL ELECTRON ENERGY" in line: result.conditions.initial_electron_energy_eV = _find_first_float(line) elif "TOTAL NUMBER OF REAL COLLISIONS" in line: val = _find_first_float(line) if val is not None: result.counts.total_real_collisions = int(val) # Null collision frequency components for i, line in enumerate(lines): if "NULL COLLISION FREQUENCY FOR EACH GAS COMPONENT" in line: for j in range(i + 1, min(i + 6, len(lines))): nums = _floats_in_line(lines[j]) if not nums: break result.counts.null_collision_frequency_components.extend(nums) break # Convergence table for i, line in enumerate(lines): if line.strip().startswith("VEL") and "DIFZZ" in line: for j in range(i + 1, len(lines)): row_line = lines[j].strip() if not row_line: continue if row_line.startswith("-"): break vals = _floats_in_line(row_line) if len(vals) >= 8: result.tables.convergence_table.append( ConvergenceRow( vel=vals[0], pos=vals[1], time=vals[2], energy=vals[3], count=vals[4], difxx=vals[5], difyy=vals[6], difzz=vals[7], ) ) break # Calculated max collision time for line in lines: if "CALCULATED MAX. COLLISION TIME" in line: result.counts.calculated_max_collision_time_ps = _find_first_float(line) elif line.strip().startswith("NUMBER OF NULL COLLISIONS"): val = _find_first_float(line) if val is not None: result.counts.num_null_collisions = int(val) # Drift velocities for line in lines: if "Z DRIFT VELOCITY" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.vz_um_ns = DriftVelocity(v_um_ns=vals[0], err_pct=vals[1]) elif "Y DRIFT VELOCITY" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.vy_um_ns = DriftVelocity(v_um_ns=vals[0], err_pct=vals[1]) elif "X DRIFT VELOCITY" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.vx_um_ns = DriftVelocity(v_um_ns=vals[0], err_pct=vals[1]) # Diffusion def _parse_diffusion(lines_list: List[str], label: str) -> Optional[int]: for i, line in enumerate(lines_list): if label in line: return i return None t_idx = _parse_diffusion(lines, "TRANSVERSE DIFFUSION") if t_idx is not None: vals = _floats_in_line(lines[t_idx]) if len(vals) >= 2: result.transport.diffusion.transverse["DT_cm2_s"] = DiffusionComponent(vals[0], vals[1]) if t_idx + 1 < len(lines): vals = _floats_in_line(lines[t_idx + 1]) if len(vals) >= 2: result.transport.diffusion.transverse["DT_eV"] = DiffusionComponent(vals[0], vals[1]) if t_idx + 2 < len(lines): vals = _floats_in_line(lines[t_idx + 2]) if len(vals) >= 2: result.transport.diffusion.transverse["DT_um_cm05"] = DiffusionComponent(vals[0], vals[1]) l_idx = _parse_diffusion(lines, "LONGITUDINAL DIFFUSION") if l_idx is not None: vals = _floats_in_line(lines[l_idx]) if len(vals) >= 2: result.transport.diffusion.longitudinal["DL_cm2_s"] = DiffusionComponent(vals[0], vals[1]) if l_idx + 1 < len(lines): vals = _floats_in_line(lines[l_idx + 1]) if len(vals) >= 2: result.transport.diffusion.longitudinal["DL_eV"] = DiffusionComponent(vals[0], vals[1]) if l_idx + 2 < len(lines): vals = _floats_in_line(lines[l_idx + 2]) if len(vals) >= 2: result.transport.diffusion.longitudinal["DL_um_cm05"] = DiffusionComponent(vals[0], vals[1]) # Rates and mean energy for line in lines: if "IONISATION RATE /CM" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.ionisation_rate_per_cm = vals[0] result.transport.ionisation_rate_err_pct = vals[1] elif "ATTACHMENT RATE /CM" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.attachment_rate_per_cm = vals[0] result.transport.attachment_rate_err_pct = vals[1] elif "MEAN ELECTRON ENERGY" in line: vals = _floats_in_line(line) if len(vals) >= 2: result.transport.mean_electron_energy_eV = vals[0] result.transport.mean_electron_energy_err_pct = vals[1] # Frequencies totals for line in lines: if line.strip().startswith("TOTAL COLL. FREQ."): result.frequencies_total.total_coll_freq_1e12_s = _find_first_float(line) elif line.strip().startswith("ELASTIC COLL. FREQ."): result.frequencies_total.elastic_coll_freq_1e12_s = _find_first_float(line) elif line.strip().startswith("INELASTIC COLL. FREQ."): result.frequencies_total.inelastic_coll_freq_1e12_s = _find_first_float(line) elif line.strip().startswith("IONISATION COLL. FREQ."): result.frequencies_total.ionisation_coll_freq_1e12_s = _find_first_float(line) elif line.strip().startswith("ATTACHMENT COLL. FREQ."): result.frequencies_total.attachment_coll_freq_1e12_s = _find_first_float(line) # Detailed collision frequencies (bounded section) in_detail = False gas_section: Optional[str] = None for i, line in enumerate(lines): if "DETAILED COLLISION FREQUENCIES" in line: in_detail = True continue if not in_detail: continue if line.strip().startswith("NORMALISED ENERGY DISTRIBUTION"): break if not line.strip(): continue # Gas header is followed by a dashed line if i + 1 < len(lines) and lines[i + 1].strip().startswith("---"): gas_section = " ".join(line.strip().split()) result.frequencies_by_gas.append(GasFrequencies(gas_name=gas_section)) continue if line.strip().startswith("---"): continue if gas_section and "+-" in line: # Parse process line if "ELOSS=" in line: m = re.search(r"ELOSS=\s*([-+0-9\.DEde]+)", line) eloss = _parse_float(m.group(1)) if m else None else: eloss = None nums = _floats_in_line(line) if len(nums) >= 2: freq = nums[-2] err = nums[-1] label = line.strip() label = re.sub(r"[-+0-9\.DEde]+\s*\+\-\s*[-+0-9\.DEde]+\s*%?", "", label).strip() label = re.sub(r"\s+ELOSS=.*", "", label).strip() category = label.split()[0] if label else None result.frequencies_by_gas[-1].processes.append( CollisionProcess( label=label, eloss_eV=eloss, freq_1e12_s=freq, err_pct=err, category=category, ) ) # Energy distribution for i, line in enumerate(lines): if "NORMALISED ENERGY DISTRIBUTION" in line: for j in range(i + 1, len(lines)): row = lines[j].strip() if not row: break if row.startswith("Process finished"): break if "E=" in row and "SPEC=" in row: vals = _floats_in_line(row) if len(vals) >= 2: result.tables.energy_distribution.append(EnergyRow(E_eV=vals[0], spec=vals[1])) break return result