diff --git a/common/src/main/java/io/seata/common/rpc/RpcStatus.java b/common/src/main/java/io/seata/common/rpc/RpcStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..350a364473455bb96bebd7da5f979249f71cb9d3 --- /dev/null +++ b/common/src/main/java/io/seata/common/rpc/RpcStatus.java @@ -0,0 +1,93 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.common.rpc; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +/** + * The state statistics. + * + * @author ph3636 + */ +public class RpcStatus { + + private static final ConcurrentMap<String, RpcStatus> SERVICE_STATUS_MAP = new ConcurrentHashMap<>(); + private final AtomicLong active = new AtomicLong(); + private final LongAdder total = new LongAdder(); + + private RpcStatus() { + } + + /** + * get the RpcStatus of this service + * + * @param service the service + * @return RpcStatus + */ + public static RpcStatus getStatus(String service) { + return SERVICE_STATUS_MAP.computeIfAbsent(service, key -> new RpcStatus()); + } + + /** + * remove the RpcStatus of this service + * + * @param service the service + */ + public static void removeStatus(String service) { + SERVICE_STATUS_MAP.remove(service); + } + + /** + * begin count + * + * @param service the service + */ + public static void beginCount(String service) { + getStatus(service).active.incrementAndGet(); + } + + /** + * end count + * + * @param service the service + */ + public static void endCount(String service) { + RpcStatus rpcStatus = getStatus(service); + rpcStatus.active.decrementAndGet(); + rpcStatus.total.increment(); + } + + /** + * get active. + * + * @return active + */ + public long getActive() { + return active.get(); + } + + /** + * get total. + * + * @return total + */ + public long getTotal() { + return total.longValue(); + } +} diff --git a/common/src/test/java/io/seata/common/rpc/RpcStatusTest.java b/common/src/test/java/io/seata/common/rpc/RpcStatusTest.java new file mode 100644 index 0000000000000000000000000000000000000000..97615dab3b6bab3ce04cac42dc0a5fa8aa7901cf --- /dev/null +++ b/common/src/test/java/io/seata/common/rpc/RpcStatusTest.java @@ -0,0 +1,42 @@ +package io.seata.common.rpc; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * The state statistics test. + * + * @author ph3636 + */ +public class RpcStatusTest { + + public static final String SERVICE = "127.0.0.1:80"; + + @Test + public void getStatus() { + RpcStatus rpcStatus1 = RpcStatus.getStatus(SERVICE); + Assertions.assertNotNull(rpcStatus1); + RpcStatus rpcStatus2 = RpcStatus.getStatus(SERVICE); + Assertions.assertEquals(rpcStatus1, rpcStatus2); + } + + @Test + public void removeStatus() { + RpcStatus old = RpcStatus.getStatus(SERVICE); + RpcStatus.removeStatus(SERVICE); + Assertions.assertNotEquals(RpcStatus.getStatus(SERVICE), old); + } + + @Test + public void beginCount() { + RpcStatus.beginCount(SERVICE); + Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getActive(), 1); + } + + @Test + public void endCount() { + RpcStatus.endCount(SERVICE); + Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getActive(), 0); + Assertions.assertEquals(RpcStatus.getStatus(SERVICE).getTotal(), 1); + } +} diff --git a/core/src/main/java/io/seata/core/rpc/hook/RpcHook.java b/core/src/main/java/io/seata/core/rpc/hook/RpcHook.java new file mode 100644 index 0000000000000000000000000000000000000000..d5303a6938863c64071ae5e30fdc84c85507ffd9 --- /dev/null +++ b/core/src/main/java/io/seata/core/rpc/hook/RpcHook.java @@ -0,0 +1,28 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.hook; + +import io.seata.core.protocol.RpcMessage; + +/** + * @author ph3636 + */ +public interface RpcHook { + + void doBeforeRequest(String remoteAddr, RpcMessage request); + + void doAfterResponse(String remoteAddr, RpcMessage request, Object response); +} diff --git a/core/src/main/java/io/seata/core/rpc/hook/StatusRpcHook.java b/core/src/main/java/io/seata/core/rpc/hook/StatusRpcHook.java new file mode 100644 index 0000000000000000000000000000000000000000..15025c7ace8be787747a25e175e509bfb2a91781 --- /dev/null +++ b/core/src/main/java/io/seata/core/rpc/hook/StatusRpcHook.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.hook; + +import io.seata.common.rpc.RpcStatus; +import io.seata.core.protocol.RpcMessage; + +/** + * @author ph3636 + */ +public class StatusRpcHook implements RpcHook { + + @Override + public void doBeforeRequest(String remoteAddr, RpcMessage request) { + RpcStatus.beginCount(remoteAddr); + } + + @Override + public void doAfterResponse(String remoteAddr, RpcMessage request, Object response) { + RpcStatus.endCount(remoteAddr); + } +} diff --git a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java index 2452edab72ba05ac18f687604aecfee2895a20ac..913ab1abf62aed46ae70e95a80c2dee0f5904551 100644 --- a/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java +++ b/core/src/main/java/io/seata/core/rpc/netty/AbstractNettyRemoting.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.seata.common.exception.FrameworkErrorCode; import io.seata.common.exception.FrameworkException; +import io.seata.common.loader.EnhancedServiceLoader; import io.seata.common.thread.NamedThreadFactory; import io.seata.common.thread.PositiveAtomicCounter; import io.seata.core.protocol.MessageFuture; @@ -28,6 +29,7 @@ import io.seata.core.protocol.MessageTypeAware; import io.seata.core.protocol.ProtocolConstants; import io.seata.core.protocol.RpcMessage; import io.seata.core.rpc.Disposable; +import io.seata.core.rpc.hook.RpcHook; import io.seata.core.rpc.processor.Pair; import io.seata.core.rpc.processor.RemotingProcessor; import org.slf4j.Logger; @@ -37,6 +39,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.SocketAddress; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -99,6 +102,8 @@ public abstract class AbstractNettyRemoting implements Disposable { */ protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32); + protected final List<RpcHook> rpcHooks = EnhancedServiceLoader.loadAll(RpcHook.class); + public void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override @@ -174,6 +179,9 @@ public abstract class AbstractNettyRemoting implements Disposable { channelWritableCheck(channel, rpcMessage.getBody()); + String remoteAddr = ChannelUtil.getAddressFromChannel(channel); + doBeforeRpcHooks(remoteAddr, rpcMessage); + channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); @@ -185,7 +193,9 @@ public abstract class AbstractNettyRemoting implements Disposable { }); try { - return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + doAfterRpcHooks(remoteAddr, rpcMessage, result); + return result; } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody()); @@ -209,6 +219,9 @@ public abstract class AbstractNettyRemoting implements Disposable { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); } + + doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage); + channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { destroyChannel(future.channel()); @@ -349,4 +362,15 @@ public abstract class AbstractNettyRemoting implements Disposable { */ public abstract void destroyChannel(String serverAddress, Channel channel); + protected void doBeforeRpcHooks(String remoteAddr, RpcMessage request) { + for (RpcHook rpcHook: rpcHooks) { + rpcHook.doBeforeRequest(remoteAddr, request); + } + } + + protected void doAfterRpcHooks(String remoteAddr, RpcMessage request, Object response) { + for (RpcHook rpcHook: rpcHooks) { + rpcHook.doAfterResponse(remoteAddr, request, response); + } + } } diff --git a/core/src/main/resources/META-INF/services/io.seata.core.rpc.hook.RpcHook b/core/src/main/resources/META-INF/services/io.seata.core.rpc.hook.RpcHook new file mode 100644 index 0000000000000000000000000000000000000000..9d86e299d5da287e72eaeccbdc48ef79c66e209a --- /dev/null +++ b/core/src/main/resources/META-INF/services/io.seata.core.rpc.hook.RpcHook @@ -0,0 +1 @@ +io.seata.core.rpc.hook.StatusRpcHook \ No newline at end of file diff --git a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/loadbalance/LeastActiveLoadBalance.java b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/loadbalance/LeastActiveLoadBalance.java new file mode 100644 index 0000000000000000000000000000000000000000..1180f9ab81817d12ac071c970cb3491bbcd2a06d --- /dev/null +++ b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/loadbalance/LeastActiveLoadBalance.java @@ -0,0 +1,53 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.discovery.loadbalance; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import io.seata.common.loader.LoadLevel; +import io.seata.common.rpc.RpcStatus; + +/** + * The type Least Active load balance. + * + * @author ph3636 + */ +@LoadLevel(name = "LeastActiveLoadBalance") +public class LeastActiveLoadBalance extends AbstractLoadBalance { + + @Override + protected <T> T doSelect(List<T> invokers, String xid) { + int length = invokers.size(); + long leastActive = -1; + int leastCount = 0; + int[] leastIndexes = new int[length]; + for (int i = 0; i < length; i++) { + long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive(); + if (leastActive == -1 || active < leastActive) { + leastActive = active; + leastCount = 1; + leastIndexes[0] = i; + } else if (active == leastActive) { + leastIndexes[leastCount++] = i; + } + } + if (leastCount == 1) { + return invokers.get(leastIndexes[0]); + } + return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); + } +} diff --git a/discovery/seata-discovery-core/src/main/resources/META-INF/services/io.seata.discovery.loadbalance.LoadBalance b/discovery/seata-discovery-core/src/main/resources/META-INF/services/io.seata.discovery.loadbalance.LoadBalance index 81fb267af93a60b42e15481c3e967a5593bf255d..26a29ae95daf6f2500e1f5482ed6924ccf613935 100644 --- a/discovery/seata-discovery-core/src/main/resources/META-INF/services/io.seata.discovery.loadbalance.LoadBalance +++ b/discovery/seata-discovery-core/src/main/resources/META-INF/services/io.seata.discovery.loadbalance.LoadBalance @@ -48,4 +48,5 @@ io.seata.discovery.loadbalance.RoundRobinLoadBalance io.seata.discovery.loadbalance.RandomLoadBalance -io.seata.discovery.loadbalance.ConsistentHashLoadBalance \ No newline at end of file +io.seata.discovery.loadbalance.ConsistentHashLoadBalance +io.seata.discovery.loadbalance.LeastActiveLoadBalance \ No newline at end of file diff --git a/discovery/seata-discovery-core/src/test/java/io/seata/discovery/loadbalance/LoadBalanceTest.java b/discovery/seata-discovery-core/src/test/java/io/seata/discovery/loadbalance/LoadBalanceTest.java index ebfb084b0ff63ff60778003881f58fa4a2e0ec83..44248fb73cc6daf6c650f16fb0859b2a552c383c 100644 --- a/discovery/seata-discovery-core/src/test/java/io/seata/discovery/loadbalance/LoadBalanceTest.java +++ b/discovery/seata-discovery-core/src/test/java/io/seata/discovery/loadbalance/LoadBalanceTest.java @@ -15,6 +15,7 @@ */ package io.seata.discovery.loadbalance; +import io.seata.common.rpc.RpcStatus; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -32,6 +33,8 @@ import java.util.stream.Stream; */ public class LoadBalanceTest { + private static final String XID = "XID"; + /** * Test random load balance select. * @@ -83,6 +86,38 @@ public class LoadBalanceTest { Assertions.assertEquals(1, selected, "selected must be equal to 1"); } + /** + * Test least active load balance select. + * + * @param addresses the addresses + */ + @ParameterizedTest + @MethodSource("addressProvider") + public void testLeastActiveLoadBalance_select(List<InetSocketAddress> addresses) throws Exception { + int runs = 10000; + int size = addresses.size(); + for (int i = 0; i < size - 1; i++) { + RpcStatus.beginCount(addresses.get(i).toString()); + } + InetSocketAddress socketAddress = addresses.get(size - 1); + LoadBalance loadBalance = new LeastActiveLoadBalance(); + for (int i = 0; i < runs; i++) { + InetSocketAddress selectAddress = loadBalance.select(addresses, XID); + Assertions.assertEquals(selectAddress, socketAddress); + } + RpcStatus.beginCount(socketAddress.toString()); + RpcStatus.beginCount(socketAddress.toString()); + Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs, addresses, loadBalance); + for (InetSocketAddress address : counter.keySet()) { + Long count = counter.get(address).get(); + if (address == socketAddress) { + Assertions.assertEquals(count, 0); + } else { + Assertions.assertTrue(count > 0); + } + } + } + /** * Gets selected counter. * @@ -100,7 +135,7 @@ public class LoadBalanceTest { } try { for (int i = 0; i < runs; i++) { - InetSocketAddress selectAddress = loadBalance.select(addresses, "XID"); + InetSocketAddress selectAddress = loadBalance.select(addresses, XID); counter.get(selectAddress).incrementAndGet(); } } catch (Exception e) {