1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
| #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量创建文件夹工具(单层,统一保存到 ./result/)。
- 从 name.txt 读取名称(一行一个),多编码兼容,清洗非法字符。
- 在当前目录下的 result 文件夹中生成对应同名文件夹。
"""
from __future__ import annotations
import argparse
import csv
import os
import re
import sys
import time
import unicodedata
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from threading import Lock
from typing import List, Optional, Tuple
# ---------- 常量 ----------
DEFAULT_INPUT = "name.txt"
DEFAULT_MAX_LEN = 200
DEFAULT_EXIST = "skip" # skip | rename | fail
DEFAULT_RENAME_PATTERN = "{name} ({n})"
ENCODING_CANDIDATES = [
"utf-8-sig", "utf-8", "utf-16", "utf-16-le", "utf-16-be",
"gb18030", "cp936", "big5", "shift_jis", "cp1252", "latin-1",
]
WINDOWS_RESERVED_BASENAMES = {
"CON", "PRN", "AUX", "NUL",
*{f"COM{i}" for i in range(1, 10)},
*{f"LPT{i}" for i in range(1, 10)},
}
ILLEGAL_CHARS_PATTERN = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
DOTS_ONLY_PATTERN = re.compile(r"^\.+$")
# ---------- 数据结构 ----------
@dataclass
class Item:
line_no: int
original: str
cleaned: str
@dataclass
class Result:
item: Item
action: str # created | skipped | renamed | error | dryrun
final_name: Optional[str]
final_path: Optional[str]
status: str # OK | ERROR
error: Optional[str]
# ---------- 工具函数 ----------
def guess_read_text(path: Path, forced_encoding: Optional[str] = None) -> Tuple[str, str]:
if forced_encoding:
return path.read_text(encoding=forced_encoding), forced_encoding
data = path.read_bytes()
last_err = None
for enc in ENCODING_CANDIDATES:
try:
return data.decode(enc), enc
except Exception as e:
last_err = e
continue
raise RuntimeError(f"读取失败,未知编码;最后错误:{last_err}")
def is_windows_reserved(name: str) -> bool:
base = name.split(".", 1)[0]
return base.upper() in WINDOWS_RESERVED_BASENAMES
def sanitize_name(raw: str, max_len: int = DEFAULT_MAX_LEN) -> str:
s = unicodedata.normalize("NFKC", raw).strip()
if "/" in s or "\\" in s:
s = s.replace("/", "_").replace("\\", "_")
if ILLEGAL_CHARS_PATTERN.search(s):
s = ILLEGAL_CHARS_PATTERN.sub("_", s)
s = s.rstrip(" .")
if DOTS_ONLY_PATTERN.match(s):
return ""
if is_windows_reserved(s):
s = s + "_"
if len(s) > max_len:
s = s[:max_len]
return s
def parse_lines(text: str) -> List[Tuple[int, str]]:
res = []
for idx, line in enumerate(text.splitlines(), start=1):
raw = line.rstrip("\r\n")
if not raw.strip():
continue
if raw.lstrip().startswith("#"):
continue
res.append((idx, raw))
return res
# ---------- 主执行逻辑 ----------
class App:
def __init__(self, args: argparse.Namespace):
self.args = args
self.base = Path.cwd() / "result"
self.base.mkdir(exist_ok=True) # 自动创建 result 文件夹
self.counter_ok = 0
self.counter_skip = 0
self.counter_err = 0
self.counter_renamed = 0
self.results: List[Result] = []
def log_line(self, result: Result):
if self.args.quiet:
return
name = result.final_name or result.item.cleaned or result.item.original
if result.status == "OK":
if result.action == "created":
print(f"[OK] {name}")
elif result.action == "renamed":
print(f"[OK] {name} (renamed)")
elif result.action == "dryrun":
print(f"[DRY] {name}")
elif result.action == "skipped":
print(f"[SKIP] {name} (exists)")
else:
reason = (result.error or "unknown").strip()
print(f"[ERROR] {name} ({reason})")
def make_one(self, item: Item) -> Result:
base = self.base
name = item.cleaned
target = base / name
if self.args.dry_run:
return Result(item, "dryrun", name, str(target), "OK", None)
try:
target.mkdir(parents=False, exist_ok=False)
return Result(item, "created", name, str(target), "OK", None)
except FileExistsError:
if self.args.exist == "skip":
return Result(item, "skipped", name, str(target), "OK", None)
elif self.args.exist == "fail":
return Result(item, "error", name, str(target), "ERROR", "already_exists")
elif self.args.exist == "rename":
base_name = name
n = 2
while True:
try_name = self.args.rename_pattern.format(name=base_name, n=n)
try_name = sanitize_name(try_name, self.args.max_name_length)
if not try_name:
return Result(item, "error", None, None, "ERROR", "rename_failed")
try_path = base / try_name
try:
try_path.mkdir(parents=False, exist_ok=False)
return Result(item, "renamed", try_name, str(try_path), "OK", None)
except FileExistsError:
n += 1
continue
else:
return Result(item, "error", name, str(target), "ERROR", "unknown_exist_strategy")
except Exception as e:
return Result(item, "error", name, str(target), "ERROR", str(e))
def run(self) -> int:
start = time.time()
inp = Path(self.args.input)
if not inp.exists():
print(f"[FATAL] 输入文件不存在: {inp}", file=sys.stderr)
return 3
try:
text, used_enc = guess_read_text(inp, self.args.encoding)
except Exception as e:
print(f"[FATAL] 无法读取输入:{e}", file=sys.stderr)
return 3
for line_no, raw in parse_lines(text):
cleaned = sanitize_name(raw, self.args.max_name_length)
if not cleaned:
r = Result(Item(line_no, raw, cleaned), "error", None, None, "ERROR", "invalid_or_empty")
self.log_line(r)
self.results.append(r)
self.counter_err += 1
continue
self.results.append(Item(line_no, raw, cleaned))
# 并发执行
tasks = []
with ThreadPoolExecutor(max_workers=self.args.max_workers) as ex:
future_map = {ex.submit(self.make_one, it): it for it in self.results if isinstance(it, Item)}
for fut in as_completed(future_map):
r = fut.result()
self.log_line(r)
if r.status == "OK":
if r.action == "created":
self.counter_ok += 1
elif r.action == "skipped":
self.counter_skip += 1
elif r.action == "renamed":
self.counter_renamed += 1
else:
self.counter_err += 1
elapsed = time.time() - start
total_ok = self.counter_ok + self.counter_renamed
print(f"总结: 成功 {total_ok}, 跳过 {self.counter_skip}, 错误 {self.counter_err}, 用时 {elapsed:.2f}s")
if self.counter_err > 0 and (total_ok > 0 or self.counter_skip > 0):
return 2
elif self.counter_err > 0:
return 3
else:
return 0
# ---------- 参数 ----------
def build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="从 name.txt 批量创建文件夹,保存到 ./result/")
p.add_argument("--input", default=DEFAULT_INPUT, help="输入文件(默认 name.txt)")
p.add_argument("--encoding", default=None, help="指定输入编码")
p.add_argument("--exist", choices=["skip", "rename", "fail"], default=DEFAULT_EXIST, help="已存在处理策略(默认 skip)")
p.add_argument("--rename-pattern", default=DEFAULT_RENAME_PATTERN, help="重命名模板,含 {name} 和 {n}")
p.add_argument("--max-name-length", type=int, default=DEFAULT_MAX_LEN, help="文件夹名最大长度(默认200)")
default_workers = min(8, (os.cpu_count() or 2) * 4)
p.add_argument("--max-workers", type=int, default=default_workers, help=f"并发线程数(默认 {default_workers})")
p.add_argument("--dry-run", action="store_true", help="仅预演,不创建目录")
p.add_argument("--quiet", action="store_true", help="仅输出汇总")
return p
def main(argv: Optional[List[str]] = None) -> int:
args = build_argparser().parse_args(argv)
app = App(args)
code = app.run()
return code
if __name__ == "__main__":
sys.exit(main())
|