Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
353 lines
12 KiB
Python
353 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""比对「小许表格」与「新表」(匹配键:加氢日期+车牌号+加氢站;5月范围)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
import shutil
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
from openpyxl.styles import Font, PatternFill
|
||
|
||
NAME_XU = "小许表格"
|
||
NAME_NEW = "新表"
|
||
|
||
FILE_XU = Path(
|
||
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
|
||
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/最新版加氢记录5.13(2).xlsx"
|
||
)
|
||
FILE_NEW = Path(
|
||
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
|
||
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/"
|
||
"氢费台账总表2026.5.21(1)(2)(2).xlsx"
|
||
)
|
||
OUT_FILE = Path("/Users/sylvawong/Desktop/CURSOR/ONE-OS/小许表格与新表比对结果_5月.xlsx")
|
||
OUT_DESKTOP = Path("/Users/sylvawong/Desktop/小许表格与新表比对结果_5月.xlsx")
|
||
|
||
# 新表:仅 2026 年 5 月;小许表格:2026 年 5 月及之后
|
||
NEW_MONTH_START = "2026-05-01"
|
||
NEW_MONTH_END = "2026-06-01"
|
||
XU_MONTH_START = "2026-05-01"
|
||
|
||
COMPARE_FIELDS = [
|
||
("加氢站", "加氢站名称", "加氢站"),
|
||
("加气量(kg)", "加氢量(kg)", "加气量(kg)"),
|
||
("成本单价", "成本单价(元/kg)", "成本单价"),
|
||
("成本金额", "成本费用(元)", "成本金额"),
|
||
("加氢单价", "加氢单价(元/kg)", "加氢单价"),
|
||
("加氢金额", "加氢金额(元)", "加氢金额"),
|
||
("客户名", "客户名称", "客户名"),
|
||
("氢费计算方式", "结算类别", "氢费计算方式"),
|
||
]
|
||
|
||
YELLOW = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
|
||
RED = PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid")
|
||
RED_FONT = Font(color="9C0006", bold=True)
|
||
REMARK_COL = "差异说明"
|
||
|
||
|
||
def new_col_prefix(label: str) -> str:
|
||
return f"{NAME_NEW}_{label}"
|
||
|
||
|
||
def read_xu_sheet(path: Path) -> pd.DataFrame:
|
||
"""最新版加氢记录:第 0 行汇总,第 1 行表头,数据从第 2 行起。"""
|
||
raw = pd.read_excel(path, sheet_name="明细台账", header=None)
|
||
raw.columns = [
|
||
str(c).strip() if pd.notna(c) else f"_col{i}" for i, c in enumerate(raw.iloc[1])
|
||
]
|
||
df = raw.iloc[2:].copy().reset_index(drop=True)
|
||
if "加气量(kg)" in df.columns:
|
||
df = df.rename(columns={"加气量(kg)": "加气量(kg)"})
|
||
return df
|
||
|
||
|
||
def read_new_sheet(path: Path) -> pd.DataFrame:
|
||
raw = pd.read_excel(path, sheet_name="加氢订单", header=None)
|
||
cols_raw = list(raw.iloc[1])
|
||
cols = []
|
||
seen: dict[str, int] = {}
|
||
for i, c in enumerate(cols_raw):
|
||
name = str(c).strip() if pd.notna(c) else f"_col{i}"
|
||
if name in seen:
|
||
seen[name] += 1
|
||
name = f"{name}_{seen[name]}"
|
||
else:
|
||
seen[name] = 0
|
||
cols.append(name)
|
||
df = raw.iloc[2:].copy().reset_index(drop=True)
|
||
df.columns = cols
|
||
return df
|
||
|
||
|
||
def parse_date(val) -> str:
|
||
if pd.isna(val):
|
||
return ""
|
||
if isinstance(val, pd.Timestamp):
|
||
return val.strftime("%Y-%m-%d")
|
||
if isinstance(val, (int, float)) and val > 40000:
|
||
try:
|
||
return (pd.Timestamp("1899-12-30") + pd.Timedelta(days=int(val))).strftime(
|
||
"%Y-%m-%d"
|
||
)
|
||
except Exception:
|
||
pass
|
||
s = str(val).strip()
|
||
try:
|
||
return pd.to_datetime(s).strftime("%Y-%m-%d")
|
||
except Exception:
|
||
return s[:10] if len(s) >= 10 else s
|
||
|
||
|
||
def norm_plate(val) -> str:
|
||
if pd.isna(val):
|
||
return ""
|
||
return re.sub(r"[\s\-·]", "", str(val).strip().upper())
|
||
|
||
|
||
def norm_station(val) -> str:
|
||
if pd.isna(val):
|
||
return ""
|
||
return re.sub(r"\s+", "", str(val).strip())
|
||
|
||
|
||
def norm_str(val) -> str:
|
||
if pd.isna(val):
|
||
return ""
|
||
return str(val).strip()
|
||
|
||
|
||
def norm_num(val) -> float | None:
|
||
if pd.isna(val) or str(val).strip() in ("", "-", "nan"):
|
||
return None
|
||
try:
|
||
return float(val)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def values_equal(a, b, numeric: bool) -> bool:
|
||
if numeric:
|
||
na, nb = norm_num(a), norm_num(b)
|
||
if na is None and nb is None:
|
||
return True
|
||
if na is None or nb is None:
|
||
return False
|
||
return abs(na - nb) <= 0.02
|
||
return norm_str(a) == norm_str(b)
|
||
|
||
|
||
def make_key(row, date_col, plate_col, station_col) -> str:
|
||
"""同日期+同车牌重复时,用加氢站区分,避免误判为缺失。"""
|
||
d = parse_date(row[date_col])
|
||
p = norm_plate(row[plate_col])
|
||
s = norm_station(row[station_col]) or "_未填加氢站_"
|
||
return f"{d}|{p}|{s}" if d and p else ""
|
||
|
||
|
||
def filter_by_date_range(df, date_col, start: str, end: str | None) -> pd.DataFrame:
|
||
df = df.copy()
|
||
df["_date_parsed"] = df[date_col].apply(parse_date)
|
||
mask = df["_date_parsed"] >= start
|
||
if end:
|
||
mask = mask & (df["_date_parsed"] < end)
|
||
return df[mask].reset_index(drop=True)
|
||
|
||
|
||
def build_index(df, date_col, plate_col, station_col):
|
||
idx: dict[str, list[int]] = defaultdict(list)
|
||
for i, row in df.iterrows():
|
||
k = make_key(row, date_col, plate_col, station_col)
|
||
if k:
|
||
idx[k].append(i)
|
||
return idx
|
||
|
||
|
||
def pair_rows(a_list: list[int], b_list: list[int]):
|
||
n = min(len(a_list), len(b_list))
|
||
return list(zip(a_list[:n], b_list[:n]))
|
||
|
||
|
||
def compare_pair(row_xu: pd.Series, row_new: pd.Series) -> tuple[list[str], list[str]]:
|
||
diffs = []
|
||
notes = []
|
||
for col_x, col_n, label in COMPARE_FIELDS:
|
||
numeric = label in ("加气量(kg)", "成本单价", "成本金额", "加氢单价", "加氢金额")
|
||
vx = row_xu.get(col_x)
|
||
vn = row_new.get(col_n)
|
||
if not values_equal(vx, vn, numeric):
|
||
diffs.append(label)
|
||
notes.append(f"{label}:{NAME_XU}[{norm_str(vx)}]≠{NAME_NEW}[{norm_str(vn)}]")
|
||
return diffs, notes
|
||
|
||
|
||
def new_to_xu_fields(row_new: pd.Series) -> dict:
|
||
return {
|
||
"序号": row_new.get("订单编号", row_new.get("订单编号_1", "")),
|
||
"日期": row_new.get("加氢时间"),
|
||
"车牌": row_new.get("车牌号"),
|
||
"加气量(kg)": row_new.get("加氢量(kg)"),
|
||
"加氢站": row_new.get("加氢站名称"),
|
||
"成本单价": row_new.get("成本单价(元/kg)"),
|
||
"成本金额": row_new.get("成本费用(元)"),
|
||
"加氢单价": row_new.get("加氢单价(元/kg)"),
|
||
"加氢金额": row_new.get("加氢金额(元)"),
|
||
"客户名": row_new.get("客户名称"),
|
||
"氢费计算方式": row_new.get("结算类别"),
|
||
}
|
||
|
||
|
||
def add_new_cols(row_new: pd.Series | None) -> dict:
|
||
if row_new is None:
|
||
return {new_col_prefix(label): "" for _, _, label in COMPARE_FIELDS}
|
||
return {
|
||
new_col_prefix(label): row_new.get(col_n, "")
|
||
for _x, col_n, label in COMPARE_FIELDS
|
||
}
|
||
|
||
|
||
def run():
|
||
df_xu_all = read_xu_sheet(FILE_XU)
|
||
df_new_all = read_new_sheet(FILE_NEW)
|
||
|
||
df_xu = filter_by_date_range(df_xu_all, "日期", XU_MONTH_START, None)
|
||
df_new = filter_by_date_range(df_new_all, "加氢时间", NEW_MONTH_START, NEW_MONTH_END)
|
||
|
||
idx_xu = build_index(df_xu, "日期", "车牌", "加氢站")
|
||
idx_new = build_index(df_new, "加氢时间", "车牌号", "加氢站名称")
|
||
|
||
all_keys = set(idx_xu) | set(idx_new)
|
||
pairs: list[tuple[int | None, int | None, str]] = []
|
||
for k in sorted(all_keys):
|
||
xa, xb = idx_xu.get(k, []), idx_new.get(k, [])
|
||
paired = pair_rows(xa, xb)
|
||
for ia, ib in paired:
|
||
pairs.append((ia, ib, k))
|
||
for ia in xa[len(paired) :]:
|
||
pairs.append((ia, None, k))
|
||
for ib in xb[len(paired) :]:
|
||
pairs.append((None, ib, k))
|
||
|
||
rows_out = []
|
||
diff_cells: dict[int, set[str]] = defaultdict(set)
|
||
red_rows: set[int] = set()
|
||
xu_cols = list(df_xu.columns)
|
||
|
||
for ia, ib, k in pairs:
|
||
excel_row = len(rows_out) + 2
|
||
remark_parts = []
|
||
is_red = False
|
||
|
||
if ia is not None:
|
||
base = df_xu.loc[ia].to_dict()
|
||
base["数据来源"] = NAME_XU
|
||
else:
|
||
row_new = df_new.loc[ib]
|
||
base = {c: "" for c in xu_cols}
|
||
base.update(new_to_xu_fields(row_new))
|
||
base["数据来源"] = f"{NAME_NEW}补录"
|
||
is_red = True
|
||
remark_parts.append(
|
||
f"{NAME_XU}缺失:本条由【{NAME_NEW}】补录({NAME_NEW}有相同日期+车牌+加氢站,{NAME_XU}无)"
|
||
)
|
||
|
||
row_new = df_new.loc[ib] if ib is not None else None
|
||
if ib is None and ia is not None:
|
||
is_red = True
|
||
if k in idx_new:
|
||
remark_parts.append(
|
||
f"同键重复:{NAME_XU}行数多于{NAME_NEW}(日期+车牌+加氢站相同,本条未配对)"
|
||
)
|
||
else:
|
||
remark_parts.append(
|
||
f"{NAME_NEW}缺失:【{NAME_NEW}】5月订单中无相同日期+车牌+加氢站"
|
||
)
|
||
|
||
if ia is not None and ib is not None:
|
||
diffs, notes = compare_pair(df_xu.loc[ia], row_new)
|
||
if diffs:
|
||
remark_parts.append("字段差异:" + ";".join(notes))
|
||
for label in diffs:
|
||
diff_cells[excel_row].add(label)
|
||
diff_cells[excel_row].add(new_col_prefix(label))
|
||
|
||
out_row = base
|
||
out_row.update(add_new_cols(row_new))
|
||
out_row[REMARK_COL] = ";".join(remark_parts) if remark_parts else "一致"
|
||
if is_red:
|
||
red_rows.add(excel_row)
|
||
rows_out.append(out_row)
|
||
|
||
df_out = pd.DataFrame(rows_out)
|
||
|
||
matched = sum(1 for ia, ib, _ in pairs if ia is not None and ib is not None)
|
||
only_xu = sum(1 for ia, ib, _ in pairs if ia is not None and ib is None)
|
||
only_new = sum(1 for ia, ib, _ in pairs if ia is None and ib is not None)
|
||
diff_count = sum(
|
||
1
|
||
for ia, ib, _ in pairs
|
||
if ia is not None
|
||
and ib is not None
|
||
and compare_pair(df_xu.loc[ia], df_new.loc[ib])[0]
|
||
)
|
||
|
||
summary = pd.DataFrame(
|
||
[
|
||
[f"{NAME_XU}范围", f"日期 >= {XU_MONTH_START}(5月及之后)"],
|
||
[f"{NAME_NEW}范围", f"{NEW_MONTH_START} <= 加氢时间 < {NEW_MONTH_END}(仅5月)"],
|
||
[f"{NAME_XU}行数", len(df_xu)],
|
||
[f"{NAME_NEW}行数", len(df_new)],
|
||
["匹配键", "加氢日期 + 车牌号 + 加氢站(同站不判缺失)"],
|
||
["成功配对", matched],
|
||
[f"仅{NAME_XU}有", only_xu],
|
||
[f"仅{NAME_NEW}有(已补录)", only_new],
|
||
["配对但字段有差异", diff_count],
|
||
],
|
||
columns=["项目", "数量/说明"],
|
||
)
|
||
|
||
OUT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
with pd.ExcelWriter(OUT_FILE, engine="openpyxl") as writer:
|
||
summary.to_excel(writer, sheet_name="比对汇总", index=False)
|
||
df_out.to_excel(writer, sheet_name="比对结果", index=False)
|
||
|
||
apply_styles(df_out, diff_cells, red_rows)
|
||
shutil.copy(OUT_FILE, OUT_DESKTOP)
|
||
print(f"已输出: {OUT_FILE}")
|
||
print(f"已复制: {OUT_DESKTOP}")
|
||
print(summary.to_string(index=False))
|
||
|
||
|
||
def col_letter_map(columns: list) -> dict[str, int]:
|
||
return {str(c): i + 1 for i, c in enumerate(columns)}
|
||
|
||
|
||
def apply_styles(df: pd.DataFrame, diff_cells: dict, red_rows: set):
|
||
wb = load_workbook(OUT_FILE)
|
||
ws = wb["比对结果"]
|
||
cmap = col_letter_map(list(df.columns))
|
||
label_to_col = {label: label for _, _, label in COMPARE_FIELDS}
|
||
for _, _, label in COMPARE_FIELDS:
|
||
label_to_col[new_col_prefix(label)] = new_col_prefix(label)
|
||
|
||
for row_idx, labels in diff_cells.items():
|
||
for label in labels:
|
||
col = label_to_col.get(label, label)
|
||
if col in cmap:
|
||
ws.cell(row=row_idx, column=cmap[col]).fill = YELLOW
|
||
|
||
for row_idx in red_rows:
|
||
for c in range(1, ws.max_column + 1):
|
||
ws.cell(row=row_idx, column=c).fill = RED
|
||
if REMARK_COL in cmap:
|
||
ws.cell(row=row_idx, column=cmap[REMARK_COL]).font = RED_FONT
|
||
|
||
wb.save(OUT_FILE)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run()
|