Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
587 lines
20 KiB
Python
587 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""One-off: merge repair Excel sources into 羚牛公司车辆运维成本台账.xlsx and dedupe."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Iterable, Iterator, List, Optional, Tuple
|
||
|
||
import pandas as pd
|
||
from openpyxl import load_workbook
|
||
|
||
LEDGER = Path("/Users/sylvawong/Desktop/羚牛公司车辆运维成本台账.xlsx")
|
||
|
||
ROOTS = [
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月常州维修明细"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年2月金华维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3-4月金华(中顺维修费)"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月常州维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年3月开封维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/2026年4月河南开封维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年1月维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年2月维修费"),
|
||
Path("/Users/sylvawong/Desktop/26年维修费/沈帅/上海昱巷2026年3月维修费"),
|
||
]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class Row:
|
||
y: int
|
||
m: int
|
||
d: int
|
||
plate: str
|
||
fee: float
|
||
remark: str
|
||
source: str = ""
|
||
|
||
def key(self) -> Tuple:
|
||
fee = round(float(self.fee), 2)
|
||
rmk = re.sub(r"\s+", "", self.remark or "")
|
||
return (self.y, self.m, self.d, self.plate, fee, rmk)
|
||
|
||
|
||
def iter_xlsx_files(roots: Iterable[Path]) -> Iterator[Path]:
|
||
for root in roots:
|
||
if not root.is_dir():
|
||
continue
|
||
for dirpath, _, filenames in os.walk(root):
|
||
for fn in filenames:
|
||
if fn.startswith("~$"):
|
||
continue
|
||
low = fn.lower()
|
||
if low.endswith(".xlsx") or low.endswith(".xls"):
|
||
yield Path(dirpath) / fn
|
||
|
||
|
||
def parse_cn_date_yy(s: object) -> Optional[Tuple[int, int, int]]:
|
||
if s is None or (isinstance(s, float) and pd.isna(s)):
|
||
return None
|
||
t = re.sub(r"\s+", "", str(s).strip())
|
||
m = re.match(r"^(\d{2})年(\d{1,2})月(\d{1,2})日", t)
|
||
if not m:
|
||
return None
|
||
y, mo, d = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
if y < 100:
|
||
y += 2000
|
||
return y, mo, d
|
||
|
||
|
||
def parse_cn_date_yyyy(s: object) -> Optional[Tuple[int, int, int]]:
|
||
if s is None or (isinstance(s, float) and pd.isna(s)):
|
||
return None
|
||
t = str(s).strip()
|
||
m = re.match(r"^(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日", t)
|
||
if not m:
|
||
return None
|
||
return int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
|
||
|
||
def parse_any_date_cell(s: object) -> Optional[Tuple[int, int, int]]:
|
||
if s is None or (isinstance(s, float) and pd.isna(s)):
|
||
return None
|
||
if isinstance(s, datetime):
|
||
return s.year, s.month, s.day
|
||
t = pd.to_datetime(s, errors="coerce")
|
||
if pd.notna(t):
|
||
tt = t.to_pydatetime()
|
||
return tt.year, tt.month, tt.day
|
||
x = parse_cn_date_yyyy(s)
|
||
if x:
|
||
return x
|
||
return parse_cn_date_yy(s)
|
||
|
||
|
||
def month_from_parent_dir(path: Path) -> Optional[Tuple[int, int]]:
|
||
"""Parse (year, month) from folder name like '2026年4月河南开封维修费'."""
|
||
m = re.search(r"(\d{4})年(\d{1,2})月", path.parent.name)
|
||
if m:
|
||
return int(m.group(1)), int(m.group(2))
|
||
return None
|
||
|
||
|
||
def parse_standard_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]:
|
||
if df.shape[0] < 3:
|
||
return None
|
||
ymd = parse_cn_date_yy(df.iloc[1, 0])
|
||
if ymd is None or ymd[0] is None:
|
||
ymd = parse_cn_date_yyyy(df.iloc[1, 0])
|
||
if ymd is None:
|
||
return None
|
||
y, mo, d = ymd
|
||
plate = None
|
||
info_fallback = ""
|
||
for i in range(min(30, df.shape[0])):
|
||
v = df.iloc[i, 0]
|
||
if pd.isna(v):
|
||
continue
|
||
s = str(v)
|
||
if "车牌号:" in s or "车牌号:" in s:
|
||
m = re.search(r"车牌号[::]\s*([^\s]+)", s)
|
||
if m:
|
||
plate = m.group(1).strip()
|
||
if "车辆维修信息:" in s or "车辆维修信息:" in s:
|
||
info_fallback = re.split(r"车辆维修信息[::]", s, maxsplit=1)[-1].strip()
|
||
|
||
header_idx = None
|
||
for i in range(df.shape[0]):
|
||
c0 = df.iloc[i, 0]
|
||
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
|
||
if pd.isna(c0):
|
||
continue
|
||
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
|
||
header_idx = i
|
||
break
|
||
projects: List[str] = []
|
||
total_fee: Optional[float] = None
|
||
if header_idx is not None:
|
||
for j in range(header_idx + 1, df.shape[0]):
|
||
c0 = df.iloc[j, 0]
|
||
s0 = "" if pd.isna(c0) else str(c0)
|
||
if "费用总计" in s0 or "费用共计" in s0:
|
||
m = re.search(r"(?:费用总计|费用共计)[::]\s*([\d.]+)", s0)
|
||
if m:
|
||
total_fee = float(m.group(1))
|
||
break
|
||
try:
|
||
float(c0)
|
||
int(float(c0))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
proj = df.iloc[j, 1] if df.shape[1] > 1 else None
|
||
if pd.notna(proj):
|
||
p = str(proj).strip()
|
||
if p:
|
||
projects.append(p)
|
||
|
||
remark = ";".join(projects) if projects else info_fallback
|
||
if plate is None or total_fee is None:
|
||
return None
|
||
return y, mo, d, plate, float(total_fee), remark
|
||
|
||
|
||
def parse_yuyu_feedback_df(df: pd.DataFrame) -> Optional[Tuple[int, int, int, str, float, str]]:
|
||
"""上海昱巷「故障车辆维修信息反馈单」."""
|
||
if df.shape[0] < 3:
|
||
return None
|
||
ymd = parse_cn_date_yyyy(df.iloc[1, 0])
|
||
if ymd is None:
|
||
ymd = parse_any_date_cell(df.iloc[1, 0])
|
||
if ymd is None or ymd[1] == 0 or ymd[2] == 0:
|
||
return None
|
||
y, mo, d = ymd
|
||
plate = None
|
||
symptom = ""
|
||
for i in range(min(25, df.shape[0])):
|
||
v = df.iloc[i, 0]
|
||
if pd.isna(v):
|
||
continue
|
||
s = str(v)
|
||
if "车牌号:" in s or "车牌号:" in s:
|
||
m = re.search(r"车牌号[::]\s*([^\s]+)", s)
|
||
if m:
|
||
p = m.group(1).strip()
|
||
if p:
|
||
plate = p
|
||
if "故障现象:" in s or "故障现象:" in s:
|
||
symptom = re.split(r"故障现象[::]", s, maxsplit=1)[-1].strip()
|
||
|
||
header_idx = None
|
||
for i in range(df.shape[0]):
|
||
c0 = df.iloc[i, 0]
|
||
c1 = df.iloc[i, 1] if df.shape[1] > 1 else None
|
||
if pd.isna(c0):
|
||
continue
|
||
if str(c0).strip() == "序号" and pd.notna(c1) and "维修项目" in str(c1):
|
||
header_idx = i
|
||
break
|
||
projects: List[str] = []
|
||
total_fee: Optional[float] = None
|
||
if header_idx is not None:
|
||
for j in range(header_idx + 1, df.shape[0]):
|
||
c0 = df.iloc[j, 0]
|
||
c1 = df.iloc[j, 1] if df.shape[1] > 1 else None
|
||
s0 = "" if pd.isna(c0) else str(c0)
|
||
s1 = "" if pd.isna(c1) else str(c1)
|
||
if "费用共计" in s0 or "费用共计" in s1:
|
||
cell = s1 if "费用共计" in s1 else s0
|
||
m = re.search(r"费用共计[::]\s*([\d.]+)", cell)
|
||
if m:
|
||
total_fee = float(m.group(1))
|
||
break
|
||
try:
|
||
float(c0)
|
||
int(float(c0))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if pd.notna(c1):
|
||
p = str(c1).strip()
|
||
if p:
|
||
projects.append(p)
|
||
|
||
remark = ";".join(projects) if projects else symptom
|
||
if plate is None or total_fee is None:
|
||
return None
|
||
return y, mo, d, plate, float(total_fee), remark
|
||
|
||
|
||
def parse_mingrui_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
|
||
out: List[Row] = []
|
||
if df.shape[0] < 3:
|
||
return out
|
||
hdr_row = 1 if str(df.iloc[1, 0]).strip() == "序号" else None
|
||
if hdr_row is None:
|
||
return out
|
||
sub = pd.read_excel(path, sheet_name=0, header=hdr_row)
|
||
cols = {str(c).strip(): c for c in sub.columns}
|
||
need = ["车牌号", "时间", "维修项目", "价格"]
|
||
if not all(k in cols for k in need):
|
||
return out
|
||
for _, r in sub.iterrows():
|
||
plate = r.get("车牌号")
|
||
if pd.isna(plate) or str(plate).strip() == "":
|
||
continue
|
||
proj = r.get("维修项目")
|
||
if pd.notna(proj) and "合计" in str(proj):
|
||
continue
|
||
ymd = parse_cn_date_yy(r.get("时间"))
|
||
if ymd is None:
|
||
continue
|
||
y, mo, d = ymd
|
||
fee = r.get("价格")
|
||
try:
|
||
fee_f = float(fee) if pd.notna(fee) else 0.0
|
||
except (TypeError, ValueError):
|
||
fee_f = 0.0
|
||
remark = "" if pd.isna(proj) else str(proj).strip()
|
||
out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name))
|
||
return out
|
||
|
||
|
||
def parse_jinhua_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
|
||
out: List[Row] = []
|
||
if df.shape[0] < 3:
|
||
return out
|
||
hdr = None
|
||
for i in range(min(5, df.shape[0])):
|
||
if str(df.iloc[i, 0]).strip() == "车牌号":
|
||
hdr = i
|
||
break
|
||
if hdr is None:
|
||
return out
|
||
sub = pd.read_excel(path, sheet_name=0, header=hdr)
|
||
time_col = "时间" if "时间" in sub.columns else None
|
||
proj_col = None
|
||
for c in sub.columns:
|
||
cs = str(c).strip()
|
||
if cs in ("项目", "维修项目"):
|
||
proj_col = c
|
||
break
|
||
price_col = None
|
||
for c in sub.columns:
|
||
if str(c).strip().startswith("价格"):
|
||
price_col = c
|
||
break
|
||
if time_col is None or proj_col is None or price_col is None:
|
||
return out
|
||
for _, r in sub.iterrows():
|
||
plate = r.get("车牌号")
|
||
if pd.isna(plate) or str(plate).strip() == "":
|
||
continue
|
||
ymd = parse_cn_date_yy(r.get(time_col))
|
||
if ymd is None:
|
||
continue
|
||
y, mo, d = ymd
|
||
proj = r.get(proj_col)
|
||
if pd.notna(proj) and "合计" in str(proj):
|
||
continue
|
||
try:
|
||
fee_f = float(r.get(price_col)) if pd.notna(r.get(price_col)) else 0.0
|
||
except (TypeError, ValueError):
|
||
fee_f = 0.0
|
||
remark = "" if pd.isna(proj) else str(proj).strip()
|
||
out.append(Row(y, mo, d, str(plate).strip(), fee_f, remark, path.name))
|
||
return out
|
||
|
||
|
||
def parse_kaifeng_chaidui(path: Path, df: pd.DataFrame) -> List[Row]:
|
||
out: List[Row] = []
|
||
mm = month_from_parent_dir(path)
|
||
if mm is None:
|
||
return out
|
||
y0, m0 = mm
|
||
hdr = None
|
||
for i in range(min(6, df.shape[0])):
|
||
c0 = str(df.iloc[i, 0]).strip() if pd.notna(df.iloc[i, 0]) else ""
|
||
if c0 == "序号":
|
||
hdr = i
|
||
break
|
||
if hdr is None:
|
||
return out
|
||
sub = pd.read_excel(path, sheet_name=0, header=hdr)
|
||
for _, r in sub.iterrows():
|
||
try:
|
||
idx = r.iloc[0]
|
||
if pd.isna(idx):
|
||
continue
|
||
float(idx)
|
||
except (TypeError, ValueError):
|
||
continue
|
||
plate = r.get("车牌") if "车牌" in sub.columns else r.get(sub.columns[1])
|
||
if plate is None or (isinstance(plate, float) and pd.isna(plate)):
|
||
continue
|
||
fee_col = None
|
||
for c in sub.columns:
|
||
if str(c).strip() == "价格":
|
||
fee_col = c
|
||
break
|
||
if fee_col is None:
|
||
continue
|
||
try:
|
||
fee_f = float(r.get(fee_col)) if pd.notna(r.get(fee_col)) else 0.0
|
||
except (TypeError, ValueError):
|
||
continue
|
||
if "合计" in str(plate):
|
||
continue
|
||
out.append(Row(y0, m0, 1, str(plate).strip(), fee_f, "开封拆堆", path.name))
|
||
return out
|
||
|
||
|
||
def find_col(df: pd.DataFrame, names: Tuple[str, ...]) -> Optional[str]:
|
||
for c in df.columns:
|
||
cs = str(c).strip()
|
||
if cs in names:
|
||
return c # type: ignore[return-value]
|
||
return None
|
||
|
||
|
||
def parse_system_table_df(df: pd.DataFrame, source: str) -> List[Row]:
|
||
out: List[Row] = []
|
||
if df.shape[0] < 1:
|
||
return out
|
||
plate_c = find_col(df, ("车牌号", "车牌号码"))
|
||
if plate_c is None:
|
||
return out
|
||
date_c = find_col(df, ("故障上报时间",))
|
||
sol_c = find_col(df, ("解决方案",))
|
||
part_c = find_col(df, ("配件价格", "配件", "配件费用", "配件费"))
|
||
labor_c = find_col(df, ("工时", "人工", "工时费"))
|
||
|
||
for _, r in df.iterrows():
|
||
plate = r.get(plate_c)
|
||
if plate is None or (isinstance(plate, float) and pd.isna(plate)):
|
||
continue
|
||
plate_s = str(plate).strip()
|
||
if plate_s == "" or plate_s == "NaN":
|
||
continue
|
||
sol = r.get(sol_c) if sol_c else None
|
||
if sol is not None and "合计" in str(sol):
|
||
continue
|
||
|
||
ymd = parse_any_date_cell(r.get(date_c)) if date_c else None
|
||
if ymd is None:
|
||
continue
|
||
|
||
fee = 0.0
|
||
if part_c:
|
||
fee += float(r.get(part_c) or 0) if pd.notna(r.get(part_c)) else 0.0
|
||
if labor_c:
|
||
v = r.get(labor_c)
|
||
if pd.notna(v):
|
||
try:
|
||
fee += float(v)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
# skip summary rows where part column shows 合计
|
||
if part_c:
|
||
pv = r.get(part_c)
|
||
if pd.notna(pv) and "合计" in str(pv):
|
||
continue
|
||
|
||
remark = "" if sol is None or pd.isna(sol) else str(sol).strip()
|
||
y, mo, d = ymd
|
||
out.append(Row(y, mo, d, plate_s, float(fee), remark, source))
|
||
return out
|
||
|
||
|
||
def parse_system_workbook(path: Path) -> List[Row]:
|
||
out: List[Row] = []
|
||
xl = pd.ExcelFile(path)
|
||
for sn in xl.sheet_names:
|
||
try:
|
||
df = pd.read_excel(path, sheet_name=sn, header=0)
|
||
except Exception:
|
||
continue
|
||
if df.shape[0] == 0:
|
||
continue
|
||
c0 = str(df.columns[0]).strip() if df.columns[0] is not None else ""
|
||
if c0 not in ("车牌号", "车辆编号"):
|
||
continue
|
||
out.extend(parse_system_table_df(df, f"{path.name}:{sn}"))
|
||
return out
|
||
|
||
|
||
def classify_and_parse(path: Path) -> List[Row]:
|
||
rows: List[Row] = []
|
||
try:
|
||
df0 = pd.read_excel(path, sheet_name=0, header=None)
|
||
except Exception:
|
||
return rows
|
||
if df0.shape[0] == 0:
|
||
return rows
|
||
c00 = str(df0.iloc[0, 0]) if pd.notna(df0.iloc[0, 0]) else ""
|
||
|
||
# 河南开封拆堆(无日期列)
|
||
if "开封拆堆" in c00 or ("拆堆明细" in c00 and "开封" in c00):
|
||
rows.extend(parse_kaifeng_chaidui(path, df0))
|
||
return rows
|
||
|
||
# 铭瑞类拆堆
|
||
if "铭瑞" in c00 or (df0.shape[0] > 2 and str(df0.iloc[1, 0]).strip() == "序号" and "维修项目" in str(df0.iloc[1, 4])):
|
||
mr = parse_mingrui_chaidui(path, df0)
|
||
if mr:
|
||
return mr
|
||
|
||
# 金华拆堆
|
||
if "金华维修明细" in c00:
|
||
jh = parse_jinhua_chaidui(path, df0)
|
||
if jh:
|
||
return jh
|
||
|
||
# 昱巷:故障车辆维修信息反馈单(多 sheet)
|
||
if "故障车辆维修信息反馈单" in c00:
|
||
xl = pd.ExcelFile(path)
|
||
for sn in xl.sheet_names:
|
||
df = pd.read_excel(path, sheet_name=sn, header=None)
|
||
rec = parse_yuyu_feedback_df(df)
|
||
if rec:
|
||
y, mo, d, plate, fee, remark = rec
|
||
rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}"))
|
||
return rows
|
||
|
||
# 常州/开封 车辆维修信息反馈单
|
||
if "车辆维修信息反馈单" in c00:
|
||
xl = pd.ExcelFile(path)
|
||
for sn in xl.sheet_names:
|
||
df = pd.read_excel(path, sheet_name=sn, header=None)
|
||
rec = parse_standard_feedback_df(df)
|
||
if rec:
|
||
y, mo, d, plate, fee, remark = rec
|
||
rows.append(Row(y, mo, d, plate, fee, remark, f"{path.name}:{sn}"))
|
||
return rows
|
||
|
||
# 系统故障 / 系统维修 / 昱巷系统明细
|
||
c0h = str(df0.iloc[0, 0]).strip() if pd.notna(df0.iloc[0, 0]) else ""
|
||
if c0h == "车牌号" or c0h == "车辆编号":
|
||
rows.extend(parse_system_workbook(path))
|
||
return rows
|
||
|
||
# 兜底:按系统表再试(部分文件首行是合并格)
|
||
alt = parse_system_workbook(path)
|
||
if alt:
|
||
return alt
|
||
|
||
return rows
|
||
|
||
|
||
def read_existing_ledger(path: Path) -> Tuple[int, List[Row]]:
|
||
wb = load_workbook(path)
|
||
ws = wb.active
|
||
header_row = None
|
||
for i, row in enumerate(ws.iter_rows(min_row=1, max_row=40, values_only=True), start=1):
|
||
if row and any(c == "年份" for c in row if c is not None):
|
||
header_row = i
|
||
break
|
||
if header_row is None:
|
||
raise SystemExit("ledger: 年份 header not found")
|
||
col_map: dict = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
out: List[Row] = []
|
||
for r in range(header_row + 1, ws.max_row + 1):
|
||
plate = ws.cell(row=r, column=col_map["车牌号"]).value
|
||
if plate is None or str(plate).strip() == "":
|
||
continue
|
||
y = ws.cell(row=r, column=col_map["年份"]).value
|
||
mo = ws.cell(row=r, column=col_map["月份"]).value
|
||
d = ws.cell(row=r, column=col_map["日期"]).value
|
||
fee = ws.cell(row=r, column=col_map["修理费"]).value
|
||
remark = ws.cell(row=r, column=col_map["备注"]).value
|
||
try:
|
||
yy, mm, dd = int(y), int(mo), int(d)
|
||
ff = float(fee) if fee is not None else 0.0
|
||
except (TypeError, ValueError):
|
||
continue
|
||
rmk = "" if remark is None else str(remark)
|
||
out.append(Row(yy, mm, dd, str(plate).strip(), ff, rmk, "ledger:existing"))
|
||
return header_row, out
|
||
|
||
|
||
def write_ledger(path: Path, header_row: int, rows: List[Row]) -> None:
|
||
wb = load_workbook(path)
|
||
ws = wb.active
|
||
col_map: dict = {}
|
||
for cell in ws[header_row]:
|
||
if cell.value:
|
||
col_map[str(cell.value).strip()] = cell.column
|
||
|
||
if ws.max_row > header_row:
|
||
ws.delete_rows(header_row + 1, ws.max_row - header_row)
|
||
|
||
r = header_row + 1
|
||
for row in rows:
|
||
ws.cell(row=r, column=col_map["年份"], value=row.y)
|
||
ws.cell(row=r, column=col_map["月份"], value=row.m)
|
||
ws.cell(row=r, column=col_map["日期"], value=row.d)
|
||
ws.cell(row=r, column=col_map["车牌号"], value=row.plate)
|
||
ws.cell(row=r, column=col_map["修理费"], value=row.fee)
|
||
for k in ("保养费", "年审费", "轮胎费", "其他"):
|
||
if k in col_map:
|
||
ws.cell(row=r, column=col_map[k], value=None)
|
||
ws.cell(row=r, column=col_map["小计"], value=row.fee)
|
||
if "费用是否为公司承担" in col_map:
|
||
ws.cell(row=r, column=col_map["费用是否为公司承担"], value=None)
|
||
ws.cell(row=r, column=col_map["备注"], value=row.remark or None)
|
||
r += 1
|
||
wb.save(path)
|
||
|
||
|
||
def main() -> None:
|
||
files = sorted({p.resolve() for p in iter_xlsx_files(ROOTS)})
|
||
imported: List[Row] = []
|
||
for p in files:
|
||
imported.extend(classify_and_parse(p))
|
||
|
||
header_row, existing = read_existing_ledger(LEDGER)
|
||
merged = existing + imported
|
||
before = len(merged)
|
||
|
||
seen = set()
|
||
deduped: List[Row] = []
|
||
for row in merged:
|
||
k = row.key()
|
||
if k in seen:
|
||
continue
|
||
seen.add(k)
|
||
deduped.append(row)
|
||
|
||
deduped.sort(key=lambda x: (x.y, x.m, x.d, x.plate, x.remark))
|
||
|
||
write_ledger(LEDGER, header_row, deduped)
|
||
print("files scanned:", len(files))
|
||
print("rows imported from files:", len(imported))
|
||
print("existing ledger rows:", len(existing))
|
||
print("merged before dedupe:", before)
|
||
print("after dedupe:", len(deduped))
|
||
print("removed duplicates:", before - len(deduped))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|