1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| """代谢物数据处理工具"""
import json, os, logging from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm from typing import List, Dict, Optional
class DataManager: def __init__(self, cache_file='cache.json', progress_file='progress.json'): self.cache_file = cache_file self.progress_file = progress_file self.cache = self._load_json(cache_file, {}) self.progress = self._load_json(progress_file, {'processed': []}) def _load_json(self, file: str, default: Dict) -> Dict: if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return json.load(f) except: pass return default def save_progress(self, item_id: str): if item_id not in self.progress['processed']: self.progress['processed'].append(item_id) self._save_json(self.progress_file, self.progress) def _save_json(self, file: str, data: Dict): try: with open(file, 'w', encoding='utf-8') as f: json.dump(data, f) except Exception as e: logging.error(f"保存文件失败: {e}") def is_processed(self, item_id: str) -> bool: return item_id in self.progress['processed'] def get_cache(self, key: str) -> Optional[Dict]: return self.cache.get(key) def set_cache(self, key: str, data: Dict): self.cache[key] = data self._save_json(self.cache_file, self.cache)
def setup_logging(): logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("处理日志.log", encoding='utf-8'), logging.StreamHandler() ] )
def process_data(items: List[str], resume: bool = False) -> List[Dict]: """处理数据的主要函数 Args: items: 待处理的数据项列表 resume: 是否继续上次的进度 Returns: 处理结果列表 """ data_manager = DataManager() results = [] for item in items: if resume and data_manager.is_processed(item): continue data = {"id": item, "status": "processed"} results.append(data) data_manager.save_progress(item) return results
def main(): setup_logging() try: items = ["item1", "item2", "item3"] results = process_data(items, resume=True) if results: logging.info(f"成功处理 {len(results)} 条数据") except Exception as e: logging.error(f"程序执行出错: {e}")
if __name__ == "__main__": main()
|