diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
new file mode 100644
index 0000000000000000000000000000000000000000..c90f6cbe8d5702ca1e236d54340c3e681f99eddb
--- /dev/null
+++ b/.github/workflows/go.yml
@@ -0,0 +1,79 @@
+name: CI
+
+on:
+ push:
+ branches: ["master", "develop"]
+ pull_request:
+ branches: "*"
+
+jobs:
+
+ build:
+ name: ubuntu-latest ${{ matrix.config.go_version }}
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ config:
+ - go_version: 1.13
+ steps:
+
+
+ - name: Set up Go 1.x
+ uses: actions/setup-go@v2
+ with:
+ go-version: ${{ matrix.config.go_version }}
+ id: go
+
+ - name: Check out code into the Go module directory
+ uses: actions/checkout@v2
+
+ - name: Cache dependencies
+ uses: actions/cache@v2
+ with:
+ path: ~/go/pkg/mod
+ key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
+ restore-keys: |
+ ${{ runner.os }}-go-
+
+ - name: Get dependencies
+ run: |
+ go get -v -t -d ./...
+ if [ -f Gopkg.toml ]; then
+ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
+ dep ensure
+ fi
+
+ - name: License Check
+ run: |
+ go fmt ./... && [[ -z `git status -s` ]]
+ sh before_validate_license.sh
+ chmod u+x /tmp/tools/license/license-header-checker
+ /tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]]
+
+ - name: Test
+ run: |
+ chmod u+x before_ut.sh && ./before_ut.sh
+ go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
+ chmod +x integrate_test.sh && ./integrate_test.sh
+
+ - name: Coverage
+ run: bash <(curl -s https://codecov.io/bash)
+
+ - name: DingTalk Message Notify
+ # You may pin to the exact commit or the version.
+ # uses: zcong1993/actions-ding@2a68a4d06ed966d2e5c28178e7187c107ec57862
+ uses: zcong1993/actions-ding@v3.0.1
+ if: ${{ github.repository == 'apache/dubbo-go' }}
+ with:
+ # DingDing bot token
+ dingToken: 6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4
+ secret: SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6
+ # Post Body to send
+ body: |
+ {
+ "msgtype": "markdown",
+ "markdown": {
+ "title": "Github Actions",
+ "text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - ref: ${{ github.ref }} \n - status: ${{ job.status }} \n - environment: ${{ runner.os }}"
+ }
+ }
diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go
index c693b86ecdab7b32936185fe4bd614bd0f83fbeb..c522bdf0f98244f5f8bdfba6085cb45dd89c1004 100644
--- a/cluster/router/healthcheck/default_health_check.go
+++ b/cluster/router/healthcheck/default_health_check.go
@@ -31,7 +31,7 @@ import (
)
func init() {
- extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
+ extension.SetHealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}
// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
@@ -85,7 +85,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
} else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
- sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor()
+ sleepWindow := (1 << uint(diff)) * c.GetCircuitTrippedTimeoutFactor()
if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
}
diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go
index 8def727614dad8393eeef9ced5e30a056fa65461..cec4c2defc291c617a0549c3296e07851b2ec128 100644
--- a/common/extension/health_checker.go
+++ b/common/extension/health_checker.go
@@ -26,8 +26,8 @@ var (
healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker)
)
-// SethealthChecker sets the HealthChecker with @name
-func SethealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) {
+// SetHealthChecker sets the HealthChecker with @name
+func SetHealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) {
healthCheckers[name] = fcn
}
diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go
index 4e83a6f6e1ed8a57b6e6374377d08eabfb56c604..af6b114a612a465d4397be7a599ddfc9ff7edab9 100644
--- a/common/extension/health_checker_test.go
+++ b/common/extension/health_checker_test.go
@@ -32,7 +32,7 @@ import (
)
func TestGetHealthChecker(t *testing.T) {
- SethealthChecker("mock", newMockhealthCheck)
+ SetHealthChecker("mock", newMockHealthCheck)
checker := GetHealthChecker("mock", common.NewURLWithOptions())
assert.NotNil(t, checker)
}
@@ -44,6 +44,6 @@ func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
return true
}
-func newMockhealthCheck(_ *common.URL) router.HealthChecker {
+func newMockHealthCheck(_ *common.URL) router.HealthChecker {
return &mockHealthChecker{}
}
diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go
index 1b8ca222011292040c57c3e86df0438943a5b464..752f3ea38193ca46a5c9dbcb8ec4b05811d19159 100644
--- a/common/proxy/proxy_factory/default.go
+++ b/common/proxy/proxy_factory/default.go
@@ -89,6 +89,8 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
result.SetAttachments(invocation.Attachments())
url := pi.GetUrl()
+ //get providerUrl. The origin url may be is registry URL.
+ url = *getProviderURL(&url)
methodName := invocation.MethodName()
proto := url.Protocol
@@ -159,3 +161,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
}
return result
}
+
+func getProviderURL(url *common.URL) *common.URL {
+ if url.SubURL == nil {
+ return url
+ }
+ return url.SubURL
+}
diff --git a/integrate_test.sh b/integrate_test.sh
index c9c2f23b5b07f0baf96260d8092e7464d4d15659..deccda756a211821978e35b92a1f0865858ff59a 100644
--- a/integrate_test.sh
+++ b/integrate_test.sh
@@ -63,4 +63,4 @@ docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_
cd ${ROOT_DIR}
# run provider
# check consumer status
-docker run -it --network host ci-consumer
+docker run -i --network host ci-consumer
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 8940d2ab5e6d71116149a8bb2bf617776a73ed93..5d4a890c6e2f59107a1dff3fc09ebf2c37777d56 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -104,72 +104,120 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) {
}
// Notify monitor changes from registry,and update the cacheServices
-func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
- go dir.update(event)
+func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
+ go dir.refreshInvokers(events...)
}
-// update the cacheServices and subscribe service from registry
-func (dir *RegistryDirectory) update(res *registry.ServiceEvent) {
- if res == nil {
- return
+// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
+// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
+// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
+// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
+// since in batch mode, the register center handles the different type of events by itself, then notify the directory
+// a batch of 'Update' events, instead of omit the different type of event one by one.
+func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
+ var oldInvokers []protocol.Invoker
+
+ // in batch mode, it is safe to remove since we have the complete list of events.
+ if len(events) > 1 {
+ dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
+ if !dir.eventMatched(k.(string), events) {
+ if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
+ oldInvokers = append(oldInvokers, invoker)
+ }
+ }
+ return true
+ })
}
- logger.Debugf("registry update, result{%s}", res)
- logger.Debugf("update service name: %s!", res.Service)
- dir.refreshInvokers(res)
-}
-
-func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) {
- var (
- url *common.URL
- oldInvoker protocol.Invoker = nil
- )
- // judge is override or others
- if res != nil {
- url = &res.Service
- // 1.for override url in 2.6.x
- if url.Protocol == constant.OVERRIDE_PROTOCOL ||
- url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
- dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url))
- url = nil
- } else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
- url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
- url = nil
+ for _, event := range events {
+ logger.Debugf("registry update, result{%s}", event)
+ if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
+ oldInvokers = append(oldInvokers, oldInvoker)
}
- switch res.Action {
- case remoting.EventTypeAdd, remoting.EventTypeUpdate:
- logger.Infof("selector add service url{%s}", res.Service)
+ }
- var urls []*common.URL
- for _, v := range config.GetRouterURLSet().Values() {
- urls = append(urls, v.(*common.URL))
- }
+ if len(events) > 0 {
+ dir.setNewInvokers()
+ }
- if len(urls) > 0 {
- dir.SetRouters(urls)
- }
- oldInvoker = dir.cacheInvoker(url)
- case remoting.EventTypeDel:
- oldInvoker = dir.uncacheInvoker(url)
- logger.Infof("selector delete service url{%s}", res.Service)
- default:
- return
+ // After dir.cacheInvokers is updated,destroy the oldInvoker
+ // Ensure that no request will enter the oldInvoker
+ for _, invoker := range oldInvokers {
+ invoker.Destroy()
+ }
+}
+
+// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
+func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
+ for _, event := range events {
+ if dir.invokerCacheKey(&event.Service) == key {
+ return true
}
}
+ return false
+}
+// invokerCacheKey generates the key in the cache for a given URL.
+func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
+ referenceUrl := dir.GetDirectoryUrl().SubURL
+ newUrl := common.MergeUrl(url, referenceUrl)
+ return newUrl.Key()
+}
+
+// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
+func (dir *RegistryDirectory) setNewInvokers() {
newInvokers := dir.toGroupInvokers()
dir.listenerLock.Lock()
+ defer dir.listenerLock.Unlock()
dir.cacheInvokers = newInvokers
- if res != nil {
- dir.RouterChain().SetInvokers(newInvokers)
+ dir.RouterChain().SetInvokers(newInvokers)
+}
+
+// cacheInvokerByEvent caches invokers from the service event
+func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
+ // judge is override or others
+ if event != nil {
+ u := dir.convertUrl(event)
+ switch event.Action {
+ case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+ logger.Infof("selector add service url{%s}", event.Service)
+ // FIXME: routers are built in every address notification?
+ dir.configRouters()
+ return dir.cacheInvoker(u), nil
+ case remoting.EventTypeDel:
+ logger.Infof("selector delete service url{%s}", event.Service)
+ return dir.uncacheInvoker(u), nil
+ default:
+ return nil, fmt.Errorf("illegal event type: %v", event.Action)
+ }
}
- dir.listenerLock.Unlock()
- // After dir.cacheInvokers is updated,destroy the oldInvoker
- // Ensure that no request will enter the oldInvoker
- if oldInvoker != nil {
- oldInvoker.Destroy()
+ return nil, nil
+}
+
+// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above.
+func (dir *RegistryDirectory) configRouters() {
+ var urls []*common.URL
+ for _, v := range config.GetRouterURLSet().Values() {
+ urls = append(urls, v.(*common.URL))
}
+ if len(urls) > 0 {
+ dir.SetRouters(urls)
+ }
+}
+
+// convertUrl processes override:// and router://
+func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL {
+ ret := &res.Service
+ if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x
+ ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY {
+ dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret))
+ ret = nil
+ } else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router
+ ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY {
+ ret = nil
+ }
+ return ret
}
func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
@@ -215,11 +263,15 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
return groupInvokersList
}
-// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
+// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
- logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key())
- if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok {
- dir.cacheInvokersMap.Delete(url.Key())
+ return dir.uncacheInvokerWithKey(url.Key())
+}
+
+func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
+ logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key)
+ if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok {
+ dir.cacheInvokersMap.Delete(key)
return cacheInvoker.(protocol.Invoker)
}
return nil
@@ -250,6 +302,12 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
+ // if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
+ // the old invoker.
+ if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
+ return nil
+ }
+
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
@@ -348,7 +406,8 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
// Process handle events and update Invokers
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
- l.directory.refreshInvokers(nil)
+ // FIXME: this doesn't trigger dir.overrideUrl()
+ l.directory.refreshInvokers()
}
type consumerConfigurationListener struct {
@@ -374,5 +433,6 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
// Process handles events from Configuration Center and update Invokers
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
- l.directory.refreshInvokers(nil)
+ // FIXME: this doesn't trigger dir.overrideUrl()
+ l.directory.refreshInvokers()
}
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index 2fec8eaad25e716fc5ed5ee33775d8898cb212e2..9cbc4945605c2ac42242d7c0727e8cd487703c9c 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -112,8 +112,9 @@ func (r *etcdV3Registry) InitListeners() {
}
// DoRegister actually do the register job in the registry center of etcd
+// for lease
func (r *etcdV3Registry) DoRegister(root string, node string) error {
- return r.client.Create(path.Join(root, node), "")
+ return r.client.RegisterTemp(path.Join(root, node), "")
}
// nolint
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 5b2a5d66f67991cc6e7ead610b3259bb5962870d..69a31ef2f2897013cf67ed6ae320c7550c5fe912 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -32,7 +32,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
- "github.com/apache/dubbo-go/common/proxy/proxy_factory"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/config_center"
_ "github.com/apache/dubbo-go/config_center/configurator"
@@ -242,7 +241,12 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}
// Notify will be triggered when a service change notification is received.
-func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
+func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
+ if len(events) == 0 {
+ return
+ }
+
+ event := events[0]
if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(&(event.Service))
nl.doOverrideIfNecessary()
@@ -403,8 +407,6 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke
// Invoke remote service base on URL of wrappedInvoker
func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
- // get right url
- ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl())
return ivk.invoker.Invoke(ctx, invocation)
}
diff --git a/registry/registry.go b/registry/registry.go
index 855b487d4766d011eaa9d97d06b672df075e60b9..2225d2c1fc6d465bfbf27cd9622d7296e4596d8f 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -68,8 +68,11 @@ type Registry interface {
// nolint
type NotifyListener interface {
- // Notify supports notifications on the service interface and the dimension of the data type.
- Notify(*ServiceEvent)
+ // Notify supports notifications on the service interface and the dimension of the data type. When a list of
+ // events are passed in, it's considered as a complete list, on the other side, if one single event is
+ // passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
+ // the impl of NotifyListener may abandon the accumulated result from previous notifications.
+ Notify(...*ServiceEvent)
}
// Listener Deprecated!
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 4e7436e445f652d8fe1db2991c87303653beb9ec..ebd454242d49ee82c81fe1a1fae1a19980c238a4 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -408,7 +408,8 @@ func (c *Client) keepAliveKV(k string, v string) error {
return ErrNilETCDV3Client
}
- lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds()))
+ // make lease time longer, since 1 second is too short
+ lease, err := c.rawClient.Grant(c.ctx, int64(30*time.Second.Seconds()))
if err != nil {
return perrors.WithMessage(err, "grant lease")
}