Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
463 lines
15 KiB
Python
463 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""Import 赵伟军 batch Excel files into 羚牛公司车辆运维成本台账.xlsx and dedupe."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Iterable, List, Optional, Tuple
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
|
||
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
|
||
|
||
SOURCES: List[Path] = [
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-1氢能源点检明细及照片(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/26-2氢能源点检明细及照片(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2服务站提供的明细表-羚牛(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-2故障车辆维修信息反馈单-羚牛.xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3车辆润滑保养结算申请单羚牛(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3打黄油明细(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3服务站提供的明细表-羚牛(1).xlsx"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/赵伟军/2026-3故障车辆维修信息反馈单-羚牛.xlsx"),
|
||
]
|
||
|
||
PLATE_RE = re.compile(
|
||
r"(沪[A-Z][A-Z0-9]{5,6}|"
|
||
r"浙[A-Z][A-Z0-9]{5,6}|"
|
||
r"粤[A-Z][A-Z0-9]{5,7})",
|
||
re.I,
|
||
)
|
||
|
||
|
||
def normalize_plate(raw: object) -> Optional[str]:
|
||
if raw is None or (isinstance(raw, float) and pd.isna(raw)):
|
||
return None
|
||
s = str(raw).strip().replace(" ", "").replace("'", "’").replace("`", "")
|
||
m = PLATE_RE.search(s)
|
||
if not m:
|
||
return None
|
||
return m.group(1).upper().replace(" ", "")
|
||
|
||
|
||
def excel_date_to_ymd(v: object) -> Optional[Tuple[int, int, int]]:
|
||
if v is None or (isinstance(v, float) and pd.isna(v)):
|
||
return None
|
||
if isinstance(v, datetime):
|
||
return v.year, v.month, v.day
|
||
if isinstance(v, pd.Timestamp):
|
||
t = v.to_pydatetime()
|
||
return t.year, t.month, t.day
|
||
try:
|
||
n = float(v)
|
||
except (TypeError, ValueError):
|
||
t = pd.to_datetime(v, errors="coerce")
|
||
if pd.isna(t):
|
||
return None
|
||
tt = t.to_pydatetime()
|
||
return tt.year, tt.month, tt.day
|
||
base = datetime(1899, 12, 30)
|
||
dt = base + timedelta(days=int(n))
|
||
return dt.year, dt.month, dt.day
|
||
|
||
|
||
@dataclass
|
||
class Row:
|
||
y: int
|
||
m: int
|
||
d: int
|
||
plate: str
|
||
fee: float
|
||
remark: str
|
||
|
||
def key(self) -> Tuple:
|
||
fee = round(float(self.fee), 2)
|
||
rmk = re.sub(r"\s+", "", self.remark or "")
|
||
return (self.y, self.m, self.d, self.plate, fee, rmk)
|
||
|
||
|
||
def parse_dianjian_sheet(path: Path, df: pd.DataFrame, tag: str) -> List[Row]:
|
||
"""点检结算汇总明细表:进修日期 + 车牌 + 维修项目/点检结果 + 合计(末列)."""
|
||
out: List[Row] = []
|
||
if df.shape[1] < 14:
|
||
return out
|
||
for i in range(3, len(df)):
|
||
r = df.iloc[i]
|
||
try:
|
||
int(float(r[0]))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
plate = normalize_plate(r[2])
|
||
if not plate:
|
||
continue
|
||
ymd = excel_date_to_ymd(r[1])
|
||
if not ymd:
|
||
continue
|
||
remark = r[4] if pd.notna(r[4]) else ""
|
||
remark = str(remark).strip() or "氢能源点检"
|
||
fee_v = r[13]
|
||
try:
|
||
fee = float(fee_v) if pd.notna(fee_v) else 0.0
|
||
except (TypeError, ValueError):
|
||
fee = 0.0
|
||
y, mo, d = ymd
|
||
out.append(Row(y, mo, d, plate, fee, remark))
|
||
return out
|
||
|
||
|
||
def parse_dianjian_workbook(path: Path) -> List[Row]:
|
||
df = pd.read_excel(path, sheet_name="点检明细", header=None)
|
||
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
|
||
if "点检结算汇总明细表" not in c00:
|
||
return []
|
||
return parse_dianjian_sheet(path, df, path.name)
|
||
|
||
|
||
def _amount_cols_for_huizong(df: pd.DataFrame) -> Tuple[int, int]:
|
||
"""服务站版:两笔「金额」在列 8、12;打黄油等窄表:在列 7、11。"""
|
||
hdr = df.iloc[1].tolist() if len(df) > 1 else []
|
||
flat = " ".join(str(x) for x in hdr if pd.notna(x))
|
||
if "车辆所属公司" in flat:
|
||
return 8, 12
|
||
return 7, 11
|
||
|
||
|
||
def parse_weixiu_huizong_sheet(df: pd.DataFrame, tag: str) -> List[Row]:
|
||
"""维修结算汇总明细表(服务站/打黄油):按序号分组,汇总两栏「金额(含税)」."""
|
||
out: List[Row] = []
|
||
if df.shape[1] < 13:
|
||
return out
|
||
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
|
||
if "维修结算汇总明细表" not in c00:
|
||
return []
|
||
c8, c12 = _amount_cols_for_huizong(df)
|
||
|
||
def add_amounts(rr) -> float:
|
||
s = 0.0
|
||
for c in (c8, c12):
|
||
if c >= len(rr):
|
||
continue
|
||
v = rr[c]
|
||
if pd.notna(v) and isinstance(v, (int, float)):
|
||
s += float(v)
|
||
return s
|
||
|
||
i = 3
|
||
while i < len(df):
|
||
r = df.iloc[i]
|
||
try:
|
||
int(float(r[0]))
|
||
has_seq = True
|
||
except (TypeError, ValueError):
|
||
has_seq = False
|
||
|
||
if not has_seq:
|
||
# 无序号行:若带独立车牌,则按单车一行入账(批量润滑等)
|
||
p = normalize_plate(r[2]) if len(r) > 2 else None
|
||
if p and len(r) > 4:
|
||
ymd = excel_date_to_ymd(r[1])
|
||
if ymd:
|
||
total = add_amounts(r)
|
||
remark = str(r[4]).strip() if pd.notna(r[4]) else "维修"
|
||
y, mo, d = ymd
|
||
out.append(Row(y, mo, d, p, total, remark))
|
||
i += 1
|
||
continue
|
||
|
||
ymd = excel_date_to_ymd(r[1])
|
||
plate = normalize_plate(r[2])
|
||
if not ymd or not plate:
|
||
i += 1
|
||
continue
|
||
parts: List[str] = []
|
||
total = 0.0
|
||
if c8 == 8:
|
||
if len(r) > 4 and pd.notna(r[4]) and str(r[4]).strip():
|
||
parts.append(str(r[4]).strip())
|
||
else:
|
||
if len(r) > 3 and pd.notna(r[3]) and str(r[3]).strip():
|
||
parts.append(str(r[3]).strip())
|
||
total += add_amounts(r)
|
||
anchor = plate
|
||
i += 1
|
||
while i < len(df):
|
||
r2 = df.iloc[i]
|
||
try:
|
||
int(float(r2[0]))
|
||
break
|
||
except (TypeError, ValueError):
|
||
pass
|
||
p2 = normalize_plate(r2[2]) if len(r2) > 2 else None
|
||
if p2 is not None and p2 != anchor:
|
||
break
|
||
if len(r2) > 4 and pd.notna(r2[4]) and str(r2[4]).strip():
|
||
parts.append(str(r2[4]).strip())
|
||
total += add_amounts(r2)
|
||
i += 1
|
||
remark = ";".join(parts) if parts else "维修"
|
||
y, mo, d = ymd
|
||
out.append(Row(y, mo, d, anchor, total, remark))
|
||
return out
|
||
|
||
|
||
def parse_weixiu_workbook(path: Path) -> List[Row]:
|
||
out: List[Row] = []
|
||
xl = pd.ExcelFile(path)
|
||
for sn in xl.sheet_names:
|
||
if sn.startswith("Sheet") and sn in ("Sheet2", "Sheet3"):
|
||
pass
|
||
try:
|
||
df = pd.read_excel(path, sheet_name=sn, header=None)
|
||
except Exception:
|
||
continue
|
||
out.extend(parse_weixiu_huizong_sheet(df, f"{path.name}:{sn}"))
|
||
return out
|
||
|
||
|
||
def parse_dahuangyou_only(path: Path) -> List[Row]:
|
||
df = pd.read_excel(path, sheet_name="Sheet1", header=None)
|
||
return parse_weixiu_huizong_sheet(df, path.name)
|
||
|
||
|
||
def parse_zhaowei_fault_df(df: pd.DataFrame) -> Optional[Row]:
|
||
if df.shape[0] < 10:
|
||
return None
|
||
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
|
||
if "故障车辆维修信息反馈单" not in c00:
|
||
return None
|
||
ymd = excel_date_to_ymd(df.iloc[1, 0])
|
||
if not ymd:
|
||
return None
|
||
plate = None
|
||
symptom = ""
|
||
for i in range(min(25, len(df))):
|
||
v = df.iloc[i, 0]
|
||
if pd.isna(v):
|
||
continue
|
||
s = str(v)
|
||
if "车牌号:" in s or "车牌号:" in s:
|
||
m = re.search(r"车牌号[::]\s*([^\s]+)", s)
|
||
if m:
|
||
plate = normalize_plate(m.group(1))
|
||
if "故障现象:" in s or "故障现象:" in s:
|
||
symptom = re.split(r"故障现象[::]", s, maxsplit=1)[-1].strip()
|
||
|
||
total: Optional[float] = None
|
||
for i in range(len(df)):
|
||
for j in range(min(6, df.shape[1])):
|
||
cell = df.iloc[i, j]
|
||
if pd.isna(cell):
|
||
continue
|
||
st = str(cell)
|
||
if "费用共计" in st:
|
||
m = re.search(r"费用共计[::]\s*([\d.]+)", st)
|
||
if m:
|
||
total = float(m.group(1))
|
||
break
|
||
if total is not None:
|
||
break
|
||
|
||
projects: List[str] = []
|
||
header_at = None
|
||
for i in range(len(df)):
|
||
c0 = df.iloc[i, 0]
|
||
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
|
||
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
|
||
header_at = i
|
||
break
|
||
if header_at is not None:
|
||
for j in range(header_at + 1, len(df)):
|
||
c0 = df.iloc[j, 0]
|
||
try:
|
||
int(float(c0))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
proj = df.iloc[j, 1] if df.shape[1] > 1 else None
|
||
if pd.notna(proj) and str(proj).strip():
|
||
projects.append(str(proj).strip())
|
||
|
||
if plate is None or total is None:
|
||
return None
|
||
remark = ";".join(projects) if projects else symptom
|
||
if not remark:
|
||
remark = "故障维修"
|
||
y, mo, d = ymd
|
||
return Row(y, mo, d, plate, float(total), remark)
|
||
|
||
|
||
def parse_fault_workbook(path: Path) -> List[Row]:
|
||
out: List[Row] = []
|
||
xl = pd.ExcelFile(path)
|
||
for sn in xl.sheet_names:
|
||
try:
|
||
df = pd.read_excel(path, sheet_name=sn, header=None)
|
||
except Exception:
|
||
continue
|
||
rec = parse_zhaowei_fault_df(df)
|
||
if rec:
|
||
out.append(rec)
|
||
return out
|
||
|
||
|
||
def parse_lubrication_front_axle(path: Path) -> List[Row]:
|
||
"""仅解析「前 轮 保养」:主表车牌 + 附件列中的车牌,70 元/台。"""
|
||
out: List[Row] = []
|
||
try:
|
||
df = pd.read_excel(path, sheet_name="前 轮 保养", header=None)
|
||
except Exception:
|
||
return out
|
||
c00 = str(df.iloc[0, 0]) if pd.notna(df.iloc[0, 0]) else ""
|
||
if "车辆维修保养结算申请单" not in c00:
|
||
return out
|
||
ymd = excel_date_to_ymd(df.iloc[1, 0])
|
||
if not ymd:
|
||
return out
|
||
fee = 70.0
|
||
remark = "润滑保养(打黄油)"
|
||
uniq: List[str] = []
|
||
seen = set()
|
||
for i in range(len(df)):
|
||
if df.shape[1] < 3:
|
||
break
|
||
v = df.iloc[i, 2]
|
||
if pd.isna(v):
|
||
continue
|
||
s = str(v)
|
||
if "车牌号:" in s:
|
||
m = re.search(r"车牌号[::]\s*([^\s(]+)", s)
|
||
if m:
|
||
s = m.group(1)
|
||
p = normalize_plate(s)
|
||
if p and p not in seen:
|
||
seen.add(p)
|
||
uniq.append(p)
|
||
y, mo, d = ymd
|
||
for p in uniq:
|
||
out.append(Row(y, mo, d, p, fee, remark))
|
||
return out
|
||
|
||
|
||
def classify_file(path: Path) -> List[Row]:
|
||
name = path.name
|
||
if "氢能源点检" in name and "点检明细" in pd.ExcelFile(path).sheet_names:
|
||
return parse_dianjian_workbook(path)
|
||
if "打黄油明细" in name:
|
||
return parse_dahuangyou_only(path)
|
||
if "润滑保养结算申请单" in name:
|
||
return parse_lubrication_front_axle(path)
|
||
if "服务站提供的明细表" in name:
|
||
return parse_weixiu_workbook(path)
|
||
if "故障车辆维修信息反馈单" in name:
|
||
return parse_fault_workbook(path)
|
||
return []
|
||
|
||
|
||
def _drop_corrupt_lubrication_rows(rows: List[Row]) -> List[Row]:
|
||
"""剔除异常合并行(多段「润滑保养」串在一条且金额畸高)。"""
|
||
out: List[Row] = []
|
||
for r in rows:
|
||
rm = r.remark or ""
|
||
if rm.count("润滑保养") > 2 and r.fee > 500:
|
||
continue
|
||
out.append(r)
|
||
return out
|
||
|
||
|
||
def read_ledger_rows() -> Tuple[int, List[Row]]:
|
||
wb = load_workbook(LEDGER)
|
||
ws = wb.active
|
||
header_row = None
|
||
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
|
||
if row and any(c == "年份" for c in row if c is not None):
|
||
header_row = i
|
||
break
|
||
if header_row is None:
|
||
raise SystemExit("ledger: 未找到表头")
|
||
col_map = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
existing: List[Row] = []
|
||
for r in range(header_row + 1, ws.max_row + 1):
|
||
plate = ws.cell(row=r, column=col_map["车牌号"]).value
|
||
if plate is None or str(plate).strip() == "":
|
||
continue
|
||
try:
|
||
yy = int(ws.cell(row=r, column=col_map["年份"]).value)
|
||
mm = int(ws.cell(row=r, column=col_map["月份"]).value)
|
||
dd = int(ws.cell(row=r, column=col_map["日期"]).value)
|
||
fee = float(ws.cell(row=r, column=col_map["修理费"]).value or 0)
|
||
except (TypeError, ValueError):
|
||
continue
|
||
remark = ws.cell(row=r, column=col_map["备注"]).value
|
||
remark = "" if remark is None else str(remark)
|
||
existing.append(Row(yy, mm, dd, str(plate).strip().upper(), fee, remark))
|
||
existing = _drop_corrupt_lubrication_rows(existing)
|
||
return header_row, existing
|
||
|
||
|
||
def write_ledger(header_row: int, rows: List[Row]) -> None:
|
||
wb = load_workbook(LEDGER)
|
||
ws = wb.active
|
||
col_map = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
if ws.max_row > header_row:
|
||
ws.delete_rows(header_row + 1, ws.max_row - header_row)
|
||
r = header_row + 1
|
||
for row in rows:
|
||
ws.cell(row=r, column=col_map["年份"], value=row.y)
|
||
ws.cell(row=r, column=col_map["月份"], value=row.m)
|
||
ws.cell(row=r, column=col_map["日期"], value=row.d)
|
||
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
|
||
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
|
||
for k in ("保养费", "年审费", "轮胎费", "其他"):
|
||
if k in col_map:
|
||
ws.cell(row=r, column=col_map[k], value=None)
|
||
ws.cell(row=r, column=col_map["小计"], value=row.fee)
|
||
if "费用是否为公司承担" in col_map:
|
||
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
|
||
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
|
||
r += 1
|
||
wb.save(LEDGER)
|
||
|
||
|
||
def main() -> None:
|
||
imported: List[Row] = []
|
||
for p in SOURCES:
|
||
if not p.is_file():
|
||
print("missing:", p)
|
||
continue
|
||
rows = classify_file(p)
|
||
print(f"{p.name}: +{len(rows)}")
|
||
imported.extend(rows)
|
||
|
||
header_row, existing = read_ledger_rows()
|
||
merged = existing + imported
|
||
before = len(merged)
|
||
seen = set()
|
||
deduped: List[Row] = []
|
||
for row in merged:
|
||
k = row.key()
|
||
if k in seen:
|
||
continue
|
||
seen.add(k)
|
||
deduped.append(row)
|
||
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
|
||
write_ledger(header_row, deduped)
|
||
print("---")
|
||
print("existing ledger:", len(existing))
|
||
print("imported:", len(imported))
|
||
print("merged before dedupe:", before)
|
||
print("after dedupe:", len(deduped))
|
||
print("removed duplicates:", before - len(deduped))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|