diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go new file mode 100644 index 0000000000000000000000000000000000000000..8dc742e5c4beddc07fafed37bd8bf4f87a6e3c07 --- /dev/null +++ b/config_center/nacos/client.go @@ -0,0 +1,247 @@ +package nacos + +import ( + "strconv" + "strings" + "sync" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/config_client" + nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" +) + +const ( + ConnDelay = 3 + MaxFailTimes = 15 +) + +type NacosClient struct { + name string + NacosAddrs []string + sync.Mutex // for Client + Client *config_client.IConfigClient + exit chan struct{} + Timeout time.Duration +} + +type Option func(*Options) + +type Options struct { + nacosName string + client *NacosClient +} + +func WithNacosName(name string) Option { + return func(opt *Options) { + opt.nacosName = name + } +} + +func ValidateNacosClient(container nacosClientFacade, opts ...Option) error { + var ( + err error + ) + opions := &Options{} + for _, opt := range opts { + opt(opions) + } + + err = nil + + lock := container.NacosClientLock() + url := container.GetUrl() + + lock.Lock() + defer lock.Unlock() + + if container.NacosClient() == nil { + //in dubbo ,every registry only connect one node ,so this is []string{r.Address} + timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT)) + if err != nil { + logger.Errorf("timeout config %v is invalid ,err is %v", + url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error()) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + nacosAddresses := strings.Split(url.Location, ",") + newClient, err := newNacosClient(opions.nacosName, nacosAddresses, timeout) + if err != nil { + logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}", + opions.nacosName, url.Location, timeout.String(), err) + return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location) + } + container.SetNacosClient(newClient) + } + + if container.NacosClient().Client == nil { + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range container.NacosClient().NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(container.NacosClient().Timeout.Nanoseconds() / 1e6), + ListenInterval: 10000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", //TODO unified log directory + }, + }) + container.NacosClient().Client = &client + if err != nil { + //TODO + } + } + + return perrors.WithMessagef(err, "newZookeeperClient(address:%+v)", url.PrimitiveURL) +} + +func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { + var ( + err error + n *NacosClient + ) + + n = &NacosClient{ + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), + } + + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range n.NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), + ListenInterval: 20000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", //TODO unified log directory + }, + }) + n.Client = &client + if err != nil { + return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) + } + + return n, nil +} + +func newMockNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) { + var ( + err error + n *NacosClient + ) + + n = &NacosClient{ + name: name, + NacosAddrs: nacosAddrs, + Timeout: timeout, + exit: make(chan struct{}), + } + + svrConfList := []nacosconst.ServerConfig{} + for _, nacosAddr := range n.NacosAddrs { + split := strings.Split(nacosAddr,":") + port, err := strconv.ParseUint(split[1], 10, 64) + if err != nil { + continue + } + svrconf := nacosconst.ServerConfig{ + IpAddr: split[0], + Port: port, + } + svrConfList = append(svrConfList, svrconf) + } + + client , err := clients.CreateConfigClient(map[string]interface{}{ + "serverConfigs": svrConfList, + "clientConfig": nacosconst.ClientConfig{ + TimeoutMs: uint64(timeout.Nanoseconds() / 1e6), + ListenInterval: 10000, + NotLoadCacheAtStart: true, + LogDir: "logs/nacos/log", ///TODO unified log directory + }, + }) + if err != nil { + return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs) + } + n.Client = &client + return n, nil +} + +func (n *NacosClient) Done() <-chan struct{} { + return n.exit +} + +func (n *NacosClient) stop() bool { + select { + case <-n.exit: + return true + default: + close(n.exit) + } + + return false +} + +func (n *NacosClient) NacosClientValid() bool { + select { + case <-n.exit: + return false + default: + } + + valid := true + n.Lock() + if n.Client == nil { + valid = false + } + n.Unlock() + + return valid +} + +func (n *NacosClient) Close() { + if n == nil { + return + } + + n.stop() + n.Lock() + if n.Client != nil { + n.Client = nil + } + n.Unlock() + logger.Warnf("nacosClient{name:%s, zk addr:%s} exit now.", n.name, n.NacosAddrs) +} \ No newline at end of file diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go new file mode 100644 index 0000000000000000000000000000000000000000..b0487b08f175b0053ad9eeacb381de92e3e8930a --- /dev/null +++ b/config_center/nacos/facade.go @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" + "time" +) +import ( + "github.com/dubbogo/getty" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/logger" +) + +type nacosClientFacade interface { + NacosClient() *NacosClient + SetNacosClient(*NacosClient) + NacosClientLock() *sync.Mutex + WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container + GetDone() chan struct{} //for nacos client control + RestartCallBack() bool + common.Node +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} + +//TODO nacos HandleClientRestart +func HandleClientRestart(r nacosClientFacade) { + var ( + err error + + failTimes int + ) + + defer r.WaitGroup().Done() +LOOP: + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...") + break LOOP + // re-register all services + case <-r.NacosClient().Done(): + r.NacosClientLock().Lock() + r.NacosClient().Close() + nacosName := r.NacosClient().name + nacosAddress := r.NacosClient().NacosAddrs + r.SetNacosClient(nil) + r.NacosClientLock().Unlock() + + // Connect nacos until success. + failTimes = 0 + for { + select { + case <-r.GetDone(): + logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...") + break LOOP + case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection nacos. + } + err = ValidateNacosClient(r, WithNacosName(nacosName)) + logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}", + nacosAddress, perrors.WithStack(err)) + if err == nil { + if r.RestartCallBack() { + break + } + } + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + } + } + } +} + + diff --git a/config_center/nacos/factory.go b/config_center/nacos/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..62c8835fa534ed0f144138c87abf0ba8a449a2c1 --- /dev/null +++ b/config_center/nacos/factory.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func init() { + extension.SetConfigCenterFactory("nacos", func() config_center.DynamicConfigurationFactory { return &nacosDynamicConfigurationFactory{} }) +} + +type nacosDynamicConfigurationFactory struct { +} + +func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) { + dynamicConfiguration, err := newNacosDynamicConfiguration(url) + if err != nil { + return nil, err + } + dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + return dynamicConfiguration, err + +} diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go new file mode 100644 index 0000000000000000000000000000000000000000..f3b79bc256cbda10fb5df751a56ea2e76c405e7c --- /dev/null +++ b/config_center/nacos/impl.go @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "sync" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +const NacosClientName = "nacos config_center" + +type nacosDynamicConfiguration struct { + url *common.URL + rootPath string + wg sync.WaitGroup + cltLock sync.Mutex + done chan struct{} + client *NacosClient + keyListeners sync.Map + parser parser.ConfigurationParser +} + +func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) { + c := &nacosDynamicConfiguration{ + rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config", + url: url, + keyListeners:sync.Map{}, + } + err := ValidateNacosClient(c, WithNacosName(NacosClientName)) + if err != nil { + logger.Errorf("nacos client start error ,error message is %v", err) + return nil, err + } + c.wg.Add(1) + go HandleClientRestart(c) + return c, err + +} + +func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.addListener(key, listener) +} + +func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) { + n.removeListener(key, listener) +} + +// 鍦╪acos group鏄痙ubbo DataId鏄痥ey configfile 鎴� appconfigfile +func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) { + + tmpOpts := &config_center.Options{} + for _, opt := range opts { + opt(tmpOpts) + } + content, err := (*n.client.Client).GetConfig(vo.ConfigParam{ + DataId: key, + Group: tmpOpts.Group, + }) + if err != nil { + return "", perrors.WithStack(err) + } else { + return string(content), nil + } + +} + +func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} + +func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) { + return n.GetProperties(key, opts...) +} + +func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser { + return n.parser +} + +func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) { + n.parser = p +} + +func (n *nacosDynamicConfiguration) NacosClient() *NacosClient { + return n.client +} + +func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) { + n.client = client +} + +func (n *nacosDynamicConfiguration) NacosClientLock() *sync.Mutex { + return &n.cltLock +} + +func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup { + return &n.wg +} + +func (n *nacosDynamicConfiguration) GetDone() chan struct{} { + return n.done +} + +func (n *nacosDynamicConfiguration) GetUrl() common.URL { + return *n.url +} + +func (n *nacosDynamicConfiguration) Destroy() { + close(n.done) + n.wg.Wait() + n.closeConfigs() +} + +func (n *nacosDynamicConfiguration) IsAvailable() bool { + select { + case <-n.done: + return false + default: + return true + } +} + +func (r *nacosDynamicConfiguration) closeConfigs() { + r.cltLock.Lock() + defer r.cltLock.Unlock() + logger.Infof("begin to close provider zk client") + // Close the old client first to close the tmp node + r.client.Close() + r.client = nil +} + +func (r *nacosDynamicConfiguration) RestartCallBack() bool { + return true +} diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1eaacdca76ee8181805d9267c998e3b842d333b6 --- /dev/null +++ b/config_center/nacos/impl_test.go @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nacos + +import ( + "context" + "fmt" + "sync" + "testing" + "time" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/config_center/parser" +) + +func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) { + regurl, _ := common.NewURL(context.TODO(), "registry://127.0.0.1:8848") + nacosConfiguration, err := newNacosDynamicConfiguration(®url) + if err != nil { + fmt.Println("error:newNacosDynamicConfiguration", err.Error()) + assert.NoError(t, err) + return nil, err + } + nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{}) + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1 + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + sucess, err := (*nacosConfiguration.client.Client).PublishConfig(vo.ConfigParam{ + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, + }) + assert.NoError(t, err) + if !sucess { + fmt.Println("error: publishconfig error", data) + } + return nacosConfiguration, err +} + +func Test_GetConfig(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo")) + m, err := nacos.Parser().Parse(configs) + assert.NoError(t, err) + fmt.Println(m) +} + +func Test_AddListener(t *testing.T) { + nacos, err := initNacosData(t) + assert.NoError(t, err) + listener := &mockDataListener{} + time.Sleep(time.Second * 2) + nacos.AddListener("dubbo.properties", listener) + listener.wg.Add(2) + fmt.Println("begin to listen") + data := ` + dubbo.service.com.ikurento.user.UserProvider.cluster=failback + dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo + dubbo.protocols.myDubbo.port=20000 + dubbo.protocols.myDubbo.name=dubbo +` + sucess, err := (*nacos.client.Client).PublishConfig(vo.ConfigParam{ + DataId: "dubbo.properties", + Group: "dubbo", + Content: data, + }) + assert.NoError(t, err) + if !sucess { + fmt.Println("error: publishconfig error", data) + } + listener.wg.Wait() + fmt.Println("end", listener.event) + +} + + +func Test_RemoveListener(t *testing.T) { + //TODO not supported in current go_nacos_sdk version +} +type mockDataListener struct { + wg sync.WaitGroup + event string +} + +func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) { + fmt.Println("process!!!!!!!!!!") + l.wg.Done() + l.event = configType.Key +} \ No newline at end of file diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go new file mode 100644 index 0000000000000000000000000000000000000000..f284f2d7526b540cd27666caaa4c8867f140707a --- /dev/null +++ b/config_center/nacos/listener.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nacos + +import ( + "context" +) + +import ( + "github.com/nacos-group/nacos-sdk-go/vo" +) + +import ( + "github.com/apache/dubbo-go/config_center" + "github.com/apache/dubbo-go/remoting" +) + +func callback(ctx context.Context, listener config_center.ConfigurationListener, namespace, group, dataId, data string) { + listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate}) +} + +func (l *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) { + _, loaded := l.keyListeners.Load(key) + if !loaded { + _, cancel := context.WithCancel(context.Background()) + (*l.client.Client).ListenConfig(vo.ConfigParam{ //TODO 杩欎釜listen鎺ュ彛搴旇瑕佹湁涓猚ontext鐨� + //(*l.client.Client).ListenConfigWithContext(ctx, vo.ConfigParam{ + DataId: key, + Group: "dubbo", + OnChange: func(namespace, group, dataId, data string) { + //go callback(ctx, listener, namespace, group, dataId, data) + go callback(context.TODO(), listener, namespace, group, dataId, data) + }, + }) + newListener := make(map[config_center.ConfigurationListener]context.CancelFunc) + newListener[listener] = cancel + l.keyListeners.Store(key, newListener) + } else { + // TODO check goroutine + } +} + +func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) { + // TODO not supported in current go_nacos_sdk version + //listeners, loaded := l.keyListeners.Load(key) + //if loaded { + // listenerMap := listeners.(map[config_center.ConfigurationListener]context.CancelFunc) + // listenerMap[listener]() + // delete(listeners.(map[config_center.ConfigurationListener]context.CancelFunc), listener) + //} +}