Files
windows-application/lib/core/client_core.dart
2026-01-23 16:11:20 +08:00

400 lines
12 KiB
Dart
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:logging/logging.dart';
import 'mining_task_info.dart';
import '../models/client_status.dart' as ui;
/// 客户端核心 - 实现与服务器通信、心跳、挖矿管理等核心功能
class ClientCore {
static final ClientCore _instance = ClientCore._internal();
factory ClientCore() => _instance;
ClientCore._internal();
final Logger _logger = Logger('ClientCore');
Socket? _socket;
String? _serverUrl;
String? _auth;
String? _machineCode;
bool _isConnected = false;
DateTime? _lastPingTime;
Timer? _heartbeatTimer;
Timer? _reconnectTimer;
StreamController<String>? _logController;
// 重连相关
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
// 需要GPU和挖矿软件信息用于认证
Map<String, dynamic>? _gpusInfo;
List<String>? _miningSofts;
// 状态回调
Function(ui.ClientStatus)? onStatusChanged;
Function(MiningTaskInfo?)? onMiningTaskChanged;
Stream<String> get logStream => _logController?.stream ?? const Stream.empty();
bool get isConnected => _isConnected;
/// 更新系统信息(用于后台异步加载后更新)
void setSystemInfo(Map<String, dynamic> gpusInfo, List<String> miningSofts) {
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
// 如果已连接,重新发送认证消息更新服务器
if (_isConnected) {
_sendMachineCode();
}
}
/// 初始化客户端
Future<bool> initialize({
required String serverUrl,
required String auth,
required String machineCode,
Map<String, dynamic>? gpusInfo,
List<String>? miningSofts,
}) async {
_serverUrl = serverUrl;
_auth = auth;
_machineCode = machineCode;
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
_logController = StreamController<String>.broadcast();
try {
await _connect();
// 注意:不在这里发送身份认证,等待机器码获取完成后再发送
_startHeartbeat();
return true;
} catch (e) {
_logger.severe('初始化失败: $e');
return false;
}
}
/// 连接到服务器
Future<void> _connect() async {
if (_serverUrl == null) {
throw Exception('服务器地址未设置');
}
try {
final parts = _serverUrl!.split(':');
if (parts.length != 2) {
throw Exception('服务器地址格式错误');
}
final host = parts[0];
final port = int.parse(parts[1]);
_socket = await Socket.connect(host, port, timeout: const Duration(seconds: 10));
_isConnected = true;
// 连接成功,重置重连计数器
_reconnectAttempts = 0;
_log('连接到服务器成功: $_serverUrl');
// 开始接收消息
_socket!.listen(
_onDataReceived,
onError: _onError,
onDone: _onDone,
cancelOnError: false,
);
onStatusChanged?.call(ui.ClientStatus.online);
// 如果已经有机器码和认证信息,立即发送认证消息(重连场景)
if (_auth != null && _machineCode != null && _machineCode!.isNotEmpty && _machineCode != '正在获取...' && _machineCode != '获取失败') {
_log('重连成功,自动发送身份认证消息');
_sendMachineCode();
}
} catch (e) {
_isConnected = false;
_log('连接失败: $e');
onStatusChanged?.call(ui.ClientStatus.offline);
rethrow;
}
}
/// 更新机器码并发送身份认证(等待硬盘身份码获取完成后调用)
void updateMachineCode(String machineCode, Map<String, dynamic> gpusInfo, List<String> miningSofts) {
_machineCode = machineCode;
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
}
/// 发送身份认证消息(公开方法,供外部调用)
void sendMachineCode() {
_sendMachineCode();
}
/// 发送机器码认证消息(内部方法)
void _sendMachineCode() {
if (_auth == null || _machineCode == null) {
_log('身份信息未设置');
return;
}
// 使用 身份信息::硬盘身份码 格式
final msg = {
'id': '$_auth::$_machineCode',
'method': 'auth.machineCode',
'params': {
'gpus': _gpusInfo ?? {},
'miningsofts': _miningSofts ?? [],
},
};
_sendMessage(msg);
_log('发送身份认证消息: $_auth::$_machineCode');
}
/// 发送消息到服务器
void _sendMessage(Map<String, dynamic> message) {
if (!_isConnected || _socket == null) {
_log('连接未建立,无法发送消息');
return;
}
try {
final jsonStr = jsonEncode(message);
_socket!.add(utf8.encode('$jsonStr\n'));
} catch (e) {
_log('发送消息失败: $e');
}
}
/// 接收数据
void _onDataReceived(List<int> data) {
try {
final message = utf8.decode(data);
final lines = message.split('\n').where((line) => line.trim().isNotEmpty);
for (final line in lines) {
_handleMessage(line);
}
} catch (e) {
_log('处理接收数据失败: $e');
}
}
/// 处理接收到的消息
void _handleMessage(String messageJson) {
try {
final msg = jsonDecode(messageJson) as Map<String, dynamic>;
final method = msg['method'] as String?;
_log('收到消息: $method');
if (method == 'ping') {
_handlePing(msg);
} else if (method == 'mining.req') {
_handleMiningRequest(msg);
} else if (method == 'mining.end') {
_handleMiningEnd(msg);
}
} catch (e) {
_log('处理消息失败: $e, 原始数据: $messageJson');
}
}
/// 处理 ping 消息
void _handlePing(Map<String, dynamic> msg) {
_lastPingTime = DateTime.now();
// 回复 pong
final pongMsg = {
'id': msg['id'],
'method': 'pong',
'params': null,
};
_sendMessage(pongMsg);
}
/// 处理挖矿请求
void _handleMiningRequest(Map<String, dynamic> msg) {
try {
final params = msg['params'] as Map<String, dynamic>?;
if (params == null) {
_sendMiningResponse(msg['id'] as String, false, '参数为空');
return;
}
// 注意miner 需要从配置中获取,这里先使用默认值
params['miner'] = params['miner'] ?? 'lolminer';
final task = MiningTaskInfo.fromJson(params);
onMiningTaskChanged?.call(task);
// 启动挖矿软件由 ClientProvider 处理
// 这里只负责响应成功
final respData = {
'coin': task.coin,
'algo': task.algo,
'pool': task.pool,
'pool_url': task.poolUrl,
'worker_id': task.workerId,
'wallet_address': task.walletAddress,
'watch_url': '',
};
_sendMiningResponse(msg['id'] as String, true, respData);
} catch (e) {
_log('处理挖矿请求失败: $e');
_sendMiningResponse(msg['id'] as String, false, e.toString());
}
}
/// 发送挖矿响应
void _sendMiningResponse(String id, bool success, dynamic data) {
final resp = {
'id': id,
'method': 'mining.resp',
'params': success ? data : data.toString(),
};
_sendMessage(resp);
}
/// 处理挖矿结束消息
void _handleMiningEnd(Map<String, dynamic> msg) {
_log('收到挖矿结束消息');
// 通知 ClientProvider 停止挖矿(通过回调实现)
onMiningTaskChanged?.call(null);
}
/// 错误处理
void _onError(dynamic error) {
// 检查是否正在停止,避免在停止过程中执行重连
if (_socket == null) {
return; // 已经停止,不执行后续操作
}
_log('连接错误: $error');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
_reconnect();
}
/// 连接关闭
void _onDone() {
// 检查是否正在停止,避免在停止过程中执行重连
if (_socket == null) {
return; // 已经停止,不执行后续操作
}
_log('连接已关闭');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
_reconnect();
}
/// 重连(指数退避策略)
void _reconnect() {
// 取消之前的重连定时器
_reconnectTimer?.cancel();
// 检查是否正在停止或已停止
if (_socket == null || _logController == null) {
return; // 已经停止,不执行重连
}
// 检查是否已达到最大重试次数
if (_reconnectAttempts >= _maxReconnectAttempts) {
_log('已达到最大重连次数($_maxReconnectAttempts次),停止重连');
return;
}
// 计算延迟时间10秒 * 2^(重试次数)
// 第1次10秒第2次20秒第3次40秒第4次80秒第5次160秒
final delaySeconds = 10 * (1 << _reconnectAttempts);
_reconnectAttempts++;
_log('将在 ${delaySeconds}秒 后进行第 $_reconnectAttempts 次重连尝试(最多$_maxReconnectAttempts次');
_reconnectTimer = Timer(Duration(seconds: delaySeconds), () {
// 再次检查是否正在停止或已停止
if (_socket == null || _logController == null) {
return; // 已经停止,不执行重连
}
// 检查是否已经连接成功(可能在其他地方已经连接)
if (_isConnected) {
_reconnectAttempts = 0; // 重置计数器
return;
}
_log('尝试第 $_reconnectAttempts 次重新连接...');
_connect().then((_) {
// 连接成功,计数器已在 _connect() 中重置
_log('重连成功');
}).catchError((e) {
_log('$_reconnectAttempts 次重连失败: $e');
// 如果未达到最大重试次数,继续重连
if (_reconnectAttempts < _maxReconnectAttempts) {
_reconnect();
} else {
_log('已达到最大重连次数,停止重连');
}
});
});
}
/// 启动心跳检查
void _startHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
if (_lastPingTime != null) {
final duration = DateTime.now().difference(_lastPingTime!);
if (duration.inMinutes > 60) {
_log('超过60分钟未收到心跳连接可能已断开');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
// 心跳超时视为新的断开事件,重置重连计数器
_reconnectAttempts = 0;
_reconnect();
}
}
});
}
void _log(String message) {
final logMsg = '[${DateTime.now().toString()}] $message';
_logger.info(logMsg);
// 检查 controller 是否已关闭,避免向已关闭的 controller 添加事件
try {
if (_logController != null && !_logController!.isClosed) {
_logController!.add(logMsg);
}
} catch (e) {
// 忽略已关闭的 controller 错误
}
}
/// 停止客户端
void stop() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
_reconnectTimer?.cancel();
_reconnectTimer = null;
_reconnectAttempts = 0; // 重置重连计数器
// 先取消 socket 监听,避免 onDone 回调在关闭 controller 后执行
_socket?.destroy();
_socket = null;
// 延迟关闭 logController确保所有回调都已完成
Future.microtask(() {
if (_logController != null && !_logController!.isClosed) {
_logController!.close();
}
_logController = null;
});
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
}
}