Files
ONE-OS/scripts/compare_h2_records.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

362 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""比对「加氢记录」与「氢费台账-加氢订单」,输出标黄/标红 Excel。"""
from __future__ import annotations
import re
from collections import defaultdict
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill
FILE_RECORD = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/msg/file/2026-05/"
"加氢记录-2026.1月-4月.xlsx"
)
FILE_LEDGER = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/msg/file/2026-05/"
"氢费台账总表2026.5.21(1).xlsx"
)
OUT_DIR = Path("/Users/sylvawong/Desktop/CURSOR/ONE-OS")
OUT_FILE = OUT_DIR / "加氢记录与台账比对结果.xlsx"
COMPARE_FIELDS = [
("加氢站", "加氢站名称", "加氢站"),
("成本单价", "成本单价(元/kg)", "成本单价"),
("成本金额", "成本费用(元)", "成本金额"),
("加氢单价", "加氢单价(元/kg)", "加氢单价"),
("加氢金额", "加氢金额(元)", "加氢金额"),
("客户名", "客户名称", "客户名"),
("氢费计算方式", "结算类别", "氢费计算方式"),
]
YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
RED = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid")
RED_FONT = Font(color="9C0006", bold=True)
def read_record_sheet(path: Path) -> pd.DataFrame:
raw = pd.read_excel(path, sheet_name=0, header=None)
# 第 0 行为汇总行,第 1 行为表头
raw.columns = [
str(c).strip() if pd.notna(c) else f"_col{i}" for i, c in enumerate(raw.iloc[1])
]
df = raw.iloc[2:].copy().reset_index(drop=True)
rename = {
"序号": "序号",
"日期": "日期",
"车牌": "车牌",
"加气量kg)": "加气量kg)",
"加气量kg": "加气量kg)",
"加氢站": "加氢站",
"成本单价": "成本单价",
"成本金额": "成本金额",
"加氢单价": "加氢单价",
"加氢金额": "加氢金额",
"客户名": "客户名",
"氢费计算方式": "氢费计算方式",
}
for old, new in rename.items():
if old in df.columns:
df = df.rename(columns={old: new})
return df
def parse_date(val) -> str:
if pd.isna(val):
return ""
if isinstance(val, pd.Timestamp):
return val.strftime("%Y-%m-%d")
if isinstance(val, (int, float)) and val > 40000:
try:
return (pd.Timestamp("1899-12-30") + pd.Timedelta(days=int(val))).strftime("%Y-%m-%d")
except Exception:
pass
s = str(val).strip()
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y-%m-%d %H:%M:%S"):
try:
return pd.to_datetime(s).strftime("%Y-%m-%d")
except Exception:
continue
try:
return pd.to_datetime(val).strftime("%Y-%m-%d")
except Exception:
return s[:10] if len(s) >= 10 else s
def norm_plate(val) -> str:
if pd.isna(val):
return ""
return re.sub(r"[\s\-·]", "", str(val).strip().upper())
def norm_kg(val, tol_decimals=3) -> str:
if pd.isna(val) or str(val).strip() in ("", "-", "nan"):
return ""
try:
return f"{round(float(val), tol_decimals):.{tol_decimals}f}"
except Exception:
return str(val).strip()
def norm_str(val) -> str:
if pd.isna(val):
return ""
return str(val).strip()
def norm_num(val, tol=0.02) -> float | None:
if pd.isna(val) or str(val).strip() in ("", "-", "nan"):
return None
try:
return float(val)
except Exception:
return None
def values_equal(a, b, numeric: bool) -> bool:
if numeric:
na, nb = norm_num(a), norm_num(b)
if na is None and nb is None:
return True
if na is None or nb is None:
return False
return abs(na - nb) <= 0.02
return norm_str(a) == norm_str(b)
def make_key(row, date_col, plate_col, kg_col) -> str:
return f"{parse_date(row[date_col])}|{norm_plate(row[plate_col])}|{norm_kg(row[kg_col])}"
def build_index(df, date_col, plate_col, kg_col):
"""同一 key 可能多行,用列表保存行号。"""
idx: dict[str, list[int]] = defaultdict(list)
for i, row in df.iterrows():
k = make_key(row, date_col, plate_col, kg_col)
parts = k.split("|")
if parts[0] and parts[1] and parts[2]:
idx[k].append(i)
return idx
def pair_rows(keys_a: list[int], keys_b: list[int]):
"""按顺序一对一配对,多余行不配。"""
n = min(len(keys_a), len(keys_b))
return list(zip(keys_a[:n], keys_b[:n]))
def compare_pair(row_a: pd.Series, row_b: pd.Series) -> tuple[list[str], list[str]]:
"""返回 (差异字段名列表, 备注片段列表)。"""
diffs = []
notes = []
for col_a, col_b, label in COMPARE_FIELDS:
numeric = label in ("成本单价", "成本金额", "加氢单价", "加氢金额")
va = row_a.get(col_a)
vb = row_b.get(col_b)
if not values_equal(va, vb, numeric):
diffs.append(label)
notes.append(f"{label}:记录[{norm_str(va)}]≠台账[{norm_str(vb)}]")
return diffs, notes
def add_ledger_cols(df: pd.DataFrame, row_b: pd.Series | None) -> dict:
if row_b is None:
return {f"台账_{label}": "" for _, _, label in COMPARE_FIELDS}
out = {}
for col_a, col_b, label in COMPARE_FIELDS:
out[f"台账_{label}"] = row_b.get(col_b, "")
return out
def run():
df_rec = read_record_sheet(FILE_RECORD)
df_led = pd.read_excel(FILE_LEDGER, sheet_name="加氢订单")
# 台账限定 2026-01 ~ 2026-04与加氢记录范围一致
df_led["_date_parsed"] = df_led["加氢时间"].apply(parse_date)
df_led_14 = df_led[
(df_led["_date_parsed"] >= "2026-01-01") & (df_led["_date_parsed"] < "2026-05-01")
].copy()
idx_rec = build_index(df_rec, "日期", "车牌", "加气量kg)")
idx_led = build_index(df_led_14, "加氢时间", "车牌号", "加氢量(kg)")
all_keys = set(idx_rec) | set(idx_led)
pairs: list[tuple[int | None, int | None, str]] = []
for k in sorted(all_keys):
ra = idx_rec.get(k, [])
rb = idx_led.get(k, [])
paired = pair_rows(ra, rb)
for ia, ib in paired:
pairs.append((ia, ib, k))
for ia in ra[len(paired) :]:
pairs.append((ia, None, k))
for ib in rb[len(paired) :]:
pairs.append((None, ib, k))
# --- 加氢记录侧输出 ---
rows_rec_out = []
rec_diff_cells: dict[int, set[str]] = defaultdict(set) # excel row -> col names
rec_red_rows: set[int] = set()
for ia, ib, k in pairs:
if ia is None:
continue
row_a = df_rec.loc[ia]
row_b = df_led_14.loc[ib] if ib is not None else None
extra = add_ledger_cols(df_rec, row_b)
remark_parts = []
if ib is None:
if k in idx_led:
remark_parts.append("同键重复:加氢记录行数多于台账,本条未配对")
else:
remark_parts.append("台账缺失:加氢订单(1-4月)中无匹配记录")
rec_red_rows.add(len(rows_rec_out) + 2) # +2: header + 1-based
else:
diffs, notes = compare_pair(row_a, row_b)
if diffs:
remark_parts.append("字段差异:" + "".join(notes))
for label in diffs:
rec_diff_cells[len(rows_rec_out) + 2].add(label)
rec_diff_cells[len(rows_rec_out) + 2].add(f"台账_{label}")
out_row = row_a.to_dict()
out_row.update(extra)
out_row["比对备注"] = "".join(remark_parts) if remark_parts else "一致"
rows_rec_out.append(out_row)
df_rec_out = pd.DataFrame(rows_rec_out)
# --- 加氢订单侧输出(含仅台账有、加氢记录无)---
rows_led_out = []
led_diff_cells: dict[int, set[str]] = defaultdict(set)
led_red_rows: set[int] = set()
for ia, ib, k in pairs:
if ib is None:
continue
row_b = df_led_14.loc[ib]
row_a = df_rec.loc[ia] if ia is not None else None
out_row = row_b.drop(labels=["_date_parsed"], errors="ignore").to_dict()
remark_parts = []
if ia is None:
if k in idx_rec:
remark_parts.append("同键重复:台账行数多于加氢记录,本条未配对")
else:
remark_parts.append("加氢记录缺失:加氢记录表中无匹配记录")
led_red_rows.add(len(rows_led_out) + 2)
else:
diffs, notes = compare_pair(row_a, row_b)
if diffs:
remark_parts.append("字段差异:" + "".join(notes))
col_map = {label: col_b for _, col_b, label in COMPARE_FIELDS}
for label in diffs:
led_diff_cells[len(rows_led_out) + 2].add(col_map[label])
out_row["比对备注"] = "".join(remark_parts) if remark_parts else "一致"
rows_led_out.append(out_row)
df_led_out = pd.DataFrame(rows_led_out)
# 汇总
matched = sum(1 for ia, ib, _ in pairs if ia is not None and ib is not None)
only_rec = sum(1 for ia, ib, _ in pairs if ia is not None and ib is None)
only_led = sum(1 for ia, ib, _ in pairs if ia is None and ib is not None)
only_rec_true = sum(
1 for ia, ib, k in pairs if ia is not None and ib is None and k not in idx_led
)
only_rec_dup = only_rec - only_rec_true
only_led_true = sum(
1 for ia, ib, k in pairs if ia is None and ib is not None and k not in idx_rec
)
only_led_dup = only_led - only_led_true
diff_count = sum(
1
for ia, ib, _ in pairs
if ia is not None and ib is not None and compare_pair(df_rec.loc[ia], df_led_14.loc[ib])[0]
)
summary = pd.DataFrame(
[
["加氢记录行数", len(df_rec)],
["台账加氢订单(1-4月)行数", len(df_led_14)],
["匹配键(日期+车牌+加氢量)对数", matched],
["仅加氢记录有-合计", only_rec],
[" 其中台账真缺失", only_rec_true],
[" 其中同键重复多出", only_rec_dup],
["仅台账有-合计", only_led],
[" 其中加氢记录真缺失", only_led_true],
[" 其中同键重复多出", only_led_dup],
["匹配但字段有差异", diff_count],
],
columns=["项目", "数量"],
)
OUT_DIR.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="比对汇总", index=False)
df_rec_out.to_excel(writer, sheet_name="加氢记录-比对", index=False)
df_led_out.to_excel(writer, sheet_name="加氢订单-比对", index=False)
apply_styles_rec(df_rec_out, rec_diff_cells, rec_red_rows)
apply_styles_led(df_led_out, led_diff_cells, led_red_rows)
print(f"已输出: {OUT_FILE}")
print(summary.to_string(index=False))
def col_letter_map(columns: list) -> dict[str, int]:
return {str(c): i + 1 for i, c in enumerate(columns)}
def apply_styles_rec(df: pd.DataFrame, diff_cells: dict, red_rows: set):
wb = load_workbook(OUT_FILE)
ws = wb["加氢记录-比对"]
cmap = col_letter_map(list(df.columns))
label_to_col = {label: label for _, _, label in COMPARE_FIELDS}
label_to_col.update({label: f"台账_{label}" for _, _, label in COMPARE_FIELDS})
for row_idx, labels in diff_cells.items():
for label in labels:
col = label_to_col.get(label, label)
if col in cmap:
ws.cell(row=row_idx, column=cmap[col]).fill = YELLOW
for row_idx in red_rows:
for c in range(1, ws.max_column + 1):
ws.cell(row=row_idx, column=c).fill = RED
if "比对备注" in cmap:
ws.cell(row=row_idx, column=cmap["比对备注"]).font = RED_FONT
wb.save(OUT_FILE)
def apply_styles_led(df: pd.DataFrame, diff_cells: dict, red_rows: set):
wb = load_workbook(OUT_FILE)
ws = wb["加氢订单-比对"]
cmap = col_letter_map(list(df.columns))
for row_idx, labels in diff_cells.items():
for col_name in labels:
if col_name in cmap:
ws.cell(row=row_idx, column=cmap[col_name]).fill = YELLOW
for row_idx in red_rows:
for c in range(1, ws.max_column + 1):
ws.cell(row=row_idx, column=c).fill = RED
if "比对备注" in cmap:
ws.cell(row=row_idx, column=cmap["比对备注"]).font = RED_FONT
wb.save(OUT_FILE)
if __name__ == "__main__":
run()