#!/usr/bin/env python3 """比对「桃子表格」与「新表」(匹配键:加氢日期+车牌号),输出标黄/标红 Excel。""" from __future__ import annotations import re import shutil from collections import defaultdict from pathlib import Path import pandas as pd from openpyxl import load_workbook from openpyxl.styles import Font, PatternFill NAME_PEACH = "桃子表格" NAME_NEW = "新表" FILE_PEACH = Path( "/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/" "xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/" "加氢记录-(2026.1月-4月)(2)(1).xlsx" ) FILE_NEW = Path( "/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/" "xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/" "氢费台账总表2026.5.21(1)(2)(2).xlsx" ) OUT_FILE = Path("/Users/sylvawong/Desktop/CURSOR/ONE-OS/桃子表格与新表比对结果.xlsx") OUT_DESKTOP = Path("/Users/sylvawong/Desktop/桃子表格与新表比对结果.xlsx") # (桃子表格列, 新表列, 展示名) COMPARE_FIELDS = [ ("加氢站", "加氢站名称", "加氢站"), ("加气量(kg)", "加氢量(kg)", "加气量(kg)"), ("成本单价", "成本单价(元/kg)", "成本单价"), ("成本金额", "成本费用(元)", "成本金额"), ("加氢单价", "加氢单价(元/kg)", "加氢单价"), ("加氢金额", "加氢金额(元)", "加氢金额"), ("客户名", "客户名称", "客户名"), ("氢费计算方式", "结算类别", "氢费计算方式"), ] YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") RED = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid") RED_FONT = Font(color="9C0006", bold=True) REMARK_COL = "差异说明" def new_col_prefix(label: str) -> str: return f"{NAME_NEW}_{label}" def read_peach_sheet(path: Path) -> pd.DataFrame: raw = pd.read_excel(path, sheet_name=0, header=None) raw.columns = [ str(c).strip() if pd.notna(c) else f"_col{i}" for i, c in enumerate(raw.iloc[1]) ] df = raw.iloc[2:].copy().reset_index(drop=True) if "加气量(kg)" in df.columns: df = df.rename(columns={"加气量(kg)": "加气量(kg)"}) return df def read_new_sheet(path: Path) -> pd.DataFrame: raw = pd.read_excel(path, sheet_name="加氢订单", header=None) cols_raw = list(raw.iloc[1]) cols = [] seen: dict[str, int] = {} for i, c in enumerate(cols_raw): name = str(c).strip() if pd.notna(c) else f"_col{i}" if name in seen: seen[name] += 1 name = f"{name}_{seen[name]}" else: seen[name] = 0 cols.append(name) df = raw.iloc[2:].copy().reset_index(drop=True) df.columns = cols return df def parse_date(val) -> str: if pd.isna(val): return "" if isinstance(val, pd.Timestamp): return val.strftime("%Y-%m-%d") if isinstance(val, (int, float)) and val > 40000: try: return (pd.Timestamp("1899-12-30") + pd.Timedelta(days=int(val))).strftime( "%Y-%m-%d" ) except Exception: pass s = str(val).strip() try: return pd.to_datetime(s).strftime("%Y-%m-%d") except Exception: return s[:10] if len(s) >= 10 else s def norm_plate(val) -> str: if pd.isna(val): return "" return re.sub(r"[\s\-·]", "", str(val).strip().upper()) def norm_str(val) -> str: if pd.isna(val): return "" return str(val).strip() def norm_num(val) -> float | None: if pd.isna(val) or str(val).strip() in ("", "-", "nan"): return None try: return float(val) except Exception: return None def values_equal(a, b, numeric: bool) -> bool: if numeric: na, nb = norm_num(a), norm_num(b) if na is None and nb is None: return True if na is None or nb is None: return False return abs(na - nb) <= 0.02 return norm_str(a) == norm_str(b) def make_key(row, date_col, plate_col) -> str: d = parse_date(row[date_col]) p = norm_plate(row[plate_col]) return f"{d}|{p}" if d and p else "" def build_index(df, date_col, plate_col): idx: dict[str, list[int]] = defaultdict(list) for i, row in df.iterrows(): k = make_key(row, date_col, plate_col) if k: idx[k].append(i) return idx def pair_rows(a_list: list[int], b_list: list[int]): n = min(len(a_list), len(b_list)) return list(zip(a_list[:n], b_list[:n])) def compare_pair(row_peach: pd.Series, row_new: pd.Series) -> tuple[list[str], list[str]]: diffs = [] notes = [] for col_p, col_n, label in COMPARE_FIELDS: numeric = label in ("加气量(kg)", "成本单价", "成本金额", "加氢单价", "加氢金额") vp = row_peach.get(col_p) vn = row_new.get(col_n) if not values_equal(vp, vn, numeric): diffs.append(label) notes.append( f"{label}:{NAME_PEACH}[{norm_str(vp)}]≠{NAME_NEW}[{norm_str(vn)}]" ) return diffs, notes def new_to_peach_fields(row_new: pd.Series) -> dict: return { "序号": row_new.get("订单编号", row_new.get("订单编号_1", "")), "日期": row_new.get("加氢时间"), "车牌": row_new.get("车牌号"), "加气量(kg)": row_new.get("加氢量(kg)"), "加氢站": row_new.get("加氢站名称"), "成本单价": row_new.get("成本单价(元/kg)"), "成本金额": row_new.get("成本费用(元)"), "加氢单价": row_new.get("加氢单价(元/kg)"), "加氢金额": row_new.get("加氢金额(元)"), "客户名": row_new.get("客户名称"), "氢费计算方式": row_new.get("结算类别"), } def add_new_cols(row_new: pd.Series | None) -> dict: if row_new is None: return {new_col_prefix(label): "" for _, _, label in COMPARE_FIELDS} out = {} for _p, col_n, label in COMPARE_FIELDS: out[new_col_prefix(label)] = row_new.get(col_n, "") return out def run(): df_peach = read_peach_sheet(FILE_PEACH) df_new = read_new_sheet(FILE_NEW) df_new["_date_parsed"] = df_new["加氢时间"].apply(parse_date) df_new_14 = df_new[ (df_new["_date_parsed"] >= "2026-01-01") & (df_new["_date_parsed"] < "2026-05-01") ].copy() idx_peach = build_index(df_peach, "日期", "车牌") idx_new = build_index(df_new_14, "加氢时间", "车牌号") all_keys = set(idx_peach) | set(idx_new) pairs: list[tuple[int | None, int | None, str]] = [] for k in sorted(all_keys): pa, pb = idx_peach.get(k, []), idx_new.get(k, []) paired = pair_rows(pa, pb) for ia, ib in paired: pairs.append((ia, ib, k)) for ia in pa[len(paired) :]: pairs.append((ia, None, k)) for ib in pb[len(paired) :]: pairs.append((None, ib, k)) rows_out = [] diff_cells: dict[int, set[str]] = defaultdict(set) red_rows: set[int] = set() peach_cols = list(df_peach.columns) for ia, ib, k in pairs: excel_row = len(rows_out) + 2 remark_parts = [] is_red = False if ia is not None: base = df_peach.loc[ia].to_dict() base["数据来源"] = NAME_PEACH else: row_new = df_new_14.loc[ib] base = {c: "" for c in peach_cols} base.update(new_to_peach_fields(row_new)) base["数据来源"] = f"{NAME_NEW}补录" is_red = True remark_parts.append( f"{NAME_PEACH}缺失:本条由【{NAME_NEW}】补录添加(日期+车牌在新表中存在、桃子表格中无对应行)" ) row_new = df_new_14.loc[ib] if ib is not None else None if ib is None and ia is not None: is_red = True if k in idx_new: remark_parts.append( f"同键重复:{NAME_PEACH}行数多于{NAME_NEW},本条在{NAME_PEACH}有但未与{NAME_NEW}配对" ) else: remark_parts.append( f"{NAME_NEW}缺失:【{NAME_NEW}】加氢订单(1-4月)中无相同加氢日期+车牌" ) if ia is not None and ib is not None: diffs, notes = compare_pair(df_peach.loc[ia], row_new) if diffs: remark_parts.append("字段差异:" + ";".join(notes)) for label in diffs: diff_cells[excel_row].add(label) diff_cells[excel_row].add(new_col_prefix(label)) out_row = base out_row.update(add_new_cols(row_new)) out_row[REMARK_COL] = ";".join(remark_parts) if remark_parts else "一致" if is_red: red_rows.add(excel_row) rows_out.append(out_row) df_out = pd.DataFrame(rows_out) matched = sum(1 for ia, ib, _ in pairs if ia is not None and ib is not None) only_peach = sum(1 for ia, ib, _ in pairs if ia is not None and ib is None) only_new = sum(1 for ia, ib, _ in pairs if ia is None and ib is not None) diff_count = sum( 1 for ia, ib, _ in pairs if ia is not None and ib is not None and compare_pair(df_peach.loc[ia], df_new_14.loc[ib])[0] ) summary = pd.DataFrame( [ [f"{NAME_PEACH}行数", len(df_peach)], [f"{NAME_NEW}行数(加氢订单1-4月)", len(df_new_14)], ["匹配键", "加氢日期 + 车牌号"], ["成功配对", matched], [f"仅{NAME_PEACH}有", only_peach], [f"仅{NAME_NEW}有(已补录)", only_new], ["配对但字段有差异", diff_count], ], columns=["项目", "数量/说明"], ) OUT_FILE.parent.mkdir(parents=True, exist_ok=True) with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer: summary.to_excel(writer, sheet_name="比对汇总", index=False) df_out.to_excel(writer, sheet_name="比对结果", index=False) apply_styles(df_out, diff_cells, red_rows) shutil.copy(OUT_FILE, OUT_DESKTOP) print(f"已输出: {OUT_FILE}") print(f"已复制: {OUT_DESKTOP}") print(summary.to_string(index=False)) def col_letter_map(columns: list) -> dict[str, int]: return {str(c): i + 1 for i, c in enumerate(columns)} def apply_styles(df: pd.DataFrame, diff_cells: dict, red_rows: set): wb = load_workbook(OUT_FILE) ws = wb["比对结果"] cmap = col_letter_map(list(df.columns)) label_to_col = {label: label for _, _, label in COMPARE_FIELDS} for _, _, label in COMPARE_FIELDS: label_to_col[label] = label label_to_col[new_col_prefix(label)] = new_col_prefix(label) for row_idx, labels in diff_cells.items(): for label in labels: col = label_to_col.get(label, label) if col in cmap: ws.cell(row=row_idx, column=cmap[col]).fill = YELLOW for row_idx in red_rows: for c in range(1, ws.max_column + 1): ws.cell(row=row_idx, column=c).fill = RED if REMARK_COL in cmap: ws.cell(row=row_idx, column=cmap[REMARK_COL]).font = RED_FONT wb.save(OUT_FILE) if __name__ == "__main__": run()