diff --git a/config/seata-config-core/src/main/java/io/seata/config/FileConfiguration.java b/config/seata-config-core/src/main/java/io/seata/config/FileConfiguration.java index 92a044c53e6db0cd3720ad8b7a4af63831447dde..6c89ec9609c00cc59a2cde6e50b68050821bc6b3 100644 --- a/config/seata-config-core/src/main/java/io/seata/config/FileConfiguration.java +++ b/config/seata-config-core/src/main/java/io/seata/config/FileConfiguration.java @@ -20,6 +20,7 @@ import java.io.UnsupportedEncodingException; import java.net.URL; import java.net.URLDecoder; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -73,6 +74,8 @@ public class FileConfiguration extends AbstractConfiguration { private final String name; + private final FileListener fileListener = new FileListener(); + private final boolean allowDynamicRefresh; /** @@ -230,8 +233,7 @@ public class FileConfiguration extends AbstractConfiguration { listenedConfigMap.put(dataId, ConfigurationFactory.getInstance().getConfig(dataId)); // Start config change listener for the dataId. - FileListener fileListener = new FileListener(dataId, listener); - fileListener.onProcessEvent(new ConfigurationChangeEvent()); + fileListener.addListener(dataId, listener); } @Override @@ -333,8 +335,8 @@ public class FileConfiguration extends AbstractConfiguration { */ class FileListener implements ConfigurationChangeListener { - private final String dataId; - private final ConfigurationChangeListener listener; + private final Map<String, Set<ConfigurationChangeListener>> dataIdMap = new HashMap<>(); + private final ExecutorService executor = new ThreadPoolExecutor(CORE_LISTENER_THREAD, MAX_LISTENER_THREAD, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("fileListener", MAX_LISTENER_THREAD)); @@ -342,30 +344,39 @@ public class FileConfiguration extends AbstractConfiguration { /** * Instantiates a new FileListener. * - * @param dataId the data id - * @param listener the listener */ - public FileListener(String dataId, ConfigurationChangeListener listener) { - this.dataId = dataId; - this.listener = listener; + FileListener() {} + + public synchronized void addListener(String dataId, ConfigurationChangeListener listener) { + // only the first time add listener will trigger on process event + if (dataIdMap.isEmpty()) { + fileListener.onProcessEvent(new ConfigurationChangeEvent()); + } + + dataIdMap .computeIfAbsent(dataId, value -> new HashSet<>()).add(listener); } @Override public void onChangeEvent(ConfigurationChangeEvent event) { while (true) { - try { - String currentConfig = - ConfigurationFactory.getInstance().getLatestConfig(dataId, null, DEFAULT_CONFIG_TIMEOUT); - if (StringUtils.isNotBlank(currentConfig)) { - String oldConfig = listenedConfigMap.get(dataId); - if (ObjectUtils.notEqual(currentConfig, oldConfig)) { - listenedConfigMap.put(dataId, currentConfig); - event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig); - listener.onChangeEvent(event); + for (String dataId : dataIdMap.keySet()) { + try { + String currentConfig = + ConfigurationFactory.getInstance().getLatestConfig(dataId, null, DEFAULT_CONFIG_TIMEOUT); + if (StringUtils.isNotBlank(currentConfig)) { + String oldConfig = listenedConfigMap.get(dataId); + if (ObjectUtils.notEqual(currentConfig, oldConfig)) { + listenedConfigMap.put(dataId, currentConfig); + event.setDataId(dataId).setNewValue(currentConfig).setOldValue(oldConfig); + + for (ConfigurationChangeListener listener : dataIdMap.get(dataId)) { + listener.onChangeEvent(event); + } + } } + } catch (Exception exx) { + LOGGER.error("fileListener execute error, dataId :{}", dataId, exx); } - } catch (Exception exx) { - LOGGER.error("fileListener execute error:{}", exx.getMessage()); } try { Thread.sleep(LISTENER_CONFIG_INTERVAL);