#!/usr/bin/env python3 # -*- coding: utf-8 -*- """赵小峰 维修明细 → 羚牛公司车辆运维成本台账(一车一条、合计入修理费、项目合并备注、去重).""" from __future__ import annotations import re from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple import pandas as pd from openpyxl import load_workbook LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx") JAN = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/2026年1月维修明细(1).xlsx") FEB = Path("/Users/sylvawong/Desktop/26年维修费/赵小峰/羚牛26.2月(1).xls") PLATE_RE = re.compile( r"(沪[A-Z][A-Z0-9]{5,6}|" r"浙[A-Z][A-Z0-9]{4,6}挂?|" r"粤[A-Z][A-Z0-9]{5,7}|" r"粤[A-Z]{2,3}\d{4,5})", re.I, ) def normalize_plate(raw: object) -> Optional[str]: if raw is None or (isinstance(raw, float) and pd.isna(raw)): return None s = str(raw).strip().replace(" ", "") m = PLATE_RE.search(s) if not m: return None return m.group(1).upper() def parse_date_jan(s: object) -> Optional[Tuple[int, int, int]]: if s is None or (isinstance(s, float) and pd.isna(s)): return None if isinstance(s, datetime): return s.year, s.month, s.day t = pd.to_datetime(s, errors="coerce") if pd.notna(t): tt = t.to_pydatetime() return tt.year, tt.month, tt.day st = str(s).strip().replace("。", ".") m = re.match(r"^(\d{4})\D(\d{1,2})\D(\d{1,2})", st) if m: return int(m.group(1)), int(m.group(2)), int(m.group(3)) return None def parse_date_feb(v: object) -> Optional[Tuple[int, int, int]]: if v is None or (isinstance(v, float) and pd.isna(v)): return None if isinstance(v, datetime): return v.year, v.month, v.day t = pd.to_datetime(v, errors="coerce") if pd.isna(t): return None tt = t.to_pydatetime() return tt.year, tt.month, tt.day def add_line_amounts(rr: pd.Series, c_amt_a: int, c_amt_b: int) -> float: s = 0.0 for c in (c_amt_a, c_amt_b): if c >= len(rr): continue v = rr[c] if pd.notna(v) and isinstance(v, (int, float)): s += float(v) return s @dataclass class Row: y: int m: int d: int plate: str fee: float remark: str def key(self) -> Tuple: fee = round(float(self.fee), 2) rmk = re.sub(r"\s+", "", self.remark or "") return (self.y, self.m, self.d, self.plate, fee, rmk) def parse_january_xlsx(path: Path) -> List[Row]: """维修结算汇总明细表:按序号分组;修理费取「总计」列;备注合并项目列。""" df = pd.read_excel(path, sheet_name=0, header=None) if df.shape[1] < 14: return [] c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else "" if "维修结算汇总明细表" not in c00: return [] # 金额(含税)在列 7、11(与赵小峰表结构一致) c_amt_a, c_amt_b = 7, 11 c_total = 13 out: List[Row] = [] i = 3 while i < len(df): r = df.iloc[i] try: int(float(r[0])) has_seq = True except (TypeError, ValueError): has_seq = False if not has_seq: i += 1 continue if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip() == "合计": break ymd = parse_date_jan(r[1]) plate = normalize_plate(r[2]) if not ymd or not plate: i += 1 continue parts: List[str] = [] fee_from_total: Optional[float] = None if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip(): parts.append(str(r[3]).strip()) tv = r[c_total] if c_total < len(r) else None if pd.notna(tv): try: fee_from_total = float(tv) except (TypeError, ValueError): pass sum_lines = add_line_amounts(r, c_amt_a, c_amt_b) anchor = plate i += 1 while i < len(df): r2 = df.iloc[i] if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip() == "合计": i += 1 break try: int(float(r2[0])) break except (TypeError, ValueError): pass p2 = normalize_plate(r2[2]) if len(r2) > 2 else None if p2 is not None and p2 != anchor: break if len(r2) > 3 and pd.notna(r2[3]) and str(r2[3]).strip(): parts.append(str(r2[3]).strip()) if fee_from_total is None and c_total < len(r2) and pd.notna(r2[c_total]): try: fee_from_total = float(r2[c_total]) except (TypeError, ValueError): pass sum_lines += add_line_amounts(r2, c_amt_a, c_amt_b) i += 1 fee_total = fee_from_total if fee_from_total is not None else sum_lines remark = ";".join(parts) if parts else "维修" y, mo, d = ymd out.append(Row(y, mo, d, anchor, float(fee_total or 0), remark)) return consolidate_by_plate(out) def parse_february_xls(path: Path) -> List[Row]: df = pd.read_excel(path, sheet_name=0, header=None) out: List[Row] = [] i = 5 while i < len(df): c0 = df.iloc[i, 0] if pd.isna(c0): i += 1 continue s0 = str(c0).strip() if s0 in ("车牌", "Page 2 of 2") or "车主" in s0 or "公司" in s0 and "合计" in str(df.iloc[i, 5]): i += 1 continue plate = normalize_plate(s0) or (s0 if re.match(r"^[浙沪粤]", s0) else None) if not plate: i += 1 continue if len(s0) > 15: i += 1 continue try: fee = float(df.iloc[i, 7]) except (TypeError, ValueError, IndexError): fee = 0.0 i += 1 projects: List[str] = [] dates: List[Tuple[int, int, int]] = [] while i < len(df): r = df.iloc[i] n0 = r[0] if pd.notna(n0): s = str(n0).strip() if "Page" in s or "打印" in s: i += 1 break if normalize_plate(s) and s not in ("车牌",): break if pd.notna(r[3]): proj = str(r[3]).strip() if proj and proj not in ("项目名称", "NaN"): projects.append(proj) dt = parse_date_feb(r[1]) if dt: dates.append(dt) i += 1 if not dates: ymd = (2026, 2, 1) else: ymd = min(dates) y, mo, d = ymd remark = ";".join(projects) if projects else "维修" out.append(Row(y, mo, d, plate.upper(), fee, remark)) return consolidate_by_plate(out) def consolidate_by_plate(rows: List[Row]) -> List[Row]: """同一文件内同一车牌合并为一条:金额相加、备注去重拼接、日期取最早。""" m: Dict[str, Row] = {} order: List[str] = [] for r in rows: if r.plate not in m: m[r.plate] = Row(r.y, r.m, r.d, r.plate, r.fee, r.remark) order.append(r.plate) else: cur = m[r.plate] d1 = datetime(cur.y, cur.m, cur.d) d2 = datetime(r.y, r.m, r.d) y, mo, d = (cur.y, cur.m, cur.d) if d1 <= d2 else (r.y, r.m, r.d) fee = cur.fee + r.fee rks = [] for part in (cur.remark, r.remark): for x in part.split(";"): x = x.strip() if x and x not in rks: rks.append(x) remark = ";".join(rks) m[r.plate] = Row(y, mo, d, r.plate, fee, remark) return [m[p] for p in order] def read_ledger() -> Tuple[int, List[Row]]: wb = load_workbook(LEDGER) ws = wb.active header_row = None for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1): if row and any(c == "年份" for c in row if c is not None): header_row = i break if header_row is None: raise SystemExit("未找到表头") col_map = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column rows: List[Row] = [] for r in range(header_row + 1, ws.max_row + 1): plate = ws.cell(row=r, column=col_map["车牌号"]).value if plate is None or str(plate).strip() == "": continue try: yy = int(ws.cell(row=r, column=col_map["年份"]).value) mm = int(ws.cell(row=r, column=col_map["月份"]).value) dd = int(ws.cell(row=r, column=col_map["日期"]).value) fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0) except (TypeError, ValueError): continue remark = ws.cell(row=r, column=col_map["备注"]).value remark = "" if remark is None else str(remark) rows.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark)) return header_row, rows def write_ledger(header_row: int, rows: List[Row]) -> None: wb = load_workbook(LEDGER) ws = wb.active col_map = {} for cell in ws[header_row]: if cell.value: col_map[str(cell.value).strip()] = cell.column if ws.max_row > header_row: ws.delete_rows(header_row + 1, ws.max_row - header_row) r = header_row + 1 for row in rows: ws.cell(row=r, column=col_map["年份"], value=row.y) ws.cell(row=r, column=col_map["月份"], value=row.m) ws.cell(row=r, column=col_map["日期"], value=row.d) ws.cell(row=r, column=col_map["车牌号"], value=row.plate) ws.cell(row=r, column=col_map["修理费"], value=row.fee) for k in ("保养费", "年审费", "轮胎费", "其他"): if k in col_map: ws.cell(row=r, column=col_map[k], value=None) ws.cell(row=r, column=col_map["小计"], value=row.fee) if "费用是否为公司承担" in col_map: ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None) ws.cell(row=r, column=col_map["备注"], value=row.remark or None) r += 1 wb.save(LEDGER) def main() -> None: imported: List[Row] = [] if JAN.is_file(): j = parse_january_xlsx(JAN) print("1月明细:", len(j), "条(按车合并后)") imported.extend(j) else: print("missing", JAN) if FEB.is_file(): f = parse_february_xls(FEB) print("2月结算单:", len(f), "条(按车合并后)") imported.extend(f) else: print("missing", FEB) header_row, existing = read_ledger() merged = existing + imported before = len(merged) seen = set() deduped: List[Row] = [] for row in merged: k = row.key() if k in seen: continue seen.add(k) deduped.append(row) deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark)) write_ledger(header_row, deduped) print("原台账", len(existing), "合并前", before, "去重后", len(deduped), "剔除重复", before - len(deduped)) if __name__ == "__main__": main()