#!/usr/bin/env python3 """ MIUI自动任务分批运行版 """ import os import glob import subprocess import stat import platform import sys import time import gc import shutil import signal import atexit class MIUITaskQinglong: """MIUI自动任务分批运行版""" # ==================== 用户配置区域 ==================== ENABLE_BATCH_MODE = "true" # 是否启用分批处理,"false"=一次处理所有账号 DEFAULT_BATCH_SIZE = 5 # 每批处理账号数量,建议5-10 DEFAULT_BATCH_DELAY = 5 # 批次间延迟秒数,建议5-10 DEFAULT_MAX_RETRIES = 1 # 失败重试次数,建议1-3 DEFAULT_CONFIG_PATH = "data/config.yaml" # 配置文件路径 # ==================================================== # 系统常量 TEMP_DIR = "temp_ql" DATA_DIR = "data" # 执行参数限制 MIN_BATCH_SIZE, MAX_BATCH_SIZE = 1, 10 MIN_DELAY, MAX_DELAY = 5, 300 MIN_RETRIES, MAX_RETRIES = 0, 5 # 常用字符串 BATCH_PREFIX = "=== 批次" CLEANUP_PATTERN = "data_backup_*" BINARY_PATTERNS = [ "miuitask*{arch}*.bin", "miuitask*.bin", "miuitask*linux*{arch}*", "miuitask*linux*" ] LOG_KEYWORDS = [ 'SUCCESS', 'ERROR', '成功', '失败', '领到', '完成', '任务', '账号:', '成长值+', '获得', '金币', '系统信息', '项目信息', '脚本日志', '开始处理账号任务', '正在启动', '构建时间', '当前版本', '操作系统:', 'Go版本:', '闲鱼ID', '反馈带日志', 'cookie失效', '登录失败', '网络错误', '配置错误', '任务已完成', '执行任务', '处理账号', 'Cookie invalid', 'Login failed' ] def __init__(self): self.enable_batch = os.getenv('ENABLE_BATCH_MODE', self.ENABLE_BATCH_MODE).lower() in ('true', '1', 'yes') self.batch_size = max(self.MIN_BATCH_SIZE, min(self.MAX_BATCH_SIZE, int(os.getenv('BATCH_SIZE', str(self.DEFAULT_BATCH_SIZE))))) self.batch_delay = max(self.MIN_DELAY, min(self.MAX_DELAY, int(os.getenv('BATCH_DELAY', str(self.DEFAULT_BATCH_DELAY))))) self.max_retries = max(self.MIN_RETRIES, min(self.MAX_RETRIES, int(os.getenv('MAX_RETRIES', str(self.DEFAULT_MAX_RETRIES))))) self.config_file = os.getenv('MIUITASK_CONFIG_PATH', self.DEFAULT_CONFIG_PATH) atexit.register(self._cleanup_all) signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) def log(self, message: str): print(message) def _validate_config(self, config): if not isinstance(config, dict) or not config.get('accounts'): return False, "配置文件格式错误或缺少accounts" accounts = config['accounts'] if not isinstance(accounts, list) or len(accounts) == 0: return False, "accounts必须是非空数组" for i, account in enumerate(accounts): if not isinstance(account, dict) or not account.get('uid'): return False, f"账号{i+1}格式错误或缺少uid" has_cookies = account.get('cookies') and isinstance(account['cookies'], dict) has_password = account.get('password') and account['password'].strip() if not has_cookies and not has_password: return False, f"账号{i+1}缺少有效的cookies或password" return True, "配置验证通过" def _cleanup_all(self): # 清理临时目录 if os.path.exists(self.TEMP_DIR): try: shutil.rmtree(self.TEMP_DIR) except Exception: pass # 清理所有备份目录 for item in glob.glob("data_backup_*"): try: if os.path.isdir(item): shutil.rmtree(item) except Exception: pass def _signal_handler(self, signum, _): self._cleanup_all() sys.exit(128 + signum) def get_system_arch(self): machine = platform.machine().lower() if machine in ("x86_64", "amd64"): return "amd64" elif machine in ("aarch64", "arm64"): return "arm64" else: self.log(f"不支持的架构: {machine}") sys.exit(1) def parse_yaml_robust(self, file_path: str): self.log(f"解析配置文件: {file_path}") try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() config = {'accounts': []} lines = content.split('\n') in_accounts = False current_account = None current_cookies = None current_mipay_cookies = None for line in lines: stripped = line.strip() if not stripped or stripped.startswith('#'): continue if stripped == 'accounts:': in_accounts = True continue if in_accounts and not line.startswith(' ') and not line.startswith('\t') and ':' in stripped: if stripped.split(':')[0].strip() in ['onepush', 'preference']: break if in_accounts: indent = len(line) - len(line.lstrip()) if stripped.startswith('- ') or (stripped.startswith('-') and ':' in stripped): if current_account: config['accounts'].append(current_account) current_account = {} current_cookies = None current_mipay_cookies = None if ':' in stripped: key_value = stripped.split(':', 1)[1].strip().strip('"\'') if key_value: current_account['uid'] = key_value elif current_account is not None and ':' in stripped and indent > 0: key, value = stripped.split(':', 1) key = key.strip() value = value.strip() if key == 'cookies': current_cookies = {} current_account['cookies'] = current_cookies elif key == 'mipay_cookies': current_mipay_cookies = {} current_account['mipay_cookies'] = current_mipay_cookies elif current_cookies is not None and indent > 6: current_cookies[key] = value.strip('"\'') elif current_mipay_cookies is not None and indent > 6: current_mipay_cookies[key] = value.strip('"\'') else: clean_value = value.strip('"\'') if clean_value.lower() in ('true', 'false'): current_account[key] = clean_value.lower() == 'true' else: current_account[key] = clean_value if current_account: config['accounts'].append(current_account) valid_accounts = [] for i, account in enumerate(config['accounts']): if account.get('uid'): if 'mipay' not in account: account['mipay'] = True if 'community' not in account: account['community'] = False valid_accounts.append(account) self.log(f"账号 {i+1}: uid={account.get('uid')}") config['accounts'] = valid_accounts self.log(f"找到 {len(valid_accounts)} 个账号") return config except Exception as e: self.log(f"解析配置文件失败: {e}") return {'accounts': []} def create_batch_config(self, accounts, batch_num: int): os.makedirs(self.TEMP_DIR, exist_ok=True) lines = [] lines.append("preference:") lines.append(' twocaptcha_api_key: ""') lines.append(' twocaptcha_user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"') lines.append(' twocaptcha_server: ""') lines.append("") lines.append("accounts:") for account in accounts: lines.append(f' - uid: "{account.get("uid", "")}"') lines.append(f' password: "{account.get("password", "")}"') if account.get('cookies'): lines.append(" cookies:") for k, v in account['cookies'].items(): lines.append(f' {k}: {v}') if account.get('mipay_cookies'): lines.append(" mipay_cookies:") for k, v in account['mipay_cookies'].items(): lines.append(f' {k}: {v}') lines.append(f' login_user_agent: "{account.get("login_user_agent", "Mozilla/5.0 (Linux; Android 13)")}"') lines.append(f' user_agent: "{account.get("user_agent", "Mozilla/5.0 (Linux; Android 13)")}"') lines.append(f' device: "{account.get("device", "")}"') lines.append(f' device_model: "{account.get("device_model", "")}"') lines.append(f' community: {str(account.get("community", False)).lower()}') lines.append(f' mipay: {str(account.get("mipay", True)).lower()}') lines.append("") lines.append("onepush:") lines.append(' notifier: ""') lines.append(" params: {}") config_content = '\n'.join(lines) config_path = os.path.join(self.TEMP_DIR, f"config_batch_{batch_num}.yaml") try: with open(config_path, 'w', encoding='utf-8') as f: f.write(config_content) return config_path except Exception as e: self.log(f"创建配置文件失败: {e}") return None def find_binary(self): arch = self.get_system_arch() patterns = [pattern.format(arch=arch) for pattern in self.BINARY_PATTERNS] candidates = [] for pattern in patterns: candidates.extend(glob.glob(pattern)) if not candidates: self.log("未找到可执行文件") return None best_file = None for f in candidates: if 'linux' in f.lower() and arch in f.lower(): best_file = f break if not best_file: for f in candidates: if 'linux' in f.lower(): best_file = f break if not best_file: best_file = candidates[0] self.log(f"选择可执行文件: {best_file}") try: current_mode = os.stat(best_file).st_mode os.chmod(best_file, current_mode | stat.S_IEXEC) except Exception as e: self.log(f"设置执行权限失败: {e}") return best_file def run_batch(self, binary_path: str, config_path: str, batch_num: int): self.log(f"{self.BATCH_PREFIX} {batch_num} ===") try: env = os.environ.copy() env['MIUITASK_CONFIG_PATH'] = config_path # 使用Popen实现实时输出,无超时限制 process = subprocess.Popen( [f"./{binary_path}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # 合并stderr到stdout text=True, env=env, bufsize=1, universal_newlines=True ) # 实时读取并输出每一行 while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: line = output.strip() if line: self.log(line) # 获取最终退出码 return_code = process.poll() self.log(f"批次 {batch_num} 完成,退出码: {return_code}") success = return_code == 0 if not success: self.log(f"批次 {batch_num} 失败") return success except Exception as e: self.log(f"批次 {batch_num} 异常: {e}") return False finally: try: if os.path.exists(config_path): os.remove(config_path) except Exception: pass def run_all_batches(self): self.log("=== MIUI自动任务分批运行版启动 ===") mode = "分批处理" if self.enable_batch else "全量处理" self.log(f"模式: {mode}, 批次={self.batch_size}, 延迟={self.batch_delay}s, 重试={self.max_retries}") binary_path = self.find_binary() if not binary_path: return False if not os.path.exists(self.config_file): self.log(f"配置文件不存在: {self.config_file}") self.log("运行程序创建配置文件...") try: result = subprocess.run( [f"./{binary_path}"], capture_output=True, text=True, env=os.environ.copy() ) if result.stdout: for line in result.stdout.split('\n'): line = line.strip() if line: self.log(line) if os.path.exists(self.config_file): self.log("配置文件创建成功") else: self.log("配置文件创建失败") return False except Exception as e: self.log(f"创建配置文件失败: {e}") return False config = self.parse_yaml_robust(self.config_file) is_valid, message = self._validate_config(config) if not is_valid: self.log(f"配置错误: {message}") return False accounts = config.get('accounts', []) if not accounts: self.log("配置文件中没有账号,请先配置账号") return False if self.enable_batch: batches = [] for i in range(0, len(accounts), self.batch_size): batches.append(accounts[i:i + self.batch_size]) self.log(f"分批处理: {len(accounts)} 个账号 → {len(batches)} 批次") else: batches = [accounts] self.log(f"全量处理: {len(accounts)} 个账号") total_batches = len(batches) successful_batches = 0 failed_batches = [] try: for batch_num, accounts in enumerate(batches, 1): self.log(f"{self.BATCH_PREFIX} {batch_num}/{total_batches} ===") uids = [acc.get('uid', 'unknown') for acc in accounts] self.log(f"账号: {uids}") config_path = self.create_batch_config(accounts, batch_num) if not config_path: failed_batches.append(batch_num) continue success = False for attempt in range(self.max_retries + 1): if attempt > 0: self.log(f"重试 {attempt + 1}") time.sleep(10) # 重试时重新创建配置文件 config_path = self.create_batch_config(accounts, batch_num) if not config_path: break success = self.run_batch(binary_path, config_path, batch_num) if success: successful_batches += 1 break if not success: failed_batches.append(batch_num) if batch_num < total_batches and self.enable_batch: gc.collect() time.sleep(self.batch_delay) finally: self._cleanup_all() self.log("=== 执行结果 ===") self.log(f"总批次: {total_batches}, 成功: {successful_batches}, 失败: {len(failed_batches)}") if failed_batches: self.log(f"失败批次: {failed_batches}") success_rate = (successful_batches / total_batches) * 100 if total_batches > 0 else 0 if success_rate == 100: self.log(f"🎉 成功率: {success_rate:.1f}%") else: self.log(f"成功率: {success_rate:.1f}%") return len(failed_batches) == 0 def main(): try: runner = MIUITaskQinglong() success = runner.run_all_batches() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n用户中断") sys.exit(130) except Exception as e: print(f"异常: {e}") sys.exit(1) if __name__ == "__main__": main()