diff --git a/game_web/robot_mgr/src/main/java/com/group/robot/RobotManager.java b/game_web/robot_mgr/src/main/java/com/group/robot/RobotManager.java index 5e3485e..795690c 100644 --- a/game_web/robot_mgr/src/main/java/com/group/robot/RobotManager.java +++ b/game_web/robot_mgr/src/main/java/com/group/robot/RobotManager.java @@ -55,6 +55,9 @@ public class RobotManager { //跟踪已经恢复leftover_robot的机器人 private final Set leftoverRobotRecovered = ConcurrentHashMap.newKeySet(); + + //房间级别的锁,用于细粒度的同步控制 + private final Map roomLocks = new ConcurrentHashMap<>(); public RobotManager() { @@ -164,9 +167,6 @@ public class RobotManager { gRobotMap.put("pid", String.valueOf(gameId)); jedis2.hmset("{grobot}:" + robot.getRobotId(), gRobotMap); - //将登录后的机器人添加到已连接列表中 等待连接服务器 - connectedRobots.put(robot.getRobotId(), robot); - //更新数据库状态为已使用 String sql = String.format("UPDATE `account` SET start = %d WHERE id = %d", 1, robot.getRobotId()); DataBase.use().executeUpdate(sql); @@ -175,6 +175,8 @@ public class RobotManager { //连接对应游戏服务器 RobotManagerInterface handler = getGameHandler(gameId); handler.connectRobot(robot); + //将登录后的机器人添加到已连接列表中 等待连接服务器 + connectedRobots.put(robot.getRobotId(), robot); } } catch (Exception e) { log.error("机器人登录时发生错误", e); @@ -253,7 +255,7 @@ public class RobotManager { /** * 断开多余的机器人连接 */ - public void disconnectExcessRobots(int wanfaId, int requiredCount) { + public void disconnectExcessRobots(int wanfaId) { List allWanfaRobots = new ArrayList<>(); for (RobotInfo robot : connectedRobots.values()) { if (robot.getWanfaId() == wanfaId) { @@ -261,38 +263,23 @@ public class RobotManager { } } - //该玩法已连接的机器人 - int currentlyConnected = 0; + //断开该玩法下未使用的机器人 + int disconnectedCount = 0; for (RobotInfo robot : allWanfaRobots) { - if (robot.isConnected()) { - currentlyConnected++; - } - } - - //需要断开的机器人 - int toDisconnect = Math.max(0, currentlyConnected - requiredCount); - - if (toDisconnect > 0) { - int disconnectedCount = 0; - for (int i = 0; i < allWanfaRobots.size() && disconnectedCount < toDisconnect; i++) { - RobotInfo robot = allWanfaRobots.get(i); - //不在使用断开连接 + synchronized(robot) { if (!robot.isUsing() && robot.isConnected()) { - synchronized(robot) { - if (!robot.isUsing() && robot.isConnected()) { - RobotManagerInterface handler = getGameHandler(robot.getWanfaId()); - if (handler != null) { - handler.disconnectRobot(robot); - log.debug("断开机器人 {} TCP连接,玩法ID: {}", robot.getRobotId(), wanfaId); - } - disconnectedCount++; - } + RobotManagerInterface handler = getGameHandler(robot.getWanfaId()); + if (handler != null) { + handler.disconnectRobot(robot); + log.debug("断开未使用机器人 {} TCP连接,玩法ID: {}", robot.getRobotId(), wanfaId); } + disconnectedCount++; } } - if (disconnectedCount > 0) { - log.debug("玩法 {} 成功断开 {} 个机器人的TCP连接,目标连接数: {}", wanfaId, disconnectedCount, requiredCount); - } + } + + if (disconnectedCount > 0) { + log.debug("玩法 {} 成功断开 {} 个未使用机器人的TCP连接", wanfaId, disconnectedCount); } } @@ -307,38 +294,48 @@ public class RobotManager { } } - //该玩法已连接的机器人 - int currentConnected = 0; + //空闲机器人 + int availableCount = 0; for (RobotInfo robot : allWanfaRobots) { - if (robot.isConnected()) { - currentConnected++; + if (robot.isConnected() && !robot.isUsing()) { + availableCount++; } } - //当前连接数小于所需数量 则连接更多机器人 - if (currentConnected < requiredCount) { - int reconnectedCount = 0; + //可用机器人数量少于需求 连接更多机器人 + if (availableCount < requiredCount) { + int needToConnect = requiredCount - availableCount; + int connectedCount = 0; + for (RobotInfo robot : allWanfaRobots) { - if (!robot.isConnected() && !robot.isUsing() && reconnectedCount < (requiredCount - currentConnected)) { + if (connectedCount >= needToConnect) { + break; + } + + if (!robot.isConnected() && !robot.isUsing()) { RobotManagerInterface handler = getGameHandler(robot.getWanfaId()); if (handler != null) { if (shouldConnectRobot(robot)) { synchronized(robot) { if (!robot.isConnected() && !robot.isUsing()) { handler.connectRobot(robot); - log.debug("重新连接玩法 {} 的机器人: {}", wanfaId, robot.getRobotId()); + log.debug("连接玩法 {} 的空闲机器人: {}", wanfaId, robot.getRobotId()); + connectedCount++; } } } else { - log.debug("玩法 {} 的leftover_robot为0,跳过连接机器人: {}", wanfaId, robot.getRobotId()); + log.debug("玩法 {} 当前不允许连接机器人: {}", wanfaId, robot.getRobotId()); } - reconnectedCount++; } } } + + if (connectedCount > 0) { + log.debug("玩法 {} 新连接了 {} 个机器人以满足需求", wanfaId, connectedCount); + } } } - + /** * 获取玩法房间轮询器 */ @@ -386,7 +383,7 @@ public class RobotManager { //获取指定玩法的所有可用机器人 List availableRobots = new ArrayList<>(); for (RobotInfo robot : connectedRobots.values()) { - if (robot.getWanfaId() == wanfaId && robot.isOnline() && robot.isConnected() && !robot.isUsing()) { + if (robot.getWanfaId() == wanfaId && robot.isOnline() && !robot.isUsing()) { availableRobots.add(robot); } } @@ -399,17 +396,20 @@ public class RobotManager { RobotInfo selectedRobot = availableRobots.get(currentIndex); - selectedRobot.setLastActiveTime(System.currentTimeMillis()); - - selectedRobot.setUsing(true); - int nextIndex = (currentIndex + 1) % availableRobots.size(); - wanfaRobotIndex.put(wanfaId, nextIndex); - return selectedRobot; + synchronized (selectedRobot) { + if (!selectedRobot.isUsing()) { + selectedRobot.setLastActiveTime(System.currentTimeMillis()); + int nextIndex = (currentIndex + 1) % availableRobots.size(); + wanfaRobotIndex.put(wanfaId, nextIndex); + return selectedRobot; + } + } } + //如果没有连接的可用机器人,检查是否有未连接的机器人并尝试连接 List allWanfaRobots = new ArrayList<>(); for (RobotInfo robot : connectedRobots.values()) { - if (robot.getWanfaId() == wanfaId && robot.isOnline() && robot.isConnected()) { + if (robot.getWanfaId() == wanfaId && robot.isOnline()) { allWanfaRobots.add(robot); } } @@ -422,12 +422,26 @@ public class RobotManager { RobotInfo selectedRobot = allWanfaRobots.get(currentIndex); - selectedRobot.setLastActiveTime(System.currentTimeMillis()); - - selectedRobot.setUsing(true); - int nextIndex = (currentIndex + 1) % allWanfaRobots.size(); - wanfaRobotIndex.put(wanfaId, nextIndex); - return selectedRobot; + //机器人未连接 尝试重新连接 + if (!selectedRobot.isConnected()) { + selectedRobot.setUsing(false); + RobotManagerInterface handler = getGameHandler(selectedRobot.getWanfaId()); + if (handler != null) { + handler.connectRobot(selectedRobot); + } + } + + if (!selectedRobot.isUsing()) { + //使用同步块确保原子操作 + synchronized (selectedRobot) { + if (!selectedRobot.isUsing()) { + selectedRobot.setLastActiveTime(System.currentTimeMillis()); + int nextIndex = (currentIndex + 1) % allWanfaRobots.size(); + wanfaRobotIndex.put(wanfaId, nextIndex); + return selectedRobot; + } + } + } } return null; @@ -538,4 +552,11 @@ public class RobotManager { return false; } } + + /** + * 获取房间级别的锁 + */ + public Object getRoomLock(String roomId) { + return roomLocks.computeIfAbsent(roomId, k -> new Object()); + } } \ No newline at end of file diff --git a/game_web/robot_mgr/src/main/java/com/group/robot/RobotManagerInterface.java b/game_web/robot_mgr/src/main/java/com/group/robot/RobotManagerInterface.java index bf48da1..d1d8b15 100644 --- a/game_web/robot_mgr/src/main/java/com/group/robot/RobotManagerInterface.java +++ b/game_web/robot_mgr/src/main/java/com/group/robot/RobotManagerInterface.java @@ -1,7 +1,6 @@ package com.group.robot; import com.group.robot.info.RobotInfo; -import com.group.robot.info.RoomInfo; /** * 机器人处理器接口 - 只负责连接管理和协议转发 diff --git a/game_web/robot_mgr/src/main/java/com/group/robot/handler/MaJiangRobotHandler.java b/game_web/robot_mgr/src/main/java/com/group/robot/handler/MaJiangRobotHandler.java index fba72e4..9033608 100644 --- a/game_web/robot_mgr/src/main/java/com/group/robot/handler/MaJiangRobotHandler.java +++ b/game_web/robot_mgr/src/main/java/com/group/robot/handler/MaJiangRobotHandler.java @@ -24,9 +24,6 @@ public class MaJiangRobotHandler implements RobotManagerInterface { try { //连接处理器进行连接 robotConnectionHandler.connectRobot(robot); - - //设置机器人连接状态 - robot.setConnected(true); }catch (Exception e) { log.error("麻将机器人 {} 连接游戏服务器时发生异常", robot.getRobotId(), e); robot.setConnected(false); diff --git a/game_web/robot_mgr/src/main/java/com/group/robot/handler/RobotConnectionHandler.java b/game_web/robot_mgr/src/main/java/com/group/robot/handler/RobotConnectionHandler.java index 4aeab99..49e44a5 100644 --- a/game_web/robot_mgr/src/main/java/com/group/robot/handler/RobotConnectionHandler.java +++ b/game_web/robot_mgr/src/main/java/com/group/robot/handler/RobotConnectionHandler.java @@ -2,11 +2,10 @@ package com.group.robot.handler; import java.util.Map; import java.util.UUID; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -34,59 +33,16 @@ public class RobotConnectionHandler { //机器人ID到客户端连接的映射 private final Map robotClients = new ConcurrentHashMap<>(); - - //机器人ID到账户信息的映射 - private final Map robotAccounts = new ConcurrentHashMap<>(); - - //机器人ID到连接ID的映射,用于标识每个机器人连接的唯一性 + + //机器人ID到连接ID的映射 用于标识每个机器人连接的唯一性 private final Map robotConnectionIds = new ConcurrentHashMap<>(); + + //机器人连接锁,防止重复连接 + private final Map robotConnectionLocks = new ConcurrentHashMap<>(); //机器人管理器引用 private final RobotManager robotManager; - //心跳和重连 - private final Map> heartbeatTasks = new ConcurrentHashMap<>(); - private final Map robotServerAddresses = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); - - /** - * 机器人账户信息 - */ - public static class RobotAccountInfo { - private int robotId; - private String account; - private String password; - private String session; - private String token; - private int groupId; - private String roomId; - private int gameId; - - public RobotAccountInfo(int robotId, String account, String password, int groupId) { - this.robotId = robotId; - this.account = account; - this.password = password; - this.groupId = groupId; - } - - public int getRobotId() { return robotId; } - public void setRobotId(int robotId) { this.robotId = robotId; } - public String getAccount() { return account; } - public void setAccount(String account) { this.account = account; } - public String getPassword() { return password; } - public void setPassword(String password) { this.password = password; } - public String getSession() { return session; } - public void setSession(String session) { this.session = session; } - public String getToken() { return token; } - public void setToken(String token) { this.token = token; } - public int getGroupId() { return groupId; } - public void setGroupId(int groupId) { this.groupId = groupId; } - public String getRoomId() { return roomId; } - public void setRoomId(String roomId) { this.roomId = roomId; } - public int getGameId() { return gameId; } - public void setGameId(int gameId) { this.gameId = gameId; } - } - private RobotConnectionHandler() { this.robotManager = RobotManager.getRobotManager(); } @@ -111,53 +67,68 @@ public class RobotConnectionHandler { return; } - //异步执行连接逻辑 - CompletableFuture.runAsync(() -> { - try { - //连接前再次检查leftover_robot数量 防止并发 - if (!robotManager.shouldConnectRobot(robot)) { - return; - } - - log.info("开始连接机器人 {} 到游戏服务器,玩法ID: {}", robot.getRobotId(), robot.getWanfaId()); - //TCP客户端连接 - String serverAddress = getGameServerAddress(robot.getWanfaId()); - TaurusClient client = new TaurusClient(serverAddress, "game", TaurusClient.ConnectionProtocol.Tcp); - client.connect(); - - //生成唯一的connecId - String connecId = generateConnectionId(robot.getRobotId()); - - //先清理可能存在的旧连接 - TaurusClient oldClient = robotClients.remove(robot.getRobotId()); - if (oldClient != null && oldClient.isConnected()) { - oldClient.killConnection(); - } - - //保存客户端连接 - robotClients.put(robot.getRobotId(), client); - robotServerAddresses.put(robot.getRobotId(), serverAddress); - //保存connecId - robotConnectionIds.put(robot.getRobotId(), connecId); - - //设置事件监听器 - setupInitializationEventListeners(client, robot); - - //连接成功后发送初始化协议 - sendInitializationProtocol(client, robot); - - //设置机器人连接状态 + //机器人锁 防止重复连接 + Object lock = robotConnectionLocks.computeIfAbsent(robot.getRobotId(), k -> new Object()); + synchronized (lock) { + //再次检查是否已经存在有效连接 + TaurusClient existingClient = robotClients.get(robot.getRobotId()); + if (existingClient != null && existingClient.isConnected()) { + log.debug("机器人 {} 已存在有效连接,跳过重复连接", robot.getRobotId()); robot.setConnected(true); - log.info("机器人 {} 成功连接到游戏服务器, ConnectionId: {}", robot.getRobotId(), connecId); - - //启动心跳 - startHeartTask(robot.getRobotId(), client, robot.getWanfaId()); - } catch (Exception e) { - log.error("连接机器人到游戏服务器时发生异常: " + robot.getRobotId(), e); - robot.setConnected(false); - disconnectRobot(robot.getRobotId()); + return; } - }); + + //异步执行连接逻辑 + CompletableFuture.runAsync(() -> { + try { + //连接前再次检查leftover_robot数量 防止并发 + if (!robotManager.shouldConnectRobot(robot)) { + return; + } + + log.info("开始连接机器人 {} 到游戏服务器,玩法ID: {}", robot.getRobotId(), robot.getWanfaId()); + //TCP客户端连接 + String serverAddress = getGameServerAddress(robot.getWanfaId()); + TaurusClient client = new TaurusClient(serverAddress, "game", TaurusClient.ConnectionProtocol.Tcp); + client.connect(); + + //连接成功后发送初始化协议 + sendInitializationProtocol(client, robot); + + //设置事件监听器 + setupInitializationEventListeners(client, robot); + + //生成唯一的connecId + String connecId = generateConnectionId(robot.getRobotId()); + + //先清理可能存在的旧连接 + TaurusClient oldClient = robotClients.remove(robot.getRobotId()); + if (oldClient != null && oldClient.isConnected()) { + oldClient.killConnection(); + } + + //保存客户端连接 + robotClients.put(robot.getRobotId(), client); + //保存connecId + robotConnectionIds.put(robot.getRobotId(), connecId); + + //设置机器人连接状态 + robot.setConnected(true); + System.out.println("connectRobot机器人尝试连接游戏服务器, client状态 :" + client.isConnected() + "机器人状态 :" + robot.isConnected()); + + } catch (Exception e) { + System.out.println("连接机器人到游戏服务器时发生异常: " + robot.getRobotId()); + robotClients.remove(robot.getRobotId()); + robotConnectionIds.remove(robot.getRobotId()); + robot.setConnected(false); + disconnectRobot(robot.getRobotId()); + } finally { + if (!robotClients.containsKey(robot.getRobotId())) { + robotConnectionLocks.remove(robot.getRobotId()); + } + } + }); + } } /** @@ -210,77 +181,6 @@ public class RobotConnectionHandler { }); } - /** - * 启动心跳任务 - */ - private void startHeartTask(int robotId, TaurusClient client, int wanfaId) { - stopHeartTask(robotId); - - ScheduledFuture heartbeatTask = scheduler.scheduleWithFixedDelay(() -> { - try { - if (client != null && client.isConnected()) { - //发送心跳包 - ITObject heartbeatParam = new TObject(); - heartbeatParam.putString("type", "heartbeat"); - heartbeatParam.putInt("robot_id", robotId); - heartbeatParam.putInt("wanfa_id", wanfaId); - heartbeatParam.putString("connecId", robotConnectionIds.get(robotId)); - heartbeatParam.putLong("timestamp", System.currentTimeMillis()); - - client.send("ping", heartbeatParam, response -> { - if (response != null) { - log.debug("机器人 {} 心跳响应成功", robotId); - } else { - log.warn("机器人 {} 心跳响应失败", robotId); - } - }); - } else { - log.warn("机器人 {} 连接已断开,无法发送心跳", robotId); - //连接断开 尝试重连 - RobotInfo robot = robotManager.getConnectedRobot(robotId); - if (robot != null) { - scheduleReconnection(robot); - } - } - } catch (Exception e) { - log.error("发送心跳时发生异常,机器人: " + robotId, e); - } - }, 30, 30, TimeUnit.SECONDS);//30秒发送一次心跳 - - heartbeatTasks.put(robotId, heartbeatTask); - log.info("已启动机器人 {} 的心跳任务", robotId); - } - - /** - * 停止心跳任务 - */ - private void stopHeartTask(int robotId) { - ScheduledFuture task = heartbeatTasks.remove(robotId); - if (task != null && !task.isCancelled()) { - robotConnectionIds.remove(robotId); - task.cancel(false); - log.info("已停止机器人 {} 的心跳任务", robotId); - } - } - - /** - * 计划重连任务 - */ - private void scheduleReconnection(RobotInfo robot) { - scheduler.schedule(() -> { - try { - log.info("开始重连机器人: {}", robot.getRobotId()); - //清理可能存在的旧连接 - robot.setConnected(false); - disconnectRobot(robot.getRobotId()); - //重新连接 - connectRobot(robot); - } catch (Exception e) { - log.error("重连机器人失败: {}", robot.getRobotId(), e); - } - }, 2, TimeUnit.SECONDS); - } - /** * 根据游戏ID获取服务器地址 */ @@ -295,7 +195,7 @@ public class RobotConnectionHandler { return "8.138.242.190:6841"; default: log.warn("未知的玩法ID: {}, 使用默认服务器地址", wanfaId); - return "192.168.0.32:6311"; //默认地址 + return "127.0.0.1:6311"; //默认地址 } } @@ -305,21 +205,25 @@ public class RobotConnectionHandler { public void disconnectRobot(int robotId) { try { log.info("清理机器人 {} 的连接资源", robotId); - stopHeartTask(robotId); - //移除机器人TCP客户端连接 - TaurusClient client = robotClients.remove(robotId); - if (client != null) { - client.killConnection(); - log.debug("已清理机器人 {} 的客户端连接", robotId); + Object lock = robotConnectionLocks.computeIfAbsent(robotId, k -> new Object()); + synchronized (lock) { + //移除机器人TCP客户端连接 + TaurusClient client = robotClients.remove(robotId); + if (client != null) { + client.killConnection(); + log.debug("已清理机器人 {} 的客户端连接", robotId); + } + //移除连接ID + robotConnectionIds.remove(robotId); + + //清理锁对象 + robotConnectionLocks.remove(robotId); } - //移除机器人信息 - RobotAccountInfo robotAccountInfo = robotAccounts.remove(robotId); - if (robotAccountInfo != null && robotManager != null) { - //调用RobotManager的统一断开服务 - robotManager.safeDisconnectRobot(robotId, - robotAccountInfo.getGroupId(), - robotAccountInfo.getGameId(), - robotAccountInfo.getRoomId()); + + //从RobotManager中获取机器人信息并断开连接 + RobotInfo robot = robotManager.getConnectedRobot(robotId); + if (robot != null) { + robotManager.safeDisconnectRobot(robotId, robot.getGroupId(), robot.getWanfaId(), robot.getRoomId()); } } catch (Exception e) { log.error("断开机器人连接时发生异常: " + robotId, e); @@ -342,7 +246,8 @@ public class RobotConnectionHandler { //发送离开房间协议 ITObject param = new TObject(); param.putString("robotId", String.valueOf(robot.getRobotId())); - param.putString("connecId", robotConnectionIds.get(robot.getRobotId())); // 添加connecId + param.putString("connecId", robotConnectionIds.get(robot.getRobotId())); + System.out.println("readyTimeRobotExit 2005 client: "+ robotConnectionIds.get(robot.getRobotId())); client.send("2005", param, response -> { log.debug("机器人 {} 发送离开房间请求", robot.getRobotId()); }); @@ -407,20 +312,24 @@ public class RobotConnectionHandler { /** * 发送加入房间消息给指定机器人 */ - public void sendJoinRoomMessage(RobotInfo robot) { + public void sendJoinRoomMessageAsync(RobotInfo robot, Consumer callback) { TaurusClient client = robotClients.get(robot.getRobotId()); - if (client == null) { - log.warn("机器人 {} 没有有效的TCP连接,无法发送加入房间消息", robot.getRobotId()); + callback.accept(false); return; } ITObject readyParam = new TObject(); + readyParam.putInt("groupId", robot.getGroupId()); + readyParam.putString("roomId", robot.getRoomId()); + readyParam.putString("session", robot.getSession() + "," + robot.getToken()); readyParam.putString("robotId", String.valueOf(robot.getRobotId())); readyParam.putString("connecId", robotConnectionIds.get(robot.getRobotId())); + System.out.println("sendJoinRoomMessageAsync 2002 client: "+ robotConnectionIds.get(robot.getRobotId())); client.send("2002", readyParam, response -> { - log.debug("机器人 {} 发送加入房间请求响应: {}", robot.getRobotId(), response); + boolean success = response != null && response.returnCode == 0; + callback.accept(success); }); } @@ -486,13 +395,95 @@ public class RobotConnectionHandler { readyParam.putString("session", robot.getSession() + "," + robot.getToken()); readyParam.putString("robotId", String.valueOf(robot.getRobotId())); readyParam.putString("connecId", robotConnectionIds.get(robot.getRobotId())); - + System.out.println("sendReadyMessage 2003 client: "+ robotConnectionIds.get(robot.getRobotId())); client.send("2003", readyParam, response -> { boolean success = response != null && response.returnCode == 0; callback.accept(success); }); } + /** + * 异步发送加入房间消息给指定机器人,返回CompletableFuture + */ + public CompletableFuture sendJoinRoomMessageAsyncCompletable(RobotInfo robot) { + CompletableFuture future = new CompletableFuture<>(); + + TaurusClient client = robotClients.get(robot.getRobotId()); + if (client == null) { + future.complete(false); + return future; + } + + ITObject readyParam = new TObject(); + readyParam.putInt("groupId", robot.getGroupId()); + readyParam.putString("roomId", robot.getRoomId()); + + readyParam.putString("session", robot.getSession() + "," + robot.getToken()); + readyParam.putString("robotId", String.valueOf(robot.getRobotId())); + readyParam.putString("connecId", robotConnectionIds.get(robot.getRobotId())); + System.out.println("sendJoinRoomMessageAsync 2002 client: " + robotConnectionIds.get(robot.getRobotId())); + + // 设置超时机制 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException("Join room timeout")); + } + } + }, 10000); // 10秒超时 + + client.send("2002", readyParam, response -> { + boolean success = response != null && response.returnCode == 0; + if (!future.isDone()) { + future.complete(success); + } + timer.cancel(); + }); + + return future; + } + + /** + * 异步发送准备消息给指定机器人,返回CompletableFuture + */ + public CompletableFuture sendReadyMessageAsyncCompletable(RobotInfo robot) { + CompletableFuture future = new CompletableFuture<>(); + + TaurusClient client = robotClients.get(robot.getRobotId()); + if (client == null) { + future.complete(false); + return future; + } + + ITObject readyParam = new TObject(); + readyParam.putString("session", robot.getSession() + "," + robot.getToken()); + readyParam.putString("robotId", String.valueOf(robot.getRobotId())); + readyParam.putString("connecId", robotConnectionIds.get(robot.getRobotId())); + + // 设置超时机制 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + if (!future.isDone()) { + future.completeExceptionally(new RuntimeException("Ready message timeout")); + } + } + }, 10000); // 10秒超时 + + client.send("2003", readyParam, response -> { + boolean success = response != null && response.returnCode == 0; + if (!future.isDone()) { + future.complete(success); + } + timer.cancel(); + }); + + return future; + } + /** * 回调接口 */ @@ -511,7 +502,8 @@ public class RobotConnectionHandler { */ public void sendCreateRoom(RobotInfo robot, int groupId, int wanfaId) { TaurusClient client = robotClients.get(robot.getRobotId()); - if (client == null ) { + //todo !client.isConnected() + if (client == null || !client.isConnected()) { connectRobot(robot); return; } diff --git a/game_web/robot_mgr/src/main/java/com/group/robot/matcher/RoomWanfaMatcher.java b/game_web/robot_mgr/src/main/java/com/group/robot/matcher/RoomWanfaMatcher.java index 97b270e..c8cb035 100644 --- a/game_web/robot_mgr/src/main/java/com/group/robot/matcher/RoomWanfaMatcher.java +++ b/game_web/robot_mgr/src/main/java/com/group/robot/matcher/RoomWanfaMatcher.java @@ -4,10 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import com.group.robot.RobotManager; import com.group.robot.handler.RobotConnectionHandler; @@ -79,7 +76,7 @@ public class RoomWanfaMatcher { log.info("开始为群组 {} 玩法ID {} 启动房间轮询", groupId, wanfaId); //10秒轮询一次 - scheduler.scheduleWithFixedDelay(this::pollRooms, 0, 10, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::pollRooms, 0, 20, TimeUnit.SECONDS); } } @@ -163,18 +160,23 @@ public class RoomWanfaMatcher { //房间没人 加入房间 if (playersStr.equals("[]")) { - //房间是否正在被加入 - if (!processingRooms.contains(roomId)) { - processingRooms.add(roomId); - robotJoinRoom(robot, true); + Object roomSpecificLock = robotManager.getRoomLock(roomId); + synchronized (roomSpecificLock) { + if (!processingRooms.contains(roomId)) { + processingRooms.add(roomId); + robotJoinRoomInternal(robot, true); + } } } else { //房间有人 不是机器人则加入房间 Integer robotInRoom = playerIsRobotRedis(playersStr); if (robotInRoom == null) { - if (!processingRooms.contains(roomId)) { - processingRooms.add(roomId); - robotJoinRoom(robot, false); + Object roomSpecificLock = robotManager.getRoomLock(roomId); + synchronized (roomSpecificLock) { + if (!processingRooms.contains(roomId)) { + processingRooms.add(roomId); + robotJoinRoomInternal(robot, false); + } } } } @@ -188,70 +190,92 @@ public class RoomWanfaMatcher { } /** - * 机器人加入房间 + * 内部机器人加入房间方法,不包含同步逻辑 + * @param isRobot 是否是机器人创建房间 + * @param robot 机器人实例 + * */ + private void robotJoinRoomInternal(RobotInfo robot, boolean isRobot) { + String roomId = robot != null ? robot.getRoomId() : null; + RobotInfo robotInfo = robot; + int groupId = robot.getGroupId(); + + try { + if (robotInfo.getRobotId() == -1) { + robotInfo = robotManager.getLoggedInRobotForWanfa(wanfaId); + System.out.println("robotJoinRoom机器人加入房间 机器人状态 :" + robot.isConnecting()); + if (robotInfo == null) { + return; + } + robotInfo.setGroupId(groupId); + robotInfo.setRoomId(roomId); + + //验证机器人连接状态 + if (robotInfo.isUsing()) { + robotManager.releaseRobot(robotInfo.getRobotId()); + return; + } + } + + // 使用CompletableFuture替代嵌套回调,提高可读性和可控性 + RobotInfo finalRobotInfo = robotInfo; + + // 创建一个完整的加入房间流程 + CompletableFuture joinFuture = robotConnectionHandler.sendJoinRoomMessageAsyncCompletable(finalRobotInfo); + + joinFuture.thenCompose(joinSuccess -> { + if (joinSuccess) { + System.out.println("robotJoinRoomInternal机器人加入了房间 机器人状态 :" + robot.isConnecting()); + // 准备阶段 + return robotConnectionHandler.sendReadyMessageAsyncCompletable(finalRobotInfo); + } else { + System.out.println("robotJoinRoomInternal机器人加入了房间 失败 :" + robot.isConnecting()); + // 加入失败,释放机器人 + robotManager.releaseRobot(finalRobotInfo.getRobotId()); + return CompletableFuture.completedFuture(false); + } + }).thenAccept(readySuccess -> { + if (readySuccess) { + System.out.println("robotJoinRoomInternal机器人准备 房间 机器人状态 :" + robot.isConnecting()); + roomToRobotMap.put(roomId, finalRobotInfo.getRobotId()); + updateRobotStatusRedis(finalRobotInfo); + if (isRobot) { + //6秒没有玩家加入 则退出房间 + robotConnectionHandler.readyTimeRobotExit(finalRobotInfo, robotManager.getRoomLock(roomId)); + } + } else { + System.out.println("robotJoinRoomInternal机器人准备 房间 失败 :" + robot.isConnecting()); + // 准备失败时释放机器人 + robotManager.releaseRobot(finalRobotInfo.getRobotId()); + } + }).exceptionally(throwable -> { + log.error("机器人加入房间流程异常", throwable); + robotManager.releaseRobot(finalRobotInfo.getRobotId()); + return null; + }); + + } catch (Exception e) { + if (robotInfo != null) { + //释放机器人 + robotManager.releaseRobot(robotInfo.getRobotId()); + } + } finally { + // 从处理队列中移除房间 - 这个操作是必要的,无论是在哪种情况下 + processingRooms.remove(roomId); + } + } + + /** + * 机器人加入房间 - 保持原有的公共接口 * @param isRobot 是否是机器人创建房间 * @param robot 机器人实例 * */ private void robotJoinRoom(RobotInfo robot, boolean isRobot) { - synchronized (joinRoomLock) { - RobotInfo robotInfo = robot; - String roomId = robot != null ? robot.getRoomId() : null; - int groupId = robot.getGroupId(); - try { - if (robotInfo.getRobotId() == -1) { - robotInfo = robotManager.getLoggedInRobotForWanfa(wanfaId); - robotInfo.setGroupId(groupId); - robotInfo.setRoomId(roomId); - } - - //加入房间 - try { - GroupRoomBusiness.joinRoom(robotInfo.getGroupId(), robotInfo.getRoomId(), robotInfo.getSession(), null); - Thread.sleep(5000); - robotConnectionHandler.sendJoinRoomMessage(robotInfo); - Thread.sleep(1000); - }catch (Exception e) { - log.error("加入房间失败", e); - processingRooms.remove(roomId); - return; - } - - //准备 - RobotInfo finalRobotInfo = robotInfo; - robotConnectionHandler.sendReadyMessageAsync(robotInfo, readySuccess -> { - if (readySuccess) { - roomToRobotMap.put(roomId, finalRobotInfo.getRobotId()); - updateRobotStatusRedis(finalRobotInfo); - if (isRobot) { - //6秒没有玩家加入 则退出房间 - robotConnectionHandler.readyTimeRobotExit(finalRobotInfo, joinRoomLock); - } - } else { - System.out.println("机器人准备失败"); - } - if (roomId != null) { - processingRooms.remove(roomId); - } - }); - /*boolean readySuccess = robotConnectionHandler.sendReadyMessageSync(robotInfo); - if (readySuccess) { - //记录机器人和房间的关联 - roomToRobotMap.put(roomId, robotInfo.getRobotId()); - - updateRobotStatusRedis(robotInfo); - - if (isRobot) { - //6秒没有玩家加入 则退出房间 - robotConnectionHandler.readyTimeRobotExit(robotInfo, joinRoomLock); - } - } else { - //robotConnectionHandler.disconnectRobot(robotInfo.getRobotId()); - }*/ - } catch (Exception e) { - if (robotInfo != null) { - //robotConnectionHandler.disconnectRobot(robotInfo.getRobotId()); - } - } + String roomId = robot != null ? robot.getRoomId() : null; + // 使用房间级别的细粒度锁,而不是全局锁 + Object roomSpecificLock = robotManager.getRoomLock(roomId); + + synchronized (roomSpecificLock) { + robotJoinRoomInternal(robot, isRobot); } } @@ -321,13 +345,14 @@ public class RoomWanfaMatcher { private void manageRobotConnections(int currentLeftoverRobot) { if (lastLeftoverRobot != currentLeftoverRobot) { if (currentLeftoverRobot <= 0 && lastLeftoverRobot > 0) { - //leftover_robot变为0 断开该玩法与robot_mj_cs的连接 - robotManager.disconnectExcessRobots(wanfaId, 0); + //leftover_robot变为0 断开该玩法下未使用的机器人连接 + robotManager.disconnectExcessRobots(wanfaId); + lastLeftoverRobot = 0; } else if (currentLeftoverRobot > 0 && lastLeftoverRobot <= 0) { - //leftover_robot变为正数 重新连接机器人到robot_mj_cs + //leftover_robot变为正数 检查是否有可用的空闲机器人 没有则连接新的 robotManager.connectRequiredRobots(wanfaId, currentLeftoverRobot); + lastLeftoverRobot = currentLeftoverRobot; } - lastLeftoverRobot = currentLeftoverRobot; } } diff --git a/robots/majiang/robot_mj_cs/config/game-config.xml b/robots/majiang/robot_mj_cs/config/game-config.xml index c5f7584..bf8fc8f 100644 --- a/robots/majiang/robot_mj_cs/config/game-config.xml +++ b/robots/majiang/robot_mj_cs/config/game-config.xml @@ -1,8 +1,8 @@ - 127.0.0.1 - 127.0.0.1 + 192.168.0.32 + 192.168.0.32 8701 7701 10 diff --git a/robots/majiang/robot_mj_cs/config/taurus-core.xml b/robots/majiang/robot_mj_cs/config/taurus-core.xml index cfbaf88..04a186f 100644 --- a/robots/majiang/robot_mj_cs/config/taurus-core.xml +++ b/robots/majiang/robot_mj_cs/config/taurus-core.xml @@ -38,15 +38,15 @@ - - - - - - - - - + + + + + + + + +