1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
""" 批量创建文件夹工具(单层,统一保存到 ./result/)。 - 从 name.txt 读取名称(一行一个),多编码兼容,清洗非法字符。 - 在当前目录下的 result 文件夹中生成对应同名文件夹。 """
from __future__ import annotations import argparse import csv import os import re import sys import time import unicodedata from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from threading import Lock from typing import List, Optional, Tuple
DEFAULT_INPUT = "name.txt" DEFAULT_MAX_LEN = 200 DEFAULT_EXIST = "skip" DEFAULT_RENAME_PATTERN = "{name} ({n})" ENCODING_CANDIDATES = [ "utf-8-sig", "utf-8", "utf-16", "utf-16-le", "utf-16-be", "gb18030", "cp936", "big5", "shift_jis", "cp1252", "latin-1", ] WINDOWS_RESERVED_BASENAMES = { "CON", "PRN", "AUX", "NUL", *{f"COM{i}" for i in range(1, 10)}, *{f"LPT{i}" for i in range(1, 10)}, } ILLEGAL_CHARS_PATTERN = re.compile(r'[<>:"/\\|?*\x00-\x1F]') DOTS_ONLY_PATTERN = re.compile(r"^\.+$")
@dataclass class Item: line_no: int original: str cleaned: str
@dataclass class Result: item: Item action: str final_name: Optional[str] final_path: Optional[str] status: str error: Optional[str]
def guess_read_text(path: Path, forced_encoding: Optional[str] = None) -> Tuple[str, str]: if forced_encoding: return path.read_text(encoding=forced_encoding), forced_encoding data = path.read_bytes() last_err = None for enc in ENCODING_CANDIDATES: try: return data.decode(enc), enc except Exception as e: last_err = e continue raise RuntimeError(f"读取失败,未知编码;最后错误:{last_err}")
def is_windows_reserved(name: str) -> bool: base = name.split(".", 1)[0] return base.upper() in WINDOWS_RESERVED_BASENAMES
def sanitize_name(raw: str, max_len: int = DEFAULT_MAX_LEN) -> str: s = unicodedata.normalize("NFKC", raw).strip() if "/" in s or "\\" in s: s = s.replace("/", "_").replace("\\", "_") if ILLEGAL_CHARS_PATTERN.search(s): s = ILLEGAL_CHARS_PATTERN.sub("_", s) s = s.rstrip(" .") if DOTS_ONLY_PATTERN.match(s): return "" if is_windows_reserved(s): s = s + "_" if len(s) > max_len: s = s[:max_len] return s
def parse_lines(text: str) -> List[Tuple[int, str]]: res = [] for idx, line in enumerate(text.splitlines(), start=1): raw = line.rstrip("\r\n") if not raw.strip(): continue if raw.lstrip().startswith("#"): continue res.append((idx, raw)) return res
class App: def __init__(self, args: argparse.Namespace): self.args = args self.base = Path.cwd() / "result" self.base.mkdir(exist_ok=True) self.counter_ok = 0 self.counter_skip = 0 self.counter_err = 0 self.counter_renamed = 0 self.results: List[Result] = []
def log_line(self, result: Result): if self.args.quiet: return name = result.final_name or result.item.cleaned or result.item.original if result.status == "OK": if result.action == "created": print(f"[OK] {name}") elif result.action == "renamed": print(f"[OK] {name} (renamed)") elif result.action == "dryrun": print(f"[DRY] {name}") elif result.action == "skipped": print(f"[SKIP] {name} (exists)") else: reason = (result.error or "unknown").strip() print(f"[ERROR] {name} ({reason})")
def make_one(self, item: Item) -> Result: base = self.base name = item.cleaned target = base / name
if self.args.dry_run: return Result(item, "dryrun", name, str(target), "OK", None)
try: target.mkdir(parents=False, exist_ok=False) return Result(item, "created", name, str(target), "OK", None) except FileExistsError: if self.args.exist == "skip": return Result(item, "skipped", name, str(target), "OK", None) elif self.args.exist == "fail": return Result(item, "error", name, str(target), "ERROR", "already_exists") elif self.args.exist == "rename": base_name = name n = 2 while True: try_name = self.args.rename_pattern.format(name=base_name, n=n) try_name = sanitize_name(try_name, self.args.max_name_length) if not try_name: return Result(item, "error", None, None, "ERROR", "rename_failed") try_path = base / try_name try: try_path.mkdir(parents=False, exist_ok=False) return Result(item, "renamed", try_name, str(try_path), "OK", None) except FileExistsError: n += 1 continue else: return Result(item, "error", name, str(target), "ERROR", "unknown_exist_strategy") except Exception as e: return Result(item, "error", name, str(target), "ERROR", str(e))
def run(self) -> int: start = time.time() inp = Path(self.args.input) if not inp.exists(): print(f"[FATAL] 输入文件不存在: {inp}", file=sys.stderr) return 3 try: text, used_enc = guess_read_text(inp, self.args.encoding) except Exception as e: print(f"[FATAL] 无法读取输入:{e}", file=sys.stderr) return 3
for line_no, raw in parse_lines(text): cleaned = sanitize_name(raw, self.args.max_name_length) if not cleaned: r = Result(Item(line_no, raw, cleaned), "error", None, None, "ERROR", "invalid_or_empty") self.log_line(r) self.results.append(r) self.counter_err += 1 continue self.results.append(Item(line_no, raw, cleaned))
tasks = [] with ThreadPoolExecutor(max_workers=self.args.max_workers) as ex: future_map = {ex.submit(self.make_one, it): it for it in self.results if isinstance(it, Item)} for fut in as_completed(future_map): r = fut.result() self.log_line(r) if r.status == "OK": if r.action == "created": self.counter_ok += 1 elif r.action == "skipped": self.counter_skip += 1 elif r.action == "renamed": self.counter_renamed += 1 else: self.counter_err += 1
elapsed = time.time() - start total_ok = self.counter_ok + self.counter_renamed print(f"总结: 成功 {total_ok}, 跳过 {self.counter_skip}, 错误 {self.counter_err}, 用时 {elapsed:.2f}s") if self.counter_err > 0 and (total_ok > 0 or self.counter_skip > 0): return 2 elif self.counter_err > 0: return 3 else: return 0
def build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="从 name.txt 批量创建文件夹,保存到 ./result/") p.add_argument("--input", default=DEFAULT_INPUT, help="输入文件(默认 name.txt)") p.add_argument("--encoding", default=None, help="指定输入编码") p.add_argument("--exist", choices=["skip", "rename", "fail"], default=DEFAULT_EXIST, help="已存在处理策略(默认 skip)") p.add_argument("--rename-pattern", default=DEFAULT_RENAME_PATTERN, help="重命名模板,含 {name} 和 {n}") p.add_argument("--max-name-length", type=int, default=DEFAULT_MAX_LEN, help="文件夹名最大长度(默认200)") default_workers = min(8, (os.cpu_count() or 2) * 4) p.add_argument("--max-workers", type=int, default=default_workers, help=f"并发线程数(默认 {default_workers})") p.add_argument("--dry-run", action="store_true", help="仅预演,不创建目录") p.add_argument("--quiet", action="store_true", help="仅输出汇总") return p
def main(argv: Optional[List[str]] = None) -> int: args = build_argparser().parse_args(argv) app = App(args) code = app.run() return code
if __name__ == "__main__": sys.exit(main())
|