Files
ONE-OS/scripts/compare_h2_records_v2.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

334 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""比对「桃子表格」与「新表」(匹配键:加氢日期+车牌号),输出标黄/标红 Excel。"""
from __future__ import annotations
import re
import shutil
from collections import defaultdict
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill
NAME_PEACH = "桃子表格"
NAME_NEW = "新表"
FILE_PEACH = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/"
"加氢记录-2026.1月-4月(2)(1).xlsx"
)
FILE_NEW = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/"
"氢费台账总表2026.5.21(1)(2)(2).xlsx"
)
OUT_FILE = Path("/Users/sylvawong/Desktop/CURSOR/ONE-OS/桃子表格与新表比对结果.xlsx")
OUT_DESKTOP = Path("/Users/sylvawong/Desktop/桃子表格与新表比对结果.xlsx")
# (桃子表格列, 新表列, 展示名)
COMPARE_FIELDS = [
("加氢站", "加氢站名称", "加氢站"),
("加气量kg)", "加氢量(kg)", "加气量kg)"),
("成本单价", "成本单价(元/kg)", "成本单价"),
("成本金额", "成本费用(元)", "成本金额"),
("加氢单价", "加氢单价(元/kg)", "加氢单价"),
("加氢金额", "加氢金额(元)", "加氢金额"),
("客户名", "客户名称", "客户名"),
("氢费计算方式", "结算类别", "氢费计算方式"),
]
YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
RED = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid")
RED_FONT = Font(color="9C0006", bold=True)
REMARK_COL = "差异说明"
def new_col_prefix(label: str) -> str:
return f"{NAME_NEW}_{label}"
def read_peach_sheet(path: Path) -> pd.DataFrame:
raw = pd.read_excel(path, sheet_name=0, header=None)
raw.columns = [
str(c).strip() if pd.notna(c) else f"_col{i}" for i, c in enumerate(raw.iloc[1])
]
df = raw.iloc[2:].copy().reset_index(drop=True)
if "加气量kg" in df.columns:
df = df.rename(columns={"加气量kg": "加气量kg)"})
return df
def read_new_sheet(path: Path) -> pd.DataFrame:
raw = pd.read_excel(path, sheet_name="加氢订单", header=None)
cols_raw = list(raw.iloc[1])
cols = []
seen: dict[str, int] = {}
for i, c in enumerate(cols_raw):
name = str(c).strip() if pd.notna(c) else f"_col{i}"
if name in seen:
seen[name] += 1
name = f"{name}_{seen[name]}"
else:
seen[name] = 0
cols.append(name)
df = raw.iloc[2:].copy().reset_index(drop=True)
df.columns = cols
return df
def parse_date(val) -> str:
if pd.isna(val):
return ""
if isinstance(val, pd.Timestamp):
return val.strftime("%Y-%m-%d")
if isinstance(val, (int, float)) and val > 40000:
try:
return (pd.Timestamp("1899-12-30") + pd.Timedelta(days=int(val))).strftime(
"%Y-%m-%d"
)
except Exception:
pass
s = str(val).strip()
try:
return pd.to_datetime(s).strftime("%Y-%m-%d")
except Exception:
return s[:10] if len(s) >= 10 else s
def norm_plate(val) -> str:
if pd.isna(val):
return ""
return re.sub(r"[\s\-·]", "", str(val).strip().upper())
def norm_str(val) -> str:
if pd.isna(val):
return ""
return str(val).strip()
def norm_num(val) -> float | None:
if pd.isna(val) or str(val).strip() in ("", "-", "nan"):
return None
try:
return float(val)
except Exception:
return None
def values_equal(a, b, numeric: bool) -> bool:
if numeric:
na, nb = norm_num(a), norm_num(b)
if na is None and nb is None:
return True
if na is None or nb is None:
return False
return abs(na - nb) <= 0.02
return norm_str(a) == norm_str(b)
def make_key(row, date_col, plate_col) -> str:
d = parse_date(row[date_col])
p = norm_plate(row[plate_col])
return f"{d}|{p}" if d and p else ""
def build_index(df, date_col, plate_col):
idx: dict[str, list[int]] = defaultdict(list)
for i, row in df.iterrows():
k = make_key(row, date_col, plate_col)
if k:
idx[k].append(i)
return idx
def pair_rows(a_list: list[int], b_list: list[int]):
n = min(len(a_list), len(b_list))
return list(zip(a_list[:n], b_list[:n]))
def compare_pair(row_peach: pd.Series, row_new: pd.Series) -> tuple[list[str], list[str]]:
diffs = []
notes = []
for col_p, col_n, label in COMPARE_FIELDS:
numeric = label in ("加气量kg)", "成本单价", "成本金额", "加氢单价", "加氢金额")
vp = row_peach.get(col_p)
vn = row_new.get(col_n)
if not values_equal(vp, vn, numeric):
diffs.append(label)
notes.append(
f"{label}:{NAME_PEACH}[{norm_str(vp)}]≠{NAME_NEW}[{norm_str(vn)}]"
)
return diffs, notes
def new_to_peach_fields(row_new: pd.Series) -> dict:
return {
"序号": row_new.get("订单编号", row_new.get("订单编号_1", "")),
"日期": row_new.get("加氢时间"),
"车牌": row_new.get("车牌号"),
"加气量kg)": row_new.get("加氢量(kg)"),
"加氢站": row_new.get("加氢站名称"),
"成本单价": row_new.get("成本单价(元/kg)"),
"成本金额": row_new.get("成本费用(元)"),
"加氢单价": row_new.get("加氢单价(元/kg)"),
"加氢金额": row_new.get("加氢金额(元)"),
"客户名": row_new.get("客户名称"),
"氢费计算方式": row_new.get("结算类别"),
}
def add_new_cols(row_new: pd.Series | None) -> dict:
if row_new is None:
return {new_col_prefix(label): "" for _, _, label in COMPARE_FIELDS}
out = {}
for _p, col_n, label in COMPARE_FIELDS:
out[new_col_prefix(label)] = row_new.get(col_n, "")
return out
def run():
df_peach = read_peach_sheet(FILE_PEACH)
df_new = read_new_sheet(FILE_NEW)
df_new["_date_parsed"] = df_new["加氢时间"].apply(parse_date)
df_new_14 = df_new[
(df_new["_date_parsed"] >= "2026-01-01") & (df_new["_date_parsed"] < "2026-05-01")
].copy()
idx_peach = build_index(df_peach, "日期", "车牌")
idx_new = build_index(df_new_14, "加氢时间", "车牌号")
all_keys = set(idx_peach) | set(idx_new)
pairs: list[tuple[int | None, int | None, str]] = []
for k in sorted(all_keys):
pa, pb = idx_peach.get(k, []), idx_new.get(k, [])
paired = pair_rows(pa, pb)
for ia, ib in paired:
pairs.append((ia, ib, k))
for ia in pa[len(paired) :]:
pairs.append((ia, None, k))
for ib in pb[len(paired) :]:
pairs.append((None, ib, k))
rows_out = []
diff_cells: dict[int, set[str]] = defaultdict(set)
red_rows: set[int] = set()
peach_cols = list(df_peach.columns)
for ia, ib, k in pairs:
excel_row = len(rows_out) + 2
remark_parts = []
is_red = False
if ia is not None:
base = df_peach.loc[ia].to_dict()
base["数据来源"] = NAME_PEACH
else:
row_new = df_new_14.loc[ib]
base = {c: "" for c in peach_cols}
base.update(new_to_peach_fields(row_new))
base["数据来源"] = f"{NAME_NEW}补录"
is_red = True
remark_parts.append(
f"{NAME_PEACH}缺失:本条由【{NAME_NEW}】补录添加(日期+车牌在新表中存在、桃子表格中无对应行)"
)
row_new = df_new_14.loc[ib] if ib is not None else None
if ib is None and ia is not None:
is_red = True
if k in idx_new:
remark_parts.append(
f"同键重复:{NAME_PEACH}行数多于{NAME_NEW},本条在{NAME_PEACH}有但未与{NAME_NEW}配对"
)
else:
remark_parts.append(
f"{NAME_NEW}缺失:【{NAME_NEW}】加氢订单(1-4月)中无相同加氢日期+车牌"
)
if ia is not None and ib is not None:
diffs, notes = compare_pair(df_peach.loc[ia], row_new)
if diffs:
remark_parts.append("字段差异:" + "".join(notes))
for label in diffs:
diff_cells[excel_row].add(label)
diff_cells[excel_row].add(new_col_prefix(label))
out_row = base
out_row.update(add_new_cols(row_new))
out_row[REMARK_COL] = "".join(remark_parts) if remark_parts else "一致"
if is_red:
red_rows.add(excel_row)
rows_out.append(out_row)
df_out = pd.DataFrame(rows_out)
matched = sum(1 for ia, ib, _ in pairs if ia is not None and ib is not None)
only_peach = sum(1 for ia, ib, _ in pairs if ia is not None and ib is None)
only_new = sum(1 for ia, ib, _ in pairs if ia is None and ib is not None)
diff_count = sum(
1
for ia, ib, _ in pairs
if ia is not None
and ib is not None
and compare_pair(df_peach.loc[ia], df_new_14.loc[ib])[0]
)
summary = pd.DataFrame(
[
[f"{NAME_PEACH}行数", len(df_peach)],
[f"{NAME_NEW}行数(加氢订单1-4月)", len(df_new_14)],
["匹配键", "加氢日期 + 车牌号"],
["成功配对", matched],
[f"{NAME_PEACH}", only_peach],
[f"{NAME_NEW}有(已补录)", only_new],
["配对但字段有差异", diff_count],
],
columns=["项目", "数量/说明"],
)
OUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="比对汇总", index=False)
df_out.to_excel(writer, sheet_name="比对结果", index=False)
apply_styles(df_out, diff_cells, red_rows)
shutil.copy(OUT_FILE, OUT_DESKTOP)
print(f"已输出: {OUT_FILE}")
print(f"已复制: {OUT_DESKTOP}")
print(summary.to_string(index=False))
def col_letter_map(columns: list) -> dict[str, int]:
return {str(c): i + 1 for i, c in enumerate(columns)}
def apply_styles(df: pd.DataFrame, diff_cells: dict, red_rows: set):
wb = load_workbook(OUT_FILE)
ws = wb["比对结果"]
cmap = col_letter_map(list(df.columns))
label_to_col = {label: label for _, _, label in COMPARE_FIELDS}
for _, _, label in COMPARE_FIELDS:
label_to_col[label] = label
label_to_col[new_col_prefix(label)] = new_col_prefix(label)
for row_idx, labels in diff_cells.items():
for label in labels:
col = label_to_col.get(label, label)
if col in cmap:
ws.cell(row=row_idx, column=cmap[col]).fill = YELLOW
for row_idx in red_rows:
for c in range(1, ws.max_column + 1):
ws.cell(row=row_idx, column=c).fill = RED
if REMARK_COL in cmap:
ws.cell(row=row_idx, column=cmap[REMARK_COL]).font = RED_FONT
wb.save(OUT_FILE)
if __name__ == "__main__":
run()