Files
windows-application/lib/core/client_core.dart

400 lines
12 KiB
Dart
Raw Normal View History

2026-01-22 15:14:27 +08:00
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:logging/logging.dart';
import 'mining_task_info.dart';
import '../models/client_status.dart' as ui;
/// 客户端核心 - 实现与服务器通信、心跳、挖矿管理等核心功能
class ClientCore {
static final ClientCore _instance = ClientCore._internal();
factory ClientCore() => _instance;
ClientCore._internal();
final Logger _logger = Logger('ClientCore');
Socket? _socket;
String? _serverUrl;
String? _auth;
String? _machineCode;
bool _isConnected = false;
DateTime? _lastPingTime;
Timer? _heartbeatTimer;
2026-01-23 16:11:20 +08:00
Timer? _reconnectTimer;
2026-01-22 15:14:27 +08:00
StreamController<String>? _logController;
2026-01-23 16:11:20 +08:00
// 重连相关
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
2026-01-22 15:14:27 +08:00
// 需要GPU和挖矿软件信息用于认证
Map<String, dynamic>? _gpusInfo;
List<String>? _miningSofts;
// 状态回调
Function(ui.ClientStatus)? onStatusChanged;
Function(MiningTaskInfo?)? onMiningTaskChanged;
Stream<String> get logStream => _logController?.stream ?? const Stream.empty();
bool get isConnected => _isConnected;
/// 更新系统信息(用于后台异步加载后更新)
void setSystemInfo(Map<String, dynamic> gpusInfo, List<String> miningSofts) {
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
// 如果已连接,重新发送认证消息更新服务器
if (_isConnected) {
_sendMachineCode();
}
}
/// 初始化客户端
Future<bool> initialize({
required String serverUrl,
required String auth,
required String machineCode,
Map<String, dynamic>? gpusInfo,
List<String>? miningSofts,
}) async {
_serverUrl = serverUrl;
_auth = auth;
_machineCode = machineCode;
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
_logController = StreamController<String>.broadcast();
try {
await _connect();
// 注意:不在这里发送身份认证,等待机器码获取完成后再发送
_startHeartbeat();
return true;
} catch (e) {
_logger.severe('初始化失败: $e');
return false;
}
}
/// 连接到服务器
Future<void> _connect() async {
if (_serverUrl == null) {
throw Exception('服务器地址未设置');
}
try {
final parts = _serverUrl!.split(':');
if (parts.length != 2) {
throw Exception('服务器地址格式错误');
}
final host = parts[0];
final port = int.parse(parts[1]);
_socket = await Socket.connect(host, port, timeout: const Duration(seconds: 10));
_isConnected = true;
2026-01-23 16:11:20 +08:00
// 连接成功,重置重连计数器
_reconnectAttempts = 0;
2026-01-22 15:14:27 +08:00
_log('连接到服务器成功: $_serverUrl');
// 开始接收消息
_socket!.listen(
_onDataReceived,
onError: _onError,
onDone: _onDone,
cancelOnError: false,
);
onStatusChanged?.call(ui.ClientStatus.online);
2026-01-23 16:11:20 +08:00
// 如果已经有机器码和认证信息,立即发送认证消息(重连场景)
if (_auth != null && _machineCode != null && _machineCode!.isNotEmpty && _machineCode != '正在获取...' && _machineCode != '获取失败') {
_log('重连成功,自动发送身份认证消息');
_sendMachineCode();
}
2026-01-22 15:14:27 +08:00
} catch (e) {
_isConnected = false;
_log('连接失败: $e');
onStatusChanged?.call(ui.ClientStatus.offline);
rethrow;
}
}
/// 更新机器码并发送身份认证(等待硬盘身份码获取完成后调用)
void updateMachineCode(String machineCode, Map<String, dynamic> gpusInfo, List<String> miningSofts) {
_machineCode = machineCode;
_gpusInfo = gpusInfo;
_miningSofts = miningSofts;
}
/// 发送身份认证消息(公开方法,供外部调用)
void sendMachineCode() {
_sendMachineCode();
}
/// 发送机器码认证消息(内部方法)
void _sendMachineCode() {
if (_auth == null || _machineCode == null) {
_log('身份信息未设置');
return;
}
// 使用 身份信息::硬盘身份码 格式
final msg = {
'id': '$_auth::$_machineCode',
'method': 'auth.machineCode',
'params': {
'gpus': _gpusInfo ?? {},
'miningsofts': _miningSofts ?? [],
},
};
_sendMessage(msg);
_log('发送身份认证消息: $_auth::$_machineCode');
}
/// 发送消息到服务器
void _sendMessage(Map<String, dynamic> message) {
if (!_isConnected || _socket == null) {
_log('连接未建立,无法发送消息');
return;
}
try {
final jsonStr = jsonEncode(message);
_socket!.add(utf8.encode('$jsonStr\n'));
} catch (e) {
_log('发送消息失败: $e');
}
}
/// 接收数据
void _onDataReceived(List<int> data) {
try {
final message = utf8.decode(data);
final lines = message.split('\n').where((line) => line.trim().isNotEmpty);
for (final line in lines) {
_handleMessage(line);
}
} catch (e) {
_log('处理接收数据失败: $e');
}
}
/// 处理接收到的消息
void _handleMessage(String messageJson) {
try {
final msg = jsonDecode(messageJson) as Map<String, dynamic>;
final method = msg['method'] as String?;
_log('收到消息: $method');
if (method == 'ping') {
_handlePing(msg);
} else if (method == 'mining.req') {
_handleMiningRequest(msg);
} else if (method == 'mining.end') {
_handleMiningEnd(msg);
}
} catch (e) {
_log('处理消息失败: $e, 原始数据: $messageJson');
}
}
/// 处理 ping 消息
void _handlePing(Map<String, dynamic> msg) {
_lastPingTime = DateTime.now();
// 回复 pong
final pongMsg = {
'id': msg['id'],
'method': 'pong',
'params': null,
};
_sendMessage(pongMsg);
}
/// 处理挖矿请求
void _handleMiningRequest(Map<String, dynamic> msg) {
try {
final params = msg['params'] as Map<String, dynamic>?;
if (params == null) {
_sendMiningResponse(msg['id'] as String, false, '参数为空');
return;
}
// 注意miner 需要从配置中获取,这里先使用默认值
params['miner'] = params['miner'] ?? 'lolminer';
final task = MiningTaskInfo.fromJson(params);
onMiningTaskChanged?.call(task);
// 启动挖矿软件由 ClientProvider 处理
// 这里只负责响应成功
final respData = {
'coin': task.coin,
'algo': task.algo,
'pool': task.pool,
'pool_url': task.poolUrl,
'worker_id': task.workerId,
'wallet_address': task.walletAddress,
'watch_url': '',
};
_sendMiningResponse(msg['id'] as String, true, respData);
} catch (e) {
_log('处理挖矿请求失败: $e');
_sendMiningResponse(msg['id'] as String, false, e.toString());
}
}
/// 发送挖矿响应
void _sendMiningResponse(String id, bool success, dynamic data) {
final resp = {
'id': id,
'method': 'mining.resp',
'params': success ? data : data.toString(),
};
_sendMessage(resp);
}
/// 处理挖矿结束消息
void _handleMiningEnd(Map<String, dynamic> msg) {
_log('收到挖矿结束消息');
// 通知 ClientProvider 停止挖矿(通过回调实现)
onMiningTaskChanged?.call(null);
}
/// 错误处理
void _onError(dynamic error) {
// 检查是否正在停止,避免在停止过程中执行重连
if (_socket == null) {
return; // 已经停止,不执行后续操作
}
_log('连接错误: $error');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
_reconnect();
}
/// 连接关闭
void _onDone() {
// 检查是否正在停止,避免在停止过程中执行重连
if (_socket == null) {
return; // 已经停止,不执行后续操作
}
_log('连接已关闭');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
_reconnect();
}
2026-01-23 16:11:20 +08:00
/// 重连(指数退避策略)
2026-01-22 15:14:27 +08:00
void _reconnect() {
2026-01-23 16:11:20 +08:00
// 取消之前的重连定时器
_reconnectTimer?.cancel();
// 检查是否正在停止或已停止
if (_socket == null || _logController == null) {
return; // 已经停止,不执行重连
}
// 检查是否已达到最大重试次数
if (_reconnectAttempts >= _maxReconnectAttempts) {
_log('已达到最大重连次数($_maxReconnectAttempts次),停止重连');
return;
}
// 计算延迟时间10秒 * 2^(重试次数)
// 第1次10秒第2次20秒第3次40秒第4次80秒第5次160秒
final delaySeconds = 10 * (1 << _reconnectAttempts);
_reconnectAttempts++;
_log('将在 ${delaySeconds}秒 后进行第 $_reconnectAttempts 次重连尝试(最多$_maxReconnectAttempts次');
_reconnectTimer = Timer(Duration(seconds: delaySeconds), () {
// 再次检查是否正在停止或已停止
2026-01-22 15:14:27 +08:00
if (_socket == null || _logController == null) {
return; // 已经停止,不执行重连
}
2026-01-23 16:11:20 +08:00
// 检查是否已经连接成功(可能在其他地方已经连接)
if (_isConnected) {
_reconnectAttempts = 0; // 重置计数器
return;
2026-01-22 15:14:27 +08:00
}
2026-01-23 16:11:20 +08:00
_log('尝试第 $_reconnectAttempts 次重新连接...');
_connect().then((_) {
// 连接成功,计数器已在 _connect() 中重置
_log('重连成功');
}).catchError((e) {
_log('$_reconnectAttempts 次重连失败: $e');
// 如果未达到最大重试次数,继续重连
if (_reconnectAttempts < _maxReconnectAttempts) {
_reconnect();
} else {
_log('已达到最大重连次数,停止重连');
}
});
2026-01-22 15:14:27 +08:00
});
}
/// 启动心跳检查
void _startHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
if (_lastPingTime != null) {
final duration = DateTime.now().difference(_lastPingTime!);
if (duration.inMinutes > 60) {
_log('超过60分钟未收到心跳连接可能已断开');
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
2026-01-23 16:11:20 +08:00
// 心跳超时视为新的断开事件,重置重连计数器
_reconnectAttempts = 0;
2026-01-22 15:14:27 +08:00
_reconnect();
}
}
});
}
void _log(String message) {
final logMsg = '[${DateTime.now().toString()}] $message';
_logger.info(logMsg);
// 检查 controller 是否已关闭,避免向已关闭的 controller 添加事件
try {
if (_logController != null && !_logController!.isClosed) {
_logController!.add(logMsg);
}
} catch (e) {
// 忽略已关闭的 controller 错误
}
}
/// 停止客户端
void stop() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
2026-01-23 16:11:20 +08:00
_reconnectTimer?.cancel();
_reconnectTimer = null;
_reconnectAttempts = 0; // 重置重连计数器
2026-01-22 15:14:27 +08:00
// 先取消 socket 监听,避免 onDone 回调在关闭 controller 后执行
_socket?.destroy();
_socket = null;
// 延迟关闭 logController确保所有回调都已完成
Future.microtask(() {
if (_logController != null && !_logController!.isClosed) {
_logController!.close();
}
_logController = null;
});
_isConnected = false;
onStatusChanged?.call(ui.ClientStatus.offline);
}
}