Files
ONE-OS/scripts/enrich_h2_orders_customer_and_monthly_agg.py
王冕 a27e3b8e43 feat: sync full workspace including web modules, docs, and configurations to Gitea
Optimized the root .gitignore to exclude virtual environments, node modules,
and temp folders to ensure clean and lightweight version tracking.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-09 18:12:25 +08:00

184 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""加氢订单按合同编号匹配客户名称,并生成「月份-客户-项目」汇总子表。"""
from __future__ import annotations
import argparse
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Alignment
def _line_total_yuan(row: pd.Series) -> float:
"""单行金额:对客费用非 0 时取对客费用,否则取成本费用(与台账常见口径一致)。"""
c = row.get("对客费用(元)")
if pd.notna(c) and float(c) != 0:
return float(c)
v = row.get("成本费用(元)")
return float(v) if pd.notna(v) else 0.0
def _merge_month_customer_cells(
workbook_path: Path,
sheet_name: str = "月度客户项目汇总",
col_month: int = 1,
col_customer: int = 2,
header_rows: int = 1,
) -> None:
"""对「月份」「客户名称」列:连续且(月份、客户名称)均相同的行做纵向合并。"""
wb = load_workbook(workbook_path)
if sheet_name not in wb.sheetnames:
wb.close()
raise SystemExit(f"工作簿中未找到工作表: {sheet_name}")
ws = wb[sheet_name]
max_r = ws.max_row
data_start = header_rows + 1
if max_r < data_start:
wb.save(workbook_path)
wb.close()
return
align_m = Alignment(vertical="center", horizontal="center", wrap_text=True)
align_c = Alignment(vertical="center", horizontal="left", wrap_text=True)
def norm(v) -> str:
if v is None:
return ""
return str(v).strip()
r = data_start
while r <= max_r:
m0 = norm(ws.cell(r, col_month).value)
c0 = norm(ws.cell(r, col_customer).value)
start = r
r += 1
while r <= max_r:
if norm(ws.cell(r, col_month).value) != m0 or norm(ws.cell(r, col_customer).value) != c0:
break
r += 1
end = r - 1
if end > start:
ws.merge_cells(
start_row=start,
start_column=col_month,
end_row=end,
end_column=col_month,
)
ws.merge_cells(
start_row=start,
start_column=col_customer,
end_row=end,
end_column=col_customer,
)
ws.cell(start, col_month).alignment = align_m
ws.cell(start, col_customer).alignment = align_c
wb.save(workbook_path)
wb.close()
def run(
orders_path: Path,
contract_map_path: Path,
out_path: Path,
orders_sheet: str = "加氢订单",
map_sheet: str = "Sheet1",
merge_summary_cells: bool = True,
) -> None:
orders = pd.read_excel(orders_path, sheet_name=orders_sheet)
cmap = pd.read_excel(contract_map_path, sheet_name=map_sheet)
need_o = {"合同编号", "项目名称", "加氢时间", "加氢量(kg)", "对客费用(元)", "成本费用(元)", "订单编号"}
miss_o = need_o - set(orders.columns)
if miss_o:
raise SystemExit(f"加氢订单表缺少列: {miss_o}")
need_m = {"contract_no", "customer_name"}
miss_m = need_m - set(cmap.columns)
if miss_m:
raise SystemExit(f"合同映射表缺少列: {miss_m},当前为 {list(cmap.columns)}")
cmap = cmap[["contract_no", "customer_name"]].copy()
cmap["contract_no"] = cmap["contract_no"].astype(str).str.strip()
o = orders.copy()
o["_合同键"] = o["合同编号"].astype(str).str.strip()
merged = o.merge(
cmap,
left_on="_合同键",
right_on="contract_no",
how="left",
suffixes=("", "_映射表"),
)
merged["客户名称"] = merged["customer_name"].fillna("")
merged = merged.drop(columns=["customer_name", "contract_no"], errors="ignore")
# 月份
ts = pd.to_datetime(merged["加氢时间"], errors="coerce")
merged["月份"] = ts.dt.strftime("%Y-%m")
merged.loc[ts.isna(), "月份"] = ""
merged["行加氢总额(元)"] = merged.apply(_line_total_yuan, axis=1)
# 汇总月份、客户名称、项目名称、加氢次数、加气量kg、加氢总额
gcols = ["月份", "客户名称", "项目名称"]
sub = (
merged.groupby(gcols, dropna=False)
.agg(
加氢次数=("订单编号", "count"),
**{"加气量kg": ("加氢量(kg)", "sum")},
**{"加氢总额(元)": ("行加氢总额(元)", "sum")},
)
.reset_index()
)
sub = sub.sort_values(gcols).reset_index(drop=True)
sub["客户名称"] = sub["客户名称"].fillna("").astype(str)
sub["项目名称"] = sub["项目名称"].fillna("").astype(str)
sub["月份"] = sub["月份"].fillna("").astype(str)
drop_from_detail = {"_合同键", "行加氢总额(元)", "月份"}
base = [c for c in merged.columns if c not in drop_from_detail and c != "客户名称"]
if "项目名称" in base:
i = base.index("项目名称") + 1
detail_out = base[:i] + ["客户名称"] + base[i:]
else:
detail_out = base + ["客户名称"]
detail_df = merged[detail_out].copy()
detail_df["客户名称"] = detail_df["客户名称"].fillna("").astype(str)
with pd.ExcelWriter(out_path, engine="openpyxl") as w:
detail_df.to_excel(w, sheet_name=orders_sheet, index=False)
sub.to_excel(w, sheet_name="月度客户项目汇总", index=False)
if merge_summary_cells:
_merge_month_customer_cells(out_path, sheet_name="月度客户项目汇总")
print(f"已写入: {out_path}")
print(f" 明细行数: {len(detail_df)},已匹配客户名称: {(detail_df['客户名称'] != '').sum()}")
print(f" 汇总行数: {len(sub)}")
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--orders", type=Path, required=True)
ap.add_argument("--contracts", type=Path, required=True)
ap.add_argument("-o", "--out", type=Path, required=True)
ap.add_argument(
"--no-merge-summary-cells",
action="store_true",
help="不对「月度客户项目汇总」做月份/客户名称列合并",
)
args = ap.parse_args()
run(
args.orders.resolve(),
args.contracts.resolve(),
args.out.resolve(),
merge_summary_cells=not args.no_merge_summary_cells,
)
if __name__ == "__main__":
main()