上传文件至 /

This commit is contained in:
2025-09-27 02:21:57 +08:00
parent 9aba27aa00
commit cf163ff274

476
miuitask_batch.py Normal file
View File

@@ -0,0 +1,476 @@
#!/usr/bin/env python3
"""
MIUI自动任务分批运行版
"""
import os
import glob
import subprocess
import stat
import platform
import sys
import time
import gc
import shutil
import signal
import atexit
class MIUITaskQinglong:
"""MIUI自动任务分批运行版"""
# ==================== 用户配置区域 ====================
# 可直接修改以下参数,无需设置环境变量
ENABLE_BATCH_MODE = "true" # 是否启用分批处理,"false"=一次处理所有账号
DEFAULT_BATCH_SIZE = 5 # 每批处理账号数量建议5-10
DEFAULT_BATCH_DELAY = 5 # 批次间延迟秒数建议5-10
DEFAULT_MAX_RETRIES = 1 # 失败重试次数建议1-3
DEFAULT_CONFIG_PATH = "data/config.yaml" # 配置文件路径
# ====================================================
# 系统常量
TEMP_DIR = "temp_ql"
DATA_DIR = "data"
# 执行参数限制
MIN_BATCH_SIZE, MAX_BATCH_SIZE = 1, 10
MIN_DELAY, MAX_DELAY = 5, 300
MIN_RETRIES, MAX_RETRIES = 0, 5
# 常用字符串
BATCH_PREFIX = "=== 批次"
CLEANUP_PATTERN = "data_backup_*"
BINARY_PATTERNS = [
"miuitask*{arch}*.bin",
"miuitask*.bin",
"miuitask*linux*{arch}*",
"miuitask*linux*"
]
LOG_KEYWORDS = [
'SUCCESS', 'ERROR', '成功', '失败', '领到', '完成', '任务',
'账号:', '成长值+', '获得', '金币', '系统信息', '项目信息',
'脚本日志', '开始处理账号任务', '正在启动', '构建时间', '当前版本',
'操作系统:', 'Go版本:', '闲鱼ID', '反馈带日志',
'cookie失效', '登录失败', '网络错误', '配置错误', '任务已完成',
'执行任务', '处理账号', 'Cookie invalid', 'Login failed'
]
def __init__(self):
self.enable_batch = os.getenv('ENABLE_BATCH_MODE', self.ENABLE_BATCH_MODE).lower() in ('true', '1', 'yes')
self.batch_size = max(self.MIN_BATCH_SIZE, min(self.MAX_BATCH_SIZE,
int(os.getenv('BATCH_SIZE', str(self.DEFAULT_BATCH_SIZE)))))
self.batch_delay = max(self.MIN_DELAY, min(self.MAX_DELAY,
int(os.getenv('BATCH_DELAY', str(self.DEFAULT_BATCH_DELAY)))))
self.max_retries = max(self.MIN_RETRIES, min(self.MAX_RETRIES,
int(os.getenv('MAX_RETRIES', str(self.DEFAULT_MAX_RETRIES)))))
self.config_file = os.getenv('MIUITASK_CONFIG_PATH', self.DEFAULT_CONFIG_PATH)
atexit.register(self._cleanup_all)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def log(self, message: str):
print(message)
def _validate_config(self, config):
if not isinstance(config, dict) or not config.get('accounts'):
return False, "配置文件格式错误或缺少accounts"
accounts = config['accounts']
if not isinstance(accounts, list) or len(accounts) == 0:
return False, "accounts必须是非空数组"
for i, account in enumerate(accounts):
if not isinstance(account, dict) or not account.get('uid'):
return False, f"账号{i+1}格式错误或缺少uid"
has_cookies = account.get('cookies') and isinstance(account['cookies'], dict)
has_password = account.get('password') and account['password'].strip()
if not has_cookies and not has_password:
return False, f"账号{i+1}缺少有效的cookies或password"
return True, "配置验证通过"
def _cleanup_all(self):
# 清理临时目录
if os.path.exists(self.TEMP_DIR):
try:
shutil.rmtree(self.TEMP_DIR)
except Exception:
pass
# 清理所有备份目录
for item in glob.glob("data_backup_*"):
try:
if os.path.isdir(item):
shutil.rmtree(item)
except Exception:
pass
def _signal_handler(self, signum, _):
self._cleanup_all()
sys.exit(128 + signum)
def get_system_arch(self):
machine = platform.machine().lower()
if machine in ("x86_64", "amd64"):
return "amd64"
elif machine in ("aarch64", "arm64"):
return "arm64"
else:
self.log(f"不支持的架构: {machine}")
sys.exit(1)
def parse_yaml_robust(self, file_path: str):
self.log(f"解析配置文件: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
config = {'accounts': []}
lines = content.split('\n')
in_accounts = False
current_account = None
current_cookies = None
current_mipay_cookies = None
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith('#'):
continue
if stripped == 'accounts:':
in_accounts = True
continue
if in_accounts and not line.startswith(' ') and not line.startswith('\t') and ':' in stripped:
if stripped.split(':')[0].strip() in ['onepush', 'preference']:
break
if in_accounts:
indent = len(line) - len(line.lstrip())
if stripped.startswith('- ') or (stripped.startswith('-') and ':' in stripped):
if current_account:
config['accounts'].append(current_account)
current_account = {}
current_cookies = None
current_mipay_cookies = None
if ':' in stripped:
key_value = stripped.split(':', 1)[1].strip().strip('"\'')
if key_value:
current_account['uid'] = key_value
elif current_account is not None and ':' in stripped and indent > 0:
key, value = stripped.split(':', 1)
key = key.strip()
value = value.strip()
if key == 'cookies':
current_cookies = {}
current_account['cookies'] = current_cookies
elif key == 'mipay_cookies':
current_mipay_cookies = {}
current_account['mipay_cookies'] = current_mipay_cookies
elif current_cookies is not None and indent > 6:
current_cookies[key] = value.strip('"\'')
elif current_mipay_cookies is not None and indent > 6:
current_mipay_cookies[key] = value.strip('"\'')
else:
clean_value = value.strip('"\'')
if clean_value.lower() in ('true', 'false'):
current_account[key] = clean_value.lower() == 'true'
else:
current_account[key] = clean_value
if current_account:
config['accounts'].append(current_account)
valid_accounts = []
for i, account in enumerate(config['accounts']):
if account.get('uid'):
if 'mipay' not in account:
account['mipay'] = True
if 'community' not in account:
account['community'] = False
valid_accounts.append(account)
self.log(f"账号 {i+1}: uid={account.get('uid')}")
config['accounts'] = valid_accounts
self.log(f"找到 {len(valid_accounts)} 个账号")
return config
except Exception as e:
self.log(f"解析配置文件失败: {e}")
return {'accounts': []}
def create_batch_config(self, accounts, batch_num: int):
os.makedirs(self.TEMP_DIR, exist_ok=True)
lines = []
lines.append("preference:")
lines.append(' twocaptcha_api_key: ""')
lines.append(' twocaptcha_user_agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"')
lines.append(' twocaptcha_server: ""')
lines.append("")
lines.append("accounts:")
for account in accounts:
lines.append(f' - uid: "{account.get("uid", "")}"')
lines.append(f' password: "{account.get("password", "")}"')
if account.get('cookies'):
lines.append(" cookies:")
for k, v in account['cookies'].items():
lines.append(f' {k}: {v}')
if account.get('mipay_cookies'):
lines.append(" mipay_cookies:")
for k, v in account['mipay_cookies'].items():
lines.append(f' {k}: {v}')
lines.append(f' login_user_agent: "{account.get("login_user_agent", "Mozilla/5.0 (Linux; Android 13)")}"')
lines.append(f' user_agent: "{account.get("user_agent", "Mozilla/5.0 (Linux; Android 13)")}"')
lines.append(f' device: "{account.get("device", "")}"')
lines.append(f' device_model: "{account.get("device_model", "")}"')
lines.append(f' community: {str(account.get("community", False)).lower()}')
lines.append(f' mipay: {str(account.get("mipay", True)).lower()}')
lines.append("")
lines.append("onepush:")
lines.append(' notifier: ""')
lines.append(" params: {}")
config_content = '\n'.join(lines)
config_path = os.path.join(self.TEMP_DIR, f"config_batch_{batch_num}.yaml")
try:
with open(config_path, 'w', encoding='utf-8') as f:
f.write(config_content)
return config_path
except Exception as e:
self.log(f"创建配置文件失败: {e}")
return None
def find_binary(self):
arch = self.get_system_arch()
patterns = [pattern.format(arch=arch) for pattern in self.BINARY_PATTERNS]
candidates = []
for pattern in patterns:
candidates.extend(glob.glob(pattern))
if not candidates:
self.log("未找到可执行文件")
return None
best_file = None
for f in candidates:
if 'linux' in f.lower() and arch in f.lower():
best_file = f
break
if not best_file:
for f in candidates:
if 'linux' in f.lower():
best_file = f
break
if not best_file:
best_file = candidates[0]
self.log(f"选择可执行文件: {best_file}")
try:
current_mode = os.stat(best_file).st_mode
os.chmod(best_file, current_mode | stat.S_IEXEC)
except Exception as e:
self.log(f"设置执行权限失败: {e}")
return best_file
def run_batch(self, binary_path: str, config_path: str, batch_num: int):
self.log(f"{self.BATCH_PREFIX} {batch_num} ===")
try:
env = os.environ.copy()
env['MIUITASK_CONFIG_PATH'] = config_path
# 使用Popen实现实时输出无超时限制
process = subprocess.Popen(
[f"./{binary_path}"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 合并stderr到stdout
text=True,
env=env,
bufsize=1,
universal_newlines=True
)
# 实时读取并输出每一行
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
line = output.strip()
if line:
self.log(line)
# 获取最终退出码
return_code = process.poll()
self.log(f"批次 {batch_num} 完成,退出码: {return_code}")
success = return_code == 0
if not success:
self.log(f"批次 {batch_num} 失败")
return success
except Exception as e:
self.log(f"批次 {batch_num} 异常: {e}")
return False
finally:
try:
if os.path.exists(config_path):
os.remove(config_path)
except Exception:
pass
def run_all_batches(self):
self.log("=== MIUI自动任务分批运行版启动 ===")
mode = "分批处理" if self.enable_batch else "全量处理"
self.log(f"模式: {mode}, 批次={self.batch_size}, 延迟={self.batch_delay}s, 重试={self.max_retries}")
binary_path = self.find_binary()
if not binary_path:
return False
if not os.path.exists(self.config_file):
self.log(f"配置文件不存在: {self.config_file}")
self.log("运行程序创建配置文件...")
try:
result = subprocess.run(
[f"./{binary_path}"],
capture_output=True,
text=True,
env=os.environ.copy()
)
if result.stdout:
for line in result.stdout.split('\n'):
line = line.strip()
if line:
self.log(line)
if os.path.exists(self.config_file):
self.log("配置文件创建成功")
else:
self.log("配置文件创建失败")
return False
except Exception as e:
self.log(f"创建配置文件失败: {e}")
return False
config = self.parse_yaml_robust(self.config_file)
is_valid, message = self._validate_config(config)
if not is_valid:
self.log(f"配置错误: {message}")
return False
accounts = config.get('accounts', [])
if not accounts:
self.log("配置文件中没有账号,请先配置账号")
return False
if self.enable_batch:
batches = []
for i in range(0, len(accounts), self.batch_size):
batches.append(accounts[i:i + self.batch_size])
self.log(f"分批处理: {len(accounts)} 个账号 → {len(batches)} 批次")
else:
batches = [accounts]
self.log(f"全量处理: {len(accounts)} 个账号")
total_batches = len(batches)
successful_batches = 0
failed_batches = []
try:
for batch_num, accounts in enumerate(batches, 1):
self.log(f"{self.BATCH_PREFIX} {batch_num}/{total_batches} ===")
uids = [acc.get('uid', 'unknown') for acc in accounts]
self.log(f"账号: {uids}")
config_path = self.create_batch_config(accounts, batch_num)
if not config_path:
failed_batches.append(batch_num)
continue
success = False
for attempt in range(self.max_retries + 1):
if attempt > 0:
self.log(f"重试 {attempt + 1}")
time.sleep(10)
# 重试时重新创建配置文件
config_path = self.create_batch_config(accounts, batch_num)
if not config_path:
break
success = self.run_batch(binary_path, config_path, batch_num)
if success:
successful_batches += 1
break
if not success:
failed_batches.append(batch_num)
if batch_num < total_batches and self.enable_batch:
gc.collect()
time.sleep(self.batch_delay)
finally:
self._cleanup_all()
self.log("=== 执行结果 ===")
self.log(f"总批次: {total_batches}, 成功: {successful_batches}, 失败: {len(failed_batches)}")
if failed_batches:
self.log(f"失败批次: {failed_batches}")
success_rate = (successful_batches / total_batches) * 100 if total_batches > 0 else 0
if success_rate == 100:
self.log(f"🎉 成功率: {success_rate:.1f}%")
else:
self.log(f"成功率: {success_rate:.1f}%")
return len(failed_batches) == 0
def main():
try:
runner = MIUITaskQinglong()
success = runner.run_all_batches()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n用户中断")
sys.exit(130)
except Exception as e:
print(f"异常: {e}")
sys.exit(1)
if __name__ == "__main__":
main()