1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
| import requests import json import time from typing import List, Dict, Any from abc import ABC, abstractmethod
class BaseExtractor(ABC): """抽象基类 - 数据提取器接口""" def __init__(self, config: Dict[str, Any]): self.config = config self.session = self._create_session() def _create_session(self) -> requests.Session: """创建HTTP会话""" session = requests.Session() session.headers.update(self.config.get('headers', {})) return session @abstractmethod def search(self, query: str) -> List[str]: """搜索方法 - 子类必须实现""" pass @abstractmethod def extract(self, item_id: str) -> Dict[str, Any]: """提取方法 - 子类必须实现""" pass
class CacheManager: """缓存管理器""" def __init__(self, cache_file: str): self.cache_file = cache_file self.data = self._load() def _load(self) -> Dict: try: with open(self.cache_file, 'r') as f: return json.load(f) except FileNotFoundError: return {} def get(self, key: str) -> Any: return self.data.get(key) def set(self, key: str, value: Any): self.data[key] = value self._save() def _save(self): with open(self.cache_file, 'w') as f: json.dump(self.data, f)
class DataProcessor: """数据处理器""" @staticmethod def process_batch(items: List[str], processor_func) -> List[Dict]: """批量处理数据""" results = [] for item in items: try: result = processor_func(item) if result: results.append(result) except Exception as e: print(f"处理失败: {item}, 错误: {e}") return results @staticmethod def save_results(data: List[Dict], filename: str): """保存结果""" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
class WebExtractor(BaseExtractor): """具体实现类""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.cache = CacheManager(config.get('cache_file', 'cache.json')) def search(self, query: str) -> List[str]: """搜索实现""" cache_key = f"search_{query}" cached = self.cache.get(cache_key) if cached: return cached time.sleep(1) results = self._perform_search(query) self.cache.set(cache_key, results) return results def extract(self, item_id: str) -> Dict[str, Any]: """提取实现""" cached = self.cache.get(item_id) if cached: return cached time.sleep(0.5) data = self._perform_extraction(item_id) if data: self.cache.set(item_id, data) return data def _perform_search(self, query: str) -> List[str]: """执行搜索 - 具体实现会根据目标网站调整""" return [f"item_{i}" for i in range(3)] def _perform_extraction(self, item_id: str) -> Dict[str, Any]: """执行提取 - 具体实现会根据目标网站调整""" return { 'id': item_id, 'title': f'Title for {item_id}', 'data': f'Data for {item_id}' }
class Application: """应用程序主类""" def __init__(self, config: Dict[str, Any]): self.config = config self.extractor = WebExtractor(config) self.processor = DataProcessor() def run(self, queries: List[str]): """运行应用程序""" all_results = [] for query in queries: items = self.extractor.search(query) for item_id in items: data = self.extractor.extract(item_id) if data: data['query'] = query all_results.append(data) self.processor.save_results(all_results, 'results.json') print(f"完成处理,共获取 {len(all_results)} 条数据")
def main(): """主函数""" config = { 'headers': {'User-Agent': 'Generic-Bot/1.0'}, 'cache_file': 'app_cache.json', 'delay': 1.0 } app = Application(config) test_queries = ['query1', 'query2', 'query3'] app.run(test_queries)
if __name__ == '__main__': main()
|