Skip to content
Snippets Groups Projects
Commit 8dffc298 authored by AlexStocks's avatar AlexStocks
Browse files

Merge branch 'develop' of https://github.com/apache/dubbo-go into develop

parents 4d50436f cfef02af
No related branches found
No related tags found
No related merge requests found
......@@ -69,12 +69,6 @@ jobs:
- name: Hello world
run: echo Hello world ${{ secrets.DING_TOKEN }} ${{ secrets.DING_SIGN }}
- name: Install go ci lint
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
- name: Run Linter
run: golangci-lint run --timeout=10m -v --disable-all --enable=govet --enable=staticcheck --enable=ineffassign --enable=misspell
# Because the contexts of push and PR are different, there are two Notify.
# Notifications are triggered only in the dubbogo/gost repository.
- name: DingTalk Message Notify only Push
......
......@@ -195,9 +195,6 @@ func (c *RouterChain) copyInvokerIfNecessary(cache *InvokerCache) []protocol.Inv
func (c *RouterChain) buildCache() {
origin := c.loadCache()
invokers := c.copyInvokerIfNecessary(origin)
if invokers == nil || len(invokers) == 0 {
return
}
var (
mutex sync.Mutex
......
......@@ -19,8 +19,10 @@ package nacos
import (
"encoding/json"
"net/http"
"strconv"
"testing"
"time"
)
import (
......@@ -36,6 +38,9 @@ import (
)
func TestNacosMetadataReport_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
rpt := newTestReport()
assert.NotNil(t, rpt)
......@@ -114,3 +119,11 @@ func newTestReport() report.MetadataReport {
res := extension.GetMetadataReportFactory("nacos").CreateMetadataReport(regurl)
return res
}
func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
......@@ -66,6 +66,7 @@ type RegistryDirectory struct {
referenceConfigurationListener *referenceConfigurationListener
serviceKey string
forbidden atomic.Bool
registerLock sync.Mutex // this lock if for register
}
// NewRegistryDirectory will create a new RegistryDirectory
......@@ -73,6 +74,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
logger.Debugf("new RegistryDirectory for service :%s.", url.Key())
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
......@@ -97,69 +99,128 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.
// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
logger.Debugf("subscribe service :%s for RegistryDirectory.", url.Key())
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
dir.registry.Subscribe(url, dir)
}
// Notify monitor changes from registry,and update the cacheServices
func (dir *RegistryDirectory) Notify(events ...*registry.ServiceEvent) {
go dir.refreshInvokers(events...)
func (dir *RegistryDirectory) Notify(event *registry.ServiceEvent) {
if event == nil {
return
}
go dir.refreshInvokers(event)
}
// NotifyAll notify the events that are complete Service Event List.
// After notify the address, the callback func will be invoked.
func (dir *RegistryDirectory) NotifyAll(events []*registry.ServiceEvent, callback func()) {
go dir.refreshAllInvokers(events, callback)
}
// refreshInvokers refreshes service's events.
func (dir *RegistryDirectory) refreshInvokers(event *registry.ServiceEvent) {
if event != nil {
logger.Debugf("refresh invokers with %+v", event)
} else {
logger.Debug("refresh invokers with nil")
}
var oldInvoker protocol.Invoker
if event != nil {
oldInvoker, _ = dir.cacheInvokerByEvent(event)
}
dir.setNewInvokers()
if oldInvoker != nil {
oldInvoker.Destroy()
}
}
// refreshInvokers refreshes service's events. It supports two modes: incremental mode and batch mode. If a single
// service event is passed in, then it is incremental mode, and if an array of service events are passed in, it is
// batch mode, in this mode, we assume the registry center have the complete list of the service events, therefore
// in this case, we can safely assume any cached invoker not in the incoming list can be removed. It is necessary
// since in batch mode, the register center handles the different type of events by itself, then notify the directory
// a batch of 'Update' events, instead of omit the different type of event one by one.
func (dir *RegistryDirectory) refreshInvokers(events ...*registry.ServiceEvent) {
var oldInvokers []protocol.Invoker
// refreshAllInvokers the argument is the complete list of the service events, we can safely assume any cached invoker
// not in the incoming list can be removed. The Action of serviceEvent should be EventTypeUpdate.
func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent, callback func()) {
var (
oldInvokers []protocol.Invoker
addEvents []*registry.ServiceEvent
)
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
// in batch mode, it is safe to remove since we have the complete list of events.
if len(events) > 1 {
// loop the events to check the Action should be EventTypeUpdate.
for _, event := range events {
if event.Action != remoting.EventTypeUpdate {
panic("Your implements of register center is wrong, " +
"please check the Action of ServiceEvent should be EventTypeUpdate")
return
}
// Originally it will Merge URL many times, now we just execute once.
// MergeUrl is executed once and put the result into Event. After this, the key will get from Event.Key().
newUrl := dir.convertUrl(event)
newUrl = common.MergeUrl(newUrl, referenceUrl)
dir.overrideUrl(newUrl)
event.Update(newUrl)
}
// After notify all addresses, do some callback.
defer callback()
func() {
// this lock is work at batch update of InvokeCache
dir.registerLock.Lock()
defer dir.registerLock.Unlock()
// get need clear invokers from original invoker list
dir.cacheInvokersMap.Range(func(k, v interface{}) bool {
if !dir.eventMatched(k.(string), events) {
// delete unused invoker from cache
if invoker := dir.uncacheInvokerWithKey(k.(string)); invoker != nil {
oldInvokers = append(oldInvokers, invoker)
}
}
return true
})
}
for _, event := range events {
logger.Debugf("registry update, result{%s}", event)
if oldInvoker, _ := dir.cacheInvokerByEvent(event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
// get need add invokers from events
for _, event := range events {
// Get the key from Event.Key()
if _, ok := dir.cacheInvokersMap.Load(event.Key()); !ok {
addEvents = append(addEvents, event)
}
}
}
if len(events) > 0 {
dir.setNewInvokers()
}
// After dir.cacheInvokers is updated,destroy the oldInvoker
// Ensure that no request will enter the oldInvoker
// loop the updateEvents
for _, event := range addEvents {
logger.Debugf("registry update, result{%s}", event)
logger.Infof("selector add service url{%s}", event.Service)
// FIXME: routers are built in every address notification?
dir.configRouters()
if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
}()
dir.setNewInvokers()
// destroy unused invokers
for _, invoker := range oldInvokers {
invoker.Destroy()
go invoker.Destroy()
}
}
// eventMatched checks if a cached invoker appears in the incoming invoker list, if no, then it is safe to remove.
func (dir *RegistryDirectory) eventMatched(key string, events []*registry.ServiceEvent) bool {
for _, event := range events {
if dir.invokerCacheKey(event.Service) == key {
if dir.invokerCacheKey(event) == key {
return true
}
}
return false
}
// invokerCacheKey generates the key in the cache for a given URL.
func (dir *RegistryDirectory) invokerCacheKey(url *common.URL) string {
// invokerCacheKey generates the key in the cache for a given ServiceEvent.
func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) string {
// If the url is merged, then return Event.Key() directly.
if event.Updated() {
return event.Key()
}
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeUrl(url, referenceUrl)
newUrl := common.MergeUrl(event.Service, referenceUrl)
event.Update(newUrl)
return newUrl.Key()
}
......@@ -294,30 +355,38 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeUrl(url, referenceUrl)
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
return cacheInvoker.(protocol.Invoker)
}
if v, ok := dir.doCacheInvoker(newUrl); ok {
return v
}
}
return nil
}
func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
key := newUrl.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
}
} else {
// if cached invoker has the same URL with the new URL, then no need to re-refer, and no need to destroy
// the old invoker.
if common.IsEquals(newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) {
return nil, true
}
logger.Debugf("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(key, newInvoker)
return cacheInvoker.(protocol.Invoker), true
}
}
return nil, false
}
// List selected protocol invokers from the directory
func (dir *RegistryDirectory) List(invocation protocol.Invocation) []protocol.Invoker {
invokers := dir.cacheInvokers
......@@ -406,7 +475,7 @@ func newReferenceConfigurationListener(dir *RegistryDirectory, url *common.URL)
func (l *referenceConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
type consumerConfigurationListener struct {
......@@ -433,5 +502,5 @@ func (l *consumerConfigurationListener) addNotifyListener(listener registry.Noti
func (l *consumerConfigurationListener) Process(event *config_center.ConfigChangeEvent) {
l.BaseConfigurationListener.Process(event)
// FIXME: this doesn't trigger dir.overrideUrl()
l.directory.refreshInvokers()
l.directory.refreshInvokers(nil)
}
......@@ -192,6 +192,34 @@ func Test_toGroupInvokers(t *testing.T) {
assert.Len(t, groupInvokers, 2)
}
func Test_RefreshUrl(t *testing.T) {
registryDirectory, mockRegistry := normalRegistryDir()
providerUrl, _ := common.NewURL("dubbo://0.0.0.0:20011/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
providerUrl2, _ := common.NewURL("dubbo://0.0.0.0:20012/org.apache.dubbo-go.mockService",
common.WithParamsValue(constant.CLUSTER_KEY, "mock1"),
common.WithParamsValue(constant.GROUP_KEY, "group"),
common.WithParamsValue(constant.VERSION_KEY, "1.0.0"))
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 3)
mockRegistry.MockEvent(&registry.ServiceEvent{Action: remoting.EventTypeAdd, Service: providerUrl})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 4)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 1)
mockRegistry.MockEvents([]*registry.ServiceEvent{&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl},
&registry.ServiceEvent{Action: remoting.EventTypeUpdate, Service: providerUrl2}})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 2)
// clear all address
mockRegistry.MockEvents([]*registry.ServiceEvent{})
time.Sleep(1e9)
assert.Len(t, registryDirectory.cacheInvokers, 0)
}
func normalRegistryDir(noMockEvent ...bool) (*RegistryDirectory, *registry.MockRegistry) {
extension.SetProtocol(protocolwrapper.FILTER, protocolwrapper.NewMockProtocolFilter)
......
......@@ -41,11 +41,36 @@ func init() {
type ServiceEvent struct {
Action remoting.EventType
Service *common.URL
// store the key for Service.Key()
key string
// If the url is updated, such as Merged.
updated bool
}
// String return the description of event
func (e ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}}", e.Action, e.Service)
func (e *ServiceEvent) String() string {
return fmt.Sprintf("ServiceEvent{Action{%s}, Path{%s}, Key{%s}}", e.Action, e.Service, e.key)
}
// Update() update the url with the merged URL. Work with Updated() can reduce the process of some merging URL.
func (e *ServiceEvent) Update(url *common.URL) {
e.Service = url
e.updated = true
}
// Updated() check if the url is updated.
// If the serviceEvent is updated, then it don't need merge url again.
func (e *ServiceEvent) Updated() bool {
return e.updated
}
// Key() generate the key for service.Key(). It is cached once.
func (e *ServiceEvent) Key() string {
if len(e.key) > 0 {
return e.key
}
e.key = e.Service.Key()
return e.key
}
// ServiceInstancesChangedEvent represents service instances make some changing
......
......@@ -18,6 +18,7 @@
package registry
import (
"fmt"
"time"
)
......@@ -32,14 +33,16 @@ import (
// MockRegistry is used as mock registry
type MockRegistry struct {
listener *listener
destroyed *atomic.Bool
listener *listener
destroyed *atomic.Bool
allAddress chan []*ServiceEvent
}
// NewMockRegistry creates a mock registry
func NewMockRegistry(url *common.URL) (Registry, error) {
registry := &MockRegistry{
destroyed: atomic.NewBool(false),
destroyed: atomic.NewBool(false),
allAddress: make(chan []*ServiceEvent),
}
listener := &listener{count: 0, registry: registry, listenChan: make(chan *ServiceEvent)}
registry.listener = listener
......@@ -80,22 +83,12 @@ func (r *MockRegistry) subscribe(*common.URL) (Listener, error) {
func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error {
go func() {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return
}
time.Sleep(time.Duration(3) * time.Second)
t, listener := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}
for {
serviceEvent, err := listener.Next()
if err != nil {
......@@ -109,6 +102,26 @@ func (r *MockRegistry) Subscribe(url *common.URL, notifyListener NotifyListener)
}
}
}()
go func() {
for {
t, _ := r.checkLoopSubscribe(url)
if t == 0 {
continue
} else if t == -1 {
return
}
for {
select {
case e := <-r.allAddress:
notifyListener.NotifyAll(e, func() {
fmt.Print("notify all ok")
})
break
}
}
}
}()
return nil
}
......@@ -138,3 +151,27 @@ func (*listener) Close() {
func (r *MockRegistry) MockEvent(event *ServiceEvent) {
r.listener.listenChan <- event
}
// nolint
func (r *MockRegistry) MockEvents(events []*ServiceEvent) {
r.allAddress <- events
}
func (r *MockRegistry) checkLoopSubscribe(url *common.URL) (int, Listener) {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
time.Sleep(time.Duration(3) * time.Second)
return -1, nil
}
listener, err := r.subscribe(url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
return -1, nil
}
time.Sleep(time.Duration(3) * time.Second)
return 0, nil
}
return 1, listener
}
......@@ -19,9 +19,11 @@ package nacos
import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"testing"
"time"
)
import (
......@@ -35,6 +37,9 @@ import (
)
func TestNacosRegistry_Register(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
......@@ -64,6 +69,9 @@ func TestNacosRegistry_Register(t *testing.T) {
}
func TestNacosRegistry_Subscribe(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
......@@ -102,6 +110,9 @@ func TestNacosRegistry_Subscribe(t *testing.T) {
}
func TestNacosRegistry_Subscribe_del(t *testing.T) {
if !checkNacosServerAlive() {
return
}
regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParamsValue(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)))
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
......@@ -188,3 +199,11 @@ func TestNacosListener_Close(t *testing.T) {
_, err = listener.Next()
assert.NotNil(t, err)
}
func checkNacosServerAlive() bool {
c := http.Client{Timeout: time.Second}
if _, err := c.Get("http://console.nacos.io/nacos/"); err != nil {
return false
}
return true
}
......@@ -141,7 +141,8 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
GroupName: n.group,
})
if err != nil {
logger.Errorf("Could not query the instances for service: " + serviceName + ", group: " + n.group)
logger.Errorf("Could not query the instances for service: %+v, group: %+v . It happened err %+v",
serviceName, n.group, err)
return make([]registry.ServiceInstance, 0, 0)
}
res := make([]registry.ServiceInstance, 0, len(instances))
......
......@@ -81,6 +81,9 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) {
}
func TestNacosServiceDiscovery_CRUD(t *testing.T) {
if !checkNacosServerAlive() {
return
}
prepareData()
extension.SetEventDispatcher("mock", func() observer.EventDispatcher {
return &dispatcher.MockEventDispatcher{}
......
......@@ -240,18 +240,23 @@ func newOverrideSubscribeListener(overriderUrl *common.URL, invoker protocol.Inv
}
// Notify will be triggered when a service change notification is received.
func (nl *overrideSubscribeListener) Notify(events ...*registry.ServiceEvent) {
if len(events) == 0 {
return
}
event := events[0]
func (nl *overrideSubscribeListener) Notify(event *registry.ServiceEvent) {
if isMatched(event.Service, nl.url) && event.Action == remoting.EventTypeAdd {
nl.configurator = extension.GetDefaultConfigurator(event.Service)
nl.doOverrideIfNecessary()
}
}
func (nl *overrideSubscribeListener) NotifyAll(events []*registry.ServiceEvent, callback func()) {
defer callback()
if len(events) == 0 {
return
}
for _, e := range events {
nl.Notify(e)
}
}
func (nl *overrideSubscribeListener) doOverrideIfNecessary() {
providerUrl := getProviderUrl(nl.originInvoker)
key := getCacheKey(providerUrl)
......
......@@ -72,7 +72,12 @@ type NotifyListener interface {
// events are passed in, it's considered as a complete list, on the other side, if one single event is
// passed in, then it's a incremental event. Pls. note when a list (instead of single event) comes,
// the impl of NotifyListener may abandon the accumulated result from previous notifications.
Notify(...*ServiceEvent)
Notify(*ServiceEvent)
// NotifyAll the events are complete Service Event List.
// The argument of events []*ServiceEvent is equal to urls []*URL, The Action of serviceEvent should be EventTypeUpdate.
// If your registry center can only get all urls but can't get individual event, you should use this one.
// After notify the address, the callback func will be invoked.
NotifyAll([]*ServiceEvent, func())
}
// Listener Deprecated!
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment