Skip to content
Snippets Groups Projects
Unverified Commit b5b8c0ee authored by fangyincheng's avatar fangyincheng Committed by GitHub
Browse files

Merge pull request #755 from georgehao/refact-seri

Refact seri
parents 4ccc7051 48d746fc
No related branches found
No related tags found
No related merge requests found
Showing
with 2304 additions and 27 deletions
......@@ -2,26 +2,33 @@ name: CI
on:
push:
branches: ["master", "develop"]
branches: [master, develop]
pull_request:
branches: "*"
jobs:
build:
name: ubuntu-latest ${{ matrix.config.go_version }}
runs-on: ubuntu-latest
name: ${{ matrix.os }} - Go ${{ matrix.go_version }}
runs-on: ${{ matrix.os }}
strategy:
# If you want to matrix build , you can append the following list.
matrix:
config:
- go_version: 1.13
steps:
go_version:
- 1.13
os:
- ubuntu-latest
env:
DING_TOKEN: "6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4"
DING_SIGN: "SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6"
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.config.go_version }}
go-version: ${{ matrix.go_version }}
id: go
- name: Check out code into the Go module directory
......@@ -30,17 +37,21 @@ jobs:
- name: Cache dependencies
uses: actions/cache@v2
with:
# Cache
path: ~/go/pkg/mod
# Cache key
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# An ordered list of keys to use for restoring the cache if no cache hit occurred for key
restore-keys: |
${{ runner.os }}-go-
- name: Get dependencies
run: |
go get -v -t -d ./...
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
else
go get -v -t -d ./...
fi
- name: License Check
......@@ -59,21 +70,43 @@ jobs:
- name: Coverage
run: bash <(curl -s https://codecov.io/bash)
- name: DingTalk Message Notify
# You may pin to the exact commit or the version.
# uses: zcong1993/actions-ding@2a68a4d06ed966d2e5c28178e7187c107ec57862
# Because the contexts of push and PR are different, there are two Notify.
# Notifications are triggered only in the apache/dubbo-go repository.
- name: DingTalk Message Notify only Push
uses: zcong1993/actions-ding@v3.0.1
if: ${{ github.repository == 'apache/dubbo-go' }}
# Whether job is successful or not, always () is always true.
if: |
always() &&
github.event_name == 'push' &&
github.repository == 'apache/dubbo-go'
with:
# DingDing bot token
dingToken: 6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4
secret: SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
# Post Body to send
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - ref: ${{ github.ref }} \n - status: ${{ job.status }} \n - environment: ${{ runner.os }}"
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})"
}
}
- name: DingTalk Message Notify only PR
uses: zcong1993/actions-ding@v3.0.1
if: |
always() &&
github.event_name == 'pull_request' &&
github.repository == 'apache/dubbo-go'
with:
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - pr_title: ${{ github.event.pull_request.title }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: [${{ github.ref }}](${{ github.event.pull_request._links.html.href }}) \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n > SHA: [${{ github.sha }}](${{ github.event.pull_request._links.html.href }})"
}
}
......@@ -171,6 +171,10 @@ const (
NACOS_USERNAME = "username"
)
const (
FILE_KEY = "file"
)
const (
ZOOKEEPER_KEY = "zookeeper"
)
......@@ -179,6 +183,18 @@ const (
ETCDV3_KEY = "etcdv3"
)
const (
CONSUL_KEY = "consul"
CHECK_PASS_INTERVAL = "consul-check-pass-interval"
// default time-to-live in millisecond
DEFAULT_CHECK_PASS_INTERVAL = 16000
QUERY_TAG = "consul_query_tag"
ACL_TOKEN = "acl-token"
// default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s"
DEREGISTER_AFTER = "consul-deregister-critical-service-after"
)
const (
TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx"
)
......
......@@ -399,6 +399,17 @@ func (c *URL) AddParam(key string, value string) {
c.params.Add(key, value)
}
// AddParamAvoidNil will add key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParamAvoidNil(key string, value string) {
if c.params == nil {
c.params = url.Values{}
}
c.params.Add(key, value)
}
// SetParam will put the key-value pair into url
// it's not thread safe.
// think twice before you want to use this method
......
......@@ -45,6 +45,7 @@ const (
)
type apolloConfiguration struct {
cc.BaseDynamicConfiguration
url *common.URL
listeners sync.Map
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
// BaseDynamicConfiguration will default implementation DynamicConfiguration some method
type BaseDynamicConfiguration struct {
}
// RemoveConfig
func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error {
return nil
}
......@@ -58,6 +58,9 @@ type DynamicConfiguration interface {
// PublishConfig will publish the config with the (key, group, value) pair
PublishConfig(string, string, string) error
// RemoveConfig will remove the config white the (key, group) pair
RemoveConfig(string, string) error
// GetConfigKeysByGroup will return all keys with the group
GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory {
return &fileDynamicConfigurationFactory{}
})
}
type fileDynamicConfigurationFactory struct {
}
// GetDynamicConfiguration Get Configuration with URL
func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration,
error) {
dynamicConfiguration, err := newFileSystemDynamicConfiguration(url)
if err != nil {
return nil, perrors.WithStack(err)
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"bytes"
"errors"
"io/ioutil"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"runtime"
"strings"
)
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
const (
PARAM_NAME_PREFIX = "dubbo.config-center."
CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir"
CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding"
DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8"
)
// FileSystemDynamicConfiguration
type FileSystemDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
encoding string
cacheListener *CacheListener
parser parser.ConfigurationParser
}
func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) {
encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING)
root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "")
var c *FileSystemDynamicConfiguration
if _, err := os.Stat(root); err != nil {
// not exist, use default, /XXX/xx/.dubbo/config-center
if rp, err := Home(); err != nil {
return nil, perrors.WithStack(err)
} else {
root = path.Join(rp, ".dubbo", "config-center")
}
}
if _, err := os.Stat(root); err != nil {
// it must be dir, if not exist, will create
if err = createDir(root); err != nil {
return nil, perrors.WithStack(err)
}
}
c = &FileSystemDynamicConfiguration{
url: url,
rootPath: root,
encoding: encode,
}
c.cacheListener = NewCacheListener(c.rootPath)
return c, nil
}
// RootPath get root path
func (fsdc *FileSystemDynamicConfiguration) RootPath() string {
return fsdc.rootPath
}
// Parser Get Parser
func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser {
return fsdc.parser
}
// SetParser Set Parser
func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
fsdc.parser = p
}
// AddListener Add listener
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.AddListener(path, listener)
}
// RemoveListener Remove listener
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.RemoveListener(path, listener)
}
// GetProperties get properties file
func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
file, err := ioutil.ReadFile(path)
if err != nil {
return "", perrors.WithStack(err)
}
return string(file), nil
}
// GetRule get Router rule properties file
func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
return fsdc.GetProperties(key, opts...)
}
// GetInternalProperty get value by key in Default properties file(dubbo.properties)
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string,
error) {
return fsdc.GetProperties(key, opts...)
}
// PublishConfig will publish the config with the (key, group, value) pair
func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := fsdc.GetPath(key, group)
return fsdc.write2File(path, value)
}
// GetConfigKeysByGroup will return all keys with the group
func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
path := fsdc.GetPath("", group)
r := gxset.NewSet()
fileInfo, _ := ioutil.ReadDir(path)
for _, file := range fileInfo {
// list file
if file.IsDir() {
continue
}
r.Add(file.Name())
}
return r, nil
}
// RemoveConfig will remove the config whit hte (key, group)
func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error {
path := fsdc.GetPath(key, group)
_, err := fsdc.deleteDelay(path)
return err
}
// Close close file watcher
func (fsdc *FileSystemDynamicConfiguration) Close() error {
return fsdc.cacheListener.Close()
}
// GetPath get path
func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string {
if len(key) == 0 {
return path.Join(fsdc.rootPath, group)
}
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
}
return path.Join(fsdc.rootPath, group, key)
}
func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) {
if path == "" {
return false, nil
}
if err := os.RemoveAll(path); err != nil {
return false, err
}
return true, nil
}
func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error {
if err := forceMkdirParent(fp); err != nil {
return perrors.WithStack(err)
}
return ioutil.WriteFile(fp, []byte(value), os.ModePerm)
}
func forceMkdirParent(fp string) error {
pd := getParentDirectory(fp)
return createDir(pd)
}
func createDir(path string) error {
// create dir, chmod is drwxrwxrwx(0777)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
return nil
}
func getParentDirectory(fp string) string {
return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator)))
}
func substr(s string, pos, length int) string {
runes := []rune(s)
l := pos + length
if l > len(runes) {
l = len(runes)
}
return string(runes[pos:l])
}
// Home returns the home directory for the executing user.
//
// This uses an OS-specific method for discovering the home directory.
// An error is returned if a home directory cannot be detected.
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
}
// Unix-like system, so just assume Unix
return homeUnix()
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
}
return home, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"fmt"
"os"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
const (
key = "com.dubbo.go"
)
func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
urlString := "registry://127.0.0.1:2181"
regurl, err := common.NewURL(urlString)
assert.NoError(t, err)
dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(&regurl)
assert.NoError(t, err)
return dc.(*FileSystemDynamicConfiguration), err
}
func TestPublishAndGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
err = file.PublishConfig(key, "", "A")
assert.NoError(t, err)
prop, err := file.GetProperties(key)
assert.NoError(t, err)
assert.Equal(t, "A", prop)
defer destroy(file.rootPath, file)
}
func TestAddListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestRemoveListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// make sure callback before RemoveListener
time.Sleep(time.Second)
file.RemoveListener(key, listener, config_center.WithGroup(group))
value = "Test Value 3"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfigKeysByGroup(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
gs, err := file.GetConfigKeysByGroup(group)
assert.NoError(t, err)
assert.Equal(t, 1, gs.Size())
assert.Equal(t, key, gs.Values()[0])
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetProperties(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func TestPublishConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetInternalProperty(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func destroy(path string, fdc *FileSystemDynamicConfiguration) {
fdc.Close()
os.RemoveAll(path)
}
type mockDataListener struct{}
func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) {
fmt.Printf("process!!!!! %v", configType)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"io/ioutil"
"sync"
)
import (
"github.com/fsnotify/fsnotify"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
// CacheListener is file watcher
type CacheListener struct {
watch *fsnotify.Watcher
keyListeners sync.Map
rootPath string
}
// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
cl := &CacheListener{rootPath: rootPath}
// start watcher
watch, err := fsnotify.NewWatcher()
if err != nil {
logger.Errorf("file : listen config fail, error:%v ", err)
}
go func() {
for {
select {
case event := <-watch.Events:
key := event.Name
logger.Debugf("watcher %s, event %v", cl.rootPath, event)
if event.Op&fsnotify.Write == fsnotify.Write {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeUpdate)
}
}
if event.Op&fsnotify.Create == fsnotify.Create {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeAdd)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
if l, ok := cl.keyListeners.Load(key); ok {
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel)
}
}
case err := <-watch.Errors:
// err may be nil, ignore
if err != nil {
logger.Warnf("file : listen watch fail:%+v", err)
}
}
}
}()
cl.watch = watch
extension.AddCustomShutdownCallback(func() {
cl.watch.Close()
})
return cl
}
func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
callback(l, key, "", event)
}
}
func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
c := getFileContent(key)
for l := range lmap {
callback(l, key, c, event)
}
}
func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) {
listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event})
}
// Close will remove key listener and close watcher
func (cl *CacheListener) Close() error {
cl.keyListeners.Range(func(key, value interface{}) bool {
cl.keyListeners.Delete(key)
return true
})
return cl.watch.Close()
}
// AddListener will add a listener if loaded
// if you watcher a file or directory not exist, will error with no such file or directory
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{
listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
return
}
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
}
// RemoveListener will delete a listener if loaded
func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) {
listeners, loaded := cl.keyListeners.Load(key)
if !loaded {
return
}
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
}
func getFileContent(path string) string {
c, err := ioutil.ReadFile(path)
if err != nil {
logger.Errorf("read file path:%s err:%v", path, err)
return ""
}
return string(c)
}
......@@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha
// MockDynamicConfiguration uses to parse content and defines listener
type MockDynamicConfiguration struct {
BaseDynamicConfiguration
parser parser.ConfigurationParser
content string
listener map[string]ConfigurationListener
......
......@@ -47,6 +47,7 @@ const (
// nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos
type nacosDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
......@@ -44,6 +44,7 @@ const (
)
type zookeeperDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter_impl
import (
"context"
"fmt"
"strings"
)
import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
func init() {
extension.SetFilter(SentinelProviderFilterName, GetSentinelProviderFilter)
extension.SetFilter(SentinelConsumerFilterName, GetSentinelConsumerFilter)
if err := sentinel.InitDefault(); err != nil {
logger.Errorf("[Sentinel Filter] fail to initialize Sentinel")
}
if err := logging.ResetGlobalLogger(DubboLoggerWrapper{Logger: logger.GetLogger()}); err != nil {
logger.Errorf("[Sentinel Filter] fail to ingest dubbo logger into sentinel")
}
}
type DubboLoggerWrapper struct {
logger.Logger
}
func (d DubboLoggerWrapper) Fatal(v ...interface{}) {
d.Logger.Error(v...)
}
func (d DubboLoggerWrapper) Fatalf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}
func (d DubboLoggerWrapper) Panic(v ...interface{}) {
d.Logger.Error(v...)
}
func (d DubboLoggerWrapper) Panicf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}
func GetSentinelConsumerFilter() filter.Filter {
return &SentinelConsumerFilter{}
}
func GetSentinelProviderFilter() filter.Filter {
return &SentinelProviderFilter{}
}
func sentinelExit(ctx context.Context, result protocol.Result) {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
e := methodEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil {
e := interfaceEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
}
type SentinelProviderFilter struct{}
func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getProviderPrefix())
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Inbound))
if b != nil {
// interface blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
methodEntry, b = sentinel.Entry(methodResourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}
func (d *SentinelProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
type SentinelConsumerFilter struct{}
func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix())
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound))
if b != nil {
// interface blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound), sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}
func (d *SentinelConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
var (
sentinelDubboConsumerFallback = getDefaultDubboFallback()
sentinelDubboProviderFallback = getDefaultDubboFallback()
)
type DubboFallback func(context.Context, protocol.Invoker, protocol.Invocation, *base.BlockError) protocol.Result
func SetDubboConsumerFallback(f DubboFallback) {
sentinelDubboConsumerFallback = f
}
func SetDubboProviderFallback(f DubboFallback) {
sentinelDubboProviderFallback = f
}
func getDefaultDubboFallback() DubboFallback {
return func(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, blockError *base.BlockError) protocol.Result {
result := &protocol.RPCResult{}
result.SetResult(nil)
result.SetError(blockError)
return result
}
}
const (
SentinelProviderFilterName = "sentinel-provider"
SentinelConsumerFilterName = "sentinel-consumer"
DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"
MethodEntryKey = "$$sentinelMethodEntry"
InterfaceEntryKey = "$$sentinelInterfaceEntry"
)
func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) (interfaceResourceName, methodResourceName string) {
var sb strings.Builder
sb.WriteString(prefix)
if getInterfaceGroupAndVersionEnabled() {
interfaceResourceName = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResourceName = invoker.GetUrl().Service()
}
sb.WriteString(interfaceResourceName)
sb.WriteString(":")
sb.WriteString(invocation.MethodName())
sb.WriteString("(")
isFirst := true
for _, v := range invocation.ParameterTypes() {
if !isFirst {
sb.WriteString(",")
}
sb.WriteString(v.Name())
isFirst = false
}
sb.WriteString(")")
methodResourceName = sb.String()
return
}
func getConsumerPrefix() string {
return DefaultConsumerPrefix
}
func getProviderPrefix() string {
return DefaultProviderPrefix
}
func getInterfaceGroupAndVersionEnabled() bool {
return true
}
func getColonSeparatedKey(url common.URL) string {
return fmt.Sprintf("%s:%s:%s",
url.Service(),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.VERSION_KEY, ""))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter_impl
import (
"context"
"sync"
"sync/atomic"
"testing"
)
import (
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestSentinelFilter_QPS(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, _ := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: interfaceResourceName,
MetricType: flow.QPS,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Count: 100,
RelationStrategy: flow.CurrentResource,
},
})
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(10)
f := GetSentinelProviderFilter()
pass := int64(0)
block := int64(0)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 30; j++ {
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
if result.Error() == nil {
atomic.AddInt64(&pass, 1)
} else {
atomic.AddInt64(&block, 1)
}
}
wg.Done()
}()
}
wg.Wait()
assert.True(t, atomic.LoadInt64(&pass) == 100)
assert.True(t, atomic.LoadInt64(&block) == 200)
}
func TestConsumerFilter_Invoke(t *testing.T) {
f := GetSentinelConsumerFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}
func TestProviderFilter_Invoke(t *testing.T) {
f := GetSentinelProviderFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}
func TestGetResourceName(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, methodResourceName := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
assert.Equal(t, "com.ikurento.user.UserProvider:myGroup:1.0.0", interfaceResourceName)
assert.Equal(t, "prefix_com.ikurento.user.UserProvider:myGroup:1.0.0:hello()", methodResourceName)
}
module github.com/apache/dubbo-go
require (
github.com/Microsoft/go-winio v0.4.13 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/RoaringBitmap/roaring v0.4.23
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v0.6.1
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.3.0
github.com/docker/go-connections v0.4.0 // indirect
github.com/dubbogo/go-zookeeper v1.0.1
github.com/dubbogo/gost v1.9.1
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/frankban/quicktest v1.4.1 // indirect
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect
github.com/fsnotify/fsnotify v1.4.7
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.1.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.3.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/golang/protobuf v1.4.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul v1.8.0
github.com/hashicorp/consul/api v1.5.0
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect
github.com/hashicorp/vault/sdk v0.1.14-0.20191112033314-390e96e22eb2
github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8
github.com/magiconair/properties v1.8.1
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.2.3
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v1.0.0
......@@ -46,14 +36,12 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/shirou/gopsutil v2.19.9+incompatible // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
github.com/zouyx/agollo/v3 v3.4.4
go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
google.golang.org/grpc v1.23.0
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
......@@ -65,3 +53,5 @@ require (
go 1.13
replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20140225173054-eb6ee6f84d0a
replace github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"encoding/base64"
"fmt"
"strconv"
"sync"
"time"
)
import (
"github.com/dubbogo/gost/container/set"
"github.com/dubbogo/gost/page"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
)
const (
enable = "enable"
watch_type = "type"
watch_type_service = "service"
watch_service = "service"
watch_passingonly = "passingonly"
watch_passingonly_true = true
)
var (
errConsulClientClosed = perrors.New("consul client is closed")
)
// init will put the service discovery into extension
func init() {
extension.SetServiceDiscovery(constant.CONSUL_KEY, newConsulServiceDiscovery)
}
// consulServiceDiscovery is the implementation of service discovery based on consul.
type consulServiceDiscovery struct {
// descriptor is a short string about the basic information of this instance
descriptor string
clientLock sync.RWMutex
// Consul client.
consulClient *consul.Client
checkPassInterval int64
tag string
address string
deregisterCriticalServiceAfter string
ttl sync.Map
*consul.Config
}
// newConsulServiceDiscovery will create new service discovery instance
// use double-check pattern to reduce race condition
func newConsulServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(name)
if !ok || len(sdc.RemoteRef) == 0 {
return nil, perrors.New("could not init the instance because the config is invalid")
}
remoteConfig, ok := config.GetBaseConfig().GetRemoteConfig(sdc.RemoteRef)
if !ok {
return nil, perrors.New("could not find the remote config for name: " + sdc.RemoteRef)
}
descriptor := fmt.Sprintf("consul-service-discovery[%s]", remoteConfig.Address)
config := &consul.Config{Address: remoteConfig.Address, Token: remoteConfig.Params[constant.ACL_TOKEN]}
client, err := consul.NewClient(config)
if err != nil {
return nil, perrors.WithMessage(err, "create consul client failed.")
}
return &consulServiceDiscovery{
address: remoteConfig.Address,
descriptor: descriptor,
checkPassInterval: getCheckPassInterval(remoteConfig.Params),
Config: config,
tag: remoteConfig.Params[constant.QUERY_TAG],
consulClient: client,
deregisterCriticalServiceAfter: getDeregisterAfter(remoteConfig.Params),
clientLock: sync.RWMutex{},
}, nil
}
func (csd *consulServiceDiscovery) String() string {
return csd.descriptor
}
// nolint
func (csd *consulServiceDiscovery) getConsulClient() *consul.Client {
csd.clientLock.RLock()
defer csd.clientLock.RUnlock()
return csd.consulClient
}
// nolint
func (csd *consulServiceDiscovery) setConsulClient(consulClient *consul.Client) {
csd.clientLock.Lock()
defer csd.clientLock.Unlock()
csd.consulClient = consulClient
}
func (csd *consulServiceDiscovery) Destroy() error {
csd.setConsulClient(nil)
csd.ttl.Range(func(key, t interface{}) bool {
close(t.(chan struct{}))
csd.ttl.Delete(key)
return true
})
return nil
}
func (csd *consulServiceDiscovery) Register(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
if consulClient = csd.getConsulClient(); consulClient == nil {
return errConsulClientClosed
}
err = consulClient.Agent().ServiceRegister(ins)
if err != nil {
logger.Errorf("consul register the instance %s fail:%v", instance.GetServiceName(), err)
return perrors.WithMessage(err, "consul could not register the instance. "+instance.GetServiceName())
}
return csd.registerTtl(instance)
}
func (csd *consulServiceDiscovery) registerTtl(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
checkID := buildID(instance)
stopChan := make(chan struct{})
csd.ttl.LoadOrStore(buildID(instance), stopChan)
period := time.Duration(csd.checkPassInterval/8) * time.Millisecond
timer := time.NewTicker(period)
go func() {
defer timer.Stop()
for {
select {
case <-timer.C:
if consulClient = csd.getConsulClient(); consulClient == nil {
logger.Debugf("consul client is closed!")
return
}
err = consulClient.Agent().PassTTL(fmt.Sprintf("service:%s", checkID), "")
if err != nil {
logger.Warnf("pass ttl heartbeat fail:%v", err)
break
}
logger.Debugf("passed ttl heartbeat for %s", checkID)
break
case <-stopChan:
logger.Info("ttl %s for service %s is stopped", checkID, instance.GetServiceName())
return
}
}
}()
return nil
}
func (csd *consulServiceDiscovery) Update(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
ins, _ := csd.buildRegisterInstance(instance)
consulClient = csd.getConsulClient()
if consulClient == nil {
return errConsulClientClosed
}
err = consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
logger.Warnf("unregister instance %s fail:%v", instance.GetServiceName(), err)
}
return consulClient.Agent().ServiceRegister(ins)
}
func (csd *consulServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
var (
err error
consulClient *consul.Client
)
if consulClient = csd.getConsulClient(); consulClient == nil {
return errConsulClientClosed
}
err = consulClient.Agent().ServiceDeregister(buildID(instance))
if err != nil {
logger.Errorf("unregister service instance %s,error: %v", instance.GetId(), err)
return err
}
stopChanel, ok := csd.ttl.Load(buildID(instance))
if !ok {
logger.Warnf("ttl for service instance %s didn't exist", instance.GetId())
return nil
}
close(stopChanel.(chan struct{}))
csd.ttl.Delete(buildID(instance))
return nil
}
func (csd *consulServiceDiscovery) GetDefaultPageSize() int {
return registry.DefaultPageSize
}
func (csd *consulServiceDiscovery) GetServices() *gxset.HashSet {
var (
err error
consulClient *consul.Client
services map[string][]string
)
var res = gxset.NewSet()
if consulClient = csd.getConsulClient(); consulClient == nil {
logger.Warnf("consul client is closed!")
return res
}
services, _, err = consulClient.Catalog().Services(nil)
if err != nil {
logger.Errorf("get services,error: %v", err)
return res
}
for service, _ := range services {
res.Add(service)
}
return res
}
// encodeConsulMetadata because consul validate key strictly.
func encodeConsulMetadata(metadata map[string]string) map[string]string {
consulMetadata := make(map[string]string, len(metadata))
encoder := base64.RawStdEncoding
for k, v := range metadata {
consulMetadata[encoder.EncodeToString([]byte(k))] = v
}
return consulMetadata
}
// nolint
func decodeConsulMetadata(metadata map[string]string) map[string]string {
dubboMetadata := make(map[string]string, len(metadata))
encoder := base64.RawStdEncoding
for k, v := range metadata {
kBytes, err := encoder.DecodeString(k)
if err != nil {
logger.Warnf("can not decoded consul metadata key %s", k)
continue
}
dubboMetadata[string(kBytes)] = v
}
return dubboMetadata
}
func (csd *consulServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
var (
err error
consulClient *consul.Client
instances []*consul.ServiceEntry
)
if consulClient = csd.getConsulClient(); consulClient == nil {
logger.Warn("consul client is closed!")
return nil
}
instances, _, err = consulClient.Health().Service(serviceName, csd.tag, true, &consul.QueryOptions{
WaitTime: time.Duration(csd.checkPassInterval),
})
if err != nil {
logger.Errorf("get instances for service %s,error: %v", serviceName, err)
return nil
}
res := make([]registry.ServiceInstance, 0, len(instances))
for _, ins := range instances {
metadata := ins.Service.Meta
// enable status
enableStr := metadata[enable]
delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr)
metadata = decodeConsulMetadata(metadata)
// health status
status := ins.Checks.AggregatedStatus()
healthy := false
if status == consul.HealthPassing {
healthy = true
}
res = append(res, &registry.DefaultServiceInstance{
Id: ins.Service.ID,
ServiceName: ins.Service.Service,
Host: ins.Service.Address,
Port: ins.Service.Port,
Enable: enable,
Healthy: healthy,
Metadata: metadata,
})
}
return res
}
func (csd *consulServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := csd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
for i := offset; i < len(all) && i < offset+pageSize; i++ {
res = append(res, all[i])
}
return gxpage.New(offset, pageSize, res, len(all))
}
func (csd *consulServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
all := csd.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
// could not use res = all[a:b] here because the res should be []interface{}, not []ServiceInstance
var (
i = offset
count = 0
)
for i < len(all) && count < pageSize {
ins := all[i]
if ins.IsHealthy() == healthy {
res = append(res, all[i])
count++
}
i++
}
return gxpage.New(offset, pageSize, res, len(all))
}
func (csd *consulServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
res[name] = csd.GetInstancesByPage(name, offset, requestedSize)
}
return res
}
func (csd *consulServiceDiscovery) AddListener(listener *registry.ServiceInstancesChangedListener) error {
params := make(map[string]interface{}, 8)
params[watch_type] = watch_type_service
params[watch_service] = listener.ServiceName
params[watch_passingonly] = watch_passingonly_true
plan, err := watch.Parse(params)
if err != nil {
logger.Errorf("add listener for service %s,error:%v", listener.ServiceName, err)
return err
}
plan.Handler = func(idx uint64, raw interface{}) {
services, ok := raw.([]*consul.ServiceEntry)
if !ok {
err = perrors.New("handler get non ServiceEntry type parameter")
return
}
instances := make([]registry.ServiceInstance, 0, len(services))
for _, ins := range services {
metadata := ins.Service.Meta
// enable status
enableStr := metadata[enable]
delete(metadata, enable)
enable, _ := strconv.ParseBool(enableStr)
// health status
status := ins.Checks.AggregatedStatus()
healthy := false
if status == consul.HealthPassing {
healthy = true
}
instances = append(instances, &registry.DefaultServiceInstance{
Id: ins.Service.ID,
ServiceName: ins.Service.Service,
Host: ins.Service.Address,
Port: ins.Service.Port,
Enable: enable,
Healthy: healthy,
Metadata: metadata,
})
}
e := csd.DispatchEventForInstances(listener.ServiceName, instances)
if e != nil {
logger.Errorf("Dispatching event got exception, service name: %s, err: %v", listener.ServiceName, err)
}
}
go func() {
err = plan.RunWithConfig(csd.Config.Address, csd.Config)
if err != nil {
logger.Error("consul plan run failure!error:%v", err)
}
}()
return nil
}
func (csd *consulServiceDiscovery) DispatchEventByServiceName(serviceName string) error {
return csd.DispatchEventForInstances(serviceName, csd.GetInstances(serviceName))
}
func (csd *consulServiceDiscovery) DispatchEventForInstances(serviceName string, instances []registry.ServiceInstance) error {
return csd.DispatchEvent(registry.NewServiceInstancesChangedEvent(serviceName, instances))
}
func (csd *consulServiceDiscovery) DispatchEvent(event *registry.ServiceInstancesChangedEvent) error {
extension.GetGlobalDispatcher().Dispatch(event)
return nil
}
func (csd *consulServiceDiscovery) buildRegisterInstance(instance registry.ServiceInstance) (*consul.AgentServiceRegistration, error) {
metadata := instance.GetMetadata()
metadata = encodeConsulMetadata(metadata)
metadata[enable] = strconv.FormatBool(instance.IsEnable())
// check
check := csd.buildCheck(instance)
return &consul.AgentServiceRegistration{
ID: buildID(instance),
Name: instance.GetServiceName(),
Port: instance.GetPort(),
Address: instance.GetHost(),
Meta: metadata,
Check: &check,
}, nil
}
func (csd *consulServiceDiscovery) buildCheck(instance registry.ServiceInstance) consul.AgentServiceCheck {
deregister, ok := instance.GetMetadata()[constant.DEREGISTER_AFTER]
if !ok || len(deregister) == 0 {
deregister = constant.DEFAULT_DEREGISTER_TIME
}
return consul.AgentServiceCheck{
TTL: strconv.FormatInt(csd.checkPassInterval/1000, 10) + "s",
DeregisterCriticalServiceAfter: csd.deregisterCriticalServiceAfter,
}
}
// nolint
func getCheckPassInterval(params map[string]string) int64 {
checkPassIntervalStr, ok := params[constant.CHECK_PASS_INTERVAL]
if !ok {
return constant.DEFAULT_CHECK_PASS_INTERVAL
}
checkPassInterval, err := strconv.ParseInt(checkPassIntervalStr, 10, 64)
if err != nil {
logger.Warnf("consul service discovery remote config error:%s", checkPassIntervalStr)
return constant.DEFAULT_CHECK_PASS_INTERVAL
}
return checkPassInterval
}
// nolint
func getDeregisterAfter(metadata map[string]string) string {
deregister, ok := metadata[constant.DEREGISTER_AFTER]
if !ok || len(deregister) == 0 {
deregister = constant.DEFAULT_DEREGISTER_TIME
}
return deregister
}
// nolint
func buildID(instance registry.ServiceInstance) string {
id := fmt.Sprintf("id:%s,serviceName:%s,host:%s,port:%d", instance.GetId(), instance.GetServiceName(), instance.GetHost(), instance.GetPort())
return id
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package consul
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/observer"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/remoting/consul"
)
var (
testName = "test"
consulCheckPassInterval = 17000
consulDeregisterCriticalServiceAfter = "20s"
consulWatchTimeout = 60000
registryURL = common.URL{}
)
func TestConsulServiceDiscovery_newConsulServiceDiscovery(t *testing.T) {
name := "consul1"
_, err := newConsulServiceDiscovery(name)
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
Protocol: "consul",
RemoteRef: "mock",
}
config.GetBaseConfig().ServiceDiscoveries[name] = sdc
_, err = newConsulServiceDiscovery(name)
assert.NotNil(t, err)
config.GetBaseConfig().Remotes["mock"] = &config.RemoteConfig{
Address: "localhost:8081",
}
res, err := newConsulServiceDiscovery(name)
assert.Nil(t, err)
assert.NotNil(t, res)
}
func TestConsulServiceDiscovery_Destroy(t *testing.T) {
prepareData()
serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
prepareService()
assert.Nil(t, err)
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
assert.Nil(t, serviceDiscovery.(*consulServiceDiscovery).consulClient)
}
func TestConsulServiceDiscovery_CRUD(t *testing.T) {
// start consul agent
consulAgent := consul.NewConsulAgent(t, registryPort)
defer consulAgent.Shutdown()
prepareData()
var eventDispatcher = MockEventDispatcher{Notify: make(chan struct{}, 1)}
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &eventDispatcher
})
extension.SetAndInitGlobalDispatcher("mock")
rand.Seed(time.Now().Unix())
instance, _ := prepareService()
// clean data
serviceDiscovery, err := extension.GetServiceDiscovery(constant.CONSUL_KEY, testName)
assert.Nil(t, err)
err = serviceDiscovery.Unregister(instance)
assert.Nil(t, err)
err = serviceDiscovery.Register(instance)
assert.Nil(t, err)
//sometimes nacos may be failed to push update of instance,
//so it need 10s to pull, we sleep 10 second to make sure instance has been update
time.Sleep(3 * time.Second)
page := serviceDiscovery.GetHealthyInstancesByPage(instance.GetServiceName(), 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
assert.Equal(t, 10, page.GetPageSize())
assert.Equal(t, 1, page.GetDataSize())
instanceResult := page.GetData()[0].(*registry.DefaultServiceInstance)
assert.NotNil(t, instanceResult)
assert.Equal(t, buildID(instance), instanceResult.GetId())
assert.Equal(t, instance.GetHost(), instanceResult.GetHost())
assert.Equal(t, instance.GetPort(), instanceResult.GetPort())
assert.Equal(t, instance.GetServiceName(), instanceResult.GetServiceName())
metadata := instanceResult.GetMetadata()
assert.Equal(t, 0, len(metadata))
instance.GetMetadata()["aaa"] = "bbb"
err = serviceDiscovery.Update(instance)
assert.Nil(t, err)
time.Sleep(3 * time.Second)
pageMap := serviceDiscovery.GetRequestInstances([]string{instance.GetServiceName()}, 0, 1)
assert.Equal(t, 1, len(pageMap))
page = pageMap[instance.GetServiceName()]
assert.NotNil(t, page)
assert.Equal(t, 1, len(page.GetData()))
instanceResult = page.GetData()[0].(*registry.DefaultServiceInstance)
v, _ := instanceResult.Metadata["aaa"]
assert.Equal(t, "bbb", v)
// test dispatcher event
//err = serviceDiscovery.DispatchEventByServiceName(instanceResult.GetServiceName())
//assert.Nil(t, err)
// test AddListener
err = serviceDiscovery.AddListener(&registry.ServiceInstancesChangedListener{ServiceName: instance.GetServiceName()})
assert.Nil(t, err)
err = serviceDiscovery.Unregister(instance)
assert.Nil(t, err)
timer := time.NewTimer(time.Second * 10)
select {
case <-eventDispatcher.Notify:
assert.NotNil(t, eventDispatcher.Event)
break
case <-timer.C:
assert.Fail(t, "")
break
}
}
func prepareData() {
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "consul",
RemoteRef: testName,
}
config.GetBaseConfig().Remotes[testName] = &config.RemoteConfig{
Address: fmt.Sprintf("%s:%d", registryHost, registryPort),
}
}
func prepareService() (registry.ServiceInstance, common.URL) {
id := "id"
registryUrl, _ := common.NewURL(protocol + "://" + providerHost + ":" + strconv.Itoa(providerPort) + "/" + service + "?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&consul-check-pass-interval=" + strconv.Itoa(consulCheckPassInterval) + "&consul-deregister-critical-service-after=" + consulDeregisterCriticalServiceAfter + "&" +
"consul-watch-timeout=" + strconv.Itoa(consulWatchTimeout))
return &registry.DefaultServiceInstance{
Id: id,
ServiceName: service,
Host: registryHost,
Port: registryPort,
Enable: true,
Healthy: true,
Metadata: nil,
}, registryUrl
}
type MockEventDispatcher struct {
Notify chan struct{}
Event observer.Event
}
// AddEventListener do nothing
func (m *MockEventDispatcher) AddEventListener(listener observer.EventListener) {
}
// AddEventListeners do nothing
func (m *MockEventDispatcher) AddEventListeners(listenersSlice []observer.EventListener) {
}
// RemoveEventListener do nothing
func (m *MockEventDispatcher) RemoveEventListener(listener observer.EventListener) {
}
// RemoveEventListeners do nothing
func (m *MockEventDispatcher) RemoveEventListeners(listenersSlice []observer.EventListener) {
}
// GetAllEventListeners return empty list
func (m *MockEventDispatcher) GetAllEventListeners() []observer.EventListener {
return make([]observer.EventListener, 0)
}
// RemoveAllEventListeners do nothing
func (m *MockEventDispatcher) RemoveAllEventListeners() {
}
// Dispatch do nothing
func (m *MockEventDispatcher) Dispatch(event observer.Event) {
m.Event = event
m.Notify <- struct{}{}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import "github.com/apache/dubbo-go/config_center"
// RegistryConfigurationListener represent the processor of flie watcher
type RegistryConfigurationListener struct {
}
// Process submit the ConfigChangeEvent to the event chan to notify all observer
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment