1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| """ ================================ 作者:IT小章 网站:itxiaozhang.com 时间:2024年11月27日 Copyright © 2024 IT小章 ================================ """
import requests import re import csv import time import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from lxml import html
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("代谢物提取.log", encoding='utf-8'), logging.StreamHandler() ] )
class MetaboliteExtractor: """代谢物数据提取器""" def __init__(self): self.base_url = "https://hmdb.ca" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } def process_files(self): """处理输入文件""" try: logging.info("程序开始运行...") results = [] if os.path.exists('positive.txt'): logging.info("处理positive.txt...") results.extend(self._process_file('positive.txt', 'positive')) if os.path.exists('negative.txt'): logging.info("处理negative.txt...") results.extend(self._process_file('negative.txt', 'negative')) if results: self._save_results(results) logging.info(f"处理完成,共获取 {len(results)} 条结果") else: logging.error("未找到任何结果") except Exception as e: logging.error(f"处理过程出错: {str(e)}") finally: logging.info("程序运行结束") print("\n" + "="*50) input("按回车键退出程序...") def _process_file(self, filename, mode): """处理单个文件(具体实现已隐藏)""" try: with open(filename, 'r', encoding='utf-8') as f: data = f.read().strip() if not data: logging.warning(f"{filename} 为空") return [] logging.info(f"正在处理 {filename}") return self._extract_data(data, mode) except FileNotFoundError: logging.error(f"未找到文件: {filename}") return [] def _extract_data(self, data, mode): """提取数据(具体实现已隐藏)""" pass def _save_results(self, results): """保存结果到CSV""" try: filename = '代谢物数据.csv' with open(filename, 'w', newline='', encoding='utf-8-sig') as f: if results: writer = csv.DictWriter(f, fieldnames=results[0].keys()) writer.writeheader() writer.writerows(results) logging.info(f"数据已保存到 {filename}") except Exception as e: logging.error(f"保存数据失败: {str(e)}")
def main(): extractor = MetaboliteExtractor() extractor.process_files()
if __name__ == "__main__": main()
|