Skip to content
Snippets Groups Projects
Commit 61fbc46f authored by xg.gao's avatar xg.gao
Browse files

Merge branch 'develop' into read

parents 54f254e7 ea9d6eb5
No related branches found
No related tags found
No related merge requests found
Showing
with 1243 additions and 25 deletions
name: CI
on:
push:
branches: [master, develop]
pull_request:
branches: "*"
jobs:
build:
name: ${{ matrix.os }} - Go ${{ matrix.go_version }}
runs-on: ${{ matrix.os }}
strategy:
# If you want to matrix build , you can append the following list.
matrix:
go_version:
- 1.13
os:
- ubuntu-latest
env:
DING_TOKEN: "6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4"
DING_SIGN: "SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6"
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go_version }}
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Cache dependencies
uses: actions/cache@v2
with:
# Cache
path: ~/go/pkg/mod
# Cache key
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
# An ordered list of keys to use for restoring the cache if no cache hit occurred for key
restore-keys: |
${{ runner.os }}-go-
- name: Get dependencies
run: |
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
else
go get -v -t -d ./...
fi
- name: License Check
run: |
go fmt ./... && [[ -z `git status -s` ]]
sh before_validate_license.sh
chmod u+x /tmp/tools/license/license-header-checker
/tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]]
- name: Test
run: |
chmod u+x before_ut.sh && ./before_ut.sh
go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
chmod +x integrate_test.sh && ./integrate_test.sh
- name: Coverage
run: bash <(curl -s https://codecov.io/bash)
# Because the contexts of push and PR are different, there are two Notify.
# Notifications are triggered only in the apache/dubbo-go repository.
- name: DingTalk Message Notify only Push
uses: zcong1993/actions-ding@v3.0.1
# Whether job is successful or not, always () is always true.
if: |
always() &&
github.event_name == 'push' &&
github.repository == 'apache/dubbo-go'
with:
# DingDing bot token
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
# Post Body to send
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})"
}
}
- name: DingTalk Message Notify only PR
uses: zcong1993/actions-ding@v3.0.1
if: |
always() &&
github.event_name == 'pull_request' &&
github.repository == 'apache/dubbo-go'
with:
dingToken: ${{ env.DING_TOKEN }}
secret: ${{ env.DING_SIGN }}
body: |
{
"msgtype": "markdown",
"markdown": {
"title": "Github Actions",
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - pr_title: ${{ github.event.pull_request.title }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: [${{ github.ref }}](${{ github.event.pull_request._links.html.href }}) \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n > SHA: [${{ github.sha }}](${{ github.event.pull_request._links.html.href }})"
}
}
......@@ -31,7 +31,7 @@ import (
)
func init() {
extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
extension.SetHealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}
// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
......@@ -85,7 +85,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
} else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor()
sleepWindow := (1 << uint(diff)) * c.GetCircuitTrippedTimeoutFactor()
if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
}
......
......@@ -169,6 +169,10 @@ const (
NACOS_USERNAME = "username"
)
const (
FILE_KEY = "file"
)
const (
ZOOKEEPER_KEY = "zookeeper"
)
......@@ -177,6 +181,18 @@ const (
ETCDV3_KEY = "etcdv3"
)
const (
CONSUL_KEY = "consul"
CHECK_PASS_INTERVAL = "consul-check-pass-interval"
// default time-to-live in millisecond
DEFAULT_CHECK_PASS_INTERVAL = 16000
QUERY_TAG = "consul_query_tag"
ACL_TOKEN = "acl-token"
// default deregister critical server after
DEFAULT_DEREGISTER_TIME = "20s"
DEREGISTER_AFTER = "consul-deregister-critical-service-after"
)
const (
TRACING_REMOTE_SPAN_CTX = "tracing.remote.span.ctx"
)
......
......@@ -26,8 +26,8 @@ var (
healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker)
)
// SethealthChecker sets the HealthChecker with @name
func SethealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) {
// SetHealthChecker sets the HealthChecker with @name
func SetHealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) {
healthCheckers[name] = fcn
}
......
......@@ -32,7 +32,7 @@ import (
)
func TestGetHealthChecker(t *testing.T) {
SethealthChecker("mock", newMockhealthCheck)
SetHealthChecker("mock", newMockHealthCheck)
checker := GetHealthChecker("mock", common.NewURLWithOptions())
assert.NotNil(t, checker)
}
......@@ -44,6 +44,6 @@ func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
return true
}
func newMockhealthCheck(_ *common.URL) router.HealthChecker {
func newMockHealthCheck(_ *common.URL) router.HealthChecker {
return &mockHealthChecker{}
}
......@@ -89,6 +89,8 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
result.SetAttachments(invocation.Attachments())
url := pi.GetUrl()
//get providerUrl. The origin url may be is registry URL.
url = *getProviderURL(&url)
methodName := invocation.MethodName()
proto := url.Protocol
......@@ -159,3 +161,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
}
return result
}
func getProviderURL(url *common.URL) *common.URL {
if url.SubURL == nil {
return url
}
return url.SubURL
}
......@@ -396,6 +396,17 @@ func (c *URL) AddParam(key string, value string) {
c.params.Add(key, value)
}
// AddParamAvoidNil will add key-value pair
// Not thread-safe
// think twice before using it.
func (c *URL) AddParamAvoidNil(key string, value string) {
if c.params == nil {
c.params = url.Values{}
}
c.params.Add(key, value)
}
// SetParam will put the key-value pair into url
// it's not thread safe.
// think twice before you want to use this method
......
......@@ -45,6 +45,7 @@ const (
)
type apolloConfiguration struct {
cc.BaseDynamicConfiguration
url *common.URL
listeners sync.Map
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config_center
// BaseDynamicConfiguration will default implementation DynamicConfiguration some method
type BaseDynamicConfiguration struct {
}
// RemoveConfig
func (bdc *BaseDynamicConfiguration) RemoveConfig(string, string) error {
return nil
}
......@@ -58,6 +58,9 @@ type DynamicConfiguration interface {
// PublishConfig will publish the config with the (key, group, value) pair
PublishConfig(string, string, string) error
// RemoveConfig will remove the config white the (key, group) pair
RemoveConfig(string, string) error
// GetConfigKeysByGroup will return all keys with the group
GetConfigKeysByGroup(group string) (*gxset.HashSet, error)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
func init() {
extension.SetConfigCenterFactory(constant.FILE_KEY, func() config_center.DynamicConfigurationFactory {
return &fileDynamicConfigurationFactory{}
})
}
type fileDynamicConfigurationFactory struct {
}
// GetDynamicConfiguration Get Configuration with URL
func (f *fileDynamicConfigurationFactory) GetDynamicConfiguration(url *common.URL) (config_center.DynamicConfiguration,
error) {
dynamicConfiguration, err := newFileSystemDynamicConfiguration(url)
if err != nil {
return nil, perrors.WithStack(err)
}
dynamicConfiguration.SetParser(&parser.DefaultConfigurationParser{})
return dynamicConfiguration, err
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"bytes"
"errors"
"io/ioutil"
"os"
"os/exec"
"os/user"
"path"
"path/filepath"
"runtime"
"strings"
)
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
)
const (
PARAM_NAME_PREFIX = "dubbo.config-center."
CONFIG_CENTER_DIR_PARAM_NAME = PARAM_NAME_PREFIX + "dir"
CONFIG_CENTER_ENCODING_PARAM_NAME = PARAM_NAME_PREFIX + "encoding"
DEFAULT_CONFIG_CENTER_ENCODING = "UTF-8"
)
// FileSystemDynamicConfiguration
type FileSystemDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
encoding string
cacheListener *CacheListener
parser parser.ConfigurationParser
}
func newFileSystemDynamicConfiguration(url *common.URL) (*FileSystemDynamicConfiguration, error) {
encode := url.GetParam(CONFIG_CENTER_ENCODING_PARAM_NAME, DEFAULT_CONFIG_CENTER_ENCODING)
root := url.GetParam(CONFIG_CENTER_DIR_PARAM_NAME, "")
var c *FileSystemDynamicConfiguration
if _, err := os.Stat(root); err != nil {
// not exist, use default, /XXX/xx/.dubbo/config-center
if rp, err := Home(); err != nil {
return nil, perrors.WithStack(err)
} else {
root = path.Join(rp, ".dubbo", "config-center")
}
}
if _, err := os.Stat(root); err != nil {
// it must be dir, if not exist, will create
if err = createDir(root); err != nil {
return nil, perrors.WithStack(err)
}
}
c = &FileSystemDynamicConfiguration{
url: url,
rootPath: root,
encoding: encode,
}
c.cacheListener = NewCacheListener(c.rootPath)
return c, nil
}
// RootPath get root path
func (fsdc *FileSystemDynamicConfiguration) RootPath() string {
return fsdc.rootPath
}
// Parser Get Parser
func (fsdc *FileSystemDynamicConfiguration) Parser() parser.ConfigurationParser {
return fsdc.parser
}
// SetParser Set Parser
func (fsdc *FileSystemDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
fsdc.parser = p
}
// AddListener Add listener
func (fsdc *FileSystemDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.AddListener(path, listener)
}
// RemoveListener Remove listener
func (fsdc *FileSystemDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener,
opts ...config_center.Option) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
fsdc.cacheListener.RemoveListener(path, listener)
}
// GetProperties get properties file
func (fsdc *FileSystemDynamicConfiguration) GetProperties(key string, opts ...config_center.Option) (string, error) {
tmpOpts := &config_center.Options{}
for _, opt := range opts {
opt(tmpOpts)
}
path := fsdc.GetPath(key, tmpOpts.Group)
file, err := ioutil.ReadFile(path)
if err != nil {
return "", perrors.WithStack(err)
}
return string(file), nil
}
// GetRule get Router rule properties file
func (fsdc *FileSystemDynamicConfiguration) GetRule(key string, opts ...config_center.Option) (string, error) {
return fsdc.GetProperties(key, opts...)
}
// GetInternalProperty get value by key in Default properties file(dubbo.properties)
func (fsdc *FileSystemDynamicConfiguration) GetInternalProperty(key string, opts ...config_center.Option) (string,
error) {
return fsdc.GetProperties(key, opts...)
}
// PublishConfig will publish the config with the (key, group, value) pair
func (fsdc *FileSystemDynamicConfiguration) PublishConfig(key string, group string, value string) error {
path := fsdc.GetPath(key, group)
return fsdc.write2File(path, value)
}
// GetConfigKeysByGroup will return all keys with the group
func (fsdc *FileSystemDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
path := fsdc.GetPath("", group)
r := gxset.NewSet()
fileInfo, _ := ioutil.ReadDir(path)
for _, file := range fileInfo {
// list file
if file.IsDir() {
continue
}
r.Add(file.Name())
}
return r, nil
}
// RemoveConfig will remove the config whit hte (key, group)
func (fsdc *FileSystemDynamicConfiguration) RemoveConfig(key string, group string) error {
path := fsdc.GetPath(key, group)
_, err := fsdc.deleteDelay(path)
return err
}
// Close close file watcher
func (fsdc *FileSystemDynamicConfiguration) Close() error {
return fsdc.cacheListener.Close()
}
// GetPath get path
func (fsdc *FileSystemDynamicConfiguration) GetPath(key string, group string) string {
if len(key) == 0 {
return path.Join(fsdc.rootPath, group)
}
if len(group) == 0 {
group = config_center.DEFAULT_GROUP
}
return path.Join(fsdc.rootPath, group, key)
}
func (fsdc *FileSystemDynamicConfiguration) deleteDelay(path string) (bool, error) {
if path == "" {
return false, nil
}
if err := os.RemoveAll(path); err != nil {
return false, err
}
return true, nil
}
func (fsdc *FileSystemDynamicConfiguration) write2File(fp string, value string) error {
if err := forceMkdirParent(fp); err != nil {
return perrors.WithStack(err)
}
return ioutil.WriteFile(fp, []byte(value), os.ModePerm)
}
func forceMkdirParent(fp string) error {
pd := getParentDirectory(fp)
return createDir(pd)
}
func createDir(path string) error {
// create dir, chmod is drwxrwxrwx(0777)
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return err
}
return nil
}
func getParentDirectory(fp string) string {
return substr(fp, 0, strings.LastIndex(fp, string(filepath.Separator)))
}
func substr(s string, pos, length int) string {
runes := []rune(s)
l := pos + length
if l > len(runes) {
l = len(runes)
}
return string(runes[pos:l])
}
// Home returns the home directory for the executing user.
//
// This uses an OS-specific method for discovering the home directory.
// An error is returned if a home directory cannot be detected.
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
}
// Unix-like system, so just assume Unix
return homeUnix()
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
}
return home, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"fmt"
"os"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/config_center"
)
const (
key = "com.dubbo.go"
)
func initFileData(t *testing.T) (*FileSystemDynamicConfiguration, error) {
urlString := "registry://127.0.0.1:2181"
regurl, err := common.NewURL(urlString)
assert.NoError(t, err)
dc, err := extension.GetConfigCenterFactory("file").GetDynamicConfiguration(&regurl)
assert.NoError(t, err)
return dc.(*FileSystemDynamicConfiguration), err
}
func TestPublishAndGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
err = file.PublishConfig(key, "", "A")
assert.NoError(t, err)
prop, err := file.GetProperties(key)
assert.NoError(t, err)
assert.Equal(t, "A", prop)
defer destroy(file.rootPath, file)
}
func TestAddListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestRemoveListener(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
listener := &mockDataListener{}
file.AddListener(key, listener, config_center.WithGroup(group))
value = "Test Value 2"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// make sure callback before RemoveListener
time.Sleep(time.Second)
file.RemoveListener(key, listener, config_center.WithGroup(group))
value = "Test Value 3"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfigKeysByGroup(t *testing.T) {
file, err := initFileData(t)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
gs, err := file.GetConfigKeysByGroup(group)
assert.NoError(t, err)
assert.Equal(t, 1, gs.Size())
assert.Equal(t, key, gs.Values()[0])
// remove need wait a moment
time.Sleep(time.Second)
defer destroy(file.rootPath, file)
}
func TestGetConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetProperties(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func TestPublishConfig(t *testing.T) {
file, err := initFileData(t)
assert.NoError(t, err)
group := "dubbogo"
value := "Test Value"
err = file.PublishConfig(key, group, value)
assert.NoError(t, err)
prop, err := file.GetInternalProperty(key, config_center.WithGroup(group))
assert.NoError(t, err)
assert.Equal(t, value, prop)
defer destroy(file.rootPath, file)
}
func destroy(path string, fdc *FileSystemDynamicConfiguration) {
fdc.Close()
os.RemoveAll(path)
}
type mockDataListener struct{}
func (l *mockDataListener) Process(configType *config_center.ConfigChangeEvent) {
fmt.Printf("process!!!!! %v", configType)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package file
import (
"io/ioutil"
"sync"
)
import (
"github.com/fsnotify/fsnotify"
)
import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)
// CacheListener is file watcher
type CacheListener struct {
watch *fsnotify.Watcher
keyListeners sync.Map
rootPath string
}
// NewCacheListener creates a new CacheListener
func NewCacheListener(rootPath string) *CacheListener {
cl := &CacheListener{rootPath: rootPath}
// start watcher
watch, err := fsnotify.NewWatcher()
if err != nil {
logger.Errorf("file : listen config fail, error:%v ", err)
}
go func() {
for {
select {
case event := <-watch.Events:
key := event.Name
logger.Debugf("watcher %s, event %v", cl.rootPath, event)
if event.Op&fsnotify.Write == fsnotify.Write {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeUpdate)
}
}
if event.Op&fsnotify.Create == fsnotify.Create {
if l, ok := cl.keyListeners.Load(key); ok {
dataChangeCallback(l.(map[config_center.ConfigurationListener]struct{}), key,
remoting.EventTypeAdd)
}
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
if l, ok := cl.keyListeners.Load(key); ok {
removeCallback(l.(map[config_center.ConfigurationListener]struct{}), key, remoting.EventTypeDel)
}
}
case err := <-watch.Errors:
// err may be nil, ignore
if err != nil {
logger.Warnf("file : listen watch fail:%+v", err)
}
}
}
}()
cl.watch = watch
extension.AddCustomShutdownCallback(func() {
cl.watch.Close()
})
return cl
}
func removeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
for l := range lmap {
callback(l, key, "", event)
}
}
func dataChangeCallback(lmap map[config_center.ConfigurationListener]struct{}, key string, event remoting.EventType) {
if len(lmap) == 0 {
logger.Warnf("file watch callback but configuration listener is empty, key:%s, event:%v", key, event)
return
}
c := getFileContent(key)
for l := range lmap {
callback(l, key, c, event)
}
}
func callback(listener config_center.ConfigurationListener, path, data string, event remoting.EventType) {
listener.Process(&config_center.ConfigChangeEvent{Key: path, Value: data, ConfigType: event})
}
// Close will remove key listener and close watcher
func (cl *CacheListener) Close() error {
cl.keyListeners.Range(func(key, value interface{}) bool {
cl.keyListeners.Delete(key)
return true
})
return cl.watch.Close()
}
// AddListener will add a listener if loaded
// if you watcher a file or directory not exist, will error with no such file or directory
func (cl *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
listeners, loaded := cl.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{
listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
cl.keyListeners.Store(key, listeners)
return
}
if err := cl.watch.Add(key); err != nil {
logger.Errorf("watcher add path:%s err:%v", key, err)
}
}
// RemoveListener will delete a listener if loaded
func (cl *CacheListener) RemoveListener(key string, listener config_center.ConfigurationListener) {
listeners, loaded := cl.keyListeners.Load(key)
if !loaded {
return
}
delete(listeners.(map[config_center.ConfigurationListener]struct{}), listener)
if err := cl.watch.Remove(key); err != nil {
logger.Errorf("watcher remove path:%s err:%v", key, err)
}
}
func getFileContent(path string) string {
c, err := ioutil.ReadFile(path)
if err != nil {
logger.Errorf("read file path:%s err:%v", path, err)
return ""
}
return string(c)
}
......@@ -98,6 +98,7 @@ func (c *MockDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.Ha
// MockDynamicConfiguration uses to parse content and defines listener
type MockDynamicConfiguration struct {
BaseDynamicConfiguration
parser parser.ConfigurationParser
content string
listener map[string]ConfigurationListener
......
......@@ -47,6 +47,7 @@ const (
// nacosDynamicConfiguration is the implementation of DynamicConfiguration based on nacos
type nacosDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
......@@ -44,6 +44,7 @@ const (
)
type zookeeperDynamicConfiguration struct {
config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
wg sync.WaitGroup
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter_impl
import (
"context"
"fmt"
"strings"
)
import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
func init() {
extension.SetFilter(SentinelProviderFilterName, GetSentinelProviderFilter)
extension.SetFilter(SentinelConsumerFilterName, GetSentinelConsumerFilter)
if err := sentinel.InitDefault(); err != nil {
logger.Errorf("[Sentinel Filter] fail to initialize Sentinel")
}
if err := logging.ResetGlobalLogger(DubboLoggerWrapper{Logger: logger.GetLogger()}); err != nil {
logger.Errorf("[Sentinel Filter] fail to ingest dubbo logger into sentinel")
}
}
type DubboLoggerWrapper struct {
logger.Logger
}
func (d DubboLoggerWrapper) Fatal(v ...interface{}) {
d.Logger.Error(v...)
}
func (d DubboLoggerWrapper) Fatalf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}
func (d DubboLoggerWrapper) Panic(v ...interface{}) {
d.Logger.Error(v...)
}
func (d DubboLoggerWrapper) Panicf(format string, v ...interface{}) {
d.Logger.Errorf(format, v...)
}
func GetSentinelConsumerFilter() filter.Filter {
return &SentinelConsumerFilter{}
}
func GetSentinelProviderFilter() filter.Filter {
return &SentinelProviderFilter{}
}
func sentinelExit(ctx context.Context, result protocol.Result) {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
e := methodEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
if interfaceEntry := ctx.Value(InterfaceEntryKey); interfaceEntry != nil {
e := interfaceEntry.(*base.SentinelEntry)
sentinel.TraceError(e, result.Error())
e.Exit()
}
}
type SentinelProviderFilter struct{}
func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getProviderPrefix())
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Inbound))
if b != nil {
// interface blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
methodEntry, b = sentinel.Entry(methodResourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboProviderFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}
func (d *SentinelProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
type SentinelConsumerFilter struct{}
func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix())
var (
interfaceEntry *base.SentinelEntry
methodEntry *base.SentinelEntry
b *base.BlockError
)
interfaceEntry, b = sentinel.Entry(interfaceResourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound))
if b != nil {
// interface blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, InterfaceEntryKey, interfaceEntry)
methodEntry, b = sentinel.Entry(methodResourceName, sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound), sentinel.WithArgs(invocation.Arguments()...))
if b != nil {
// method blocked
return sentinelDubboConsumerFallback(ctx, invoker, invocation, b)
}
ctx = context.WithValue(ctx, MethodEntryKey, methodEntry)
return invoker.Invoke(ctx, invocation)
}
func (d *SentinelConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
var (
sentinelDubboConsumerFallback = getDefaultDubboFallback()
sentinelDubboProviderFallback = getDefaultDubboFallback()
)
type DubboFallback func(context.Context, protocol.Invoker, protocol.Invocation, *base.BlockError) protocol.Result
func SetDubboConsumerFallback(f DubboFallback) {
sentinelDubboConsumerFallback = f
}
func SetDubboProviderFallback(f DubboFallback) {
sentinelDubboProviderFallback = f
}
func getDefaultDubboFallback() DubboFallback {
return func(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, blockError *base.BlockError) protocol.Result {
result := &protocol.RPCResult{}
result.SetResult(nil)
result.SetError(blockError)
return result
}
}
const (
SentinelProviderFilterName = "sentinel-provider"
SentinelConsumerFilterName = "sentinel-consumer"
DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"
MethodEntryKey = "$$sentinelMethodEntry"
InterfaceEntryKey = "$$sentinelInterfaceEntry"
)
func getResourceName(invoker protocol.Invoker, invocation protocol.Invocation, prefix string) (interfaceResourceName, methodResourceName string) {
var sb strings.Builder
sb.WriteString(prefix)
if getInterfaceGroupAndVersionEnabled() {
interfaceResourceName = getColonSeparatedKey(invoker.GetUrl())
} else {
interfaceResourceName = invoker.GetUrl().Service()
}
sb.WriteString(interfaceResourceName)
sb.WriteString(":")
sb.WriteString(invocation.MethodName())
sb.WriteString("(")
isFirst := true
for _, v := range invocation.ParameterTypes() {
if !isFirst {
sb.WriteString(",")
}
sb.WriteString(v.Name())
isFirst = false
}
sb.WriteString(")")
methodResourceName = sb.String()
return
}
func getConsumerPrefix() string {
return DefaultConsumerPrefix
}
func getProviderPrefix() string {
return DefaultProviderPrefix
}
func getInterfaceGroupAndVersionEnabled() bool {
return true
}
func getColonSeparatedKey(url common.URL) string {
return fmt.Sprintf("%s:%s:%s",
url.Service(),
url.GetParam(constant.GROUP_KEY, ""),
url.GetParam(constant.VERSION_KEY, ""))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package filter_impl
import (
"context"
"sync"
"sync/atomic"
"testing"
)
import (
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestSentinelFilter_QPS(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, _ := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: interfaceResourceName,
MetricType: flow.QPS,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Count: 100,
RelationStrategy: flow.CurrentResource,
},
})
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(10)
f := GetSentinelProviderFilter()
pass := int64(0)
block := int64(0)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 30; j++ {
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
if result.Error() == nil {
atomic.AddInt64(&pass, 1)
} else {
atomic.AddInt64(&block, 1)
}
}
wg.Done()
}()
}
wg.Wait()
assert.True(t, atomic.LoadInt64(&pass) == 100)
assert.True(t, atomic.LoadInt64(&block) == 200)
}
func TestConsumerFilter_Invoke(t *testing.T) {
f := GetSentinelConsumerFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}
func TestProviderFilter_Invoke(t *testing.T) {
f := GetSentinelProviderFilter()
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
mockInvocation := invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{}))
result := f.Invoke(context.TODO(), mockInvoker, mockInvocation)
assert.NoError(t, result.Error())
}
func TestGetResourceName(t *testing.T) {
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"version=1.0.0&group=myGroup&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
"module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&" +
"side=provider&timeout=3000&timestamp=1556509797245&bean.name=UserProvider")
assert.NoError(t, err)
mockInvoker := protocol.NewBaseInvoker(url)
interfaceResourceName, methodResourceName := getResourceName(mockInvoker,
invocation.NewRPCInvocation("hello", []interface{}{"OK"}, make(map[string]interface{})), "prefix_")
assert.Equal(t, "com.ikurento.user.UserProvider:myGroup:1.0.0", interfaceResourceName)
assert.Equal(t, "prefix_com.ikurento.user.UserProvider:myGroup:1.0.0:hello()", methodResourceName)
}
module github.com/apache/dubbo-go
require (
github.com/Microsoft/go-winio v0.4.13 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/RoaringBitmap/roaring v0.4.23
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/alibaba/sentinel-golang v0.6.1
github.com/apache/dubbo-getty v1.3.10
github.com/apache/dubbo-go-hessian2 v1.6.0-rc1.0.20200906044240-6c1fb5c3bd44
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/apache/dubbo-go-hessian2 v1.7.0
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.3.0
github.com/docker/go-connections v0.4.0 // indirect
github.com/dubbogo/go-zookeeper v1.0.1
github.com/dubbogo/gost v1.9.1
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/frankban/quicktest v1.4.1 // indirect
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 // indirect
github.com/fsnotify/fsnotify v1.4.7
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.1.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.3.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
github.com/golang/protobuf v1.4.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/consul v1.8.0
github.com/hashicorp/consul/api v1.5.0
github.com/hashicorp/go-raftchunking v0.6.3-0.20191002164813-7e9e8525653a // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect
github.com/hashicorp/vault/sdk v0.1.14-0.20191112033314-390e96e22eb2
github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8
github.com/magiconair/properties v1.8.1
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.2.3
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v1.0.0
......@@ -45,22 +36,22 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/shirou/gopsutil v2.19.9+incompatible // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
github.com/zouyx/agollo/v3 v3.4.4
go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
google.golang.org/grpc v1.23.0
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
k8s.io/client-go v0.16.9
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a // indirect
)
replace (
github.com/envoyproxy/go-control-plane => github.com/envoyproxy/go-control-plane v0.8.0
launchpad.net/gocheck => github.com/go-check/check v0.0.0-20140225173054-eb6ee6f84d0a
)
go 1.13
replace launchpad.net/gocheck => github.com/go-check/check v0.0.0-20140225173054-eb6ee6f84d0a
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment