云算力平台windows桌面应用
This commit is contained in:
349
lib/core/client_core.dart
Normal file
349
lib/core/client_core.dart
Normal file
@@ -0,0 +1,349 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'mining_task_info.dart';
|
||||
import '../models/client_status.dart' as ui;
|
||||
|
||||
/// 客户端核心 - 实现与服务器通信、心跳、挖矿管理等核心功能
|
||||
class ClientCore {
|
||||
static final ClientCore _instance = ClientCore._internal();
|
||||
factory ClientCore() => _instance;
|
||||
ClientCore._internal();
|
||||
|
||||
final Logger _logger = Logger('ClientCore');
|
||||
|
||||
Socket? _socket;
|
||||
String? _serverUrl;
|
||||
String? _auth;
|
||||
String? _machineCode;
|
||||
bool _isConnected = false;
|
||||
DateTime? _lastPingTime;
|
||||
Timer? _heartbeatTimer;
|
||||
StreamController<String>? _logController;
|
||||
|
||||
// 需要GPU和挖矿软件信息用于认证
|
||||
Map<String, dynamic>? _gpusInfo;
|
||||
List<String>? _miningSofts;
|
||||
|
||||
// 状态回调
|
||||
Function(ui.ClientStatus)? onStatusChanged;
|
||||
Function(MiningTaskInfo?)? onMiningTaskChanged;
|
||||
|
||||
Stream<String> get logStream => _logController?.stream ?? const Stream.empty();
|
||||
bool get isConnected => _isConnected;
|
||||
|
||||
/// 更新系统信息(用于后台异步加载后更新)
|
||||
void setSystemInfo(Map<String, dynamic> gpusInfo, List<String> miningSofts) {
|
||||
_gpusInfo = gpusInfo;
|
||||
_miningSofts = miningSofts;
|
||||
// 如果已连接,重新发送认证消息更新服务器
|
||||
if (_isConnected) {
|
||||
_sendMachineCode();
|
||||
}
|
||||
}
|
||||
|
||||
/// 初始化客户端
|
||||
Future<bool> initialize({
|
||||
required String serverUrl,
|
||||
required String auth,
|
||||
required String machineCode,
|
||||
Map<String, dynamic>? gpusInfo,
|
||||
List<String>? miningSofts,
|
||||
}) async {
|
||||
_serverUrl = serverUrl;
|
||||
_auth = auth;
|
||||
_machineCode = machineCode;
|
||||
_gpusInfo = gpusInfo;
|
||||
_miningSofts = miningSofts;
|
||||
_logController = StreamController<String>.broadcast();
|
||||
|
||||
try {
|
||||
await _connect();
|
||||
// 注意:不在这里发送身份认证,等待机器码获取完成后再发送
|
||||
_startHeartbeat();
|
||||
return true;
|
||||
} catch (e) {
|
||||
_logger.severe('初始化失败: $e');
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/// 连接到服务器
|
||||
Future<void> _connect() async {
|
||||
if (_serverUrl == null) {
|
||||
throw Exception('服务器地址未设置');
|
||||
}
|
||||
|
||||
try {
|
||||
final parts = _serverUrl!.split(':');
|
||||
if (parts.length != 2) {
|
||||
throw Exception('服务器地址格式错误');
|
||||
}
|
||||
|
||||
final host = parts[0];
|
||||
final port = int.parse(parts[1]);
|
||||
|
||||
_socket = await Socket.connect(host, port, timeout: const Duration(seconds: 10));
|
||||
_isConnected = true;
|
||||
_log('连接到服务器成功: $_serverUrl');
|
||||
|
||||
// 注意:不在这里发送身份认证,等待机器码获取完成后再发送
|
||||
// 身份认证消息将在机器码获取完成后通过 sendMachineCode() 发送
|
||||
|
||||
// 开始接收消息
|
||||
_socket!.listen(
|
||||
_onDataReceived,
|
||||
onError: _onError,
|
||||
onDone: _onDone,
|
||||
cancelOnError: false,
|
||||
);
|
||||
|
||||
onStatusChanged?.call(ui.ClientStatus.online);
|
||||
} catch (e) {
|
||||
_isConnected = false;
|
||||
_log('连接失败: $e');
|
||||
onStatusChanged?.call(ui.ClientStatus.offline);
|
||||
rethrow;
|
||||
}
|
||||
}
|
||||
|
||||
/// 更新机器码并发送身份认证(等待硬盘身份码获取完成后调用)
|
||||
void updateMachineCode(String machineCode, Map<String, dynamic> gpusInfo, List<String> miningSofts) {
|
||||
_machineCode = machineCode;
|
||||
_gpusInfo = gpusInfo;
|
||||
_miningSofts = miningSofts;
|
||||
}
|
||||
|
||||
/// 发送身份认证消息(公开方法,供外部调用)
|
||||
void sendMachineCode() {
|
||||
_sendMachineCode();
|
||||
}
|
||||
|
||||
/// 发送机器码认证消息(内部方法)
|
||||
void _sendMachineCode() {
|
||||
if (_auth == null || _machineCode == null) {
|
||||
_log('身份信息未设置');
|
||||
return;
|
||||
}
|
||||
|
||||
// 使用 身份信息::硬盘身份码 格式
|
||||
final msg = {
|
||||
'id': '$_auth::$_machineCode',
|
||||
'method': 'auth.machineCode',
|
||||
'params': {
|
||||
'gpus': _gpusInfo ?? {},
|
||||
'miningsofts': _miningSofts ?? [],
|
||||
},
|
||||
};
|
||||
|
||||
_sendMessage(msg);
|
||||
_log('发送身份认证消息: $_auth::$_machineCode');
|
||||
}
|
||||
|
||||
/// 发送消息到服务器
|
||||
void _sendMessage(Map<String, dynamic> message) {
|
||||
if (!_isConnected || _socket == null) {
|
||||
_log('连接未建立,无法发送消息');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
final jsonStr = jsonEncode(message);
|
||||
_socket!.add(utf8.encode('$jsonStr\n'));
|
||||
} catch (e) {
|
||||
_log('发送消息失败: $e');
|
||||
}
|
||||
}
|
||||
|
||||
/// 接收数据
|
||||
void _onDataReceived(List<int> data) {
|
||||
try {
|
||||
final message = utf8.decode(data);
|
||||
final lines = message.split('\n').where((line) => line.trim().isNotEmpty);
|
||||
|
||||
for (final line in lines) {
|
||||
_handleMessage(line);
|
||||
}
|
||||
} catch (e) {
|
||||
_log('处理接收数据失败: $e');
|
||||
}
|
||||
}
|
||||
|
||||
/// 处理接收到的消息
|
||||
void _handleMessage(String messageJson) {
|
||||
try {
|
||||
final msg = jsonDecode(messageJson) as Map<String, dynamic>;
|
||||
final method = msg['method'] as String?;
|
||||
|
||||
_log('收到消息: $method');
|
||||
|
||||
if (method == 'ping') {
|
||||
_handlePing(msg);
|
||||
} else if (method == 'mining.req') {
|
||||
_handleMiningRequest(msg);
|
||||
} else if (method == 'mining.end') {
|
||||
_handleMiningEnd(msg);
|
||||
}
|
||||
} catch (e) {
|
||||
_log('处理消息失败: $e, 原始数据: $messageJson');
|
||||
}
|
||||
}
|
||||
|
||||
/// 处理 ping 消息
|
||||
void _handlePing(Map<String, dynamic> msg) {
|
||||
_lastPingTime = DateTime.now();
|
||||
|
||||
// 回复 pong
|
||||
final pongMsg = {
|
||||
'id': msg['id'],
|
||||
'method': 'pong',
|
||||
'params': null,
|
||||
};
|
||||
_sendMessage(pongMsg);
|
||||
}
|
||||
|
||||
/// 处理挖矿请求
|
||||
void _handleMiningRequest(Map<String, dynamic> msg) {
|
||||
try {
|
||||
final params = msg['params'] as Map<String, dynamic>?;
|
||||
if (params == null) {
|
||||
_sendMiningResponse(msg['id'] as String, false, '参数为空');
|
||||
return;
|
||||
}
|
||||
|
||||
// 注意:miner 需要从配置中获取,这里先使用默认值
|
||||
params['miner'] = params['miner'] ?? 'lolminer';
|
||||
|
||||
final task = MiningTaskInfo.fromJson(params);
|
||||
onMiningTaskChanged?.call(task);
|
||||
|
||||
// 启动挖矿软件由 ClientProvider 处理
|
||||
// 这里只负责响应成功
|
||||
final respData = {
|
||||
'coin': task.coin,
|
||||
'algo': task.algo,
|
||||
'pool': task.pool,
|
||||
'pool_url': task.poolUrl,
|
||||
'worker_id': task.workerId,
|
||||
'wallet_address': task.walletAddress,
|
||||
'watch_url': '',
|
||||
};
|
||||
|
||||
_sendMiningResponse(msg['id'] as String, true, respData);
|
||||
} catch (e) {
|
||||
_log('处理挖矿请求失败: $e');
|
||||
_sendMiningResponse(msg['id'] as String, false, e.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/// 发送挖矿响应
|
||||
void _sendMiningResponse(String id, bool success, dynamic data) {
|
||||
final resp = {
|
||||
'id': id,
|
||||
'method': 'mining.resp',
|
||||
'params': success ? data : data.toString(),
|
||||
};
|
||||
_sendMessage(resp);
|
||||
}
|
||||
|
||||
/// 处理挖矿结束消息
|
||||
void _handleMiningEnd(Map<String, dynamic> msg) {
|
||||
_log('收到挖矿结束消息');
|
||||
// 通知 ClientProvider 停止挖矿(通过回调实现)
|
||||
onMiningTaskChanged?.call(null);
|
||||
}
|
||||
|
||||
/// 错误处理
|
||||
void _onError(dynamic error) {
|
||||
// 检查是否正在停止,避免在停止过程中执行重连
|
||||
if (_socket == null) {
|
||||
return; // 已经停止,不执行后续操作
|
||||
}
|
||||
|
||||
_log('连接错误: $error');
|
||||
_isConnected = false;
|
||||
onStatusChanged?.call(ui.ClientStatus.offline);
|
||||
_reconnect();
|
||||
}
|
||||
|
||||
/// 连接关闭
|
||||
void _onDone() {
|
||||
// 检查是否正在停止,避免在停止过程中执行重连
|
||||
if (_socket == null) {
|
||||
return; // 已经停止,不执行后续操作
|
||||
}
|
||||
|
||||
_log('连接已关闭');
|
||||
_isConnected = false;
|
||||
onStatusChanged?.call(ui.ClientStatus.offline);
|
||||
_reconnect();
|
||||
}
|
||||
|
||||
/// 重连
|
||||
void _reconnect() {
|
||||
Future.delayed(const Duration(seconds: 5), () {
|
||||
// 检查是否正在停止或已停止
|
||||
if (_socket == null || _logController == null) {
|
||||
return; // 已经停止,不执行重连
|
||||
}
|
||||
|
||||
if (!_isConnected) {
|
||||
_log('尝试重新连接...');
|
||||
_connect().catchError((e) {
|
||||
_log('重连失败: $e');
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// 启动心跳检查
|
||||
void _startHeartbeat() {
|
||||
_heartbeatTimer?.cancel();
|
||||
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
|
||||
if (_lastPingTime != null) {
|
||||
final duration = DateTime.now().difference(_lastPingTime!);
|
||||
if (duration.inMinutes > 60) {
|
||||
_log('超过60分钟未收到心跳,连接可能已断开');
|
||||
_isConnected = false;
|
||||
onStatusChanged?.call(ui.ClientStatus.offline);
|
||||
_reconnect();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void _log(String message) {
|
||||
final logMsg = '[${DateTime.now().toString()}] $message';
|
||||
_logger.info(logMsg);
|
||||
// 检查 controller 是否已关闭,避免向已关闭的 controller 添加事件
|
||||
try {
|
||||
if (_logController != null && !_logController!.isClosed) {
|
||||
_logController!.add(logMsg);
|
||||
}
|
||||
} catch (e) {
|
||||
// 忽略已关闭的 controller 错误
|
||||
}
|
||||
}
|
||||
|
||||
/// 停止客户端
|
||||
void stop() {
|
||||
_heartbeatTimer?.cancel();
|
||||
_heartbeatTimer = null;
|
||||
|
||||
// 先取消 socket 监听,避免 onDone 回调在关闭 controller 后执行
|
||||
_socket?.destroy();
|
||||
_socket = null;
|
||||
|
||||
// 延迟关闭 logController,确保所有回调都已完成
|
||||
Future.microtask(() {
|
||||
if (_logController != null && !_logController!.isClosed) {
|
||||
_logController!.close();
|
||||
}
|
||||
_logController = null;
|
||||
});
|
||||
|
||||
_isConnected = false;
|
||||
onStatusChanged?.call(ui.ClientStatus.offline);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user