import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:logging/logging.dart'; import 'mining_task_info.dart'; import '../models/client_status.dart' as ui; /// 客户端核心 - 实现与服务器通信、心跳、挖矿管理等核心功能 class ClientCore { static final ClientCore _instance = ClientCore._internal(); factory ClientCore() => _instance; ClientCore._internal(); final Logger _logger = Logger('ClientCore'); Socket? _socket; String? _serverUrl; String? _auth; String? _machineCode; bool _isConnected = false; DateTime? _lastPingTime; Timer? _heartbeatTimer; Timer? _reconnectTimer; StreamController? _logController; // 重连相关 int _reconnectAttempts = 0; static const int _maxReconnectAttempts = 5; // 需要GPU和挖矿软件信息用于认证 Map? _gpusInfo; List? _miningSofts; // 状态回调 Function(ui.ClientStatus)? onStatusChanged; Function(MiningTaskInfo?)? onMiningTaskChanged; Stream get logStream => _logController?.stream ?? const Stream.empty(); bool get isConnected => _isConnected; /// 更新系统信息(用于后台异步加载后更新) void setSystemInfo(Map gpusInfo, List miningSofts) { _gpusInfo = gpusInfo; _miningSofts = miningSofts; // 如果已连接,重新发送认证消息更新服务器 if (_isConnected) { _sendMachineCode(); } } /// 初始化客户端 Future initialize({ required String serverUrl, required String auth, required String machineCode, Map? gpusInfo, List? miningSofts, }) async { _serverUrl = serverUrl; _auth = auth; _machineCode = machineCode; _gpusInfo = gpusInfo; _miningSofts = miningSofts; _logController = StreamController.broadcast(); try { await _connect(); // 注意:不在这里发送身份认证,等待机器码获取完成后再发送 _startHeartbeat(); return true; } catch (e) { _logger.severe('初始化失败: $e'); return false; } } /// 连接到服务器 Future _connect() async { if (_serverUrl == null) { throw Exception('服务器地址未设置'); } try { final parts = _serverUrl!.split(':'); if (parts.length != 2) { throw Exception('服务器地址格式错误'); } final host = parts[0]; final port = int.parse(parts[1]); _socket = await Socket.connect(host, port, timeout: const Duration(seconds: 10)); _isConnected = true; // 连接成功,重置重连计数器 _reconnectAttempts = 0; _log('连接到服务器成功: $_serverUrl'); // 开始接收消息 _socket!.listen( _onDataReceived, onError: _onError, onDone: _onDone, cancelOnError: false, ); onStatusChanged?.call(ui.ClientStatus.online); // 如果已经有机器码和认证信息,立即发送认证消息(重连场景) if (_auth != null && _machineCode != null && _machineCode!.isNotEmpty && _machineCode != '正在获取...' && _machineCode != '获取失败') { _log('重连成功,自动发送身份认证消息'); _sendMachineCode(); } } catch (e) { _isConnected = false; _log('连接失败: $e'); onStatusChanged?.call(ui.ClientStatus.offline); rethrow; } } /// 更新机器码并发送身份认证(等待硬盘身份码获取完成后调用) void updateMachineCode(String machineCode, Map gpusInfo, List miningSofts) { _machineCode = machineCode; _gpusInfo = gpusInfo; _miningSofts = miningSofts; } /// 发送身份认证消息(公开方法,供外部调用) void sendMachineCode() { _sendMachineCode(); } /// 发送机器码认证消息(内部方法) void _sendMachineCode() { if (_auth == null || _machineCode == null) { _log('身份信息未设置'); return; } // 使用 身份信息::硬盘身份码 格式 final msg = { 'id': '$_auth::$_machineCode', 'method': 'auth.machineCode', 'params': { 'gpus': _gpusInfo ?? {}, 'miningsofts': _miningSofts ?? [], }, }; _sendMessage(msg); _log('发送身份认证消息: $_auth::$_machineCode'); } /// 发送消息到服务器 void _sendMessage(Map message) { if (!_isConnected || _socket == null) { _log('连接未建立,无法发送消息'); return; } try { final jsonStr = jsonEncode(message); _socket!.add(utf8.encode('$jsonStr\n')); } catch (e) { _log('发送消息失败: $e'); } } /// 接收数据 void _onDataReceived(List data) { try { final message = utf8.decode(data); final lines = message.split('\n').where((line) => line.trim().isNotEmpty); for (final line in lines) { _handleMessage(line); } } catch (e) { _log('处理接收数据失败: $e'); } } /// 处理接收到的消息 void _handleMessage(String messageJson) { try { final msg = jsonDecode(messageJson) as Map; final method = msg['method'] as String?; _log('收到消息: $method'); if (method == 'ping') { _handlePing(msg); } else if (method == 'mining.req') { _handleMiningRequest(msg); } else if (method == 'mining.end') { _handleMiningEnd(msg); } } catch (e) { _log('处理消息失败: $e, 原始数据: $messageJson'); } } /// 处理 ping 消息 void _handlePing(Map msg) { _lastPingTime = DateTime.now(); // 回复 pong final pongMsg = { 'id': msg['id'], 'method': 'pong', 'params': null, }; _sendMessage(pongMsg); } /// 处理挖矿请求 void _handleMiningRequest(Map msg) { try { final params = msg['params'] as Map?; if (params == null) { _sendMiningResponse(msg['id'] as String, false, '参数为空'); return; } // 注意:miner 需要从配置中获取,这里先使用默认值 params['miner'] = params['miner'] ?? 'lolminer'; final task = MiningTaskInfo.fromJson(params); onMiningTaskChanged?.call(task); // 启动挖矿软件由 ClientProvider 处理 // 这里只负责响应成功 final respData = { 'coin': task.coin, 'algo': task.algo, 'pool': task.pool, 'pool_url': task.poolUrl, 'worker_id': task.workerId, 'wallet_address': task.walletAddress, 'watch_url': '', }; _sendMiningResponse(msg['id'] as String, true, respData); } catch (e) { _log('处理挖矿请求失败: $e'); _sendMiningResponse(msg['id'] as String, false, e.toString()); } } /// 发送挖矿响应 void _sendMiningResponse(String id, bool success, dynamic data) { final resp = { 'id': id, 'method': 'mining.resp', 'params': success ? data : data.toString(), }; _sendMessage(resp); } /// 处理挖矿结束消息 void _handleMiningEnd(Map msg) { _log('收到挖矿结束消息'); // 通知 ClientProvider 停止挖矿(通过回调实现) onMiningTaskChanged?.call(null); } /// 错误处理 void _onError(dynamic error) { // 检查是否正在停止,避免在停止过程中执行重连 if (_socket == null) { return; // 已经停止,不执行后续操作 } _log('连接错误: $error'); _isConnected = false; onStatusChanged?.call(ui.ClientStatus.offline); _reconnect(); } /// 连接关闭 void _onDone() { // 检查是否正在停止,避免在停止过程中执行重连 if (_socket == null) { return; // 已经停止,不执行后续操作 } _log('连接已关闭'); _isConnected = false; onStatusChanged?.call(ui.ClientStatus.offline); _reconnect(); } /// 重连(指数退避策略) void _reconnect() { // 取消之前的重连定时器 _reconnectTimer?.cancel(); // 检查是否正在停止或已停止 if (_socket == null || _logController == null) { return; // 已经停止,不执行重连 } // 检查是否已达到最大重试次数 if (_reconnectAttempts >= _maxReconnectAttempts) { _log('已达到最大重连次数($_maxReconnectAttempts次),停止重连'); return; } // 计算延迟时间:10秒 * 2^(重试次数) // 第1次:10秒,第2次:20秒,第3次:40秒,第4次:80秒,第5次:160秒 final delaySeconds = 10 * (1 << _reconnectAttempts); _reconnectAttempts++; _log('将在 ${delaySeconds}秒 后进行第 $_reconnectAttempts 次重连尝试(最多$_maxReconnectAttempts次)'); _reconnectTimer = Timer(Duration(seconds: delaySeconds), () { // 再次检查是否正在停止或已停止 if (_socket == null || _logController == null) { return; // 已经停止,不执行重连 } // 检查是否已经连接成功(可能在其他地方已经连接) if (_isConnected) { _reconnectAttempts = 0; // 重置计数器 return; } _log('尝试第 $_reconnectAttempts 次重新连接...'); _connect().then((_) { // 连接成功,计数器已在 _connect() 中重置 _log('重连成功'); }).catchError((e) { _log('第 $_reconnectAttempts 次重连失败: $e'); // 如果未达到最大重试次数,继续重连 if (_reconnectAttempts < _maxReconnectAttempts) { _reconnect(); } else { _log('已达到最大重连次数,停止重连'); } }); }); } /// 启动心跳检查 void _startHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) { if (_lastPingTime != null) { final duration = DateTime.now().difference(_lastPingTime!); if (duration.inMinutes > 60) { _log('超过60分钟未收到心跳,连接可能已断开'); _isConnected = false; onStatusChanged?.call(ui.ClientStatus.offline); // 心跳超时视为新的断开事件,重置重连计数器 _reconnectAttempts = 0; _reconnect(); } } }); } void _log(String message) { final logMsg = '[${DateTime.now().toString()}] $message'; _logger.info(logMsg); // 检查 controller 是否已关闭,避免向已关闭的 controller 添加事件 try { if (_logController != null && !_logController!.isClosed) { _logController!.add(logMsg); } } catch (e) { // 忽略已关闭的 controller 错误 } } /// 停止客户端 void stop() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; _reconnectTimer?.cancel(); _reconnectTimer = null; _reconnectAttempts = 0; // 重置重连计数器 // 先取消 socket 监听,避免 onDone 回调在关闭 controller 后执行 _socket?.destroy(); _socket = null; // 延迟关闭 logController,确保所有回调都已完成 Future.microtask(() { if (_logController != null && !_logController!.isClosed) { _logController!.close(); } _logController = null; }); _isConnected = false; onStatusChanged?.call(ui.ClientStatus.offline); } }