Reusable BMS ML-SIL-HIL Pipeline

What this is: The complete pipeline that works across any BMS customer. Only the DBC file changes.


Pipeline Overview

INPUT (per customer)              PIPELINE (reusable)                OUTPUT (deliverables)
──────────────────                ───────────────────                ─────────────────────

CAN logs (.blf/.asc)  ──→  1. DECODE  ──→  2. ANALYZE  ──→  3. REPORT
DBC file                       cantools       5 ONNX models       PDF / dashboard
Pack specs                     → CSV/numpy    → predictions        findings + numbers
                                                   │
                                                   v
                                        4. DEPLOY (optional)
                                           ML sidecar on CAN
                                           SIL plant model
                                           fault injection

Stage 1: Decode (reusable, 1 hour per log)

Input: Customer CAN log + their DBC file Output: Time-aligned CSV with all BMS signals

# decode_customer_can.py — works with ANY BMS

import cantools
import can

def decode_log(dbc_path, log_path, output_csv):
    db = cantools.database.load_file(dbc_path)
    log = can.BLFReader(log_path)  # or ASCReader, CSVReader

    signals = {}
    for msg in log:
        try:
            decoded = db.decode_message(msg.arbitration_id, msg.data)
            for name, value in decoded.items():
                signals.setdefault(name, []).append((msg.timestamp, value))
        except:
            pass

    # Align to common timebase, write CSV
    write_aligned_csv(signals, output_csv)

What changes per customer: Only dbc_path. The decode logic is universal.

Dependencies: pip install cantools python-can


Stage 2: Extract Features (reusable, config-driven)

Input: Decoded CSV Output: Numpy arrays ready for ML inference

# extract_features.py — parameterized by pack_config.json

import json
import numpy as np

def extract(csv_path, config_path):
    config = json.load(open(config_path))

    # Config maps customer signal names to model inputs
    # {
    #   "pack_voltage": "BMS_PackVoltage",     ← changes per customer
    #   "pack_current": "BMS_PackCurrent",     ← changes per customer
    #   "cell_temp_signals": ["CellT_01", "CellT_02", ...],
    #   "cell_voltage_signals": ["CellV_01", "CellV_02", ...],
    #   "soc_signal": "BMS_SOC",
    #   "cells_in_series": 96,
    #   "nominal_cell_voltage": 3.7
    # }

    data = load_csv(csv_path)

    # Normalize pack voltage to per-cell (makes models topology-independent)
    pack_v = data[config["pack_voltage"]]
    cell_v = pack_v / config["cells_in_series"]

    pack_i = data[config["pack_current"]]
    temp_avg = np.mean([data[s] for s in config["cell_temp_signals"]], axis=0)
    temp_max = np.max([data[s] for s in config["cell_temp_signals"]], axis=0)

    # Build feature matrix: [V_cell, I, T_avg, T_max, 0]
    # velocity=0 for bench data (model trained with this feature, set to 0)
    features = np.column_stack([cell_v, pack_i, temp_avg, temp_max,
                                np.zeros(len(cell_v))])

    return features, data.get(config.get("soc_signal"), None)

What changes per customer: pack_config.json — signal name mapping + cell count. ~15 minutes to create from their DBC.


Stage 3: ML Inference (fully reusable, zero changes)

Input: Feature numpy arrays Output: Predictions for all 5 models

# ml_inference.py — same code for every customer

import onnxruntime as ort
import numpy as np
from pathlib import Path

MODEL_DIR = Path(__file__).parent.parent / "taktflow-bms-ml" / "models" / "bms"
NORM_DIR = Path(__file__).parent.parent / "taktflow-bms-ml" / "data" / "bms-processed"

class BMSInference:
    def __init__(self):
        self.soc = ort.InferenceSession(str(MODEL_DIR / "soc_lstm.onnx"))
        self.soh = ort.InferenceSession(str(MODEL_DIR / "soh_lstm.onnx"))
        self.thermal = ort.InferenceSession(str(MODEL_DIR / "thermal_cnn.onnx"))
        self.imbalance = ort.InferenceSession(str(MODEL_DIR / "imbalance_cnn.onnx"))
        self.rul = ort.InferenceSession(str(MODEL_DIR / "rul_transformer.onnx"))

        # Normalization stats from training
        self.soc_mean = np.load(NORM_DIR / "soc_norm_mean.npy")
        self.soc_std = np.load(NORM_DIR / "soc_norm_std.npy")

    def predict_soc(self, features, window=200):
        """Sliding window SOC prediction.
        features: (N, 5) array [V_cell, I, T_avg, T_max, velocity]
        Returns: (M,) SOC predictions, one per window
        """
        predictions = []
        for i in range(window, len(features)):
            window_data = features[i-window:i]
            x = (window_data - self.soc_mean) / self.soc_std
            x = x.astype(np.float32).reshape(1, window, 5)
            soc = self.soc.run(None, {"bms_window": x})[0][0]
            predictions.append(soc)
        return np.array(predictions)

    def predict_thermal_risk(self, features, window=30):
        """Sliding window thermal anomaly scoring.
        features: (N, 4) array [V, I, T, dT/dt]
        Returns: (M,) risk scores 0-1
        """
        risks = []
        for i in range(window, len(features)):
            x = features[i-window:i].astype(np.float32).reshape(1, window, 4)
            risk = self.thermal.run(None, {"bms_window": x})[0][0]
            risks.append(risk)
        return np.array(risks)

    def predict_imbalance(self, cell_voltages, window=20):
        """Cell imbalance detection.
        cell_voltages: (N, num_cells) array
        Returns: per-window binary (0=healthy, 1=imbalanced)
        """
        predictions = []
        for i in range(window, len(cell_voltages)):
            x = cell_voltages[i-window:i].astype(np.float32)
            x = x.reshape(1, window, -1)
            pred = self.imbalance.run(None, {"bms_window": x})[0][0]
            predictions.append(pred)
        return np.array(predictions)

    def predict_soh(self, cycle_features, window=30):
        """SOH from cycle-level features.
        cycle_features: (N_cycles, 6) [V, I, T, capacity, resistance, cycle]
        Returns: SOH %
        """
        if len(cycle_features) < window:
            return None  # need cycling history
        x = cycle_features[-window:].astype(np.float32).reshape(1, window, -1)
        return self.soh.run(None, {"bms_window": x})[0][0]

What changes per customer: Nothing. Models are trained on per-cell-normalized data. Any BMS, any topology.


Stage 4: Report Generation (reusable template)

# generate_report.py — parameterized by customer name + results

def generate_soc_report(customer, test_name, ml_soc, bms_soc, timestamps):
    rmse = np.sqrt(np.mean((ml_soc - bms_soc)**2))
    max_drift = np.max(np.abs(ml_soc - bms_soc))
    drift_at_end = ml_soc[-1] - bms_soc[-1]

    report = f"""
    # SOC Audit Report — {customer}
    ## Test: {test_name}

    | Metric | Value |
    |--------|-------|
    | ML SOC RMSE vs BMS | {rmse:.2f}% |
    | Max instantaneous difference | {max_drift:.2f}% |
    | End-of-test drift | {drift_at_end:+.2f}% |
    | ML model | SOC LSTM (BiLSTM 128→64, trained BMW i3 + NASA) |
    | Confidence | Based on {len(ml_soc)} prediction windows |

    ## Interpretation
    {"BMS SOC tracking is within industry standard (<2%)." if rmse < 2
     else f"BMS SOC shows {rmse:.1f}% RMSE — investigate coulomb counting drift."}

    [SOC comparison plot attached]
    """
    return report

def generate_thermal_report(customer, test_name, risk_scores, temps, timestamps):
    peak_risk = np.max(risk_scores)
    high_risk_periods = np.sum(risk_scores > 0.5)

    report = f"""
    # Thermal Risk Report — {customer}
    ## Test: {test_name}

    | Metric | Value |
    |--------|-------|
    | Peak thermal risk score | {peak_risk:.3f} |
    | Periods with risk > 0.5 | {high_risk_periods} ({high_risk_periods*100/len(risk_scores):.1f}%) |
    | Max cell temperature | {np.max(temps):.1f}°C |
    | ML model | Thermal CNN (trained NREL 364 abuse tests) |

    ## Findings
    {"No thermal anomalies detected." if peak_risk < 0.3
     else f"Elevated thermal risk detected. {high_risk_periods} windows scored above 0.5."}

    [Thermal risk overlay plot attached]
    """
    return report

def generate_cell_health_report(customer, cell_voltages_at_rest):
    n_cells = cell_voltages_at_rest.shape[1]
    mean_v = np.mean(cell_voltages_at_rest, axis=0)
    spread = np.max(mean_v) - np.min(mean_v)
    weakest = np.argmin(mean_v)
    deviation = mean_v[weakest] - np.mean(mean_v)

    report = f"""
    # Cell Health Report — {customer}

    | Metric | Value |
    |--------|-------|
    | Cells monitored | {n_cells} |
    | Voltage spread at rest | {spread*1000:.1f} mV |
    | Weakest cell | Cell {weakest+1} ({deviation*1000:+.1f} mV from mean) |
    | Imbalance threshold | >30mV = investigate |

    ## Cell Voltage Bar Chart
    [attached]

    ## Recommendation
    {"Cell voltages are well balanced." if spread < 0.020
     else f"Cell {weakest+1} shows {abs(deviation)*1000:.0f}mV deviation — monitor for further degradation."}
    """
    return report

What changes per customer: Customer name string. Everything else is data-driven.


Stage 5: Live CAN Sidecar (reusable, config-driven)

For bench deployment — reads CAN in real-time, publishes ML predictions.

# ml_sidecar.py — runs on bench laptop, reads CAN, publishes ML predictions

import can
import cantools
import json
import collections
import numpy as np
import onnxruntime as ort
import time

class MLSidecar:
    def __init__(self, dbc_path, config_path, can_interface="can0"):
        self.db = cantools.database.load_file(dbc_path)
        self.config = json.load(open(config_path))
        self.bus = can.interface.Bus(can_interface, bustype="socketcan")
        self.inference = BMSInference()  # from Stage 3

        # Sliding window buffers
        self.soc_window = collections.deque(maxlen=200)
        self.thermal_window = collections.deque(maxlen=30)

        # Latest decoded values
        self.pack_v = 0.0
        self.pack_i = 0.0
        self.temp_avg = 25.0
        self.temp_max = 25.0
        self.last_inference = 0

    def run(self):
        """Main loop: read CAN → decode → infer → publish."""
        while True:
            msg = self.bus.recv(timeout=0.1)
            if msg is None:
                continue

            # Decode
            try:
                decoded = self.db.decode_message(msg.arbitration_id, msg.data)
                self.update_state(decoded)
            except:
                continue

            # Infer at 1Hz (not per frame)
            now = time.monotonic()
            if now - self.last_inference >= 1.0:
                self.last_inference = now
                self.do_inference()

    def update_state(self, decoded):
        """Update internal state from decoded CAN signals."""
        cfg = self.config
        if cfg["pack_voltage"] in decoded:
            self.pack_v = decoded[cfg["pack_voltage"]]
        if cfg["pack_current"] in decoded:
            self.pack_i = decoded[cfg["pack_current"]]
        # ... temperature signals similarly

        cell_v = self.pack_v / self.config["cells_in_series"]
        self.soc_window.append([cell_v, self.pack_i, self.temp_avg,
                                self.temp_max, 0.0])

    def do_inference(self):
        """Run all models, publish results on CAN."""
        if len(self.soc_window) >= 200:
            features = np.array(self.soc_window, dtype=np.float32)
            x = (features - self.inference.soc_mean) / self.inference.soc_std
            x = x.reshape(1, 200, 5)
            soc = self.inference.soc.run(None, {"bms_window": x})[0][0]

            # Publish ML SOC on CAN ID 0x700
            soc_bytes = int(soc * 100).to_bytes(2, "big")
            self.bus.send(can.Message(arbitration_id=0x700,
                                     data=soc_bytes + bytes(6)))

    def publish(self, can_id, data):
        self.bus.send(can.Message(arbitration_id=can_id, data=data))

What changes per customer: dbc_path and config_path (signal name mapping). Same sidecar binary.


Stage 6: SIL Plant Model (reusable, parameter-driven)

For customers who want simulation without the bench.

# plant_model_calibrated.py — parameterized from real bench data

import json
import numpy as np

class CalibratedPlantModel:
    def __init__(self, config_path):
        cfg = json.load(open(config_path))
        self.n_cells = cfg["cells_in_series"]
        self.n_parallel = cfg["cells_in_parallel"]
        self.capacity_ah = cfg["nominal_capacity_ah"]
        self.r_internal = cfg["r_internal_ohm"]       # from bench data
        self.ocv_soc_table = np.array(cfg["ocv_curve"])  # from bench rest periods
        self.thermal_mass = cfg["thermal_mass_j_per_k"]
        self.cell_spread_mv = cfg["cell_spread_mv"]   # from bench OCV measurement
        self.soc = cfg.get("initial_soc", 0.5)
        self.temp = cfg.get("initial_temp_c", 25.0)

    def step(self, dt, current_a):
        """Advance one timestep. Returns cell voltages, pack voltage, temperature."""
        # SOC update (coulomb counting)
        self.soc -= (current_a * dt) / (self.capacity_ah * 3600)
        self.soc = np.clip(self.soc, 0, 1)

        # OCV from SOC (interpolate table from bench data)
        ocv = np.interp(self.soc, self.ocv_soc_table[:, 0], self.ocv_soc_table[:, 1])

        # Per-cell voltage with IR drop + cell-to-cell spread
        cell_v_base = ocv - current_a * self.r_internal / self.n_parallel
        cell_voltages = cell_v_base + np.random.normal(0, self.cell_spread_mv/1000,
                                                        self.n_cells)

        # Temperature (simple thermal model)
        power_dissipated = current_a**2 * self.r_internal
        self.temp += (power_dissipated * dt) / self.thermal_mass
        self.temp -= 0.1 * (self.temp - 25.0) * dt  # ambient cooling

        pack_v = np.sum(cell_voltages)
        return cell_voltages, pack_v, current_a, self.temp, self.soc

What changes per customer: plant_config.json — extracted from their bench data. The plant model code is identical.

Config extraction script (run once on their CAN logs):

# calibrate_plant.py — extract plant parameters from customer CAN logs

def calibrate(decoded_csv, config):
    data = load_csv(decoded_csv)

    params = {}
    params["cells_in_series"] = config["cells_in_series"]
    params["cells_in_parallel"] = config["cells_in_parallel"]

    # OCV curve: voltage at rest periods (|current| < 0.5A for >60s)
    rest_mask = find_rest_periods(data[config["pack_current"]],
                                  threshold=0.5, min_duration=60)
    params["ocv_curve"] = extract_ocv_soc(
        data[config["pack_voltage"]][rest_mask] / config["cells_in_series"],
        data[config["soc_signal"]][rest_mask]
    )

    # Internal resistance: dV/dI at load steps
    params["r_internal_ohm"] = estimate_resistance(
        data[config["pack_voltage"]],
        data[config["pack_current"]],
        config["cells_in_series"]
    )

    # Cell spread: std of cell voltages at rest
    cell_vs = [data[s] for s in config["cell_voltage_signals"]]
    params["cell_spread_mv"] = float(np.std(np.array(cell_vs), axis=0).mean() * 1000)

    # Thermal mass: from temperature rise during known power
    params["thermal_mass_j_per_k"] = estimate_thermal_mass(
        data[config["cell_temp_signals"][0]],
        data[config["pack_current"]],
        params["r_internal_ohm"]
    )

    params["nominal_capacity_ah"] = config["nominal_capacity_ah"]

    return params

Per-Customer Config File (the only thing that changes)

{
    "customer": "Example BMS GmbH",
    "pack_voltage": "BMS_HV_Voltage",
    "pack_current": "BMS_HV_Current",
    "soc_signal": "BMS_SOC_Display",
    "soh_signal": "BMS_SOH",
    "cell_voltage_signals": [
        "CMB_CellV_01", "CMB_CellV_02", "CMB_CellV_03",
        "CMB_CellV_04", "CMB_CellV_05", "CMB_CellV_06"
    ],
    "cell_temp_signals": [
        "CMB_CellT_01", "CMB_CellT_02", "CMB_CellT_03"
    ],
    "cells_in_series": 96,
    "cells_in_parallel": 3,
    "nominal_capacity_ah": 60,
    "nominal_cell_voltage": 3.7,
    "chemistry": "NMC"
}

Time to create: 15 minutes with their DBC file open. Map signal names, fill in pack specs.


What Gets Reused vs What's Per-Customer

Component Lines of Code Reused? Per-Customer Work
decode_customer_can.py ~50 100% reused 0
extract_features.py ~80 100% reused 0
ml_inference.py ~120 100% reused 0
generate_report.py ~150 100% reused 0
ml_sidecar.py ~100 100% reused 0
plant_model_calibrated.py ~80 100% reused 0
calibrate_plant.py ~100 100% reused 0
5 ONNX models 100% reused (fine-tune if chemistry differs) 0 or 1 day
pack_config.json ~25 PER CUSTOMER 15 minutes
Normalization stats (.npy) Reused (retrain if fine-tuning) 0 or included in fine-tune
Total pipeline code ~680 lines 100% reusable 15 min config + optional 1 day fine-tune

Delivery Timeline by Customer Number

Customer Calendar Time What Happens
#0 (foxBMS demo) Now → 4-6 weeks Build everything. foxBMS SIL + fault injection + ML sidecar.
#1 (Customer A) +2 weeks Internal pilot. Real Bologna bench data. Validate + fine-tune.
#2 +1-2 weeks Swap config. Run pipeline. Generate reports.
#3 +1 week Routine. Config + run + deliver.
#10 +2-3 days Streamlined. Mostly automated.

File Structure (Final Pipeline)

taktflow-bms-ml/
├── models/bms/                    5 ONNX models (reusable)
├── data/bms-processed/            Normalization stats (reusable)
├── scripts/bms/                   Training scripts (if fine-tuning needed)
│
├── pipeline/                      ← THE REUSABLE PIPELINE
│   ├── decode_customer_can.py     Decode any CAN log with any DBC
│   ├── extract_features.py        Config-driven feature extraction
│   ├── ml_inference.py            All 5 ONNX models, sliding window
│   ├── generate_report.py         SOC audit, thermal, cell health reports
│   ├── ml_sidecar.py              Live CAN inference (bench deployment)
│   ├── plant_model_calibrated.py  Parameter-driven plant model
│   ├── calibrate_plant.py         Extract params from bench CAN logs
│   ├── run_audit.py               One-command: decode → infer → report
│   └── requirements.txt           cantools, python-can, onnxruntime, numpy
│
├── customers/                     ← PER-CUSTOMER CONFIG (15 min each)
│   ├── foxbms-demo/
│   │   └── pack_config.json
│   ├── customer-example/
│   │   └── pack_config.json
│   └── customer-template/
│       └── pack_config.json       Template with comments
│
└── reports/                       Generated output
    ├── foxbms-demo/
    ├── customer-example/
    └── ...

One-Command Audit

# For any customer, any CAN log:
python pipeline/run_audit.py \
    --dbc customers/customer-example/bologna.dbc \
    --config customers/customer-example/pack_config.json \
    --log /path/to/their/test_run.blf \
    --output reports/customer-example/

# Output:
#   reports/customer-example/soc_audit.pdf
#   reports/customer-example/thermal_risk.pdf
#   reports/customer-example/cell_health.pdf
#   reports/customer-example/raw_predictions.csv

15 minutes to create the config. 1 command to run. 3 reports out.