diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000000000000000000000000000000000000..c90f6cbe8d5702ca1e236d54340c3e681f99eddb --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,79 @@ +name: CI + +on: + push: + branches: ["master", "develop"] + pull_request: + branches: "*" + +jobs: + + build: + name: ubuntu-latest ${{ matrix.config.go_version }} + runs-on: ubuntu-latest + strategy: + matrix: + config: + - go_version: 1.13 + steps: + + + - name: Set up Go 1.x + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.config.go_version }} + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Cache dependencies + uses: actions/cache@v2 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + - name: Get dependencies + run: | + go get -v -t -d ./... + if [ -f Gopkg.toml ]; then + curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh + dep ensure + fi + + - name: License Check + run: | + go fmt ./... && [[ -z `git status -s` ]] + sh before_validate_license.sh + chmod u+x /tmp/tools/license/license-header-checker + /tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]] + + - name: Test + run: | + chmod u+x before_ut.sh && ./before_ut.sh + go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic + chmod +x integrate_test.sh && ./integrate_test.sh + + - name: Coverage + run: bash <(curl -s https://codecov.io/bash) + + - name: DingTalk Message Notify + # You may pin to the exact commit or the version. + # uses: zcong1993/actions-ding@2a68a4d06ed966d2e5c28178e7187c107ec57862 + uses: zcong1993/actions-ding@v3.0.1 + if: ${{ github.repository == 'apache/dubbo-go' }} + with: + # DingDing bot token + dingToken: 6374f1bf8d4f23cde81d4a4b8c1f0bc98cc92b5151ca938ab938d3d7f4230fc4 + secret: SECa98677289194bb0e5caec3051301d06515750ff1bd2f932a4704298afb2e0ae6 + # Post Body to send + body: | + { + "msgtype": "markdown", + "markdown": { + "title": "Github Actions", + "text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - ref: ${{ github.ref }} \n - status: ${{ job.status }} \n - environment: ${{ runner.os }}" + } + } diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index c693b86ecdab7b32936185fe4bd614bd0f83fbeb..c522bdf0f98244f5f8bdfba6085cb45dd89c1004 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -31,7 +31,7 @@ import ( ) func init() { - extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) + extension.SetHealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of @@ -85,7 +85,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol } else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } - sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor() + sleepWindow := (1 << uint(diff)) * c.GetCircuitTrippedTimeoutFactor() if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go index 8def727614dad8393eeef9ced5e30a056fa65461..cec4c2defc291c617a0549c3296e07851b2ec128 100644 --- a/common/extension/health_checker.go +++ b/common/extension/health_checker.go @@ -26,8 +26,8 @@ var ( healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) ) -// SethealthChecker sets the HealthChecker with @name -func SethealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) { +// SetHealthChecker sets the HealthChecker with @name +func SetHealthChecker(name string, fcn func(_ *common.URL) router.HealthChecker) { healthCheckers[name] = fcn } diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go index 4e83a6f6e1ed8a57b6e6374377d08eabfb56c604..af6b114a612a465d4397be7a599ddfc9ff7edab9 100644 --- a/common/extension/health_checker_test.go +++ b/common/extension/health_checker_test.go @@ -32,7 +32,7 @@ import ( ) func TestGetHealthChecker(t *testing.T) { - SethealthChecker("mock", newMockhealthCheck) + SetHealthChecker("mock", newMockHealthCheck) checker := GetHealthChecker("mock", common.NewURLWithOptions()) assert.NotNil(t, checker) } @@ -44,6 +44,6 @@ func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool { return true } -func newMockhealthCheck(_ *common.URL) router.HealthChecker { +func newMockHealthCheck(_ *common.URL) router.HealthChecker { return &mockHealthChecker{} } diff --git a/common/proxy/proxy_factory/default.go b/common/proxy/proxy_factory/default.go index 1b8ca222011292040c57c3e86df0438943a5b464..752f3ea38193ca46a5c9dbcb8ec4b05811d19159 100644 --- a/common/proxy/proxy_factory/default.go +++ b/common/proxy/proxy_factory/default.go @@ -89,6 +89,8 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati result.SetAttachments(invocation.Attachments()) url := pi.GetUrl() + //get providerUrl. The origin url may be is registry URL. + url = *getProviderURL(&url) methodName := invocation.MethodName() proto := url.Protocol @@ -159,3 +161,10 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati } return result } + +func getProviderURL(url *common.URL) *common.URL { + if url.SubURL == nil { + return url + } + return url.SubURL +} diff --git a/integrate_test.sh b/integrate_test.sh index c9c2f23b5b07f0baf96260d8092e7464d4d15659..deccda756a211821978e35b92a1f0865858ff59a 100644 --- a/integrate_test.sh +++ b/integrate_test.sh @@ -63,4 +63,4 @@ docker build . -t ci-consumer --build-arg PR_ORIGIN_REPO=${TRAVIS_PULL_REQUEST_ cd ${ROOT_DIR} # run provider # check consumer status -docker run -it --network host ci-consumer +docker run -i --network host ci-consumer diff --git a/registry/directory/directory.go b/registry/directory/directory.go index 8940d2ab5e6d71116149a8bb2bf617776a73ed93..5d4a890c6e2f59107a1dff3fc09ebf2c37777d56 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -104,72 +104,120 @@ func (dir *RegistryDirectory) subscribe(url *common.URL) { } // Notify monitor changes from registry,and update the cacheServices -func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) { - go dir.update(event) +func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) { + go dir.refreshInvokers(events...) } -// update the cacheServices and subscribe service from registry -func (dir *RegistryDirectory) update(res *registry.ServiceEvent) { - if res == nil { - return +// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single +// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is +// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore +// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary +// since in batch mode, the register center handles the different type of events by itself, then notify the directory +// a batch of 'Update' events, instead of omit the different type of event one by one. +func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) { + var oldInvokers []protocol.Invoker + + // in batch mode, it is safe to remove since we have the complete list of events. + if len(events) > 1 { + dir.cacheInvokersMap.Range(func(k, v interface{}) bool { + if !dir.eventMatched(k.(string), events) { + if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil { + oldInvokers = append(oldInvokers, invoker) + } + } + return true + }) } - logger.Debugf("registry update, result{%s}", res) - logger.Debugf("update service name: %s!", res.Service) - dir.refreshInvokers(res) -} - -func (dir *RegistryDirectory) refreshInvokers(res *registry.ServiceEvent) { - var ( - url *common.URL - oldInvoker protocol.Invoker = nil - ) - // judge is override or others - if res != nil { - url = &res.Service - // 1.for override url in 2.6.x - if url.Protocol == constant.OVERRIDE_PROTOCOL || - url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { - dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(url)) - url = nil - } else if url.Protocol == constant.ROUTER_PROTOCOL || // 2.for router - url.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { - url = nil + for _, event := range events { + logger.Debugf("registry update, result{%s}", event) + if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil { + oldInvokers = append(oldInvokers, oldInvoker) } - switch res.Action { - case remoting.EventTypeAdd, remoting.EventTypeUpdate: - logger.Infof("selector add service url{%s}", res.Service) + } - var urls []*common.URL - for _, v := range config.GetRouterURLSet().Values() { - urls = append(urls, v.(*common.URL)) - } + if len(events) > 0 { + dir.setNewInvokers() + } - if len(urls) > 0 { - dir.SetRouters(urls) - } - oldInvoker = dir.cacheInvoker(url) - case remoting.EventTypeDel: - oldInvoker = dir.uncacheInvoker(url) - logger.Infof("selector delete service url{%s}", res.Service) - default: - return + // After dir.cacheInvokers is updated,destroy the oldInvoker + // Ensure that no request will enter the oldInvoker + for _, invoker := range oldInvokers { + invoker.Destroy() + } +} + +// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove. +func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool { + for _, event := range events { + if dir.invokerCacheKey(&event.Service) == key { + return true } } + return false +} +// invokerCacheKey generates the key in the cache for a given URL. +func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string { + referenceUrl := dir.GetDirectoryUrl().SubURL + newUrl := common.MergeUrl(url, referenceUrl) + return newUrl.Key() +} + +// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain. +func (dir *RegistryDirectory) setNewInvokers() { newInvokers := dir.toGroupInvokers() dir.listenerLock.Lock() + defer dir.listenerLock.Unlock() dir.cacheInvokers = newInvokers - if res != nil { - dir.RouterChain().SetInvokers(newInvokers) + dir.RouterChain().SetInvokers(newInvokers) +} + +// cacheInvokerByEvent caches invokers from the service event +func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) { + // judge is override or others + if event != nil { + u := dir.convertUrl(event) + switch event.Action { + case remoting.EventTypeAdd, remoting.EventTypeUpdate: + logger.Infof("selector add service url{%s}", event.Service) + // FIXME: routers are built in every address notification? + dir.configRouters() + return dir.cacheInvoker(u), nil + case remoting.EventTypeDel: + logger.Infof("selector delete service url{%s}", event.Service) + return dir.uncacheInvoker(u), nil + default: + return nil, fmt.Errorf("illegal event type: %v", event.Action) + } } - dir.listenerLock.Unlock() - // After dir.cacheInvokers is updated,destroy the oldInvoker - // Ensure that no request will enter the oldInvoker - if oldInvoker != nil { - oldInvoker.Destroy() + return nil, nil +} + +// configRouters configures dynamic routers into the router chain, but, the current impl is incorrect, see FIXME above. +func (dir *RegistryDirectory) configRouters() { + var urls []*common.URL + for _, v := range config.GetRouterURLSet().Values() { + urls = append(urls, v.(*common.URL)) } + if len(urls) > 0 { + dir.SetRouters(urls) + } +} + +// convertUrl processes override:// and router:// +func (dir *RegistryDirectory) convertUrl(res *registry.ServiceEvent) *common.URL { + ret := &res.Service + if ret.Protocol == constant.OVERRIDE_PROTOCOL || // 1.for override url in 2.6.x + ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.CONFIGURATORS_CATEGORY { + dir.configurators = append(dir.configurators, extension.GetDefaultConfigurator(ret)) + ret = nil + } else if ret.Protocol == constant.ROUTER_PROTOCOL || // 2.for router + ret.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY) == constant.ROUTER_CATEGORY { + ret = nil + } + return ret } func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { @@ -215,11 +263,15 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -// uncacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil +// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { - logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) - if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { - dir.cacheInvokersMap.Delete(url.Key()) + return dir.uncacheInvokerWithKey(url.Key()) +} + +func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker { + logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", key) + if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); ok { + dir.cacheInvokersMap.Delete(key) return cacheInvoker.(protocol.Invoker) } return nil @@ -250,6 +302,12 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) } } else { + // if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy + // the old invoker. + if common.IsEquals(*newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) { + return nil + } + logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) if newInvoker != nil { @@ -348,7 +406,8 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL) // Process handle events and update Invokers func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) { l.BaseConfigurationListener.Process(event) - l.directory.refreshInvokers(nil) + // FIXME: this doesn't trigger dir.overrideUrl() + l.directory.refreshInvokers() } type consumerConfigurationListener struct { @@ -374,5 +433,6 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti // Process handles events from Configuration Center and update Invokers func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) { l.BaseConfigurationListener.Process(event) - l.directory.refreshInvokers(nil) + // FIXME: this doesn't trigger dir.overrideUrl() + l.directory.refreshInvokers() } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index 2fec8eaad25e716fc5ed5ee33775d8898cb212e2..9cbc4945605c2ac42242d7c0727e8cd487703c9c 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -112,8 +112,9 @@ func (r *etcdV3Registry) InitListeners() { } // DoRegister actually do the register job in the registry center of etcd +// for lease func (r *etcdV3Registry) DoRegister(root string, node string) error { - return r.client.Create(path.Join(root, node), "") + return r.client.RegisterTemp(path.Join(root, node), "") } // nolint diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 5b2a5d66f67991cc6e7ead610b3259bb5962870d..69a31ef2f2897013cf67ed6ae320c7550c5fe912 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -32,7 +32,6 @@ import ( "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" - "github.com/apache/dubbo-go/common/proxy/proxy_factory" "github.com/apache/dubbo-go/config" "github.com/apache/dubbo-go/config_center" _ "github.com/apache/dubbo-go/config_center/configurator" @@ -242,7 +241,12 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv } // Notify will be triggered when a service change notification is received. -func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) { +func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) { + if len(events) == 0 { + return + } + + event := events[0] if isMatched(&(event.Service), nl.url) && event.Action == remoting.EventTypeAdd { nl.configurator = extension.GetDefaultConfigurator(&(event.Service)) nl.doOverrideIfNecessary() @@ -403,8 +407,6 @@ func newWrappedInvoker(invoker protocol.Invoker, url *common.URL) *wrappedInvoke // Invoke remote service base on URL of wrappedInvoker func (ivk *wrappedInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { - // get right url - ivk.invoker.(*proxy_factory.ProxyInvoker).BaseInvoker = *protocol.NewBaseInvoker(ivk.GetUrl()) return ivk.invoker.Invoke(ctx, invocation) } diff --git a/registry/registry.go b/registry/registry.go index 855b487d4766d011eaa9d97d06b672df075e60b9..2225d2c1fc6d465bfbf27cd9622d7296e4596d8f 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -68,8 +68,11 @@ type Registry interface { // nolint type NotifyListener interface { - // Notify supports notifications on the service interface and the dimension of the data type. - Notify(*ServiceEvent) + // Notify supports notifications on the service interface and the dimension of the data type. When a list of + // events are passed in, it's considered as a complete list, on the other side, if one single event is + // passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes, + // the impl of NotifyListener may abandon the accumulated result from previous notifications. + Notify(...*ServiceEvent) } // Listener Deprecated! diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go index 4e7436e445f652d8fe1db2991c87303653beb9ec..ebd454242d49ee82c81fe1a1fae1a19980c238a4 100644 --- a/remoting/etcdv3/client.go +++ b/remoting/etcdv3/client.go @@ -408,7 +408,8 @@ func (c *Client) keepAliveKV(k string, v string) error { return ErrNilETCDV3Client } - lease, err := c.rawClient.Grant(c.ctx, int64(time.Second.Seconds())) + // make lease time longer, since 1 second is too short + lease, err := c.rawClient.Grant(c.ctx, int64(30*time.Second.Seconds())) if err != nil { return perrors.WithMessage(err, "grant lease") }