diff --git a/core/src/main/java/io/seata/core/rpc/ShutdownHook.java b/core/src/main/java/io/seata/core/rpc/ShutdownHook.java index 543bcd68f7ce597949eb1f788b171500793b87ce..b38fe18b48878b0216bf0eb095aaf5f9add019ee 100644 --- a/core/src/main/java/io/seata/core/rpc/ShutdownHook.java +++ b/core/src/main/java/io/seata/core/rpc/ShutdownHook.java @@ -15,12 +15,9 @@ */ package io.seata.core.rpc; -import java.util.Set; -import java.util.TreeSet; +import java.util.PriorityQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import io.seata.common.util.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +32,7 @@ public class ShutdownHook extends Thread { private static final ShutdownHook SHUTDOWN_HOOK = new ShutdownHook("ShutdownHook"); - private Set<Disposable> disposables = new TreeSet<>(); + private final PriorityQueue<DisposablePriorityWrapper> disposables = new PriorityQueue<>(); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -70,16 +67,22 @@ public class ShutdownHook extends Thread { } public void destroyAll() { - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("destoryAll starting"); + if (!destroyed.compareAndSet(false, true)) { + return; } - if (!destroyed.compareAndSet(false, true) && CollectionUtils.isEmpty(disposables)) { + + if (disposables.isEmpty()) { return; } - for (Disposable disposable : disposables) { + + LOGGER.debug("destoryAll starting"); + + while (!disposables.isEmpty()) { + Disposable disposable = disposables.poll(); disposable.destroy(); } + + LOGGER.debug("destoryAll finish"); } /** @@ -91,63 +94,18 @@ public class ShutdownHook extends Thread { private static class DisposablePriorityWrapper implements Comparable<DisposablePriorityWrapper>, Disposable { - private static AtomicLong seq = new AtomicLong(); - - private Disposable disposable; - - private int priority; + private final Disposable disposable; - private long seqId; + private final int priority; public DisposablePriorityWrapper(Disposable disposable, int priority) { this.disposable = disposable; this.priority = priority; - this.seqId = seq.incrementAndGet(); } @Override - public int compareTo(DisposablePriorityWrapper disposablePriorityWrapper) { - int cmp = priority - disposablePriorityWrapper.priority; - if (cmp == 0) { - if (seqId > disposablePriorityWrapper.seqId) { - cmp = 1; - } else if (seqId < disposablePriorityWrapper.seqId) { - cmp = -1; - } else { - cmp = 0; - } - } - return cmp; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + priority; - result = prime * result + (int) (seqId ^ (seqId >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - DisposablePriorityWrapper other = (DisposablePriorityWrapper) obj; - if (priority != other.priority) { - return false; - } - if (seqId != other.seqId) { - return false; - } - return true; + public int compareTo(DisposablePriorityWrapper challenger) { + return priority - challenger.priority; } @Override @@ -155,6 +113,5 @@ public class ShutdownHook extends Thread { disposable.destroy(); } } - } diff --git a/core/src/test/java/io/seata/core/context/GlobalLockConfigHolderTest.java b/core/src/test/java/io/seata/core/context/GlobalLockConfigHolderTest.java index 292cc159ce555d7aa8aaf68e3fdf180cc09706bb..440f73938afe19277ee638b218a7db6447523cc5 100644 --- a/core/src/test/java/io/seata/core/context/GlobalLockConfigHolderTest.java +++ b/core/src/test/java/io/seata/core/context/GlobalLockConfigHolderTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; -class GlobalLockConfigHolderTest { +public class GlobalLockConfigHolderTest { @BeforeEach void setUp() { diff --git a/core/src/test/java/io/seata/core/rpc/ShutdownHookTest.java b/core/src/test/java/io/seata/core/rpc/ShutdownHookTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c5244cca7d204e837680ed60413b8bb458681392 --- /dev/null +++ b/core/src/test/java/io/seata/core/rpc/ShutdownHookTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; + +public class ShutdownHookTest { + + private int previousPriority = -1; + + private final ShutdownHook hook = ShutdownHook.getInstance(); + + private final Random random = new Random(); + + @BeforeAll + static void beforeAll() { + ShutdownHook.removeRuntimeShutdownHook(); + } + + @Test + void testAddAndExecute() throws InterruptedException { + // note: all of them had been added in the addDisposable method + List<Disposable> disposableList = getRandomDisposableList(); + + hook.start(); + hook.join(); + + disposableList.forEach(disposable -> verify(disposable, times(1)).destroy()); + } + + private List<Disposable> getRandomDisposableList() { + return IntStream.rangeClosed(0, 10) + .boxed() + .flatMap(this::generateDisposableStream) + .collect(Collectors.toList()); + } + + private Stream<Disposable> generateDisposableStream(int priority) { + int size = random.nextInt(10); + return IntStream.range(0, size).mapToObj(i -> addDisposable(priority)); + } + + private Disposable addDisposable(int priority) { + Disposable disposable = new TestDisposable(priority); + Disposable wrapper = spy(disposable); + hook.addDisposable(wrapper, priority); + return wrapper; + } + + private class TestDisposable implements Disposable { + + private final int priority; + + @Override + public void destroy() { + assertTrue(previousPriority <= priority, "lower priority should be executed first"); + previousPriority = priority; + } + + public TestDisposable(int priority) { + this.priority = priority; + } + } +} \ No newline at end of file diff --git a/rm-datasource/src/test/java/io/seata/rm/GlobalLockTemplateTest.java b/rm-datasource/src/test/java/io/seata/rm/GlobalLockTemplateTest.java index e944c918d3094ce0235242ee972281777ca15a70..c9c4717697f5057843e94c4af8be278bb443f5ef 100644 --- a/rm-datasource/src/test/java/io/seata/rm/GlobalLockTemplateTest.java +++ b/rm-datasource/src/test/java/io/seata/rm/GlobalLockTemplateTest.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.*; /** * @author selfishlover */ -class GlobalLockTemplateTest { +public class GlobalLockTemplateTest { private GlobalLockTemplate template = new GlobalLockTemplate(); diff --git a/rm-datasource/src/test/java/io/seata/rm/datasource/exec/LockRetryControllerTest.java b/rm-datasource/src/test/java/io/seata/rm/datasource/exec/LockRetryControllerTest.java index 5ea15414afda90941fa675b7db2a86eb33cc5a96..f707c7aeb1a9be9b43295838a3fc3e8207507b8b 100644 --- a/rm-datasource/src/test/java/io/seata/rm/datasource/exec/LockRetryControllerTest.java +++ b/rm-datasource/src/test/java/io/seata/rm/datasource/exec/LockRetryControllerTest.java @@ -28,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.*; /** * @author selfishlover */ -class LockRetryControllerTest { +public class LockRetryControllerTest { private GlobalLockConfig config;