Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
337 lines
11 KiB
Python
337 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""赵小峰 维修明细 → 羚牛公司车辆运维成本台账(一车一条、合计入修理费、项目合并备注、去重)."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
|
||
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
|
||
JAN = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/2026年1月维修明细(1).xlsx")
|
||
FEB = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/羚牛26.2月(1).xls")
|
||
|
||
PLATE_RE = re.compile(
|
||
r"(沪[A-Z][A-Z0-9]{5,6}|"
|
||
r"浙[A-Z][A-Z0-9]{4,6}挂?|"
|
||
r"粤[A-Z][A-Z0-9]{5,7}|"
|
||
r"粤[A-Z]{2,3}\d{4,5})",
|
||
re.I,
|
||
)
|
||
|
||
|
||
def normalize_plate(raw: object) -> Optional[str]:
|
||
if raw is None or (isinstance(raw, float) and pd.isna(raw)):
|
||
return None
|
||
s = str(raw).strip().replace(" ", "")
|
||
m = PLATE_RE.search(s)
|
||
if not m:
|
||
return None
|
||
return m.group(1).upper()
|
||
|
||
|
||
def parse_date_jan(s: object) -> Optional[Tuple[int, int, int]]:
|
||
if s is None or (isinstance(s, float) and pd.isna(s)):
|
||
return None
|
||
if isinstance(s, datetime):
|
||
return s.year, s.month, s.day
|
||
t = pd.to_datetime(s, errors="coerce")
|
||
if pd.notna(t):
|
||
tt = t.to_pydatetime()
|
||
return tt.year, tt.month, tt.day
|
||
st = str(s).strip().replace("。", ".")
|
||
m = re.match(r"^(\d{4})\D(\d{1,2})\D(\d{1,2})", st)
|
||
if m:
|
||
return int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
return None
|
||
|
||
|
||
def parse_date_feb(v: object) -> Optional[Tuple[int, int, int]]:
|
||
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||
return None
|
||
if isinstance(v, datetime):
|
||
return v.year, v.month, v.day
|
||
t = pd.to_datetime(v, errors="coerce")
|
||
if pd.isna(t):
|
||
return None
|
||
tt = t.to_pydatetime()
|
||
return tt.year, tt.month, tt.day
|
||
|
||
|
||
def add_line_amounts(rr: pd.Series, c_amt_a: int, c_amt_b: int) -> float:
|
||
s = 0.0
|
||
for c in (c_amt_a, c_amt_b):
|
||
if c >= len(rr):
|
||
continue
|
||
v = rr[c]
|
||
if pd.notna(v) and isinstance(v, (int, float)):
|
||
s += float(v)
|
||
return s
|
||
|
||
|
||
@dataclass
|
||
class Row:
|
||
y: int
|
||
m: int
|
||
d: int
|
||
plate: str
|
||
fee: float
|
||
remark: str
|
||
|
||
def key(self) -> Tuple:
|
||
fee = round(float(self.fee), 2)
|
||
rmk = re.sub(r"\s+", "", self.remark or "")
|
||
return (self.y, self.m, self.d, self.plate, fee, rmk)
|
||
|
||
|
||
def parse_january_xlsx(path: Path) -> List[Row]:
|
||
"""维修结算汇总明细表:按序号分组;修理费取「总计」列;备注合并项目列。"""
|
||
df = pd.read_excel(path, sheet_name=0, header=None)
|
||
if df.shape[1] < 14:
|
||
return []
|
||
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
|
||
if "维修结算汇总明细表" not in c00:
|
||
return []
|
||
# 金额(含税)在列 7、11(与赵小峰表结构一致)
|
||
c_amt_a, c_amt_b = 7, 11
|
||
c_total = 13
|
||
out: List[Row] = []
|
||
i = 3
|
||
while i < len(df):
|
||
r = df.iloc[i]
|
||
try:
|
||
int(float(r[0]))
|
||
has_seq = True
|
||
except (TypeError, ValueError):
|
||
has_seq = False
|
||
if not has_seq:
|
||
i += 1
|
||
continue
|
||
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip() == "合计":
|
||
break
|
||
ymd = parse_date_jan(r[1])
|
||
plate = normalize_plate(r[2])
|
||
if not ymd or not plate:
|
||
i += 1
|
||
continue
|
||
parts: List[str] = []
|
||
fee_from_total: Optional[float] = None
|
||
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip():
|
||
parts.append(str(r[3]).strip())
|
||
tv = r[c_total] if c_total < len(r) else None
|
||
if pd.notna(tv):
|
||
try:
|
||
fee_from_total = float(tv)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
sum_lines = add_line_amounts(r, c_amt_a, c_amt_b)
|
||
anchor = plate
|
||
i += 1
|
||
while i < len(df):
|
||
r2 = df.iloc[i]
|
||
if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip() == "合计":
|
||
i += 1
|
||
break
|
||
try:
|
||
int(float(r2[0]))
|
||
break
|
||
except (TypeError, ValueError):
|
||
pass
|
||
p2 = normalize_plate(r2[2]) if len(r2) > 2 else None
|
||
if p2 is not None and p2 != anchor:
|
||
break
|
||
if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip():
|
||
parts.append(str(r2[3]).strip())
|
||
if fee_from_total is None and c_total < len(r2) and pd.notna(r2[c_total]):
|
||
try:
|
||
fee_from_total = float(r2[c_total])
|
||
except (TypeError, ValueError):
|
||
pass
|
||
sum_lines += add_line_amounts(r2, c_amt_a, c_amt_b)
|
||
i += 1
|
||
fee_total = fee_from_total if fee_from_total is not None else sum_lines
|
||
remark = ";".join(parts) if parts else "维修"
|
||
y, mo, d = ymd
|
||
out.append(Row(y, mo, d, anchor, float(fee_total or 0), remark))
|
||
return consolidate_by_plate(out)
|
||
|
||
|
||
def parse_february_xls(path: Path) -> List[Row]:
|
||
df = pd.read_excel(path, sheet_name=0, header=None)
|
||
out: List[Row] = []
|
||
i = 5
|
||
while i < len(df):
|
||
c0 = df.iloc[i, 0]
|
||
if pd.isna(c0):
|
||
i += 1
|
||
continue
|
||
s0 = str(c0).strip()
|
||
if s0 in ("车牌", "Page 2 of 2") or "车主" in s0 or "公司" in s0 and "合计" in str(df.iloc[i, 5]):
|
||
i += 1
|
||
continue
|
||
plate = normalize_plate(s0) or (s0 if re.match(r"^[浙沪粤]", s0) else None)
|
||
if not plate:
|
||
i += 1
|
||
continue
|
||
if len(s0) > 15:
|
||
i += 1
|
||
continue
|
||
try:
|
||
fee = float(df.iloc[i, 7])
|
||
except (TypeError, ValueError, IndexError):
|
||
fee = 0.0
|
||
i += 1
|
||
projects: List[str] = []
|
||
dates: List[Tuple[int, int, int]] = []
|
||
while i < len(df):
|
||
r = df.iloc[i]
|
||
n0 = r[0]
|
||
if pd.notna(n0):
|
||
s = str(n0).strip()
|
||
if "Page" in s or "打印" in s:
|
||
i += 1
|
||
break
|
||
if normalize_plate(s) and s not in ("车牌",):
|
||
break
|
||
if pd.notna(r[3]):
|
||
proj = str(r[3]).strip()
|
||
if proj and proj not in ("项目名称", "NaN"):
|
||
projects.append(proj)
|
||
dt = parse_date_feb(r[1])
|
||
if dt:
|
||
dates.append(dt)
|
||
i += 1
|
||
if not dates:
|
||
ymd = (2026, 2, 1)
|
||
else:
|
||
ymd = min(dates)
|
||
y, mo, d = ymd
|
||
remark = ";".join(projects) if projects else "维修"
|
||
out.append(Row(y, mo, d, plate.upper(), fee, remark))
|
||
return consolidate_by_plate(out)
|
||
|
||
|
||
def consolidate_by_plate(rows: List[Row]) -> List[Row]:
|
||
"""同一文件内同一车牌合并为一条:金额相加、备注去重拼接、日期取最早。"""
|
||
m: Dict[str, Row] = {}
|
||
order: List[str] = []
|
||
for r in rows:
|
||
if r.plate not in m:
|
||
m[r.plate] = Row(r.y, r.m, r.d, r.plate, r.fee, r.remark)
|
||
order.append(r.plate)
|
||
else:
|
||
cur = m[r.plate]
|
||
d1 = datetime(cur.y, cur.m, cur.d)
|
||
d2 = datetime(r.y, r.m, r.d)
|
||
y, mo, d = (cur.y, cur.m, cur.d) if d1 <= d2 else (r.y, r.m, r.d)
|
||
fee = cur.fee + r.fee
|
||
rks = []
|
||
for part in (cur.remark, r.remark):
|
||
for x in part.split(";"):
|
||
x = x.strip()
|
||
if x and x not in rks:
|
||
rks.append(x)
|
||
remark = ";".join(rks)
|
||
m[r.plate] = Row(y, mo, d, r.plate, fee, remark)
|
||
return [m[p] for p in order]
|
||
|
||
|
||
def read_ledger() -> Tuple[int, List[Row]]:
|
||
wb = load_workbook(LEDGER)
|
||
ws = wb.active
|
||
header_row = None
|
||
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
|
||
if row and any(c == "年份" for c in row if c is not None):
|
||
header_row = i
|
||
break
|
||
if header_row is None:
|
||
raise SystemExit("未找到表头")
|
||
col_map = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
rows: List[Row] = []
|
||
for r in range(header_row + 1, ws.max_row + 1):
|
||
plate = ws.cell(row=r, column=col_map["车牌号"]).value
|
||
if plate is None or str(plate).strip() == "":
|
||
continue
|
||
try:
|
||
yy = int(ws.cell(row=r, column=col_map["年份"]).value)
|
||
mm = int(ws.cell(row=r, column=col_map["月份"]).value)
|
||
dd = int(ws.cell(row=r, column=col_map["日期"]).value)
|
||
fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0)
|
||
except (TypeError, ValueError):
|
||
continue
|
||
remark = ws.cell(row=r, column=col_map["备注"]).value
|
||
remark = "" if remark is None else str(remark)
|
||
rows.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark))
|
||
return header_row, rows
|
||
|
||
|
||
def write_ledger(header_row: int, rows: List[Row]) -> None:
|
||
wb = load_workbook(LEDGER)
|
||
ws = wb.active
|
||
col_map = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
if ws.max_row > header_row:
|
||
ws.delete_rows(header_row + 1, ws.max_row - header_row)
|
||
r = header_row + 1
|
||
for row in rows:
|
||
ws.cell(row=r, column=col_map["年份"], value=row.y)
|
||
ws.cell(row=r, column=col_map["月份"], value=row.m)
|
||
ws.cell(row=r, column=col_map["日期"], value=row.d)
|
||
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
|
||
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
|
||
for k in ("保养费", "年审费", "轮胎费", "其他"):
|
||
if k in col_map:
|
||
ws.cell(row=r, column=col_map[k], value=None)
|
||
ws.cell(row=r, column=col_map["小计"], value=row.fee)
|
||
if "费用是否为公司承担" in col_map:
|
||
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
|
||
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
|
||
r += 1
|
||
wb.save(LEDGER)
|
||
|
||
|
||
def main() -> None:
|
||
imported: List[Row] = []
|
||
if JAN.is_file():
|
||
j = parse_january_xlsx(JAN)
|
||
print("1月明细:", len(j), "条(按车合并后)")
|
||
imported.extend(j)
|
||
else:
|
||
print("missing", JAN)
|
||
if FEB.is_file():
|
||
f = parse_february_xls(FEB)
|
||
print("2月结算单:", len(f), "条(按车合并后)")
|
||
imported.extend(f)
|
||
else:
|
||
print("missing", FEB)
|
||
|
||
header_row, existing = read_ledger()
|
||
merged = existing + imported
|
||
before = len(merged)
|
||
seen = set()
|
||
deduped: List[Row] = []
|
||
for row in merged:
|
||
k = row.key()
|
||
if k in seen:
|
||
continue
|
||
seen.add(k)
|
||
deduped.append(row)
|
||
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
|
||
write_ledger(header_row, deduped)
|
||
print("原台账", len(existing), "合并前", before, "去重后", len(deduped), "剔除重复", before - len(deduped))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|