Files
ONE-OS/scripts/compare_h2_xiaoxu.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

353 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""比对「小许表格」与「新表」(匹配键:加氢日期+车牌号+加氢站5月范围"""
from __future__ import annotations
import re
import shutil
from collections import defaultdict
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill
NAME_XU = "小许表格"
NAME_NEW = "新表"
FILE_XU = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/最新版加氢记录5.13(2).xlsx"
)
FILE_NEW = Path(
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/"
"氢费台账总表2026.5.21(1)(2)(2).xlsx"
)
OUT_FILE = Path("/Users/sylvawong/Desktop/CURSOR/ONE-OS/小许表格与新表比对结果_5月.xlsx")
OUT_DESKTOP = Path("/Users/sylvawong/Desktop/小许表格与新表比对结果_5月.xlsx")
# 新表:仅 2026 年 5 月小许表格2026 年 5 月及之后
NEW_MONTH_START = "2026-05-01"
NEW_MONTH_END = "2026-06-01"
XU_MONTH_START = "2026-05-01"
COMPARE_FIELDS = [
("加氢站", "加氢站名称", "加氢站"),
("加气量kg)", "加氢量(kg)", "加气量kg)"),
("成本单价", "成本单价(元/kg)", "成本单价"),
("成本金额", "成本费用(元)", "成本金额"),
("加氢单价", "加氢单价(元/kg)", "加氢单价"),
("加氢金额", "加氢金额(元)", "加氢金额"),
("客户名", "客户名称", "客户名"),
("氢费计算方式", "结算类别", "氢费计算方式"),
]
YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
RED = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid")
RED_FONT = Font(color="9C0006", bold=True)
REMARK_COL = "差异说明"
def new_col_prefix(label: str) -> str:
return f"{NAME_NEW}_{label}"
def read_xu_sheet(path: Path) -> pd.DataFrame:
"""最新版加氢记录:第 0 行汇总,第 1 行表头,数据从第 2 行起。"""
raw = pd.read_excel(path, sheet_name="明细台账", header=None)
raw.columns = [
str(c).strip() if pd.notna(c) else f"_col{i}" for i, c in enumerate(raw.iloc[1])
]
df = raw.iloc[2:].copy().reset_index(drop=True)
if "加气量kg" in df.columns:
df = df.rename(columns={"加气量kg": "加气量kg)"})
return df
def read_new_sheet(path: Path) -> pd.DataFrame:
raw = pd.read_excel(path, sheet_name="加氢订单", header=None)
cols_raw = list(raw.iloc[1])
cols = []
seen: dict[str, int] = {}
for i, c in enumerate(cols_raw):
name = str(c).strip() if pd.notna(c) else f"_col{i}"
if name in seen:
seen[name] += 1
name = f"{name}_{seen[name]}"
else:
seen[name] = 0
cols.append(name)
df = raw.iloc[2:].copy().reset_index(drop=True)
df.columns = cols
return df
def parse_date(val) -> str:
if pd.isna(val):
return ""
if isinstance(val, pd.Timestamp):
return val.strftime("%Y-%m-%d")
if isinstance(val, (int, float)) and val > 40000:
try:
return (pd.Timestamp("1899-12-30") + pd.Timedelta(days=int(val))).strftime(
"%Y-%m-%d"
)
except Exception:
pass
s = str(val).strip()
try:
return pd.to_datetime(s).strftime("%Y-%m-%d")
except Exception:
return s[:10] if len(s) >= 10 else s
def norm_plate(val) -> str:
if pd.isna(val):
return ""
return re.sub(r"[\s\-·]", "", str(val).strip().upper())
def norm_station(val) -> str:
if pd.isna(val):
return ""
return re.sub(r"\s+", "", str(val).strip())
def norm_str(val) -> str:
if pd.isna(val):
return ""
return str(val).strip()
def norm_num(val) -> float | None:
if pd.isna(val) or str(val).strip() in ("", "-", "nan"):
return None
try:
return float(val)
except Exception:
return None
def values_equal(a, b, numeric: bool) -> bool:
if numeric:
na, nb = norm_num(a), norm_num(b)
if na is None and nb is None:
return True
if na is None or nb is None:
return False
return abs(na - nb) <= 0.02
return norm_str(a) == norm_str(b)
def make_key(row, date_col, plate_col, station_col) -> str:
"""同日期+同车牌重复时,用加氢站区分,避免误判为缺失。"""
d = parse_date(row[date_col])
p = norm_plate(row[plate_col])
s = norm_station(row[station_col]) or "_未填加氢站_"
return f"{d}|{p}|{s}" if d and p else ""
def filter_by_date_range(df, date_col, start: str, end: str | None) -> pd.DataFrame:
df = df.copy()
df["_date_parsed"] = df[date_col].apply(parse_date)
mask = df["_date_parsed"] >= start
if end:
mask = mask & (df["_date_parsed"] < end)
return df[mask].reset_index(drop=True)
def build_index(df, date_col, plate_col, station_col):
idx: dict[str, list[int]] = defaultdict(list)
for i, row in df.iterrows():
k = make_key(row, date_col, plate_col, station_col)
if k:
idx[k].append(i)
return idx
def pair_rows(a_list: list[int], b_list: list[int]):
n = min(len(a_list), len(b_list))
return list(zip(a_list[:n], b_list[:n]))
def compare_pair(row_xu: pd.Series, row_new: pd.Series) -> tuple[list[str], list[str]]:
diffs = []
notes = []
for col_x, col_n, label in COMPARE_FIELDS:
numeric = label in ("加气量kg)", "成本单价", "成本金额", "加氢单价", "加氢金额")
vx = row_xu.get(col_x)
vn = row_new.get(col_n)
if not values_equal(vx, vn, numeric):
diffs.append(label)
notes.append(f"{label}:{NAME_XU}[{norm_str(vx)}]≠{NAME_NEW}[{norm_str(vn)}]")
return diffs, notes
def new_to_xu_fields(row_new: pd.Series) -> dict:
return {
"序号": row_new.get("订单编号", row_new.get("订单编号_1", "")),
"日期": row_new.get("加氢时间"),
"车牌": row_new.get("车牌号"),
"加气量kg)": row_new.get("加氢量(kg)"),
"加氢站": row_new.get("加氢站名称"),
"成本单价": row_new.get("成本单价(元/kg)"),
"成本金额": row_new.get("成本费用(元)"),
"加氢单价": row_new.get("加氢单价(元/kg)"),
"加氢金额": row_new.get("加氢金额(元)"),
"客户名": row_new.get("客户名称"),
"氢费计算方式": row_new.get("结算类别"),
}
def add_new_cols(row_new: pd.Series | None) -> dict:
if row_new is None:
return {new_col_prefix(label): "" for _, _, label in COMPARE_FIELDS}
return {
new_col_prefix(label): row_new.get(col_n, "")
for _x, col_n, label in COMPARE_FIELDS
}
def run():
df_xu_all = read_xu_sheet(FILE_XU)
df_new_all = read_new_sheet(FILE_NEW)
df_xu = filter_by_date_range(df_xu_all, "日期", XU_MONTH_START, None)
df_new = filter_by_date_range(df_new_all, "加氢时间", NEW_MONTH_START, NEW_MONTH_END)
idx_xu = build_index(df_xu, "日期", "车牌", "加氢站")
idx_new = build_index(df_new, "加氢时间", "车牌号", "加氢站名称")
all_keys = set(idx_xu) | set(idx_new)
pairs: list[tuple[int | None, int | None, str]] = []
for k in sorted(all_keys):
xa, xb = idx_xu.get(k, []), idx_new.get(k, [])
paired = pair_rows(xa, xb)
for ia, ib in paired:
pairs.append((ia, ib, k))
for ia in xa[len(paired) :]:
pairs.append((ia, None, k))
for ib in xb[len(paired) :]:
pairs.append((None, ib, k))
rows_out = []
diff_cells: dict[int, set[str]] = defaultdict(set)
red_rows: set[int] = set()
xu_cols = list(df_xu.columns)
for ia, ib, k in pairs:
excel_row = len(rows_out) + 2
remark_parts = []
is_red = False
if ia is not None:
base = df_xu.loc[ia].to_dict()
base["数据来源"] = NAME_XU
else:
row_new = df_new.loc[ib]
base = {c: "" for c in xu_cols}
base.update(new_to_xu_fields(row_new))
base["数据来源"] = f"{NAME_NEW}补录"
is_red = True
remark_parts.append(
f"{NAME_XU}缺失:本条由【{NAME_NEW}】补录({NAME_NEW}有相同日期+车牌+加氢站,{NAME_XU}无)"
)
row_new = df_new.loc[ib] if ib is not None else None
if ib is None and ia is not None:
is_red = True
if k in idx_new:
remark_parts.append(
f"同键重复:{NAME_XU}行数多于{NAME_NEW}(日期+车牌+加氢站相同,本条未配对)"
)
else:
remark_parts.append(
f"{NAME_NEW}缺失:【{NAME_NEW}】5月订单中无相同日期+车牌+加氢站"
)
if ia is not None and ib is not None:
diffs, notes = compare_pair(df_xu.loc[ia], row_new)
if diffs:
remark_parts.append("字段差异:" + "".join(notes))
for label in diffs:
diff_cells[excel_row].add(label)
diff_cells[excel_row].add(new_col_prefix(label))
out_row = base
out_row.update(add_new_cols(row_new))
out_row[REMARK_COL] = "".join(remark_parts) if remark_parts else "一致"
if is_red:
red_rows.add(excel_row)
rows_out.append(out_row)
df_out = pd.DataFrame(rows_out)
matched = sum(1 for ia, ib, _ in pairs if ia is not None and ib is not None)
only_xu = sum(1 for ia, ib, _ in pairs if ia is not None and ib is None)
only_new = sum(1 for ia, ib, _ in pairs if ia is None and ib is not None)
diff_count = sum(
1
for ia, ib, _ in pairs
if ia is not None
and ib is not None
and compare_pair(df_xu.loc[ia], df_new.loc[ib])[0]
)
summary = pd.DataFrame(
[
[f"{NAME_XU}范围", f"日期 >= {XU_MONTH_START}5月及之后"],
[f"{NAME_NEW}范围", f"{NEW_MONTH_START} <= 加氢时间 < {NEW_MONTH_END}仅5月"],
[f"{NAME_XU}行数", len(df_xu)],
[f"{NAME_NEW}行数", len(df_new)],
["匹配键", "加氢日期 + 车牌号 + 加氢站(同站不判缺失)"],
["成功配对", matched],
[f"{NAME_XU}", only_xu],
[f"{NAME_NEW}有(已补录)", only_new],
["配对但字段有差异", diff_count],
],
columns=["项目", "数量/说明"],
)
OUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="比对汇总", index=False)
df_out.to_excel(writer, sheet_name="比对结果", index=False)
apply_styles(df_out, diff_cells, red_rows)
shutil.copy(OUT_FILE, OUT_DESKTOP)
print(f"已输出: {OUT_FILE}")
print(f"已复制: {OUT_DESKTOP}")
print(summary.to_string(index=False))
def col_letter_map(columns: list) -> dict[str, int]:
return {str(c): i + 1 for i, c in enumerate(columns)}
def apply_styles(df: pd.DataFrame, diff_cells: dict, red_rows: set):
wb = load_workbook(OUT_FILE)
ws = wb["比对结果"]
cmap = col_letter_map(list(df.columns))
label_to_col = {label: label for _, _, label in COMPARE_FIELDS}
for _, _, label in COMPARE_FIELDS:
label_to_col[new_col_prefix(label)] = new_col_prefix(label)
for row_idx, labels in diff_cells.items():
for label in labels:
col = label_to_col.get(label, label)
if col in cmap:
ws.cell(row=row_idx, column=cmap[col]).fill = YELLOW
for row_idx in red_rows:
for c in range(1, ws.max_column + 1):
ws.cell(row=row_idx, column=c).fill = RED
if REMARK_COL in cmap:
ws.cell(row=row_idx, column=cmap[REMARK_COL]).font = RED_FONT
wb.save(OUT_FILE)
if __name__ == "__main__":
run()