diff --git a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java index 833a68d11a3755b8394033e4184b907fc7e0b74d..8cd487195da8ab3bf66d3ea6c89998b05645f777 100644 --- a/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java +++ b/core/src/main/java/io/seata/core/constants/ConfigurationKeys.java @@ -25,7 +25,7 @@ public interface ConfigurationKeys { /** * The constant SEATA_PREFIX. */ - public static final String SEATA_PREFIX = "seata."; + String SEATA_PREFIX = "seata."; /** * The constant SERVICE_PREFIX. diff --git a/server/src/main/java/io/seata/server/session/SessionHolder.java b/server/src/main/java/io/seata/server/session/SessionHolder.java index fd2af233ad17098ea3360895f7e2fbfb77eda5de..18f12bf97e161d035f0e81caeb42ba61dd258b8d 100644 --- a/server/src/main/java/io/seata/server/session/SessionHolder.java +++ b/server/src/main/java/io/seata/server/session/SessionHolder.java @@ -17,11 +17,12 @@ package io.seata.server.session; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import io.seata.common.exception.ShouldNeverHappenException; import io.seata.common.exception.StoreException; import io.seata.common.loader.EnhancedServiceLoader; +import io.seata.common.util.CollectionUtils; import io.seata.common.util.StringUtils; import io.seata.config.Configuration; import io.seata.config.ConfigurationFactory; @@ -78,7 +79,7 @@ public class SessionHolder { * @param mode the store mode: file, db * @throws IOException the io exception */ - public static void init(String mode) throws IOException { + public static void init(String mode) { if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); } @@ -117,68 +118,54 @@ public class SessionHolder { // unknown store throw new IllegalArgumentException("unknown store mode:" + mode); } - reload(); + reload(storeMode); } + //region reload + /** * Reload. */ - protected static void reload() { + protected static void reload(StoreMode storeMode) { if (ROOT_SESSION_MANAGER instanceof Reloadable) { - ((Reloadable)ROOT_SESSION_MANAGER).reload(); + ((Reloadable) ROOT_SESSION_MANAGER).reload(); + } - Collection<GlobalSession> reloadedSessions = ROOT_SESSION_MANAGER.allSessions(); - if (reloadedSessions != null && !reloadedSessions.isEmpty()) { - reloadedSessions.forEach(globalSession -> { - GlobalStatus globalStatus = globalSession.getStatus(); - switch (globalStatus) { - case UnKnown: - case Committed: - case CommitFailed: - case Rollbacked: - case RollbackFailed: - case TimeoutRollbacked: - case TimeoutRollbackFailed: - case Finished: - throw new ShouldNeverHappenException("Reloaded Session should NOT be " + globalStatus); - case AsyncCommitting: - try { - globalSession.addSessionLifecycleListener(getAsyncCommittingSessionManager()); - getAsyncCommittingSessionManager().addGlobalSession(globalSession); - } catch (TransactionException e) { - throw new ShouldNeverHappenException(e); - } - break; - default: { - ArrayList<BranchSession> branchSessions = globalSession.getSortedBranches(); - branchSessions.forEach(branchSession -> { - try { - branchSession.lock(); - } catch (TransactionException e) { - throw new ShouldNeverHappenException(e); - } - }); + // There is a remove operation in the following code, it will affect the file mode, so new ArrayList + List<GlobalSession> allSessions = new ArrayList<>(ROOT_SESSION_MANAGER.allSessions()); + if (CollectionUtils.isNotEmpty(allSessions)) { + allSessions.forEach(globalSession -> { + GlobalStatus globalStatus = globalSession.getStatus(); + switch (globalStatus) { + case UnKnown: + case Committed: + case CommitFailed: + case Rollbacked: + case RollbackFailed: + case TimeoutRollbacked: + case TimeoutRollbackFailed: + case Finished: + removeInErrorState(globalSession); + break; + case AsyncCommitting: + if (storeMode == StoreMode.FILE) { + queueToAsyncCommitting(globalSession); + } + break; + default: { + if (storeMode == StoreMode.FILE) { + lockBranchSessions(globalSession.getSortedBranches()); switch (globalStatus) { case Committing: case CommitRetrying: - try { - globalSession.addSessionLifecycleListener(getRetryCommittingSessionManager()); - getRetryCommittingSessionManager().addGlobalSession(globalSession); - } catch (TransactionException e) { - throw new ShouldNeverHappenException(e); - } + queueToRetryCommit(globalSession); break; case Rollbacking: case RollbackRetrying: case TimeoutRollbacking: case TimeoutRollbackRetrying: - try { - globalSession.addSessionLifecycleListener(getRetryRollbackingSessionManager()); - getRetryRollbackingSessionManager().addGlobalSession(globalSession); - } catch (TransactionException e) { - throw new ShouldNeverHappenException(e); - } + queueToRetryRollback(globalSession); break; case Begin: globalSession.setActive(true); @@ -186,14 +173,67 @@ public class SessionHolder { default: throw new ShouldNeverHappenException("NOT properly handled " + globalStatus); } - break; } + break; } - }); + } + }); + } + } + + private static void removeInErrorState(GlobalSession globalSession) { + try { + LOGGER.warn("The global session should NOT be {}, remove it. xid = {}", globalSession.getStatus(), globalSession.getXid()); + ROOT_SESSION_MANAGER.removeGlobalSession(globalSession); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Remove global session succeed, xid = {}, status = {}", globalSession.getXid(), globalSession.getStatus()); + } + } catch (Exception e) { + LOGGER.error("Remove global session failed, xid = {}, status = {}", globalSession.getXid(), globalSession.getStatus(), e); + } + } + + private static void queueToAsyncCommitting(GlobalSession globalSession) { + try { + globalSession.addSessionLifecycleListener(getAsyncCommittingSessionManager()); + getAsyncCommittingSessionManager().addGlobalSession(globalSession); + } catch (TransactionException e) { + throw new ShouldNeverHappenException(e); + } + } + + private static void lockBranchSessions(ArrayList<BranchSession> branchSessions) { + branchSessions.forEach(branchSession -> { + try { + branchSession.lock(); + } catch (TransactionException e) { + throw new ShouldNeverHappenException(e); } + }); + } + + private static void queueToRetryCommit(GlobalSession globalSession) { + try { + globalSession.addSessionLifecycleListener(getRetryCommittingSessionManager()); + getRetryCommittingSessionManager().addGlobalSession(globalSession); + } catch (TransactionException e) { + throw new ShouldNeverHappenException(e); } } + private static void queueToRetryRollback(GlobalSession globalSession) { + try { + globalSession.addSessionLifecycleListener(getRetryRollbackingSessionManager()); + getRetryRollbackingSessionManager().addGlobalSession(globalSession); + } catch (TransactionException e) { + throw new ShouldNeverHappenException(e); + } + } + + //endregion + + //region get session manager + /** * Gets root session manager. * @@ -242,6 +282,8 @@ public class SessionHolder { return RETRY_ROLLBACKING_SESSION_MANAGER; } + //endregion + /** * Find global session. * diff --git a/server/src/main/java/io/seata/server/storage/db/session/DataBaseSessionManager.java b/server/src/main/java/io/seata/server/storage/db/session/DataBaseSessionManager.java index c14980e1c9cd3fd29b1d842ed28173b0c9bd756e..5f3faf5c740da2ea95e5fdf48285dc7dabb2746f 100644 --- a/server/src/main/java/io/seata/server/storage/db/session/DataBaseSessionManager.java +++ b/server/src/main/java/io/seata/server/storage/db/session/DataBaseSessionManager.java @@ -28,11 +28,8 @@ import io.seata.core.model.GlobalStatus; import io.seata.server.session.AbstractSessionManager; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; -import io.seata.server.session.Reloadable; import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionHolder; -import io.seata.server.session.SessionLifecycleListener; -import io.seata.server.session.SessionManager; import io.seata.server.storage.db.store.DataBaseTransactionStoreManager; import io.seata.server.store.TransactionStoreManager.LogOperation; import io.seata.common.loader.Scope; @@ -46,7 +43,7 @@ import org.slf4j.LoggerFactory; */ @LoadLevel(name = "db", scope = Scope.PROTOTYPE) public class DataBaseSessionManager extends AbstractSessionManager - implements SessionManager, SessionLifecycleListener, Initialize, Reloadable { + implements Initialize { /** * The constant LOGGER. @@ -196,8 +193,4 @@ public class DataBaseSessionManager extends AbstractSessionManager throws TransactionException { return lockCallable.call(); } - - @Override - public void reload() { - } } diff --git a/server/src/main/java/io/seata/server/storage/redis/session/RedisSessionManager.java b/server/src/main/java/io/seata/server/storage/redis/session/RedisSessionManager.java index 6b944dce51d01e495b50891c57f959f4fbf62432..63bc5d9c8191698926d778f9dc7ee23bae0bdaa6 100644 --- a/server/src/main/java/io/seata/server/storage/redis/session/RedisSessionManager.java +++ b/server/src/main/java/io/seata/server/storage/redis/session/RedisSessionManager.java @@ -28,11 +28,8 @@ import io.seata.core.model.GlobalStatus; import io.seata.server.session.AbstractSessionManager; import io.seata.server.session.BranchSession; import io.seata.server.session.GlobalSession; -import io.seata.server.session.Reloadable; import io.seata.server.session.SessionCondition; import io.seata.server.session.SessionHolder; -import io.seata.server.session.SessionLifecycleListener; -import io.seata.server.session.SessionManager; import io.seata.server.storage.redis.store.RedisTransactionStoreManager; import io.seata.server.store.TransactionStoreManager.LogOperation; import org.slf4j.Logger; @@ -43,7 +40,7 @@ import org.slf4j.LoggerFactory; */ @LoadLevel(name = "redis", scope = Scope.PROTOTYPE) public class RedisSessionManager extends AbstractSessionManager - implements SessionManager, SessionLifecycleListener, Initialize, Reloadable { + implements Initialize { /** * The constant LOGGER. */ @@ -193,8 +190,4 @@ public class RedisSessionManager extends AbstractSessionManager throws TransactionException { return lockCallable.call(); } - - @Override - public void reload() {} - } diff --git a/test/src/test/java/io/seata/saga/engine/db/AbstractServerTest.java b/test/src/test/java/io/seata/saga/engine/db/AbstractServerTest.java index 31c7624662dd27453f0722d4b6b06566bcf75f71..5dcc9f0d653308f45f95cde4c9be915d0ff3d994 100644 --- a/test/src/test/java/io/seata/saga/engine/db/AbstractServerTest.java +++ b/test/src/test/java/io/seata/saga/engine/db/AbstractServerTest.java @@ -48,44 +48,40 @@ public abstract class AbstractServerTest { (new Thread(new Runnable() { @Override public void run() { - try { - File file = new File("sessionStore/root.data"); - if(file.exists()){ - file.delete(); - } - - ParameterParser parameterParser = new ParameterParser(new String[]{}); + File file = new File("sessionStore/root.data"); + if(file.exists()){ + file.delete(); + } - //initialize the metrics - MetricsManager.get().init(); + ParameterParser parameterParser = new ParameterParser(new String[]{}); - System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); + //initialize the metrics + MetricsManager.get().init(); - nettyServer = new NettyRemotingServer(workingThreads); - //server port - nettyServer.setListenPort(parameterParser.getPort()); - UUIDGenerator.init(parameterParser.getServerNode()); - //log store mode : file、db - SessionHolder.init(parameterParser.getStoreMode()); + System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); - DefaultCoordinator coordinator = new DefaultCoordinator(nettyServer); - coordinator.init(); - nettyServer.setHandler(coordinator); - // register ShutdownHook - ShutdownHook.getInstance().addDisposable(coordinator); + nettyServer = new NettyRemotingServer(workingThreads); + //server port + nettyServer.setListenPort(parameterParser.getPort()); + UUIDGenerator.init(parameterParser.getServerNode()); + //log store mode : file、db + SessionHolder.init(parameterParser.getStoreMode()); - //127.0.0.1 and 0.0.0.0 are not valid here. - if (NetUtil.isValidIp(parameterParser.getHost(), false)) { - XID.setIpAddress(parameterParser.getHost()); - } else { - XID.setIpAddress(NetUtil.getLocalIp()); - } - XID.setPort(nettyServer.getListenPort()); + DefaultCoordinator coordinator = new DefaultCoordinator(nettyServer); + coordinator.init(); + nettyServer.setHandler(coordinator); + // register ShutdownHook + ShutdownHook.getInstance().addDisposable(coordinator); - nettyServer.init(); - } catch (IOException e) { - throw new RuntimeException(e); + //127.0.0.1 and 0.0.0.0 are not valid here. + if (NetUtil.isValidIp(parameterParser.getHost(), false)) { + XID.setIpAddress(parameterParser.getHost()); + } else { + XID.setIpAddress(NetUtil.getLocalIp()); } + XID.setPort(nettyServer.getListenPort()); + + nettyServer.init(); } })).start(); Thread.sleep(5000);