原文地址:https://itxiaozhang.com/python-hmdb-mass-spectrometry-compound-extraction/
如果您需要远程电脑维修或者编程开发,请加我微信咨询。
需求分析 本工具旨在帮助用户根据质谱数据(质量数),自动从HMDB数据库批量检索代谢物信息,包括HMDB ID、常用名称、化学式及结构图片等,并导出为Excel可直接打开的CSV文件,极大提升代谢物注释效率。
程序介绍 准备数据:将所有待检索的质量数写入negative.txt文件,每行一个。 运行脚本: 双击或命令行运行hmdb_metabolite_extractor.py脚本。 程序自动读取negative.txt,并按负离子模式批量检索HMDB。 检索结果自动保存为代谢物数据.csv,日志信息保存在代谢物提取.log。 主要功能模块: 质量数批量检索:自动根据质量数和离子模式检索HMDB ID 代谢物详细信息抓取:根据HMDB ID获取常用名称、化学式、结构图片等信息 并发加速与重试机制:采用多线程并发抓取,自动重试机制 结果导出:自动整理为CSV文件,支持Excel直接打开 部分代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 """代谢物数据处理工具""" import json, os, logging from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from typing import List, Dict, Optional class DataManager: def __init__(self, cache_file='cache.json', progress_file='progress.json'): self.cache_file = cache_file self.progress_file = progress_file self.cache = self._load_json(cache_file, {}) self.progress = self._load_json(progress_file, {'processed': []}) def _load_json(self, file: str, default: Dict) -> Dict: if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return json.load(f) except: pass return default def save_progress(self, item_id: str): if item_id not in self.progress['processed']: self.progress['processed'].append(item_id) self._save_json(self.progress_file, self.progress) def _save_json(self, file: str, data: Dict): try: with open(file, 'w', encoding='utf-8') as f: json.dump(data, f) except Exception as e: logging.error(f"保存文件失败: {e}") def is_processed(self, item_id: str) -> bool: return item_id in self.progress['processed'] def get_cache(self, key: str) -> Optional[Dict]: return self.cache.get(key) def set_cache(self, key: str, data: Dict): self.cache[key] = data self._save_json(self.cache_file, self.cache) def setup_logging(): logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("处理日志.log", encoding='utf-8'), logging.StreamHandler() ] ) def process_data(items: List[str], resume: bool = False) -> List[Dict]: """处理数据的主要函数 Args: items: 待处理的数据项列表 resume: 是否继续上次的进度 Returns: 处理结果列表 """ data_manager = DataManager() results = [] for item in items: # 检查是否已处理 if resume and data_manager.is_processed(item): continue # 获取数据 - data = {"id": item, "status": "processed"} results.append(data) # 保存进度 data_manager.save_progress(item) return results def main(): setup_logging() try: # 读取输入数据 - items = ["item1", "item2", "item3"] # 处理数据 results = process_data(items, resume=True) # 保存结果 - if results: logging.info(f"成功处理 {len(results)} 条数据") except Exception as e: logging.error(f"程序执行出错: {e}") if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube