Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
184 lines
6.3 KiB
Python
184 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
||
"""加氢订单按合同编号匹配客户名称,并生成「月份-客户-项目」汇总子表。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
from pathlib import Path
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
from openpyxl.styles import Alignment
|
||
|
||
|
||
def _line_total_yuan(row: pd.Series) -> float:
|
||
"""单行金额:对客费用非 0 时取对客费用,否则取成本费用(与台账常见口径一致)。"""
|
||
c = row.get("对客费用(元)")
|
||
if pd.notna(c) and float(c) != 0:
|
||
return float(c)
|
||
v = row.get("成本费用(元)")
|
||
return float(v) if pd.notna(v) else 0.0
|
||
|
||
|
||
def _merge_month_customer_cells(
|
||
workbook_path: Path,
|
||
sheet_name: str = "月度客户项目汇总",
|
||
col_month: int = 1,
|
||
col_customer: int = 2,
|
||
header_rows: int = 1,
|
||
) -> None:
|
||
"""对「月份」「客户名称」列:连续且(月份、客户名称)均相同的行做纵向合并。"""
|
||
wb = load_workbook(workbook_path)
|
||
if sheet_name not in wb.sheetnames:
|
||
wb.close()
|
||
raise SystemExit(f"工作簿中未找到工作表: {sheet_name}")
|
||
ws = wb[sheet_name]
|
||
max_r = ws.max_row
|
||
data_start = header_rows + 1
|
||
if max_r < data_start:
|
||
wb.save(workbook_path)
|
||
wb.close()
|
||
return
|
||
|
||
align_m = Alignment(vertical="center", horizontal="center", wrap_text=True)
|
||
align_c = Alignment(vertical="center", horizontal="left", wrap_text=True)
|
||
|
||
def norm(v) -> str:
|
||
if v is None:
|
||
return ""
|
||
return str(v).strip()
|
||
|
||
r = data_start
|
||
while r <= max_r:
|
||
m0 = norm(ws.cell(r, col_month).value)
|
||
c0 = norm(ws.cell(r, col_customer).value)
|
||
start = r
|
||
r += 1
|
||
while r <= max_r:
|
||
if norm(ws.cell(r, col_month).value) != m0 or norm(ws.cell(r, col_customer).value) != c0:
|
||
break
|
||
r += 1
|
||
end = r - 1
|
||
if end > start:
|
||
ws.merge_cells(
|
||
start_row=start,
|
||
start_column=col_month,
|
||
end_row=end,
|
||
end_column=col_month,
|
||
)
|
||
ws.merge_cells(
|
||
start_row=start,
|
||
start_column=col_customer,
|
||
end_row=end,
|
||
end_column=col_customer,
|
||
)
|
||
ws.cell(start, col_month).alignment = align_m
|
||
ws.cell(start, col_customer).alignment = align_c
|
||
wb.save(workbook_path)
|
||
wb.close()
|
||
|
||
|
||
def run(
|
||
orders_path: Path,
|
||
contract_map_path: Path,
|
||
out_path: Path,
|
||
orders_sheet: str = "加氢订单",
|
||
map_sheet: str = "Sheet1",
|
||
merge_summary_cells: bool = True,
|
||
) -> None:
|
||
orders = pd.read_excel(orders_path, sheet_name=orders_sheet)
|
||
cmap = pd.read_excel(contract_map_path, sheet_name=map_sheet)
|
||
|
||
need_o = {"合同编号", "项目名称", "加氢时间", "加氢量(kg)", "对客费用(元)", "成本费用(元)", "订单编号"}
|
||
miss_o = need_o - set(orders.columns)
|
||
if miss_o:
|
||
raise SystemExit(f"加氢订单表缺少列: {miss_o}")
|
||
|
||
need_m = {"contract_no", "customer_name"}
|
||
miss_m = need_m - set(cmap.columns)
|
||
if miss_m:
|
||
raise SystemExit(f"合同映射表缺少列: {miss_m},当前为 {list(cmap.columns)}")
|
||
|
||
cmap = cmap[["contract_no", "customer_name"]].copy()
|
||
cmap["contract_no"] = cmap["contract_no"].astype(str).str.strip()
|
||
|
||
o = orders.copy()
|
||
o["_合同键"] = o["合同编号"].astype(str).str.strip()
|
||
|
||
merged = o.merge(
|
||
cmap,
|
||
left_on="_合同键",
|
||
right_on="contract_no",
|
||
how="left",
|
||
suffixes=("", "_映射表"),
|
||
)
|
||
merged["客户名称"] = merged["customer_name"].fillna("")
|
||
merged = merged.drop(columns=["customer_name", "contract_no"], errors="ignore")
|
||
|
||
# 月份
|
||
ts = pd.to_datetime(merged["加氢时间"], errors="coerce")
|
||
merged["月份"] = ts.dt.strftime("%Y-%m")
|
||
merged.loc[ts.isna(), "月份"] = ""
|
||
|
||
merged["行加氢总额(元)"] = merged.apply(_line_total_yuan, axis=1)
|
||
|
||
# 汇总:月份、客户名称、项目名称、加氢次数、加气量(kg)、加氢总额(元)
|
||
gcols = ["月份", "客户名称", "项目名称"]
|
||
sub = (
|
||
merged.groupby(gcols, dropna=False)
|
||
.agg(
|
||
加氢次数=("订单编号", "count"),
|
||
**{"加气量(kg)": ("加氢量(kg)", "sum")},
|
||
**{"加氢总额(元)": ("行加氢总额(元)", "sum")},
|
||
)
|
||
.reset_index()
|
||
)
|
||
sub = sub.sort_values(gcols).reset_index(drop=True)
|
||
sub["客户名称"] = sub["客户名称"].fillna("").astype(str)
|
||
sub["项目名称"] = sub["项目名称"].fillna("").astype(str)
|
||
sub["月份"] = sub["月份"].fillna("").astype(str)
|
||
drop_from_detail = {"_合同键", "行加氢总额(元)", "月份"}
|
||
base = [c for c in merged.columns if c not in drop_from_detail and c != "客户名称"]
|
||
if "项目名称" in base:
|
||
i = base.index("项目名称") + 1
|
||
detail_out = base[:i] + ["客户名称"] + base[i:]
|
||
else:
|
||
detail_out = base + ["客户名称"]
|
||
|
||
detail_df = merged[detail_out].copy()
|
||
detail_df["客户名称"] = detail_df["客户名称"].fillna("").astype(str)
|
||
|
||
with pd.ExcelWriter(out_path, engine="openpyxl") as w:
|
||
detail_df.to_excel(w, sheet_name=orders_sheet, index=False)
|
||
sub.to_excel(w, sheet_name="月度客户项目汇总", index=False)
|
||
|
||
if merge_summary_cells:
|
||
_merge_month_customer_cells(out_path, sheet_name="月度客户项目汇总")
|
||
|
||
print(f"已写入: {out_path}")
|
||
print(f" 明细行数: {len(detail_df)},已匹配客户名称: {(detail_df['客户名称'] != '').sum()} 行")
|
||
print(f" 汇总行数: {len(sub)}")
|
||
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--orders", type=Path, required=True)
|
||
ap.add_argument("--contracts", type=Path, required=True)
|
||
ap.add_argument("-o", "--out", type=Path, required=True)
|
||
ap.add_argument(
|
||
"--no-merge-summary-cells",
|
||
action="store_true",
|
||
help="不对「月度客户项目汇总」做月份/客户名称列合并",
|
||
)
|
||
args = ap.parse_args()
|
||
run(
|
||
args.orders.resolve(),
|
||
args.contracts.resolve(),
|
||
args.out.resolve(),
|
||
merge_summary_cells=not args.no_merge_summary_cells,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|