Source code for magboltz_gui.util.export_controller

"""Export parsed run results to CSV, JSON, and XML representations."""

from __future__ import annotations

import json
import os
import tempfile
import xml.etree.ElementTree as ET
from dataclasses import asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast

from magboltz_gui.util.export_types import ExportFormat, ExportType, CsvOptions, JsonOptions, XmlOptions
from magboltz_gui.util.run_result import RunResult, GasFrequencies, CollisionProcess


def _safe_float(val: Optional[float]) -> Optional[float]:
    return None if val is None else float(val)


def _unit_header(name: str, unit: Optional[str], include_units: bool) -> str:
    return f"{name} [{unit}]" if include_units and unit else name


def _write_atomic(path: Path, data: bytes) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with tempfile.NamedTemporaryFile("wb", delete=False, dir=path.parent) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tmp_path = Path(tmp.name)
    tmp_path.replace(path)


def _mixture_flat_columns(run: RunResult) -> Dict[str, Any]:
    out: Dict[str, Any] = {}
    for i, gas in enumerate(run.mixture, start=1):
        out[f"gas{i}_name"] = gas.name
        out[f"gas{i}_model_tag"] = gas.model_tag
        out[f"gas{i}_fraction_percent"] = gas.fraction_percent
    return out


def _summary_dict(run: RunResult) -> Dict[str, Any]:
    cond = run.conditions
    trans = run.transport
    freq = run.frequencies_total
    counts = run.counts
    dt = trans.diffusion.transverse.get("DT_cm2_s")
    dl = trans.diffusion.longitudinal.get("DL_cm2_s")
    return {
        "gas_temperature_C": cond.gas_temperature_C,
        "gas_pressure_torr": cond.gas_pressure_torr,
        "electric_field_V_cm": cond.electric_field_V_cm,
        "magnetic_field_kG": cond.magnetic_field_kG,
        "angle_E_B_deg": cond.angle_E_B_deg,
        "cyclotron_freq_rad_ps": cond.cyclotron_freq_rad_ps,
        "initial_electron_energy_eV": cond.initial_electron_energy_eV,
        "integration_E_min_eV": cond.integration.E_min_eV,
        "integration_E_max_eV": cond.integration.E_max_eV,
        "integration_n_steps": cond.integration.n_steps,
        "penning_included": cond.flags.penning_included,
        "thermal_motion_included": cond.flags.thermal_motion_included,
        "anisotropic_scattering_type": cond.flags.anisotropic_scattering_type,
        "short_decorrelation_length_collisions": cond.flags.short_decorrelation_length_collisions,
        "total_real_collisions": counts.total_real_collisions,
        "num_null_collisions": counts.num_null_collisions,
        "calculated_max_collision_time_ps": counts.calculated_max_collision_time_ps,
        "vx_um_ns": trans.vx_um_ns.v_um_ns if trans.vx_um_ns else None,
        "vx_err_pct": trans.vx_um_ns.err_pct if trans.vx_um_ns else None,
        "vy_um_ns": trans.vy_um_ns.v_um_ns if trans.vy_um_ns else None,
        "vy_err_pct": trans.vy_um_ns.err_pct if trans.vy_um_ns else None,
        "vz_um_ns": trans.vz_um_ns.v_um_ns if trans.vz_um_ns else None,
        "vz_err_pct": trans.vz_um_ns.err_pct if trans.vz_um_ns else None,
        "DT_cm2_s": dt.value if dt else None,
        "DT_cm2_s_err_pct": dt.err_pct if dt else None,
        "DL_cm2_s": dl.value if dl else None,
        "DL_cm2_s_err_pct": dl.err_pct if dl else None,
        "ionisation_rate_per_cm": trans.ionisation_rate_per_cm,
        "ionisation_rate_err_pct": trans.ionisation_rate_err_pct,
        "attachment_rate_per_cm": trans.attachment_rate_per_cm,
        "attachment_rate_err_pct": trans.attachment_rate_err_pct,
        "mean_electron_energy_eV": trans.mean_electron_energy_eV,
        "mean_electron_energy_err_pct": trans.mean_electron_energy_err_pct,
        "total_coll_freq_1e12_s": freq.total_coll_freq_1e12_s,
        "elastic_coll_freq_1e12_s": freq.elastic_coll_freq_1e12_s,
        "inelastic_coll_freq_1e12_s": freq.inelastic_coll_freq_1e12_s,
        "ionisation_coll_freq_1e12_s": freq.ionisation_coll_freq_1e12_s,
        "attachment_coll_freq_1e12_s": freq.attachment_coll_freq_1e12_s,
    }


[docs] def available_export_types(run: RunResult) -> List[ExportType]: """Return the export modes that make sense for the current result.""" types = [ExportType.SUMMARY] if run.tables.convergence_table: types.append(ExportType.CONVERGENCE_TABLE) if run.tables.energy_distribution: types.append(ExportType.ENERGY_DISTRIBUTION) if run.frequencies_by_gas: types.append(ExportType.COLLISION_FREQUENCIES) types.append(ExportType.FULL_RUN_ARCHIVE) return types
def _csv_rows_summary(run: RunResult, opts: CsvOptions) -> Tuple[List[str], List[List[Any]]]: data = _summary_dict(run) if opts.flatten_mixture: data.update(_mixture_flat_columns(run)) if opts.include_metadata: data = { "tool_version": run.meta.tool_version, "timestamp_utc": run.meta.timestamp_utc, "input_path": run.input.input_path, **data, } headers = list(data.keys()) units = { "gas_temperature_C": "C", "gas_pressure_torr": "torr", "electric_field_V_cm": "V/cm", "magnetic_field_kG": "kG", "angle_E_B_deg": "deg", "cyclotron_freq_rad_ps": "rad/ps", "initial_electron_energy_eV": "eV", "integration_E_min_eV": "eV", "integration_E_max_eV": "eV", "vx_um_ns": "um/ns", "vy_um_ns": "um/ns", "vz_um_ns": "um/ns", "DT_cm2_s": "cm^2/s", "DL_cm2_s": "cm^2/s", "ionisation_rate_per_cm": "1/cm", "attachment_rate_per_cm": "1/cm", "mean_electron_energy_eV": "eV", "total_coll_freq_1e12_s": "1e12/s", "elastic_coll_freq_1e12_s": "1e12/s", "inelastic_coll_freq_1e12_s": "1e12/s", "ionisation_coll_freq_1e12_s": "1e12/s", "attachment_coll_freq_1e12_s": "1e12/s", } if opts.include_units: headers = [_unit_header(h, units.get(h), True) for h in headers] row = [data[k] for k in data.keys()] return headers, [row] def _csv_rows_convergence(run: RunResult, opts: CsvOptions) -> Tuple[List[str], List[List[Any]]]: headers = ["vel", "pos", "time", "energy", "count", "difxx", "difyy", "difzz"] units = {"vel": "um/ns", "pos": "", "time": "ps", "energy": "eV", "count": "", "difxx": "cm^2/s", "difyy": "cm^2/s", "difzz": "cm^2/s"} if opts.include_units: headers = [_unit_header(h, units.get(h), True) for h in headers] rows: List[List[Any]] = [[r.vel, r.pos, r.time, r.energy, r.count, r.difxx, r.difyy, r.difzz] for r in run.tables.convergence_table] if opts.include_metadata: rows = [[run.meta.tool_version, run.meta.timestamp_utc, run.input.input_path, *row] for row in rows] headers = ["tool_version", "timestamp_utc", "input_path", *headers] return headers, rows def _csv_rows_energy(run: RunResult, opts: CsvOptions) -> Tuple[List[str], List[List[Any]]]: headers = ["E_eV", "spec"] units = {"E_eV": "eV", "spec": ""} if opts.include_units: headers = [_unit_header(h, units.get(h), True) for h in headers] rows: List[List[Any]] = [[r.E_eV, r.spec] for r in run.tables.energy_distribution] if opts.include_metadata: rows = [[run.meta.tool_version, run.meta.timestamp_utc, run.input.input_path, *row] for row in rows] headers = ["tool_version", "timestamp_utc", "input_path", *headers] return headers, rows def _csv_rows_collfreq(run: RunResult, opts: CsvOptions) -> Tuple[List[str], List[List[Any]]]: headers = ["gas_name", "process_label", "category", "eloss_eV", "freq_1e12_s", "err_pct"] if opts.include_units: headers = [_unit_header("eloss_eV", "eV", True) if h == "eloss_eV" else h for h in headers] rows: List[List[Any]] = [] for gas in run.frequencies_by_gas: for proc in gas.processes: rows.append([gas.gas_name, proc.label, proc.category, proc.eloss_eV, proc.freq_1e12_s, proc.err_pct]) if opts.include_metadata: rows = [[run.meta.tool_version, run.meta.timestamp_utc, run.input.input_path, *row] for row in rows] headers = ["tool_version", "timestamp_utc", "input_path", *headers] return headers, rows
[docs] def export_csv(run: RunResult, export_type: ExportType, path: Path, opts: CsvOptions) -> None: """Write one export view to CSV.""" if export_type == ExportType.SUMMARY: headers, rows = _csv_rows_summary(run, opts) elif export_type == ExportType.CONVERGENCE_TABLE: headers, rows = _csv_rows_convergence(run, opts) elif export_type == ExportType.ENERGY_DISTRIBUTION: headers, rows = _csv_rows_energy(run, opts) elif export_type == ExportType.COLLISION_FREQUENCIES: headers, rows = _csv_rows_collfreq(run, opts) else: raise ValueError("CSV does not support full run archive") lines = [opts.delimiter.join(str(h) for h in headers)] for row in rows: lines.append(opts.delimiter.join("" if v is None else str(v) for v in row)) data = ("\n".join(lines) + "\n").encode("utf-8") _write_atomic(path, data)
def _filter_run_for_export(run: RunResult, export_type: ExportType, opts: JsonOptions | XmlOptions) -> Any: data = run.to_dict() if not opts.include_input_text: data["input"]["input_text"] = None if not opts.include_raw_stdout: data["raw"]["stdout_text"] = None if not opts.include_parser_warnings: data["raw"]["parser_warnings"] = [] if export_type == ExportType.FULL_RUN_ARCHIVE: return data if export_type == ExportType.SUMMARY: return _summary_dict(run) if export_type == ExportType.CONVERGENCE_TABLE: return [asdict(r) for r in run.tables.convergence_table] if export_type == ExportType.ENERGY_DISTRIBUTION: return [asdict(r) for r in run.tables.energy_distribution] if export_type == ExportType.COLLISION_FREQUENCIES: rows = [] for gas in run.frequencies_by_gas: for proc in gas.processes: rows.append( { "gas_name": gas.gas_name, "process_label": proc.label, "category": proc.category, "eloss_eV": proc.eloss_eV, "freq_1e12_s": proc.freq_1e12_s, "err_pct": proc.err_pct, } ) return rows raise ValueError("Unknown export type")
[docs] def export_json(run: RunResult, export_type: ExportType, path: Path, opts: JsonOptions) -> None: """Write one export view to JSON.""" payload = { "schema_version": "1.0", "export_type": export_type.value, "data": _filter_run_for_export(run, export_type, opts), "units": run.tables.units, } text = json.dumps(payload, indent=2 if opts.pretty_print else None) _write_atomic(path, text.encode("utf-8"))
def _xml_append(parent: ET.Element, key: str, value: Any) -> None: if isinstance(value, list): list_elem = ET.SubElement(parent, key) for item in value: if isinstance(item, dict): row = ET.SubElement(list_elem, "row") for k, v in item.items(): if v is not None: row.set(k, str(v)) else: ET.SubElement(list_elem, "item").text = str(item) return if isinstance(value, dict): elem = ET.SubElement(parent, key) for k, v in value.items(): _xml_append(elem, k, v) return elem = ET.SubElement(parent, key) if value is not None: elem.text = str(value)
[docs] def export_xml(run: RunResult, export_type: ExportType, path: Path, opts: XmlOptions) -> None: """Write one export view to XML.""" root = ET.Element("magboltz_export", schema_version="1.0", export_type=export_type.value) data = _filter_run_for_export(run, export_type, opts) _xml_append(root, "data", data) units = ET.SubElement(root, "units") for k, v in run.tables.units.items(): if v: units.set(k, v) xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=True) _write_atomic(path, xml_bytes)
[docs] def export_to_file( run: RunResult, export_type: ExportType, export_format: ExportFormat, path: Path, csv_options: Optional[CsvOptions] = None, json_options: Optional[JsonOptions] = None, xml_options: Optional[XmlOptions] = None, ) -> None: """Dispatch one export request to the format-specific writer.""" if export_type == ExportType.CONVERGENCE_TABLE and not run.tables.convergence_table: raise ValueError("Selected dataset missing: convergence table") if export_type == ExportType.ENERGY_DISTRIBUTION and not run.tables.energy_distribution: raise ValueError("Selected dataset missing: energy distribution") if export_type == ExportType.COLLISION_FREQUENCIES and not run.frequencies_by_gas: raise ValueError("Selected dataset missing: collision frequencies") if export_format == ExportFormat.CSV: if export_type == ExportType.FULL_RUN_ARCHIVE: raise ValueError("Full run archive is not supported for CSV") export_csv(run, export_type, path, csv_options or CsvOptions()) elif export_format == ExportFormat.JSON: export_json(run, export_type, path, json_options or JsonOptions()) elif export_format == ExportFormat.XML: export_xml(run, export_type, path, xml_options or XmlOptions()) else: raise ValueError("Unknown export format")
[docs] def default_filename(run: RunResult, export_type: ExportType, ext: str) -> str: """Build a default filename for one exported artefact.""" mix = "_".join([f"{g.name}{int(g.fraction_percent or 0)}" for g in run.mixture]) or "run" field = run.conditions.electric_field_V_cm field_str = f"E{int(field)}" if field is not None else "E" ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") return f"{mix}_{field_str}_{export_type.value}_{ts}.{ext}"