自动抓取 HMDB 外源性代谢物食物来源工具

原文地址:https://itxiaozhang.com/auto-hmdb-exogenous-metabolite-food-source 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 1. 需求分析 代谢物的来源信息对于科研和营养分析非常重要。现有数据库(HMDB)中,外源性代谢物来源信息分散在网页上,手动整理效率低、容易出错。因此,需要一个工具能批量抓取代谢物的外源性来源信息,生成结构化、可分析的 CSV 数据。 2. 工具功能概述 抓取范围:只关注 HMDB 中 Disposition → Exogenous,即代谢物外源性来源。 输出数据:CSV 文件,每条记录包含 HMDB ID、食物名称、FooDB 编号;找不到或出错标 None。 自动化与鲁棒性: 多线程并发抓取,提高效率 随机 User-Agent、防封锁 自动重试和错误处理 每处理若干条自动保存,保证数据安全 用途:支持食物-代谢物关联分析、营养研究及数据库构建。 3. 程序结构 核心程序通过 Python 实现,主要功能包括: 加载 HMDB ID 列表 请求对应网页 提取代谢物外源性食物来源信息 生成 CSV 文件,每条记录包括 HMDB ID、食物名称及对应编号 错误处理和日志记录,确保每个 ID 都有处理结果 示意代码如下: 1 2 3 4 5 6 7 8 9 def get_metabolite_data(hmdb_id): """ 根据 HMDB ID 获取外源性食物来源信息 输出字典包含: - 'HMDB ID' - 'Source(Exogenous)' (食物名称和编号,找不到时为 'None') """ data = {'HMDB ID': hmdb_id, 'Source(Exogenous)': '...'} return data 视频版本 哔哩哔哩 YouTube

2025年12月27日 · 1 分钟 · IT小章

HMDB代谢物批量抓取与结构化导出

原文地址:https://itxiaozhang.com/hmdb-metabolite-batch-extraction-csv-export/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 需求介绍 自动化获取多个 HMDB ID 的完整字段,减少手工搜索与遗漏。 稳定可靠:处理限速与网络波动,不中断、可续跑。 输出可用:统一字段、原始顺序、便于下游分析与复现。 程序如何运行 准备 在 id.txt 放入待处理 ID(如 HMDB0000123),一行一个。 安装依赖: 1 pip install requests lxml psutil pandas 执行 1 python HMDB_Metabolite_Extractor.py 结果 数据:代谢物数据_最终.csv(按原始 ID 顺序)。 失败:失败.txt(便于回补)。 进度:progress.json(断点续跑)。 整体框架 I/O 层 读取 id.txt;写入 代谢物数据_最终.csv;失败与进度持久化。 抓取层 requests + 全局 RateLimiter 控制 QPS;统一超时与重试策略。 解析层 lxml + 细粒度 XPath,覆盖 25 个关键字段(名称、化学式、分子量、分类层级、性质、通路、浓度、疾病参考、外部 ID 等)。 并发层 ThreadPoolExecutor;批量写入、降内存;进度与估时。 排序与收尾 按原始 ID 重新排序;清理临时文件;统计汇总。 核心代码框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 import time, csv, os, re, json, threading from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import requests from lxml import html from datetime import datetime import psutil HEADERS = {"User-Agent": "Mozilla/5.0 ..."} MAX_WORKERS = 5 REQUESTS_PER_SECOND = 5 BATCH_WRITE_SIZE = 500 PROGRESS_REPORT_INTERVAL = 100 REQUEST_TIMEOUT = (5, 15) MAX_RETRIES = 2 class RateLimiter: def __init__(self, rate): self.rate = rate self.tokens = rate self.last_update = time.time() self.lock = threading.Lock() def acquire(self): with self.lock: now = time.time() self.tokens = min(self.rate, self.tokens + (now - self.last_update) * self.rate) self.last_update = now if self.tokens >= 1: self.tokens -= 1 return True return (1 - self.tokens) / self.rate rate_limiter = RateLimiter(REQUESTS_PER_SECOND) def check_ids(ids): invalid, dup = [], Counter(ids) duplicates = {i:c for i,c in dup.items() if c>1} for idx, i in enumerate(ids, 1): if not i.startswith("HMDB"): invalid.append((idx, i)) return invalid, duplicates def clean_text(s): s = re.sub(r'[\x00-\x1F\u200B-\u200D\uFEFF]', '', s or '') s = re.sub(r'\s+', ' ', s) return s.strip() def fetch(hmdb_id): url = f"https://hmdb.ca/metabolites/{hmdb_id}" wait = rate_limiter.acquire() if wait is not True: time.sleep(wait) rate_limiter.acquire() resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return html.fromstring(resp.content) def parse(tree, hmdb_id): data = {"HMDB_ID": hmdb_id} # 关键字段解析 cf = tree.xpath('//th[text()="Chemical Formula"]/following-sibling::td[1]') data["Chemical_Formula"] = ''.join(cf[0].xpath('.//text()')) if cf else "" # 其余字段:Average/Mono Weight、IUPAC、CAS、SMILES、分类层级、性质、通路、浓度、疾病参考、外部 ID... # ... return data def should_retry(exc, status=None): import requests as R if isinstance(exc, R.exceptions.Timeout): return True if isinstance(exc, R.exceptions.ConnectionError): return True if isinstance(exc, R.exceptions.HTTPError): if status in [429, 502, 503, 504]: return True if status in [404, 400, 403]: return False return True return True def get_with_retry(hmdb_id, retries=MAX_RETRIES): last = None for attempt in range(retries): try: tree = fetch(hmdb_id) return parse(tree, hmdb_id) except requests.exceptions.HTTPError as e: last, status = e, (e.response.status_code if e.response else None) if not should_retry(e, status): break if attempt < retries - 1: wait = 2 ** attempt ra = e.response.headers.get('Retry-After') if e.response else None if ra: try: wait = max(wait, int(ra)) except: pass time.sleep(wait) except Exception as e: last = e if not should_retry(e): break if attempt < retries - 1: time.sleep(2 ** attempt) print(f"[{hmdb_id}] 重试失败: {last}") return None def mem_mb(): try: return psutil.Process().memory_info().rss / 1024 / 1024 except: return 0 def write_csv(rows, file, mode='a'): cols = [ 'HMDB_ID','Common_Name','Description','Synonyms','Chemical_Formula', 'Average_Molecular_Weight','Monoisotopic_Molecular_Weight','IUPAC_Name', 'Traditional_Name','CAS_Registry_Number','SMILES','Kingdom','Super_Class', 'Class','Sub_Class','Direct_Parent','Experimental_Molecular_Properties', 'Predicted_Molecular_Properties','Pathways','Normal_Concentrations', 'Abnormal_Concentrations','Disease_References','Associated_OMIM_IDs', 'KEGG_Compound_ID','PubChem_Compound' ] exists = os.path.isfile(file) with open(file, mode, newline='', encoding='utf-8-sig') as f: w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_ALL) if not exists or mode == 'w': w.writeheader() w.writerows(rows) def save_failed(failed, file='失败.txt'): with open(file, 'w', encoding='utf-8') as f: for i, err in failed: f.write(f"{i}\t{err}\n") def save_progress(done, file='progress.json'): data = {'processed_ids': list(done), 'timestamp': datetime.now().isoformat(), 'count': len(done)} with open(file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_progress(file='progress.json'): if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return set(json.load(f).get('processed_ids', [])) except: pass return set() def process_ids(hmdb_ids): done = load_progress() remain = [i for i in hmdb_ids if i not in done] if not remain: print("所有ID都已处理完成!") return len(hmdb_ids), 0, [] results, failed = {}, [] total, curr, ok, bad = len(hmdb_ids), len(done), len(done), 0 start = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: fut = {ex.submit(get_with_retry, i): i for i in remain} for f in as_completed(fut): i = fut[f]; curr += 1 try: d = f.result() if d: results[i] = d; done.add(i); ok += 1 else: failed.append((i, "数据获取失败")); bad += 1 except Exception as e: failed.append((i, str(e))); bad += 1 if curr % PROGRESS_REPORT_INTERVAL == 0 or curr == total: left = total - curr pct = curr / total * 100 print(f"进度: {curr}/{total} ({pct:.1f}%) | 剩余: {left} | 成功: {ok} | 失败: {bad} | 内存: {mem_mb():.1f}MB") if len(results) >= BATCH_WRITE_SIZE: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]); del results[_id] if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') save_progress(done) if results: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]) if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') if failed: save_failed(failed) save_progress(done) return ok, bad, failed def sort_by_original_order(original_ids, csv_file): try: import pandas as pd df = pd.read_csv(csv_file, encoding='utf-8-sig') if df.empty or 'HMDB_ID' not in df.columns: return order = {i: idx for idx, i in enumerate(original_ids)} df['sort_key'] = df['HMDB_ID'].map(order) df.sort_values('sort_key').drop('sort_key', axis=1).to_csv(csv_file, index=False, encoding='utf-8-sig') except ImportError: print("提示:未安装 pandas,跳过最终排序。") except Exception as e: print(f"排序异常: {e}") def main(): print("="*80); print("HMDB代谢物数据提取工具"); print("="*80) if not os.path.exists('id.txt'): print("错误:缺少 id.txt"); input("回车退出..."); return with open('id.txt', 'r', encoding='utf-8') as f: ids = f.read().splitlines() invalid, duplicates = check_ids(ids) valid = [i for i in ids if i.startswith("HMDB")] unique = list(dict.fromkeys(valid)) if not os.path.exists('progress.json'): write_csv([], '代谢物数据_最终.csv', 'w') ok, bad, failed = process_ids(unique) print("\n按原始ID顺序整理 CSV...") sort_by_original_order(unique, '代谢物数据_最终.csv') print("="*80); print(f"总计: {len(unique)} | 成功: {ok} | 失败: {bad}"); print("输出: 代谢物数据_最终.csv"); print("="*80) if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube

2025年11月3日 · 5 分钟 · IT小章

Bilibili粉丝数据获取工具

原文地址:https://itxiaozhang.com/bilibili-followers-data-fetcher-python-tutorial-guide/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 Bilibili粉丝数据获取工具使用指南 一、工具介绍 本工具是一个基于Python开发的Bilibili粉丝数据获取程序,能够批量获取用户的粉丝信息并生成美观的HTML展示页面。 主要功能: 批量获取Bilibili粉丝数据(最多1000个粉丝) 自动过滤默认头像用户,提升数据质量 生成响应式HTML页面,支持多设备查看 异步处理,提高数据获取效率 适用场景: UP主了解粉丝构成 制作粉丝感谢页面 个人数据统计备份 二、Cookie获取方法 Chrome浏览器获取步骤: 登录Bilibili账号 按F12打开开发者工具 切换到"Network"(网络)标签页 刷新页面或随意点击一个链接 在请求列表中找到任意一个请求 在右侧"Headers"中找到"Cookie"字段 复制完整的Cookie内容 安全注意事项: Cookie包含您的登录凭证,请妥善保管 不要将Cookie分享给他人 建议定期更新Cookie以保证安全性 三、环境准备 系统要求: Python 3.7 或更高版本 稳定的网络连接 依赖安装: 1 pip install aiohttp 四、使用步骤 1. 配置Cookie 打开 bilibili_followers_fetcher.py 文件,找到以下代码行: 1 'Cookie': 'SESSDATA=你的SESSDATA内容' 将其中的SESSDATA内容替换为您从浏览器获取的真实Cookie中的SESSDATA部分。 2. 运行程序 在命令行中执行: 1 python bilibili_followers_fetcher.py 3. 查看结果 程序运行完成后,会在当前目录生成 followers.html 文件,用浏览器打开即可查看粉丝列表。 五、常见问题解决 Cookie相关问题: Cookie过期:重新从浏览器获取最新Cookie 权限不足:确认账号已正常登录Bilibili 格式错误:检查Cookie是否完整复制 网络相关问题: 请求超时:检查网络连接,稍后重试 API错误:可能是请求频率过高,等待一段时间后重试 数据不完整:网络不稳定导致,可重新运行程序 数据相关问题: 粉丝数量限制:程序最多获取1000个粉丝(20页×50个) 部分粉丝缺失:Bilibili API限制,属于正常现象 文件编码问题:确保使用UTF-8编码打开HTML文件 六、注意事项 合规使用: 本工具仅供个人学习和研究使用 请遵守Bilibili平台的使用条款 不得将获取的数据用于商业用途 使用建议: 控制使用频率,避免对服务器造成压力 尊重用户隐私,不要泄露他人信息 建议在网络空闲时段使用 七、完整源码 以下是完整的Python源码: ...

2025年11月1日 · 7 分钟 · IT小章

HMDB代谢物信息批量获取工具:支持PPM误差计算的Python代谢组学数据提取解决方案

原文地址:https://itxiaozhang.com/hmdb-metabolite-batch-extractor-python-tool-with-ppm-error-calculation/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 本文对应60号代码。 这个程序是做什么的? HMDB代谢物信息批量获取工具是一个专业的生物信息学工具,专门用于从HMDB(Human Metabolome Database,人类代谢组数据库)批量获取代谢物的详细信息。该工具通过输入代谢物的质量数,自动搜索并提取相关的代谢物数据,为代谢组学研究提供高效的数据获取解决方案。 核心用途 代谢组学研究:为质谱分析结果提供代谢物注释 化合物鉴定:根据精确质量数快速匹配可能的代谢物 数据库查询:批量获取代谢物的生物学信息 科研辅助:为生物医学研究提供代谢物数据支持 主要功能 1. 双离子模式支持 正离子模式:支持M+H、M+Li、M+NH4、M+Na等加合离子 负离子模式:支持M-H加合离子 智能模式选择:用户可根据实验条件选择合适的离子化模式 2. 灵活的误差计算方式 固定Da误差:传统的绝对误差计算方式 PPM相对误差:科学的相对误差计算,公式为 (实验值-理论值)/理论值×10^6 用户可配置:通过da.txt和ppm.txt文件自定义误差参数 3. 智能缓存机制 本地缓存:避免重复请求相同的代谢物数据 断点续传:支持程序中断后继续处理 批量保存:优化I/O操作,提高处理效率 4. 高效并发处理 多线程处理:同时处理多个HMDB ID查询 智能限流:控制并发数量,避免服务器过载 进度显示:实时显示处理进度和统计信息 5. robust错误处理 智能重试:针对网络错误和服务器503错误的特殊处理 指数退避:逐步增加重试间隔,提高成功率 详细日志:完整记录处理过程和错误信息 6. 丰富的数据提取 程序能够提取以下代谢物信息: 基本信息:HMDB ID、通用名称、化学式 分类信息:超类、类别、子类 生物学性质:内源性、组织分布 数据库交叉引用:KEGG、ChEBI、METLIN ID 结构信息:分子结构图链接 程序特点 1. 用户友好的交互界面 1 2 3 4 5 6 7 8 9 📁 检查输入文件: ✓ positive.txt (5 个质量数) ✓ negative.txt (2 个质量数) ✓ da.txt (3 个Da选项) ✓ ppm.txt (4 个PPM选项) ⚙️ 请选择误差计算方式: 1. 固定Da误差模式 (使用da.txt配置) 2. PPM相对误差模式 (使用ppm.txt配置) 2. 高度可配置性 配置文件驱动:所有参数都可通过配置文件调整 多精度选择:支持不同精度要求的应用场景 向后兼容:保持与旧版本的兼容性 3. 企业级稳定性 连接池管理:优化网络连接使用 内存优化:智能缓存管理,避免内存溢出 异常恢复:程序崩溃后可从断点继续 4. 性能优化 批量处理:一次性处理大量质量数 并发控制:平衡处理速度和服务器负载 缓存命中率统计:实时监控缓存效果 部分代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 import requests import json import time from typing import List, Dict, Any from abc import ABC, abstractmethod class BaseExtractor(ABC): """抽象基类 - 数据提取器接口""" def __init__(self, config: Dict[str, Any]): self.config = config self.session = self._create_session() def _create_session(self) -> requests.Session: """创建HTTP会话""" session = requests.Session() session.headers.update(self.config.get('headers', {})) return session @abstractmethod def search(self, query: str) -> List[str]: """搜索方法 - 子类必须实现""" pass @abstractmethod def extract(self, item_id: str) -> Dict[str, Any]: """提取方法 - 子类必须实现""" pass class CacheManager: """缓存管理器""" def __init__(self, cache_file: str): self.cache_file = cache_file self.data = self._load() def _load(self) -> Dict: try: with open(self.cache_file, 'r') as f: return json.load(f) except FileNotFoundError: return {} def get(self, key: str) -> Any: return self.data.get(key) def set(self, key: str, value: Any): self.data[key] = value self._save() def _save(self): with open(self.cache_file, 'w') as f: json.dump(self.data, f) class DataProcessor: """数据处理器""" @staticmethod def process_batch(items: List[str], processor_func) -> List[Dict]: """批量处理数据""" results = [] for item in items: try: result = processor_func(item) if result: results.append(result) except Exception as e: print(f"处理失败: {item}, 错误: {e}") return results @staticmethod def save_results(data: List[Dict], filename: str): """保存结果""" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) class WebExtractor(BaseExtractor): """具体实现类""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.cache = CacheManager(config.get('cache_file', 'cache.json')) def search(self, query: str) -> List[str]: """搜索实现""" # 检查缓存 cache_key = f"search_{query}" cached = self.cache.get(cache_key) if cached: return cached # 模拟网络请求 time.sleep(1) # 请求延迟 # 这里是具体的搜索逻辑 # 实际项目中会有具体的API调用和数据解析 results = self._perform_search(query) # 缓存结果 self.cache.set(cache_key, results) return results def extract(self, item_id: str) -> Dict[str, Any]: """提取实现""" # 检查缓存 cached = self.cache.get(item_id) if cached: return cached # 模拟网络请求 time.sleep(0.5) # 这里是具体的提取逻辑 data = self._perform_extraction(item_id) # 缓存结果 if data: self.cache.set(item_id, data) return data def _perform_search(self, query: str) -> List[str]: """执行搜索 - 具体实现会根据目标网站调整""" # 这里会有具体的HTTP请求和HTML解析逻辑 return [f"item_{i}" for i in range(3)] # 示例返回 def _perform_extraction(self, item_id: str) -> Dict[str, Any]: """执行提取 - 具体实现会根据目标网站调整""" # 这里会有具体的数据提取逻辑 return { 'id': item_id, 'title': f'Title for {item_id}', 'data': f'Data for {item_id}' } class Application: """应用程序主类""" def __init__(self, config: Dict[str, Any]): self.config = config self.extractor = WebExtractor(config) self.processor = DataProcessor() def run(self, queries: List[str]): """运行应用程序""" all_results = [] for query in queries: # 搜索相关项目 items = self.extractor.search(query) # 提取详细数据 for item_id in items: data = self.extractor.extract(item_id) if data: data['query'] = query all_results.append(data) # 保存结果 self.processor.save_results(all_results, 'results.json') print(f"完成处理,共获取 {len(all_results)} 条数据") def main(): """主函数""" # 配置参数 config = { 'headers': {'User-Agent': 'Generic-Bot/1.0'}, 'cache_file': 'app_cache.json', 'delay': 1.0 } # 创建应用实例 app = Application(config) # 示例查询列表 test_queries = ['query1', 'query2', 'query3'] # 运行程序 app.run(test_queries) if __name__ == '__main__': main() 技术亮点 科学的误差计算:支持PPM相对误差,适应不同质量范围的化合物 智能网络处理:针对HMDB服务器特点优化的请求策略 高效缓存机制:多层缓存设计,显著提高处理效率 robust错误处理:全面的异常处理和恢复机制 用户友好设计:直观的配置文件和交互界面 视频版本 哔哩哔哩 YouTube

2025年9月15日 · 4 分钟 · IT小章

批量新建文件夹的三种简便方法

原文地址:https://itxiaozhang.com/batch-create-folders/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 📌 常见场景 员工资料:公司新入职 200 名员工,需要批量生成个人资料文件夹。 学生作业:某班级 60 名学生上交作业,老师按花名册批量生成作业文件夹。 项目成员:一个跨部门项目组有 50 名成员,项目经理为其批量创建任务资料文件夹。 💡 为什么要批量创建文件夹 手动逐个新建既耗时又容易出错。 无论是为 20 个部门建文件夹,还是整理 100 张照片,重复操作都会浪费时间。 批量方式可节省约 90% 时间,让你专注更重要的工作。 方法一:ChatGPT 帮忙 将以下代码粘贴给 ChatGPT,让其执行: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 # 请作为Python解释器运行以下代码 import zipfile import os # 文件夹和文件名列表,可以增加或者替换更多 names = """唐僧 孙悟空 猪八戒 沙和尚 白龙马 玉皇大帝 王母娘娘 太上老君 太白金星 托塔李天王 哪吒三太子 二郎神 四大天王 巨灵神 二十八宿 雷公 电母 风伯 雨师 赤脚大仙 福禄寿三星 镇元子 如来佛祖 观音菩萨 文殊菩萨 普贤菩萨 地藏王菩萨 弥勒佛 燃灯古佛 十八罗汉 阿傩 伽叶 寅将军 熊山君 特处士 白骨精 黄袍怪 金角大王 银角大王 九尾狐狸 狮猁怪 红孩儿 鼍龙 虎力大仙 鹿力大仙 羊力大仙 灵感大王 独角兕大王 蝎子精 六耳猕猴 铁扇公主 牛魔王 玉面狐狸 万圣龙王 九头虫 黄眉老祖 蟒蛇精 赛太岁 蜘蛛精 百眼魔君 青狮 白象 大鹏金翅雕 白鹿精 白面狐狸 地涌夫人 南山大王 黄狮精 九灵元圣 辟寒大王 辟暑大王 辟尘大王 玉兔精 混世魔王 黑熊精 凌虚子 白衣秀士 虎先锋 黄风怪 精细鬼 伶俐虫 巴山虎 倚海龙 狐阿七大王 宝象国国王 百花羞公主 乌鸡国国王 车迟国国王 女儿国国王 朱紫国国王 比丘国国王 灭法国国王 寇员外 陈光蕊 殷温娇 刘伯钦 高太公 高翠兰 唐太宗 李世民 房玄龄 杜如晦 徐茂公""".splitlines() base_dir = "/mnt/data/named_folders" zip_filename = "/mnt/data/named_folders.zip" # 创建文件夹 os.makedirs(base_dir, exist_ok=True) for name in names: os.makedirs(os.path.join(base_dir, name), exist_ok=True) # 压缩文件夹 with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(base_dir): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.relpath(file_path, base_dir)) for dir in dirs: dir_path = os.path.join(root, dir) zipf.write(dir_path, os.path.relpath(dir_path, base_dir)) zip_filename 方法二:纯前端在线工具 特点 无需安装:网页直接用,文件不上传服务器。 操作简单:上传名单 → 一键生成压缩包。 智能处理:自动去除空行与非法字符。 使用步骤 准备文本文件,每行一个名称。 打开 批量文件夹生成器 上传文件并预览。 点击 生成 ZIP → 下载 → 解压即可。 核心代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 "use client" import type React from "react" import { useState, useCallback } from "react" import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { Button } from "@/components/ui/button" import { Progress } from "@/components/ui/progress" import { Alert, AlertDescription } from "@/components/ui/alert" import { Badge } from "@/components/ui/badge" import { Upload, Download, FileText, Folder, AlertCircle, CheckCircle2, X } from "lucide-react" import { useToast } from "@/hooks/use-toast" import JSZip from "jszip" interface ProcessedFile { name: string content: string[] status: "pending" | "processing" | "completed" | "error" error?: string } export default function FolderGenerator() { const [files, setFiles] = useState<ProcessedFile[]>([]) const [isProcessing, setIsProcessing] = useState(false) const [progress, setProgress] = useState(0) const { toast } = useToast() const handleFileUpload = useCallback( (event: React.ChangeEvent<HTMLInputElement>) => { const uploadedFiles = Array.from(event.target.files || []) if (uploadedFiles.length === 0) return const newFiles: ProcessedFile[] = uploadedFiles.map((file) => ({ name: file.name, content: [], status: "pending", })) setFiles((prev) => [...prev, ...newFiles]) uploadedFiles.forEach((file, index) => { const reader = new FileReader() reader.onload = (e) => { try { const content = e.target?.result as string const lines = content .split(/\r?\n/) .map((line) => line.trim()) .filter((line) => line.length > 0) .filter((line) => /^[\u4e00-\u9fa5a-zA-Z0-9\s\-_()()]+$/.test(line)) setFiles((prev) => prev.map((f, i) => i === prev.length - uploadedFiles.length + index ? { ...f, content: lines, status: "completed" as const } : f, ), ) toast({ title: "解析成功", description: `${file.name} 共 ${lines.length} 个有效名称`, }) } catch (error) { setFiles((prev) => prev.map((f, i) => i === prev.length - uploadedFiles.length + index ? { ...f, status: "error" as const, error: "解析失败" } : f, ), ) toast({ title: "解析失败", description: `${file.name} 解析错误`, variant: "destructive", }) } } reader.readAsText(file, "utf-8") }) }, [toast], ) const removeFile = useCallback((index: number) => { setFiles((prev) => prev.filter((_, i) => i !== index)) }, []) const generateZip = useCallback(async () => { const validFiles = files.filter((f) => f.status === "completed" && f.content.length > 0) if (validFiles.length === 0) { toast({ title: "无有效文件", description: "请上传有效文本文件", variant: "destructive", }) return } setIsProcessing(true) setProgress(0) try { const zip = new JSZip() let totalItems = 0 let processedItems = 0 validFiles.forEach((file) => { totalItems += file.content.length }) for (const file of validFiles) { const baseName = file.name.replace(/\.[^/.]+$/, "") const fileFolder = zip.folder(baseName) if (fileFolder) { for (const name of file.content) { const folderName = name.replace(/[<>:"/\\|?*]/g, "_") const nameFolder = fileFolder.folder(folderName) if (nameFolder) { nameFolder.file("README.txt", `文件夹: ${name}\n创建时间: ${new Date().toLocaleString("zh-CN")}`) } processedItems++ setProgress((processedItems / totalItems) * 100) await new Promise((resolve) => setTimeout(resolve, 10)) } } } const content = await zip.generateAsync({ type: "blob" }) const url = URL.createObjectURL(content) const a = document.createElement("a") a.href = url a.download = `folders_${new Date().toISOString().slice(0, 10)}.zip` document.body.appendChild(a) a.click() document.body.removeChild(a) URL.revokeObjectURL(url) toast({ title: "生成成功", description: `已生成 ${totalItems} 个文件夹`, }) } catch (error) { toast({ title: "生成失败", description: "ZIP文件生成错误", variant: "destructive", }) } finally { setIsProcessing(false) setProgress(0) } }, [files, toast]) const totalFolders = files.reduce((sum, file) => sum + (file.content?.length || 0), 0) return ( <div className="min-h-screen bg-white dark:bg-gray-900"> <div className="container mx-auto px-6 py-12 max-w-3xl"> <div className="text-center mb-12"> <h1 className="text-4xl font-bold text-gray-900 dark:text-white mb-3 tracking-tight">文件夹生成器</h1> <p className="text-gray-600 dark:text-gray-400 text-lg">批量生成文件夹结构</p> </div> <Card className="mb-8 border-2"> <CardContent className="pt-8"> <div className="border-2 border-dashed border-gray-300 dark:border-gray-600 rounded-lg p-8 text-center hover:border-gray-400 transition-colors"> <input type="file" multiple accept=".txt,.csv,.text" onChange={handleFileUpload} className="hidden" id="file-upload" /> <label htmlFor="file-upload" className="cursor-pointer"> <Upload className="h-12 w-12 text-gray-400 mx-auto mb-4" /> <p className="font-semibold text-gray-700 dark:text-gray-300 mb-2 text-lg">选择文件</p> <p className="text-gray-500 dark:text-gray-400">支持 .txt, .csv 格式</p> </label> </div> </CardContent> </Card> {files.length > 0 && ( <Card className="mb-8 border-2"> <CardHeader className="pb-4"> <div className="flex items-center justify-between"> <CardTitle className="text-xl font-bold">文件列表 ({files.length})</CardTitle> <Badge variant="secondary" className="text-sm px-3 py-1"> {totalFolders} 个文件夹 </Badge> </div> </CardHeader> <CardContent className="space-y-4"> {files.map((file, index) => ( <div key={index} className="space-y-3"> <div className="flex items-center justify-between p-4 bg-gray-50 dark:bg-gray-800 rounded-lg border"> <div className="flex items-center gap-3"> {file.status === "completed" && <CheckCircle2 className="h-5 w-5 text-green-600" />} {file.status === "error" && <AlertCircle className="h-5 w-5 text-red-600" />} {file.status === "pending" && <FileText className="h-5 w-5 text-gray-400" />} <div> <p className="font-semibold">{file.name}</p> {file.status === "completed" && ( <p className="text-sm text-gray-600 dark:text-gray-400">{file.content.length} 个名称</p> )} {file.status === "error" && <p className="text-sm text-red-600">{file.error}</p>} </div> </div> <Button variant="ghost" size="sm" onClick={() => removeFile(index)}> <X className="h-4 w-4" /> </Button> </div> {file.status === "completed" && file.content.length > 0 && ( <div className="ml-6 p-4 bg-gray-100 dark:bg-gray-700 rounded-lg border"> <div className="font-semibold mb-3 text-gray-700 dark:text-gray-300">预览内容</div> <div className="h-32 overflow-y-auto space-y-2"> {file.content.slice(0, 20).map((name, i) => ( <div key={i} className="flex items-center gap-2 text-gray-600 dark:text-gray-400"> <Folder className="h-4 w-4" /> <span>{name}</span> </div> ))} {file.content.length > 20 && ( <div className="text-gray-500 italic mt-2">... 还有 {file.content.length - 20} 个</div> )} </div> </div> )} </div> ))} </CardContent> </Card> )} {files.some((f) => f.status === "completed") && ( <Card className="mb-8 border-2"> <CardContent className="pt-8"> {isProcessing && ( <div className="space-y-3 mb-6"> <div className="flex justify-between font-semibold"> <span>生成中...</span> <span>{Math.round(progress)}%</span> </div> <Progress value={progress} className="w-full h-3" /> </div> )} <Button onClick={generateZip} disabled={isProcessing} className="w-full h-12 text-lg font-semibold"> <Download className="h-5 w-5 mr-2" /> {isProcessing ? "生成中..." : "下载ZIP文件"} </Button> </CardContent> </Card> )} <Alert className="border-2"> <AlertCircle className="h-5 w-5" /> <AlertDescription className="text-base"> 支持 .txt/.csv 文件,每行一个名称。本地处理,保护隐私。 </AlertDescription> </Alert> <div className="text-center mt-12 pt-8 border-t border-gray-200 dark:border-gray-700"> <p className="text-sm text-gray-500 dark:text-gray-400"> by{" "} <a href="https://itxiaozhang.com" target="_blank" rel="noopener noreferrer" className="text-gray-600 dark:text-gray-300 hover:text-gray-800 dark:hover:text-gray-100 transition-colors" > IT小章 </a> </p> </div> </div> </div> ) } 方法三:本地 Python 脚本 优势 本地运行:无需联网,速度快。 功能丰富:支持去重、重命名、日志。 高度可定制:可随需修改。 多编码兼容:guess_read_text 支持多种常见编码。 文件名清洗:非法字符、Windows 保留名、长度限制等都处理到位。 并发执行:ThreadPoolExecutor 加速文件夹创建。 参数灵活:支持 --exist 策略、重命名模板、最大长度、dry-run。 使用方法 新建 name.txt,每行一个文件夹名称。 保存脚本为 create_dirs.py。 运行命令: 1 python create_dirs.py 全部代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 批量创建文件夹工具(单层,统一保存到 ./result/)。 - 从 name.txt 读取名称(一行一个),多编码兼容,清洗非法字符。 - 在当前目录下的 result 文件夹中生成对应同名文件夹。 """ from __future__ import annotations import argparse import csv import os import re import sys import time import unicodedata from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from threading import Lock from typing import List, Optional, Tuple # ---------- 常量 ---------- DEFAULT_INPUT = "name.txt" DEFAULT_MAX_LEN = 200 DEFAULT_EXIST = "skip" # skip | rename | fail DEFAULT_RENAME_PATTERN = "{name} ({n})" ENCODING_CANDIDATES = [ "utf-8-sig", "utf-8", "utf-16", "utf-16-le", "utf-16-be", "gb18030", "cp936", "big5", "shift_jis", "cp1252", "latin-1", ] WINDOWS_RESERVED_BASENAMES = { "CON", "PRN", "AUX", "NUL", *{f"COM{i}" for i in range(1, 10)}, *{f"LPT{i}" for i in range(1, 10)}, } ILLEGAL_CHARS_PATTERN = re.compile(r'[<>:"/\\|?*\x00-\x1F]') DOTS_ONLY_PATTERN = re.compile(r"^\.+$") # ---------- 数据结构 ---------- @dataclass class Item: line_no: int original: str cleaned: str @dataclass class Result: item: Item action: str # created | skipped | renamed | error | dryrun final_name: Optional[str] final_path: Optional[str] status: str # OK | ERROR error: Optional[str] # ---------- 工具函数 ---------- def guess_read_text(path: Path, forced_encoding: Optional[str] = None) -> Tuple[str, str]: if forced_encoding: return path.read_text(encoding=forced_encoding), forced_encoding data = path.read_bytes() last_err = None for enc in ENCODING_CANDIDATES: try: return data.decode(enc), enc except Exception as e: last_err = e continue raise RuntimeError(f"读取失败,未知编码;最后错误:{last_err}") def is_windows_reserved(name: str) -> bool: base = name.split(".", 1)[0] return base.upper() in WINDOWS_RESERVED_BASENAMES def sanitize_name(raw: str, max_len: int = DEFAULT_MAX_LEN) -> str: s = unicodedata.normalize("NFKC", raw).strip() if "/" in s or "\\" in s: s = s.replace("/", "_").replace("\\", "_") if ILLEGAL_CHARS_PATTERN.search(s): s = ILLEGAL_CHARS_PATTERN.sub("_", s) s = s.rstrip(" .") if DOTS_ONLY_PATTERN.match(s): return "" if is_windows_reserved(s): s = s + "_" if len(s) > max_len: s = s[:max_len] return s def parse_lines(text: str) -> List[Tuple[int, str]]: res = [] for idx, line in enumerate(text.splitlines(), start=1): raw = line.rstrip("\r\n") if not raw.strip(): continue if raw.lstrip().startswith("#"): continue res.append((idx, raw)) return res # ---------- 主执行逻辑 ---------- class App: def __init__(self, args: argparse.Namespace): self.args = args self.base = Path.cwd() / "result" self.base.mkdir(exist_ok=True) # 自动创建 result 文件夹 self.counter_ok = 0 self.counter_skip = 0 self.counter_err = 0 self.counter_renamed = 0 self.results: List[Result] = [] def log_line(self, result: Result): if self.args.quiet: return name = result.final_name or result.item.cleaned or result.item.original if result.status == "OK": if result.action == "created": print(f"[OK] {name}") elif result.action == "renamed": print(f"[OK] {name} (renamed)") elif result.action == "dryrun": print(f"[DRY] {name}") elif result.action == "skipped": print(f"[SKIP] {name} (exists)") else: reason = (result.error or "unknown").strip() print(f"[ERROR] {name} ({reason})") def make_one(self, item: Item) -> Result: base = self.base name = item.cleaned target = base / name if self.args.dry_run: return Result(item, "dryrun", name, str(target), "OK", None) try: target.mkdir(parents=False, exist_ok=False) return Result(item, "created", name, str(target), "OK", None) except FileExistsError: if self.args.exist == "skip": return Result(item, "skipped", name, str(target), "OK", None) elif self.args.exist == "fail": return Result(item, "error", name, str(target), "ERROR", "already_exists") elif self.args.exist == "rename": base_name = name n = 2 while True: try_name = self.args.rename_pattern.format(name=base_name, n=n) try_name = sanitize_name(try_name, self.args.max_name_length) if not try_name: return Result(item, "error", None, None, "ERROR", "rename_failed") try_path = base / try_name try: try_path.mkdir(parents=False, exist_ok=False) return Result(item, "renamed", try_name, str(try_path), "OK", None) except FileExistsError: n += 1 continue else: return Result(item, "error", name, str(target), "ERROR", "unknown_exist_strategy") except Exception as e: return Result(item, "error", name, str(target), "ERROR", str(e)) def run(self) -> int: start = time.time() inp = Path(self.args.input) if not inp.exists(): print(f"[FATAL] 输入文件不存在: {inp}", file=sys.stderr) return 3 try: text, used_enc = guess_read_text(inp, self.args.encoding) except Exception as e: print(f"[FATAL] 无法读取输入:{e}", file=sys.stderr) return 3 for line_no, raw in parse_lines(text): cleaned = sanitize_name(raw, self.args.max_name_length) if not cleaned: r = Result(Item(line_no, raw, cleaned), "error", None, None, "ERROR", "invalid_or_empty") self.log_line(r) self.results.append(r) self.counter_err += 1 continue self.results.append(Item(line_no, raw, cleaned)) # 并发执行 tasks = [] with ThreadPoolExecutor(max_workers=self.args.max_workers) as ex: future_map = {ex.submit(self.make_one, it): it for it in self.results if isinstance(it, Item)} for fut in as_completed(future_map): r = fut.result() self.log_line(r) if r.status == "OK": if r.action == "created": self.counter_ok += 1 elif r.action == "skipped": self.counter_skip += 1 elif r.action == "renamed": self.counter_renamed += 1 else: self.counter_err += 1 elapsed = time.time() - start total_ok = self.counter_ok + self.counter_renamed print(f"总结: 成功 {total_ok}, 跳过 {self.counter_skip}, 错误 {self.counter_err}, 用时 {elapsed:.2f}s") if self.counter_err > 0 and (total_ok > 0 or self.counter_skip > 0): return 2 elif self.counter_err > 0: return 3 else: return 0 # ---------- 参数 ---------- def build_argparser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="从 name.txt 批量创建文件夹,保存到 ./result/") p.add_argument("--input", default=DEFAULT_INPUT, help="输入文件(默认 name.txt)") p.add_argument("--encoding", default=None, help="指定输入编码") p.add_argument("--exist", choices=["skip", "rename", "fail"], default=DEFAULT_EXIST, help="已存在处理策略(默认 skip)") p.add_argument("--rename-pattern", default=DEFAULT_RENAME_PATTERN, help="重命名模板,含 {name} 和 {n}") p.add_argument("--max-name-length", type=int, default=DEFAULT_MAX_LEN, help="文件夹名最大长度(默认200)") default_workers = min(8, (os.cpu_count() or 2) * 4) p.add_argument("--max-workers", type=int, default=default_workers, help=f"并发线程数(默认 {default_workers})") p.add_argument("--dry-run", action="store_true", help="仅预演,不创建目录") p.add_argument("--quiet", action="store_true", help="仅输出汇总") return p def main(argv: Optional[List[str]] = None) -> int: args = build_argparser().parse_args(argv) app = App(args) code = app.run() return code if __name__ == "__main__": sys.exit(main()) 视频版本 哔哩哔哩 YouTube

2025年8月20日 · 12 分钟 · IT小章

HMDB数据库爬虫案例:一个高效、自动化的Python爬虫脚本

原文地址:https://itxiaozhang.com/hmdb-database-crawler-case-study-efficient-automated-python-script/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 这个程序是做什么的? 这是一个专门为科研人员设计的网络爬虫工具。它的主要任务是从一个名为HMDB(人类代谢物组数据库)的网站上,自动、批量地抓取代谢物的相关信息。简单来说,你给它一批代谢物的编号(HMDB ID),它就能帮你把这些编号对应的详细分类和来源信息从网站上找回来,并整理好。 主要功能 读取数据源:程序会自动读取data文件夹下所有的Excel文件,并从每个文件的第一列中提取出所有格式为HMDBXXXXXXX的代谢物编号。 网络数据抓取:对于每一个提取到的HMDB编号,程序会访问HMDB网站上对应的网页,并抓取以下三个关键信息: Class (分类) Sub Class (亚类) Source (来源):判断该物质是内源性(Endogenous)还是外源性(Exogenous)。 结果保存:抓取到的信息会以表格形式,保存到result文件夹下一个同名的Excel文件中。每一行对应一个HMDB编号和它抓取到的信息。 缓存机制:程序非常智能,它会把查询过的结果保存在一个名为hmid_cache.json的缓存文件中。下次运行时,如果遇到相同的编号,它会直接从缓存中读取数据,而不是重新访问网站,这极大地提高了效率并减少了不必要的网络请求。 断点续传:得益于缓存机制,如果程序在中途因为网络问题或其他原因中断,下次重新运行时它会自动跳过已经成功处理的编号,从上次中断的地方继续,非常省心。 程序特点 自动化与批量处理:你只需要把包含HMDB编号的Excel文件放进data文件夹,运行一次程序,它就能自动处理所有文件,无需人工干预。 高效稳定: 多线程处理:程序会同时开启多个线程(默认为5个,可以自己设置)来抓取数据,就像多个人同时在工作,速度比单线程快很多。 智能重试:如果遇到网络波动或网站临时访问不了,程序会自动尝试重新连接,确保数据抓取的成功率。 用户友好: 进度条显示:在程序运行时,你会看到一个清晰的进度条,告诉你当前处理到哪里了,还剩多少任务。 详细日志:程序会把运行过程中的所有重要信息(如哪个文件处理成功,哪个编号查询失败)记录在hmdb_scraper.log日志文件中,方便你随时查看或排查问题。 高可配置性:你可以通过命令行参数来调整程序的行为,例如: 调整同时工作的线程数量(--workers)。 设置是否使用缓存(--no-cache)。 调整请求之间的时间间隔(--delay)。 部分代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 """ ================================ 作者:IT小章 博客:itxiaozhang.com 时间:2025年7月13日 Copyright © 2024 IT小章 ================================ """ import requests from lxml import html import re import os # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("hmdb_scraper.log"), logging.StreamHandler() ] ) logger = logging.getLogger("hmdb_scraper") # 会话创建(细节省略) def create_session(*args, **kwargs): session = requests.Session() # ...屏蔽实现... return session # 随机User-Agent(细节省略) def get_random_user_agent(): # ...屏蔽实现... return "User-Agent-Placeholder" # 清理URL def clean_url(url): # ...屏蔽实现... return url # 获取代谢物数据(核心内容已屏蔽) def get_metabolite_data(url, session=None, max_retries=3): if session is None: session = create_session() url = clean_url(url) # ...屏蔽核心爬虫逻辑... return { "Compound ID": "HMDBXXXXXXX", "Class": "屏蔽", "Sub Class": "屏蔽", "Source": "屏蔽" } # 单个HMID处理函数 def process_hmid(hmid, hmid_cache, session, retry_failed=True): if hmid in hmid_cache: if retry_failed and "获取失败" in hmid_cache[hmid].values(): logger.info(f"重试失败项: {hmid}") else: return hmid_cache[hmid] url = f"https://hmdb.ca/metabolites/{hmid}" data = get_metabolite_data(url, session) hmid_cache[hmid] = data or { "Compound ID": hmid, "Class": "获取失败", "Sub Class": "获取失败", "Source": "获取失败" } return hmid_cache[hmid] def load_cache(cache_file): # ...屏蔽缓存读取实现... return {} def save_cache(cache_data, cache_file): # ...屏蔽缓存保存实现... pass # 处理Excel文件(调用逻辑保留) def process_excel_files(max_workers=5, use_cache=True, delay=0.5, retry_failed=True, chunk_size=100): # ...屏蔽部分实现,仅保留接口调用结构与日志... logger.info("模拟处理Excel文件...") time.sleep(1) logger.info("模拟完成。") def main(): parser = argparse.ArgumentParser(description='HMDB数据爬取工具(公开版本)') parser.add_argument('--workers', type=int, default=5) parser.add_argument('--no-cache', action='store_true') parser.add_argument('--delay', type=float, default=0.5) parser.add_argument('--no-retry-failed', action='store_true') parser.add_argument('--chunk-size', type=int, default=100) args = parser.parse_args() logger.info("启动爬虫(敏感实现已屏蔽)") process_excel_files( max_workers=args.workers, use_cache=not args.no_cache, delay=args.delay, retry_failed=not args.no_retry_failed, chunk_size=args.chunk_size ) if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube

2025年7月13日 · 2 分钟 · IT小章

使用 Python 批量替换图片局部区域

原文地址:https://itxiaozhang.com/batch-replace-fixed-region-images-python/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 一、需求分析 在实际工作中,我们常常会遇到这样的需求: 有一批尺寸一致的长图(如公众号长截图、拼接图等),需要在同一固定区域内替换为统一的内容,比如覆盖新的标题栏、状态栏、Logo 等。 目标要求如下: 只处理尺寸为 960×8049 的 PNG 图像; 替换区域为垂直方向上 第 115 至第 229 像素(高 114 像素); 使用一张大小为 960×114 的小图进行替换; 支持批量自动处理,提升效率; 提供错误处理和处理结果统计。 二、解决思路 我们使用 Python 语言和 Pillow 图像处理库来实现这一功能。整体流程如下: 遍历图像文件夹 仅处理 .png 文件,并验证尺寸是否为 960×8049,其他自动跳过。 图像处理逻辑 使用 Pillow 打开图片并转换为 RGBA 模式; 将 960×114 的替换图粘贴到 (0, 115) 位置,覆盖目标区域; 保存处理后的新图到输出文件夹。 并发加速处理 借助 Python 内置的 ThreadPoolExecutor 并发模块,支持多线程批量处理。 日志记录与统计 实时输出每张图的处理结果,最终打印成功、跳过和失败数量。 三、关键代码片段 1 2 3 4 5 6 7 8 def process_image(path, top_img, output_dir): with Image.open(path).convert('RGBA') as img: if img.size != (960, 8049): return 'skipped' # 在 y=115 处替换 960×114 区域 img.paste(top_img, (0, 115)) img.save(os.path.join(output_dir, os.path.basename(path))) return 'done' 完整脚本中还包含:日志配置、多线程处理逻辑、输出统计等内容。运行脚本即可批量处理所有符合条件的图片。 ...

2025年6月18日 · 1 分钟 · IT小章

批量图片去水印:使用 IOPaint 实现固定掩膜批量去除图片水印

原文地址:https://itxiaozhang.com/iopaint-fixed-mask-batch-inpainting-cpu/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 一、需求分析 在批量处理图片的实际工作中,常会遇到统一位置的水印(如顶部编号、底部标识等)。手动擦除效率低,借助 AI 图像修复工具如 IOPaint(原 Lama Cleaner)可实现自动化处理。然而其默认使用 Web 界面,不能直接批处理,因此本文目标是: 批量处理整目录图片 使用一个固定掩膜去除相同水印位置 全自动运行,无需交互操作 使用 CPU,兼容所有普通电脑 输出保持原分辨率与文件名 生成完整处理日志 二、解决思路 1. 工具选择 使用开源图像修复工具 IOPaint,支持命令行调用与 AI 修复模型(默认 lama),适合图像 inpainting 任务,兼容 CPU 和 GPU。 2. 执行流程 安装 Python 与 IOPaint CLI 使用 Lama Cleaner 在线版手动生成统一的掩膜图 用 Python 脚本批量执行 CLI 命令 支持多进程并发,加速 CPU 处理 三、掩膜文件准备与优化 1. 掩膜制作 打开 Lama Cleaner Hugging Face 页面,加载任意一张图,进行掩膜绘制: 勾选「Download Mask」 勾选「Manual Inpainting Mode」 用画笔框选需要去除的水印区域 点击「Download Mask」按钮下载掩膜图 掩膜图与原图尺寸一致,白色区域为需修复部分。 2. 掩膜要求 保存为 fixed_mask.jpg,与原图尺寸相同(如 960x8049) 白色区域表示需要修复,黑色保留不变 掩膜应统一应用于所有图像 四、脚本开发与并行优化(仅使用 CPU) 为提升批量处理效率,我们开发了一个支持多进程并发的 Python 脚本。虽然运行在 CPU 上,但并行仍能显著提速。 ...

2025年5月19日 · 1 分钟 · IT小章

HMDB代谢物信息一键批量获取工具:快速获取代谢组学数据

原文地址:https://itxiaozhang.com/hmdb-metabolite-batch-extraction-tool/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 功能特性 支持批量查询HMDB代谢物信息 自动提取关键信息,包括: 代谢物描述 超类(Super Class) 类别(Class) 子类(Sub Class) 来源(Disposition) 内源性(Endogenous) 组织定位(Tissue Locations) 相关ID(KEGG、ChEBI、METLIN) 支持正负离子模式搜索 自动导出CSV格式结果 内置日志记录功能 部分代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 """ ================================ 作者:IT小章 网站:itxiaozhang.com 时间:2024年11月27日 Copyright © 2024 IT小章 ================================ """ import requests import re import csv import time import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from lxml import html # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("代谢物提取.log", encoding='utf-8'), logging.StreamHandler() ] ) class MetaboliteExtractor: """代谢物数据提取器""" def __init__(self): self.base_url = "https://hmdb.ca" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } def process_files(self): """处理输入文件""" try: logging.info("程序开始运行...") results = [] # 处理positive模式 if os.path.exists('positive.txt'): logging.info("处理positive.txt...") results.extend(self._process_file('positive.txt', 'positive')) # 处理negative模式 if os.path.exists('negative.txt'): logging.info("处理negative.txt...") results.extend(self._process_file('negative.txt', 'negative')) if results: self._save_results(results) logging.info(f"处理完成,共获取 {len(results)} 条结果") else: logging.error("未找到任何结果") except Exception as e: logging.error(f"处理过程出错: {str(e)}") finally: logging.info("程序运行结束") print("\n" + "="*50) input("按回车键退出程序...") def _process_file(self, filename, mode): """处理单个文件(具体实现已隐藏)""" try: with open(filename, 'r', encoding='utf-8') as f: data = f.read().strip() if not data: logging.warning(f"{filename} 为空") return [] logging.info(f"正在处理 {filename}") return self._extract_data(data, mode) except FileNotFoundError: logging.error(f"未找到文件: {filename}") return [] def _extract_data(self, data, mode): """提取数据(具体实现已隐藏)""" # 核心实现已隐藏 pass def _save_results(self, results): """保存结果到CSV""" try: filename = '代谢物数据.csv' with open(filename, 'w', newline='', encoding='utf-8-sig') as f: if results: writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer.writeheader() writer.writerows(results) logging.info(f"数据已保存到 {filename}") except Exception as e: logging.error(f"保存数据失败: {str(e)}") def main(): extractor = MetaboliteExtractor() extractor.process_files() if __name__ == "__main__": main() 使用方法 准备输入文件: ...

2025年5月15日 · 2 分钟 · IT小章

利用Python从HMDB数据库批量提取质谱数据中的化合物信息

原文地址:https://itxiaozhang.com/python-hmdb-mass-spectrometry-compound-extraction/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 需求分析 本工具旨在帮助用户根据质谱数据(质量数),自动从HMDB数据库批量检索代谢物信息,包括HMDB ID、常用名称、化学式及结构图片等,并导出为Excel可直接打开的CSV文件,极大提升代谢物注释效率。 程序介绍 准备数据:将所有待检索的质量数写入negative.txt文件,每行一个。 运行脚本: 双击或命令行运行hmdb_metabolite_extractor.py脚本。 程序自动读取negative.txt,并按负离子模式批量检索HMDB。 检索结果自动保存为代谢物数据.csv,日志信息保存在代谢物提取.log。 主要功能模块: 质量数批量检索:自动根据质量数和离子模式检索HMDB ID 代谢物详细信息抓取:根据HMDB ID获取常用名称、化学式、结构图片等信息 并发加速与重试机制:采用多线程并发抓取,自动重试机制 结果导出:自动整理为CSV文件,支持Excel直接打开 部分代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 """代谢物数据处理工具""" import json, os, logging from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from typing import List, Dict, Optional class DataManager: def __init__(self, cache_file='cache.json', progress_file='progress.json'): self.cache_file = cache_file self.progress_file = progress_file self.cache = self._load_json(cache_file, {}) self.progress = self._load_json(progress_file, {'processed': []}) def _load_json(self, file: str, default: Dict) -> Dict: if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return json.load(f) except: pass return default def save_progress(self, item_id: str): if item_id not in self.progress['processed']: self.progress['processed'].append(item_id) self._save_json(self.progress_file, self.progress) def _save_json(self, file: str, data: Dict): try: with open(file, 'w', encoding='utf-8') as f: json.dump(data, f) except Exception as e: logging.error(f"保存文件失败: {e}") def is_processed(self, item_id: str) -> bool: return item_id in self.progress['processed'] def get_cache(self, key: str) -> Optional[Dict]: return self.cache.get(key) def set_cache(self, key: str, data: Dict): self.cache[key] = data self._save_json(self.cache_file, self.cache) def setup_logging(): logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("处理日志.log", encoding='utf-8'), logging.StreamHandler() ] ) def process_data(items: List[str], resume: bool = False) -> List[Dict]: """处理数据的主要函数 Args: items: 待处理的数据项列表 resume: 是否继续上次的进度 Returns: 处理结果列表 """ data_manager = DataManager() results = [] for item in items: # 检查是否已处理 if resume and data_manager.is_processed(item): continue # 获取数据 - data = {"id": item, "status": "processed"} results.append(data) # 保存进度 data_manager.save_progress(item) return results def main(): setup_logging() try: # 读取输入数据 - items = ["item1", "item2", "item3"] # 处理数据 results = process_data(items, resume=True) # 保存结果 - if results: logging.info(f"成功处理 {len(results)} 条数据") except Exception as e: logging.error(f"程序执行出错: {e}") if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube

2025年5月8日 · 2 分钟 · IT小章