
| import requests import json import time from typing import List, Dict, Any from abc import ABC, abstractmethod
class BaseExtractor(ABC): """抽象基类 - 数据提取器接口""" def __init__(self, config: Dict[str, Any]): self.config = config self.session = self._create_session() def _create_session(self) -> requests.Session: """创建HTTP会话""" session = requests.Session() session.headers.update(self.config.get('headers', {})) return session @abstractmethod def search(self, query: str) -> List[str]: """搜索方法 - 子类必须实现""" pass @abstractmethod def extract(self, item_id: str) -> Dict[str, Any]: """提取方法 - 子类必须实现""" pass
class CacheManager: """缓存管理器""" def __init__(self, cache_file: str): self.cache_file = cache_file self.data = self._load() def _load(self) -> Dict: try: with open(self.cache_file, 'r') as f: return json.load(f) except FileNotFoundError: return {} def get(self, key: str) -> Any: return self.data.get(key) def set(self, key: str, value: Any): self.data[key] = value self._save() def _save(self): with open(self.cache_file, 'w') as f: json.dump(self.data, f)
class DataProcessor: """数据处理器""" @staticmethod def process_batch(items: List[str], processor_func) -> List[Dict]: """批量处理数据""" results = [] for item in items: try: result = processor_func(item) if result: results.append(result) except Exception as e: print(f"处理失败: {item}, 错误: {e}") return results @staticmethod def save_results(data: List[Dict], filename: str): """保存结果""" with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
class WebExtractor(BaseExtractor): """具体实现类""" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.cache = CacheManager(config.get('cache_file', 'cache.json')) def search(self, query: str) -> List[str]: """搜索实现""" cache_key = f"search_{query}" cached = self.cache.get(cache_key) if cached: return cached time.sleep(1) results = self._perform_search(query) self.cache.set(cache_key, results) return results def extract(self, item_id: str) -> Dict[str, Any]: """提取实现""" cached = self.cache.get(item_id) if cached: return cached time.sleep(0.5) data = self._perform_extraction(item_id) if data: self.cache.set(item_id, data) return data def _perform_search(self, query: str) -> List[str]: """执行搜索 - 具体实现会根据目标网站调整""" return [f"item_{i}" for i in range(3)] def _perform_extraction(self, item_id: str) -> Dict[str, Any]: """执行提取 - 具体实现会根据目标网站调整""" return { 'id': item_id, 'title': f'Title for {item_id}', 'data': f'Data for {item_id}' }
class Application: """应用程序主类""" def __init__(self, config: Dict[str, Any]): self.config = config self.extractor = WebExtractor(config) self.processor = DataProcessor() def run(self, queries: List[str]): """运行应用程序""" all_results = [] for query in queries: items = self.extractor.search(query) for item_id in items: data = self.extractor.extract(item_id) if data: data['query'] = query all_results.append(data) self.processor.save_results(all_results, 'results.json') print(f"完成处理,共获取 {len(all_results)} 条数据")
def main(): """主函数""" config = { 'headers': {'User-Agent': 'Generic-Bot/1.0'}, 'cache_file': 'app_cache.json', 'delay': 1.0 } app = Application(config) test_queries = ['query1', 'query2', 'query3'] app.run(test_queries)
if __name__ == '__main__': main()
|