Skip to content
Snippets Groups Projects
Unverified Commit f8359f05 authored by jimin's avatar jimin Committed by GitHub
Browse files

[release]: release for 1.4.1

parents f270a2d6 9569bed4
No related branches found
No related tags found
No related merge requests found
Showing
with 304 additions and 72 deletions
......@@ -21,7 +21,7 @@
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>
<name>Seata All-in-one ${project.version}</name>
<url>http://seata.io</url>
......@@ -289,6 +289,11 @@
<artifactId>seata-compressor-lz4</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-compressor-deflater</artifactId>
<version>${project.version}</version>
</dependency>
<!-- saga -->
<dependency>
......@@ -681,11 +686,13 @@
<include>io.seata:seata-saga-rm</include>
<include>io.seata:seata-saga-tm</include>
<include>io.seata:seata-saga-engine-store</include>
<!--compressor-->
<include>io.seata:seata-compressor-gzip</include>
<include>io.seata:seata-compressor-7z</include>
<include>io.seata:seata-compressor-bzip2</include>
<include>io.seata:seata-compressor-zip</include>
<include>io.seata:seata-compressor-lz4</include>
<include>io.seata:seata-compressor-deflater</include>
</includes>
</artifactSet>
<transformers>
......@@ -771,6 +778,7 @@
<serverId>oss_seata</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>false</autoReleaseAfterClose>
<skipStagingRepositoryClose>true</skipStagingRepositoryClose>
</configuration>
</plugin>
<plugin>
......
......@@ -20,7 +20,7 @@
<groupId>io.seata</groupId>
<artifactId>seata-bom</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
......@@ -567,6 +567,7 @@
<serverId>oss_seata</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>false</autoReleaseAfterClose>
<skipStagingRepositoryClose>true</skipStagingRepositoryClose>
</configuration>
</plugin>
<plugin>
......
......@@ -206,6 +206,11 @@ public enum FrameworkErrorCode {
*/
StateMachineExecutionTimeout("0421", "State machine execution timeout", "State machine execution timeout"),
/**
* State machine execution no choice matched
*/
StateMachineNoChoiceMatched("0422", "State machine no choice matched", "State machine no choice matched"),
/**
* Undefined error
*/
......
......@@ -31,7 +31,7 @@ import io.seata.common.util.CollectionUtils;
*/
public class NamedThreadFactory implements ThreadFactory {
private final static Map<String, AtomicInteger> PREFIX_COUNTER = new ConcurrentHashMap<>();
private final ThreadGroup group;
private final AtomicInteger counter = new AtomicInteger(0);
private final String prefix;
private final int totalSize;
......@@ -47,6 +47,8 @@ public class NamedThreadFactory implements ThreadFactory {
public NamedThreadFactory(String prefix, int totalSize, boolean makeDaemons) {
int prefixCounter = CollectionUtils.computeIfAbsent(PREFIX_COUNTER, prefix, key -> new AtomicInteger(0))
.incrementAndGet();
SecurityManager securityManager = System.getSecurityManager();
group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.prefix = prefix + "_" + prefixCounter;
this.makeDaemons = makeDaemons;
this.totalSize = totalSize;
......@@ -78,7 +80,7 @@ public class NamedThreadFactory implements ThreadFactory {
if (totalSize > 1) {
name += "_" + totalSize;
}
Thread thread = new FastThreadLocalThread(r, name);
Thread thread = new FastThreadLocalThread(group, r, name);
thread.setDaemon(makeDaemons);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
......
......@@ -65,4 +65,12 @@ public class NamedThreadFactoryTest {
.isEqualTo(thread.getName());
}
}
@Test
public void testNamedThreadFactoryWithSecurityManager() {
NamedThreadFactory factory = new NamedThreadFactory("testThreadGroup", true);
Thread thread = factory.newThread(() -> {});
assertThat(thread.getThreadGroup()).isNotNull();
}
}
......@@ -34,6 +34,7 @@
<module>seata-compressor-7z</module>
<module>seata-compressor-bzip2</module>
<module>seata-compressor-lz4</module>
<module>seata-compressor-deflater</module>
</modules>
......
......@@ -47,6 +47,11 @@
<artifactId>seata-compressor-zip</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-compressor-deflater</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-compressor</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-compressor-deflater</artifactId>
<packaging>jar</packaging>
<name>seata-compressor-deflater ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.compressor.deflater;
import io.seata.common.loader.LoadLevel;
import io.seata.core.compressor.Compressor;
/**
* @author dongzl
*/
@LoadLevel(name = "DEFLATER")
public class DeflaterCompressor implements Compressor {
@Override
public byte[] compress(byte[] bytes) {
return DeflaterUtil.compress(bytes);
}
@Override
public byte[] decompress(byte[] bytes) {
return DeflaterUtil.decompress(bytes);
}
}
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.compressor.deflater;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
/**
* @author dongzl
*/
public class DeflaterUtil {
private DeflaterUtil() {
}
private static final int BUFFER_SIZE = 8192;
public static byte[] compress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
int lenght = 0;
Deflater deflater = new Deflater();
deflater.setInput(bytes);
deflater.finish();
byte[] outputBytes = new byte[BUFFER_SIZE];
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
while (!deflater.finished()) {
lenght = deflater.deflate(outputBytes);
bos.write(outputBytes, 0, lenght);
}
deflater.end();
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Deflater compress error", e);
}
}
public static byte[] decompress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
int length = 0;
Inflater inflater = new Inflater();
inflater.setInput(bytes);
byte[] outputBytes = new byte[BUFFER_SIZE];
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();) {
while (!inflater.finished()) {
length = inflater.inflate(outputBytes);
if (length == 0) {
break;
}
bos.write(outputBytes, 0, length);
}
inflater.end();
return bos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Deflater decompress error", e);
}
}
}
io.seata.compressor.deflater.DeflaterCompressor
\ No newline at end of file
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.compressor.deflater;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* @author dongzl
*/
public class DeflaterCompressorTest {
@Test
public void testCompressAndDecompress() {
DeflaterCompressor compressor = new DeflaterCompressor();
byte[] bytes = "seata".getBytes();
bytes = compressor.compress(bytes);
bytes = compressor.decompress(bytes);
Assertions.assertEquals(new String(bytes), "seata");
}
}
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.compressor.deflater;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* @author dongzl
*/
public class DeflaterUtilTest {
@Test
public void test_compress() {
Assertions.assertThrows(NullPointerException.class, () -> {
DeflaterUtil.compress(null);
});
}
@Test
public void test_decompress() {
Assertions.assertThrows(NullPointerException.class, () -> {
DeflaterUtil.decompress(null);
});
}
@Test
public void test_compressEqualDecompress() {
byte[] compress = DeflaterUtil.compress("seata".getBytes());
byte[] decompress = DeflaterUtil.decompress(compress);
Assertions.assertEquals("seata", new String(decompress));
}
}
......@@ -48,7 +48,12 @@ public enum CompressorType {
/**
* The lz4.
*/
LZ4((byte) 5);
LZ4((byte) 5),
/**
* The deflater.
*/
DEFLATER((byte) 6);
private final byte code;
......
......@@ -341,14 +341,14 @@ public interface ConfigurationKeys {
*/
String TRANSACTION_UNDO_LOG_TABLE = CLIENT_UNDO_PREFIX + "logTable";
/**
* The constant CLIENT_LOG_PREFIX
* The constant LOG_PREFIX
*/
String CLIENT_LOG_PREFIX = CLIENT_PREFIX + "log.";
String LOG_PREFIX = "log.";
/**
* The constant TRANSACTION_UNDO_LOG_EXCEPTION_RATE
*/
String TRANSACTION_LOG_EXCEPTION_RATE = CLIENT_LOG_PREFIX + "exceptionRate";
String TRANSACTION_LOG_EXCEPTION_RATE = LOG_PREFIX + "exceptionRate";
/**
* The constant MAX_COMMIT_RETRY_TIMEOUT.
......
......@@ -89,7 +89,7 @@ public class RootContext {
*/
public static void bind(@Nonnull String xid) {
if (StringUtils.isBlank(xid)) {
throw new IllegalArgumentException("xid must be not blank");
xid = null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind {}", xid);
......
......@@ -32,7 +32,7 @@ public class Version {
/**
* The constant CURRENT.
*/
private static final String CURRENT = "1.4.0";
private static final String CURRENT = "1.4.1";
private static final String VERSION_0_7_1 = "0.7.1";
private static final int MAX_VERSION_DOT = 3;
......
......@@ -15,12 +15,9 @@
*/
package io.seata.core.rpc;
import java.util.Set;
import java.util.TreeSet;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import io.seata.common.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,7 +32,7 @@ public class ShutdownHook extends Thread {
private static final ShutdownHook SHUTDOWN_HOOK = new ShutdownHook("ShutdownHook");
private Set<Disposable> disposables = new TreeSet<>();
private final PriorityQueue<DisposablePriorityWrapper> disposables = new PriorityQueue<>();
private final AtomicBoolean destroyed = new AtomicBoolean(false);
......@@ -70,16 +67,22 @@ public class ShutdownHook extends Thread {
}
public void destroyAll() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("destoryAll starting");
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (!destroyed.compareAndSet(false, true) && CollectionUtils.isEmpty(disposables)) {
if (disposables.isEmpty()) {
return;
}
for (Disposable disposable : disposables) {
LOGGER.debug("destoryAll starting");
while (!disposables.isEmpty()) {
Disposable disposable = disposables.poll();
disposable.destroy();
}
LOGGER.debug("destoryAll finish");
}
/**
......@@ -91,63 +94,18 @@ public class ShutdownHook extends Thread {
private static class DisposablePriorityWrapper implements Comparable<DisposablePriorityWrapper>, Disposable {
private static AtomicLong seq = new AtomicLong();
private Disposable disposable;
private final Disposable disposable;
private int priority;
private long seqId;
private final int priority;
public DisposablePriorityWrapper(Disposable disposable, int priority) {
this.disposable = disposable;
this.priority = priority;
this.seqId = seq.incrementAndGet();
}
@Override
public int compareTo(DisposablePriorityWrapper disposablePriorityWrapper) {
int cmp = priority - disposablePriorityWrapper.priority;
if (cmp == 0) {
if (seqId > disposablePriorityWrapper.seqId) {
cmp = 1;
} else if (seqId < disposablePriorityWrapper.seqId) {
cmp = -1;
} else {
cmp = 0;
}
}
return cmp;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + priority;
result = prime * result + (int) (seqId ^ (seqId >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
DisposablePriorityWrapper other = (DisposablePriorityWrapper) obj;
if (priority != other.priority) {
return false;
}
if (seqId != other.seqId) {
return false;
}
return true;
public int compareTo(DisposablePriorityWrapper challenger) {
return priority - challenger.priority;
}
@Override
......@@ -155,6 +113,5 @@ public class ShutdownHook extends Thread {
disposable.destroy();
}
}
}
......@@ -155,7 +155,11 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
// put message into basketMap
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
basket.offer(rpcMessage);
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
......
......@@ -54,7 +54,7 @@ import static io.seata.common.Constants.DBKEYS_SPLIT_CHAR;
* @author zhaojun
* @author zhangchenghui.dev@gmail.com
*/
@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyRemotingClient.class);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment