Skip to content
Snippets Groups Projects
Unverified Commit d0d3ac19 authored by Ming Deng's avatar Ming Deng Committed by GitHub
Browse files

Merge pull request #357 from zouyx/feature/nacos

Ftr : nacos config
parents 33f82cd1 f0a4d8fe
No related branches found
No related tags found
No related merge requests found
package nacos
import (
"strconv"
"strings"
"sync"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
)
const logDir = "logs/nacos/log"
// NacosClient Nacos client
type NacosClient struct {
name string
NacosAddrs []string
sync.Mutex // for Client
client *config_client.IConfigClient
exit chan struct{}
Timeout time.Duration
once sync.Once
onceClose func()
}
// Client Get Client
func (n *NacosClient) Client() *config_client.IConfigClient {
return n.client
}
// SetClient Set client
func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
n.Lock()
n.client = client
n.Unlock()
}
type option func(*options)
type options struct {
nacosName string
client *NacosClient
}
// WithNacosName Set nacos name
func WithNacosName(name string) option {
return func(opt *options) {
opt.nacosName = name
}
}
// ValidateNacosClient Validate nacos client , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
if container == nil {
return perrors.Errorf("container can not be null")
}
os := &options{}
for _, opt := range opts {
opt(os)
}
url := container.GetUrl()
if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
logger.Errorf("timeout config %v is invalid ,err is %v",
url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT), err.Error())
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
nacosAddresses := strings.Split(url.Location, ",")
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout)
if err != nil {
logger.Warnf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.SetNacosClient(newClient)
}
if container.NacosClient().Client() == nil {
svrConfList := []nacosconst.ServerConfig{}
for _, nacosAddr := range container.NacosClient().NacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
logger.Warnf("nacos addr port parse error ,error message is %v", err)
continue
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}
client, err := clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(container.NacosClient().Timeout / time.Millisecond)),
ListenInterval: 10000,
NotLoadCacheAtStart: true,
LogDir: logDir,
},
})
container.NacosClient().SetClient(&client)
if err != nil {
logger.Errorf("nacos create config client error:%v", err)
}
}
return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
}
func newNacosClient(name string, nacosAddrs []string, timeout time.Duration) (*NacosClient, error) {
var (
err error
n *NacosClient
)
n = &NacosClient{
name: name,
NacosAddrs: nacosAddrs,
Timeout: timeout,
exit: make(chan struct{}),
onceClose: func() {
close(n.exit)
},
}
svrConfList := []nacosconst.ServerConfig{}
for _, nacosAddr := range n.NacosAddrs {
split := strings.Split(nacosAddr, ":")
port, err := strconv.ParseUint(split[1], 10, 64)
if err != nil {
logger.Warnf("convert port , source:%s , error:%v ", split[1], err)
continue
}
svrconf := nacosconst.ServerConfig{
IpAddr: split[0],
Port: port,
}
svrConfList = append(svrConfList, svrconf)
}
client, err := clients.CreateConfigClient(map[string]interface{}{
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(timeout / time.Millisecond),
ListenInterval: 20000,
NotLoadCacheAtStart: true,
LogDir: logDir,
},
})
n.SetClient(&client)
if err != nil {
return nil, perrors.WithMessagef(err, "nacos clients.CreateConfigClient(nacosAddrs:%+v)", nacosAddrs)
}
return n, nil
}
// Done Get nacos client exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}
func (n *NacosClient) stop() bool {
select {
case <-n.exit:
return true
default:
n.once.Do(n.onceClose)
}
return false
}
// NacosClientValid Get nacos client valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
return false
default:
}
valid := true
n.Lock()
if n.Client() == nil {
valid = false
}
n.Unlock()
return valid
}
// Close Close nacos client , then set null
func (n *NacosClient) Close() {
if n == nil {
return
}
n.stop()
n.SetClient(nil)
logger.Warnf("nacosClient{name:%s, nacos addr:%s} exit now.", n.name, n.NacosAddrs)
}
package nacos
import (
"strings"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
)
func Test_newNacosClient(t *testing.T) {
server := mockCommonNacosServer()
nacosURL := strings.ReplaceAll(server.URL, "http", "registry")
registryUrl, _ := common.NewURL(nacosURL)
c := &nacosDynamicConfiguration{
url: &registryUrl,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
c.client.Close()
<-c.client.Done()
c.Destroy()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)
const (
connDelay = 3
maxFailTimes = 15
)
type nacosClientFacade interface {
NacosClient() *NacosClient
SetNacosClient(*NacosClient)
// WaitGroup for wait group control, zk client listener & zk client container
WaitGroup() *sync.WaitGroup
// GetDone For nacos client control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
// HandleClientRestart Restart client handler
func HandleClientRestart(r nacosClientFacade) {
var (
err error
failTimes int
)
defer r.WaitGroup().Done()
LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...")
break LOOP
// re-register all services
case <-r.NacosClient().Done():
r.NacosClient().Close()
nacosName := r.NacosClient().name
nacosAddress := r.NacosClient().NacosAddrs
r.SetNacosClient(nil)
// Connect nacos until success.
failTimes = 0
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * connDelay)): // Prevent crazy reconnection nacos.
}
err = ValidateNacosClient(r, WithNacosName(nacosName))
logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
nacosAddress, perrors.WithStack(err))
if err == nil {
break
}
failTimes++
if maxFailTimes <= failTimes {
failTimes = maxFailTimes
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory("nacos", func() config_center.DynamicConfigurationFactory { return &nacosDynamicConfigurationFactory{} })
}
type nacosDynamicConfigurationFactory struct {
}
// GetDynamicConfiguration Get Configuration with URL
func (f *nacosDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration, error) {
dynamicConfiguration, err := newNacosDynamicConfiguration(url)
if err != nil {
return nil, err
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"sync"
)
import (
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
const nacosClientName = "nacos config_center"
type nacosDynamicConfiguration struct {
url *common.URL
rootPath string
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
client *NacosClient
keyListeners sync.Map
parser parser.ConfigurationParser
}
func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration, error) {
c := &nacosDynamicConfiguration{
rootPath: "/" + url.GetParam(constant.CONFIG_NAMESPACE_KEY, config_center.DEFAULT_GROUP) + "/config",
url: url,
done: make(chan struct{}),
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
if err != nil {
logger.Errorf("nacos client start error ,error message is %v", err)
return nil, err
}
c.wg.Add(1)
go HandleClientRestart(c)
return c, err
}
// AddListener Add listener
func (n *nacosDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
n.addListener(key, listener)
}
// RemoveListener Remove listener
func (n *nacosDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
n.removeListener(key, listener)
}
//nacos distinguishes configuration files based on group and dataId. defalut group = "dubbo" and dataId = key
func (n *nacosDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) {
return n.GetRule(key, opts...)
}
// GetInternalProperty Get properties value by key
func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string, error) {
return n.GetProperties(key, opts...)
}
// GetRule Get router rule
func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{
DataId: key,
Group: tmpOpts.Group,
})
if err != nil {
return "", perrors.WithStack(err)
} else {
return string(content), nil
}
}
// Parser Get Parser
func (n *nacosDynamicConfiguration) Parser() parser.ConfigurationParser {
return n.parser
}
// SetParser Set Parser
func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
n.parser = p
}
// NacosClient Get Nacos Client
func (n *nacosDynamicConfiguration) NacosClient() *NacosClient {
return n.client
}
// SetNacosClient Set Nacos Client
func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) {
n.cltLock.Lock()
n.client = client
n.cltLock.Unlock()
}
// WaitGroup for wait group control, zk client listener & zk client container
func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &n.wg
}
// GetDone For nacos client control RestartCallBack() bool
func (n *nacosDynamicConfiguration) GetDone() chan struct{} {
return n.done
}
// GetUrl Get Url
func (n *nacosDynamicConfiguration) GetUrl() common.URL {
return *n.url
}
// Destroy Destroy configuration instance
func (n *nacosDynamicConfiguration) Destroy() {
close(n.done)
n.wg.Wait()
n.closeConfigs()
}
// IsAvailable Get available status
func (n *nacosDynamicConfiguration) IsAvailable() bool {
select {
case <-n.done:
return false
default:
return true
}
}
func (r *nacosDynamicConfiguration) closeConfigs() {
r.cltLock.Lock()
client := r.client
r.client = nil
r.cltLock.Unlock()
// Close the old client first to close the tmp node
client.Close()
logger.Infof("begin to close provider nacos client")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
// run mock config server
func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request),
configListenHandler func(http.ResponseWriter, *http.Request)) *httptest.Server {
uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request), 0)
uriHandlerMap["/nacos/v1/cs/configs"] = configHandler
uriHandlerMap["/nacos/v1/cs/configs/listener"] = configListenHandler
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
uri := r.RequestURI
for path, handler := range uriHandlerMap {
if uri == path {
handler(w, r)
break
}
}
}))
return ts
}
func mockCommonNacosServer() *httptest.Server {
return runMockConfigServer(func(writer http.ResponseWriter, request *http.Request) {
data := `
dubbo.service.com.ikurento.user.UserProvider.cluster=failback
dubbo.service.com.ikurento.user.UserProvider.protocol=myDubbo1
dubbo.protocols.myDubbo.port=20000
dubbo.protocols.myDubbo.name=dubbo
`
fmt.Fprintf(writer, "%s", data)
}, func(writer http.ResponseWriter, request *http.Request) {
data := `dubbo.properties%02dubbo%02dubbo.service.com.ikurento.user.UserProvider.cluster=failback`
fmt.Fprintf(writer, "%s", data)
})
}
func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) {
server := mockCommonNacosServer()
nacosURL := strings.ReplaceAll(server.URL, "http", "registry")
regurl, _ := common.NewURL(nacosURL)
nacosConfiguration, err := newNacosDynamicConfiguration(&regurl)
assert.NoError(t, err)
nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return nacosConfiguration, err
}
func Test_GetConfig(t *testing.T) {
nacos, err := initNacosData(t)
assert.NoError(t, err)
configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo"))
_, err = nacos.Parser().Parse(configs)
assert.NoError(t, err)
}
func Test_AddListener(t *testing.T) {
nacos, err := initNacosData(t)
assert.NoError(t, err)
listener := &mockDataListener{}
time.Sleep(time.Second * 2)
nacos.AddListener("dubbo.properties", listener)
listener.wg.Add(1)
listener.wg.Wait()
}
func Test_RemoveListener(t *testing.T) {
//TODO not supported in current go_nacos_sdk version
}
type mockDataListener struct {
wg sync.WaitGroup
event string
}
func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) {
l.wg.Done()
l.event = configType.Key
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"context"
)
import (
"github.com/nacos-group/nacos-sdk-go/vo"
)
import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
func callback(listener config_center.ConfigurationListener, namespace, group, dataId, data string) {
listener.Process(&config_center.ConfigChangeEvent{Key: dataId, Value: data, ConfigType: remoting.EventTypeUpdate})
}
func (l *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
_, loaded := l.keyListeners.Load(key)
if !loaded {
_, cancel := context.WithCancel(context.Background())
err := (*l.client.Client()).ListenConfig(vo.ConfigParam{
DataId: key,
Group: "dubbo",
OnChange: func(namespace, group, dataId, data string) {
go callback(listener, namespace, group, dataId, data)
},
})
logger.Errorf("nacos : listen config fail, error:%v ", err)
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel
l.keyListeners.Store(key, newListener)
} else {
// TODO check goroutine alive, but this version of go_nacos_sdk is not support.
logger.Infof("profile:%s. this profile is already listening", key)
}
}
func (l *nacosDynamicConfiguration) removeListener(key string, listener config_center.ConfigurationListener) {
// TODO: not supported in current go_nacos_sdk version
logger.Warn("not supported in current go_nacos_sdk version")
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment