Files
ONE-OS/scripts/merge_h2_fuel_tables.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

332 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Merge 表2(嘉锦明细) into 表1(车辆氢费明细), generate result + comparison report."""
import pandas as pd
import numpy as np
from datetime import datetime
T1_PATH = "/Users/sylvawong/Desktop/车辆氢费明细_2026-06-06_嘉兴嘉锦亭桥北综合供能服务站.xlsx"
T2_PATH = (
"/Users/sylvawong/Library/Containers/com.tencent.xinWeChat/Data/Documents/"
"xwechat_files/wxid_l80gh7d3x7u012_385c/temp/drag/嘉锦 4月明细.xls"
)
OUT_DATA = "/Users/sylvawong/Desktop/车辆氢费明细_嘉兴嘉锦亭_4月合并结果.xlsx"
OUT_COMPARE = "/Users/sylvawong/Desktop/车辆氢费明细_嘉兴嘉锦亭_4月比对结果.xlsx"
FILTER_YEAR = 2026
FILTER_MONTH = 4
def norm_date(val):
return pd.to_datetime(val).strftime("%Y-%m-%d")
def norm_plate(val):
return str(val).strip().upper()
def norm_kg(val):
return round(float(val), 2)
def make_key(date_val, plate_val, kg_val):
return f"{norm_date(date_val)}|{norm_plate(plate_val)}|{norm_kg(kg_val)}"
def fmt_datetime(val):
ts = pd.to_datetime(val)
if ts.hour == 0 and ts.minute == 0 and ts.second == 0:
return ts.strftime("%Y-%m-%dT00:00")
return ts.strftime("%Y-%m-%d %H:%M:%S")
def parse_status(val):
if pd.isna(val):
return "未对账"
s = str(val)
if "已对账" in s:
return "已对账"
if "未对账" in s:
return "未对账"
return s
def num_equal(a, b, tol=0.01):
if pd.isna(a) and pd.isna(b):
return True
if pd.isna(a) or pd.isna(b):
return False
return abs(float(a) - float(b)) <= tol
def str_equal(a, b):
if pd.isna(a) and pd.isna(b):
return True
if pd.isna(a) or pd.isna(b):
return False
return str(a).strip() == str(b).strip()
def load_table1():
return pd.read_excel(T1_PATH, sheet_name=0)
def load_table2():
df = pd.read_excel(T2_PATH, sheet_name=0, header=1)
df.columns = [
"日期",
"车牌",
"加气量kg",
"公里数",
"加氢站",
"是否对账",
"车架号",
"成本单价",
"成本金额",
]
return df
def filter_april(df, date_col):
dt = pd.to_datetime(df[date_col])
mask = (dt.dt.year == FILTER_YEAR) & (dt.dt.month == FILTER_MONTH)
return df.loc[mask].copy()
def row_from_table2(r2, seq_no=None):
dt = pd.to_datetime(r2["日期"])
return {
"序号": seq_no,
"": int(dt.year),
"": int(dt.month),
"加氢日期": norm_date(r2["日期"]),
"加氢时间": fmt_datetime(r2["日期"]),
"加氢站名称": r2["加氢站"],
"客户名称": np.nan,
"车牌号": norm_plate(r2["车牌"]),
"加氢量(kg)": norm_kg(r2["加气量kg"]),
"成本单价(元/kg)": float(r2["成本单价"]) if pd.notna(r2["成本单价"]) else np.nan,
"成本总价(元)": float(r2["成本金额"]) if pd.notna(r2["成本金额"]) else np.nan,
"加氢单价(元/kg)": 0,
"加氢总价(元)": 0.0,
"行驶里程(km)": float(r2["公里数"]) if pd.notna(r2["公里数"]) else np.nan,
"备注": np.nan,
"业务员": np.nan,
"承担方式": np.nan,
"状态": parse_status(r2["是否对账"]),
"订单编号": np.nan,
}
def main():
df1_all = load_table1()
df2_all = load_table2()
df1 = filter_april(df1_all, "加氢日期")
df2 = filter_april(df2_all, "日期")
df2 = df2.copy()
df2["_key"] = df2.apply(
lambda r: make_key(r["日期"], r["车牌"], r["加气量kg"]), axis=1
)
df1["_key"] = df1.apply(
lambda r: make_key(r["加氢日期"], r["车牌号"], r["加氢量(kg)"]), axis=1
)
df1_orig = df1.copy()
t2_map = df2.set_index("_key", drop=False)
keys1 = set(df1["_key"])
keys2 = set(df2["_key"])
matched_keys = keys1 & keys2
only_t2_keys = keys2 - keys1
only_t1_keys = keys1 - keys2
overwrite_rows = []
diff_rows = []
unchanged_matched = 0
for idx, row in df1.iterrows():
key = row["_key"]
if key not in matched_keys:
continue
r2 = t2_map.loc[key]
if isinstance(r2, pd.DataFrame):
r2 = r2.iloc[0]
before = row.copy()
updates = {
"加氢日期": norm_date(r2["日期"]),
"加氢时间": fmt_datetime(r2["日期"]),
"加氢站名称": r2["加氢站"],
"车牌号": norm_plate(r2["车牌"]),
"加氢量(kg)": norm_kg(r2["加气量kg"]),
"成本单价(元/kg)": float(r2["成本单价"]) if pd.notna(r2["成本单价"]) else np.nan,
"成本总价(元)": float(r2["成本金额"]) if pd.notna(r2["成本金额"]) else np.nan,
"行驶里程(km)": float(r2["公里数"]) if pd.notna(r2["公里数"]) else np.nan,
}
changed_fields = []
for field, new_val in updates.items():
old_val = before[field]
is_diff = False
if field in ("成本单价(元/kg)", "成本总价(元)", "加氢量(kg)", "行驶里程(km)"):
is_diff = not num_equal(old_val, new_val)
else:
is_diff = not str_equal(old_val, new_val)
if is_diff:
changed_fields.append(field)
diff_rows.append(
{
"匹配键": key,
"订单编号": before.get("订单编号"),
"差异字段": field,
"表1原值": old_val,
"表2正确值": new_val,
}
)
df1.at[idx, field] = new_val
overwrite_rows.append(
{
"操作": "覆盖更新",
"匹配键": key,
"加氢日期": updates["加氢日期"],
"车牌号": updates["车牌号"],
"加氢量(kg)": updates["加氢量(kg)"],
"表1原客户名称": before["客户名称"],
"表1原加氢站名称": before["加氢站名称"],
"表1原成本总价(元)": before["成本总价(元)"],
"表1原行驶里程(km)": before["行驶里程(km)"],
"表2加氢站": r2["加氢站"],
"表2成本金额": r2["成本金额"],
"表2公里数": r2["公里数"],
"变更字段数": len(changed_fields),
"变更字段": "".join(changed_fields) if changed_fields else "",
"订单编号": before.get("订单编号"),
}
)
if not changed_fields:
unchanged_matched += 1
# 表2有、表1无 -> 新增(不生成订单编号)
new_rows = []
next_seq = int(df1["序号"].max()) + 1 if len(df1) else 1
for key in sorted(only_t2_keys):
r2 = t2_map.loc[key]
if isinstance(r2, pd.DataFrame):
r2 = r2.iloc[0]
new_row = row_from_table2(r2, seq_no=next_seq)
new_row["_key"] = key
new_rows.append(new_row)
next_seq += 1
df_new = pd.DataFrame(new_rows) if new_rows else pd.DataFrame()
if len(df_new):
df1 = pd.concat([df1, df_new], ignore_index=True)
out_cols = [c for c in df1_orig.columns if c != "_key"]
df1 = df1.drop(columns=["_key"], errors="ignore")[out_cols]
# 比对报告
summary = pd.DataFrame(
[
{"项目": "比对范围", "数量": f"{FILTER_YEAR}{FILTER_MONTH}"},
{"项目": "表1全量行数", "数量": len(df1_all)},
{"项目": "表1四月行数", "数量": len(df1_orig)},
{"项目": "表2全量行数", "数量": len(df2_all)},
{"项目": "表2四月行数", "数量": len(df2)},
{"项目": "三条件完全命中(覆盖更新)", "数量": len(matched_keys)},
{"项目": "命中且字段完全一致", "数量": unchanged_matched},
{"项目": "命中且存在字段差异", "数量": len(matched_keys) - unchanged_matched},
{"项目": "表2有表1无新增", "数量": len(only_t2_keys)},
{"项目": "表1有表2无保留表1原样", "数量": len(only_t1_keys)},
{"项目": "合并后四月总行数", "数量": len(df1)},
{
"项目": "说明",
"数量": "仅比对/合并四月数据表2无客户名称列匹配记录客户名称保留表1原值新增记录客户名称为空",
},
]
)
df_overwrite = pd.DataFrame(overwrite_rows)
df_diff = pd.DataFrame(diff_rows)
df_add = pd.DataFrame(new_rows)[
[
"序号",
"加氢日期",
"加氢时间",
"加氢站名称",
"车牌号",
"加氢量(kg)",
"成本单价(元/kg)",
"成本总价(元)",
"行驶里程(km)",
"状态",
"订单编号",
]
] if len(new_rows) else pd.DataFrame(
columns=[
"序号",
"加氢日期",
"加氢时间",
"加氢站名称",
"车牌号",
"加氢量(kg)",
"成本单价(元/kg)",
"成本总价(元)",
"行驶里程(km)",
"状态",
"订单编号",
]
)
only_t2_detail = df2[df2["_key"].isin(only_t2_keys)][
["日期", "车牌", "加气量kg", "公里数", "加氢站", "是否对账", "成本单价", "成本金额"]
].copy()
only_t1_detail = df1_orig[df1_orig["_key"].isin(only_t1_keys)][
["加氢日期", "车牌号", "加氢量(kg)", "加氢站名称", "客户名称", "成本总价(元)", "订单编号"]
].copy()
field_map = pd.DataFrame(
[
{"表1字段": "加氢日期", "表2字段": "日期", "处理方式": "覆盖"},
{"表1字段": "加氢时间", "表2字段": "日期", "处理方式": "覆盖(与加氢日期对应)"},
{"表1字段": "加氢站名称", "表2字段": "加氢站", "处理方式": "覆盖"},
{"表1字段": "客户名称", "表2字段": "表2无此列", "处理方式": "匹配记录保留表1原值"},
{"表1字段": "车牌号", "表2字段": "车牌", "处理方式": "覆盖"},
{"表1字段": "加氢量(kg)", "表2字段": "加气量kg", "处理方式": "覆盖"},
{"表1字段": "成本单价(元/kg)", "表2字段": "成本单价", "处理方式": "覆盖"},
{"表1字段": "成本总价(元)", "表2字段": "成本金额", "处理方式": "覆盖"},
{"表1字段": "加氢单价(元/kg)", "表2字段": "-", "处理方式": "保留表1"},
{"表1字段": "加氢总价(元)", "表2字段": "-", "处理方式": "保留表1"},
{"表1字段": "行驶里程(km)", "表2字段": "公里数", "处理方式": "覆盖"},
{"表1字段": "备注", "表2字段": "-", "处理方式": "保留表1"},
{"表1字段": "业务员", "表2字段": "-", "处理方式": "保留表1"},
{"表1字段": "承担方式", "表2字段": "-", "处理方式": "保留表1"},
{"表1字段": "订单编号", "表2字段": "-", "处理方式": "保留表1新增记录不生成"},
]
)
with pd.ExcelWriter(OUT_DATA, engine="openpyxl") as writer:
df1.to_excel(writer, sheet_name="车辆氢费明细", index=False)
with pd.ExcelWriter(OUT_COMPARE, engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="比对总览", index=False)
field_map.to_excel(writer, sheet_name="字段映射规则", index=False)
df_overwrite.to_excel(writer, sheet_name="覆盖更新明细", index=False)
df_diff.to_excel(writer, sheet_name="字段差异明细", index=False)
df_add.to_excel(writer, sheet_name="新增记录", index=False)
only_t2_detail.to_excel(writer, sheet_name="仅表2有", index=False)
only_t1_detail.to_excel(writer, sheet_name="仅表1有", index=False)
print("合并结果:", OUT_DATA)
print("比对结果:", OUT_COMPARE)
print(summary.to_string(index=False))
if __name__ == "__main__":
main()