Optimized the root .gitignore to exclude virtual environments, node modules, and temp folders to ensure clean and lightweight version tracking. Co-authored-by: Cursor <cursoragent@cursor.com>
336 lines
11 KiB
Python
336 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""从行驶证影像 OCR 提取检验日期,与车辆表合并导出 Excel。
|
||
|
||
有照片的车辆:先对影像做 OCR;若未识别到「检验有效期」,再用表内「行驶证检验有效期」补全第三列。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import calendar
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import tempfile
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import date
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
import fitz # pymupdf
|
||
import pandas as pd
|
||
from PIL import Image, ImageEnhance
|
||
|
||
# 配置路径(可按需修改)
|
||
EXCEL_IN = "/Users/sylvawong/Downloads/车辆信息-1776270214730.xlsx"
|
||
PHOTO_DIR = "/Users/sylvawong/Desktop/证件信息梳理/行驶证"
|
||
EXCEL_OUT = "/Users/sylvawong/Desktop/CURSOR/ONE-OS/车辆行驶证_OCR核对.xlsx"
|
||
EXCEL_INSPECTION_COL = "行驶证检验有效期"
|
||
|
||
TESSERACT = "/opt/homebrew/bin/tesseract"
|
||
OCR_LANG = "chi_sim+eng"
|
||
MAX_IMAGE_SIDE = 2200
|
||
MAX_WORKERS = 3
|
||
MAX_PDF_PAGES = 3
|
||
PDF_ZOOM = 2.5
|
||
|
||
|
||
def extract_plate_from_filename(basename_no_ext: str) -> Optional[str]:
|
||
s = basename_no_ext
|
||
s = re.sub(r"\s*\(\d+\)\s*$", "", s)
|
||
if "行驶证" in s:
|
||
s = s.split("行驶证")[0]
|
||
s = s.rstrip("-_· ")
|
||
if not s:
|
||
return None
|
||
parts = s.split("-")
|
||
first = parts[0]
|
||
if (
|
||
len(parts) >= 2
|
||
and len(parts[1]) == 17
|
||
and re.match(r"^[A-HJ-NPR-Z0-9]{17}$", parts[1], re.I)
|
||
):
|
||
cand = first
|
||
else:
|
||
cand = first
|
||
m = re.match(r"^([\u4e00-\u9fa5][A-Z0-9\u4e00-\u9fa5·]{1,14})$", cand)
|
||
if not m:
|
||
return None
|
||
plate = m.group(1).rstrip("-_·")
|
||
if len(plate) < 6:
|
||
return None
|
||
return plate
|
||
|
||
|
||
def build_plate_files() -> Dict[str, List[str]]:
|
||
mapping: Dict[str, List[str]] = {}
|
||
for fn in os.listdir(PHOTO_DIR):
|
||
path = os.path.join(PHOTO_DIR, fn)
|
||
if not os.path.isfile(path):
|
||
continue
|
||
base, _ = os.path.splitext(fn)
|
||
plate = extract_plate_from_filename(base)
|
||
if not plate:
|
||
continue
|
||
mapping.setdefault(plate, []).append(path)
|
||
return mapping
|
||
|
||
|
||
def file_try_order(paths: List[str]) -> List[str]:
|
||
def score(p: str) -> Tuple[float, str]:
|
||
fn = os.path.basename(p).lower()
|
||
# 先年审页(通常含检验记录),再主页,PDF 略靠后(需渲染)
|
||
if "年审" in fn:
|
||
tier = 0.0
|
||
elif "行驶证" in fn or "行驶" in fn:
|
||
tier = 1.0
|
||
else:
|
||
tier = 1.5
|
||
if fn.endswith(".pdf"):
|
||
tier += 0.25
|
||
return (tier, fn)
|
||
|
||
return sorted(paths, key=score)
|
||
|
||
|
||
def load_image_for_ocr(path: str) -> Image.Image:
|
||
img = Image.open(path).convert("RGB")
|
||
w, h = img.size
|
||
if max(w, h) > MAX_IMAGE_SIDE:
|
||
s = MAX_IMAGE_SIDE / max(w, h)
|
||
img = img.resize((int(w * s), int(h * s)), Image.LANCZOS)
|
||
return img
|
||
|
||
|
||
def ocr_image(img: Image.Image) -> str:
|
||
gray = img.convert("L")
|
||
gray = ImageEnhance.Contrast(gray).enhance(1.35)
|
||
fd, tmp = tempfile.mkstemp(suffix=".png")
|
||
os.close(fd)
|
||
try:
|
||
gray.save(tmp, format="PNG")
|
||
cmd = [
|
||
TESSERACT,
|
||
tmp,
|
||
"stdout",
|
||
"-l",
|
||
OCR_LANG,
|
||
"--psm",
|
||
"6",
|
||
"-c",
|
||
"preserve_interword_spaces=1",
|
||
]
|
||
r = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=180,
|
||
)
|
||
if r.returncode != 0:
|
||
return ""
|
||
return r.stdout or ""
|
||
finally:
|
||
try:
|
||
os.unlink(tmp)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def ocr_file(path: str) -> str:
|
||
try:
|
||
img = load_image_for_ocr(path)
|
||
return ocr_image(img)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def month_end(y: int, m: int) -> date:
|
||
last = calendar.monthrange(y, m)[1]
|
||
return date(y, m, last)
|
||
|
||
|
||
def parse_inspection_date(text: str) -> Optional[date]:
|
||
if not text:
|
||
return None
|
||
t = text.replace("O", "0").replace("o", "0")
|
||
# 检验有效期至 2027年06月04日 / 2026-03-31
|
||
patterns = [
|
||
r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})年(\d{1,2})月(\d{1,2})日",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})\s*[-/.]\s*(\d{1,2})\s*[-/.]\s*(\d{1,2})",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})",
|
||
r"检验有效期\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日",
|
||
r"有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月(?!\s*\d{1,2}\s*日)",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})年(\d{1,2})月(?!\d{1,2}日)",
|
||
r"检验有效期至\s*[::]?\s*(\d{4})\s*年\s*(\d{1,2})\s*月\s*",
|
||
r"(\d{4})\s*年\s*(\d{1,2})\s*月\s*(\d{1,2})\s*日\s*[((]?\s*检验",
|
||
]
|
||
for pat in patterns:
|
||
m = re.search(pat, t)
|
||
if not m:
|
||
continue
|
||
y, mo = int(m.group(1)), int(m.group(2))
|
||
if mo < 1 or mo > 12 or y < 2000 or y > 2100:
|
||
continue
|
||
g3 = m.group(3) if m.lastindex and m.lastindex >= 3 else None
|
||
if g3 is not None and str(g3).strip() != "":
|
||
try:
|
||
d = int(g3)
|
||
except ValueError:
|
||
continue
|
||
if 1 <= d <= 31:
|
||
try:
|
||
return date(y, mo, d)
|
||
except ValueError:
|
||
continue
|
||
return month_end(y, mo)
|
||
return None
|
||
|
||
|
||
def ocr_inspection_from_path(path: str) -> Optional[date]:
|
||
ext = os.path.splitext(path)[1].lower()
|
||
if ext == ".pdf":
|
||
doc = fitz.open(path)
|
||
try:
|
||
n = min(len(doc), MAX_PDF_PAGES)
|
||
for i in range(n):
|
||
page = doc.load_page(i)
|
||
mat = fitz.Matrix(PDF_ZOOM, PDF_ZOOM)
|
||
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||
mode = "RGB" if pix.n < 4 else "RGBA"
|
||
img = Image.frombytes(mode, [pix.width, pix.height], pix.samples)
|
||
if mode == "RGBA":
|
||
img = img.convert("RGB")
|
||
w, h = img.size
|
||
if max(w, h) > MAX_IMAGE_SIDE:
|
||
s = MAX_IMAGE_SIDE / max(w, h)
|
||
img = img.resize((int(w * s), int(h * s)), Image.LANCZOS)
|
||
t = ocr_image(img)
|
||
d = parse_inspection_date(t)
|
||
if d:
|
||
return d
|
||
finally:
|
||
doc.close()
|
||
return None
|
||
t = ocr_file(path)
|
||
return parse_inspection_date(t)
|
||
|
||
|
||
def parse_excel_inspection_date(val) -> Optional[date]:
|
||
if val is None or (isinstance(val, float) and pd.isna(val)):
|
||
return None
|
||
ts = pd.to_datetime(val, errors="coerce")
|
||
if pd.isna(ts):
|
||
return None
|
||
return ts.date()
|
||
|
||
|
||
def next_inspection_for_plate(paths: List[str]) -> Optional[date]:
|
||
ordered = file_try_order(paths)
|
||
for p in ordered[:4]:
|
||
d = ocr_inspection_from_path(p)
|
||
if d:
|
||
return d
|
||
return None
|
||
|
||
|
||
def main() -> None:
|
||
if not os.path.isfile(TESSERACT):
|
||
raise SystemExit(f"未找到 tesseract: {TESSERACT}")
|
||
|
||
today = date.today()
|
||
plate_files = build_plate_files()
|
||
df = pd.read_excel(EXCEL_IN, sheet_name="车辆信息", header=0, engine="openpyxl")
|
||
plates = [str(x).strip() if pd.notna(x) else "" for x in df["车牌号"]]
|
||
excel_inspection = (
|
||
df[EXCEL_INSPECTION_COL]
|
||
if EXCEL_INSPECTION_COL in df.columns
|
||
else pd.Series([pd.NA] * len(df))
|
||
)
|
||
|
||
tasks: List[Tuple[int, str, List[str]]] = []
|
||
for idx, plate in enumerate(plates):
|
||
if plate and plate_files.get(plate):
|
||
tasks.append((idx, plate, plate_files[plate]))
|
||
|
||
results: Dict[int, Optional[date]] = {}
|
||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
|
||
fut_to_idx = {
|
||
ex.submit(next_inspection_for_plate, fl): idx for idx, _p, fl in tasks
|
||
}
|
||
for fut in as_completed(fut_to_idx):
|
||
idx = fut_to_idx[fut]
|
||
try:
|
||
results[idx] = fut.result()
|
||
except Exception:
|
||
results[idx] = None
|
||
|
||
out_rows = []
|
||
for idx, plate in enumerate(plates):
|
||
if not plate:
|
||
out_rows.append(
|
||
{
|
||
"车牌号": plate,
|
||
"行驶证照片是否存在": "否",
|
||
"行驶证下次检验日期": "",
|
||
"行驶证是否过期": "",
|
||
}
|
||
)
|
||
continue
|
||
files = plate_files.get(plate, [])
|
||
if not files:
|
||
out_rows.append(
|
||
{
|
||
"车牌号": plate,
|
||
"行驶证照片是否存在": "否",
|
||
"行驶证下次检验日期": "",
|
||
"行驶证是否过期": "",
|
||
}
|
||
)
|
||
continue
|
||
ocr_d = results.get(idx)
|
||
excel_d = parse_excel_inspection_date(excel_inspection.iloc[idx])
|
||
final_d = ocr_d or excel_d
|
||
if final_d:
|
||
date_str = final_d.strftime("%Y-%m-%d")
|
||
expired = "是" if final_d < today else "否"
|
||
else:
|
||
date_str = ""
|
||
expired = ""
|
||
out_rows.append(
|
||
{
|
||
"车牌号": plate,
|
||
"行驶证照片是否存在": "是",
|
||
"行驶证下次检验日期": date_str,
|
||
"行驶证是否过期": expired,
|
||
}
|
||
)
|
||
|
||
out_df = pd.DataFrame(out_rows)
|
||
os.makedirs(os.path.dirname(EXCEL_OUT), exist_ok=True)
|
||
with pd.ExcelWriter(EXCEL_OUT, engine="openpyxl") as w:
|
||
out_df.to_excel(w, index=False, sheet_name="行驶证OCR")
|
||
has_date = out_df["行驶证下次检验日期"] != ""
|
||
with_photo = out_df["行驶证照片是否存在"] == "是"
|
||
ocr_ok = sum(1 for idx, _, _ in tasks if results.get(idx))
|
||
excel_only = 0
|
||
for idx, plate in enumerate(plates):
|
||
if not plate or not plate_files.get(plate):
|
||
continue
|
||
if (
|
||
results.get(idx) is None
|
||
and parse_excel_inspection_date(excel_inspection.iloc[idx]) is not None
|
||
):
|
||
excel_only += 1
|
||
print(
|
||
f"完成: {EXCEL_OUT}\n"
|
||
f"今日日期: {today.isoformat()}\n"
|
||
f"有照片: {int(with_photo.sum())}\n"
|
||
f"OCR 识别到检验日期: {ocr_ok}\n"
|
||
f"OCR 未识别、第三列由表内「{EXCEL_INSPECTION_COL}」补全: {excel_only}\n"
|
||
f"有检验日期(第三列非空): {int(has_date.sum())}"
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|