Skip to content
Snippets Groups Projects
Commit 1113bf16 authored by flycash's avatar flycash
Browse files

Merge 2.7.5

parents 4b8b4d7a 698c2b29
No related branches found
No related tags found
No related merge requests found
Showing with 72 additions and 29 deletions
......@@ -18,10 +18,13 @@
package extension
import (
"github.com/apache/dubbo-go/registry/servicediscovery/instance"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go/registry/servicediscovery/instance"
)
var (
serviceInstanceSelectorMappings = make(map[string]func() instance.ServiceInstanceSelector)
)
......
......@@ -35,11 +35,8 @@ github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vaj
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.5.0 h1:fzulDG5G7nX0ccgKdiN9XipJ7tZ4WXKgmk4stdlDS6s=
github.com/apache/dubbo-go-hessian2 v1.5.0/go.mod h1:VwEnsOMidkM1usya2uPfGpSLO9XUF//WQcWn3y+jFz8=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
......@@ -53,7 +50,6 @@ github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f h1:/8NcnxL6
github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.15.24 h1:xLAdTA/ore6xdPAljzZRed7IGqQgC+nY+ERS5vaj4Ro=
github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
......
......@@ -25,25 +25,30 @@ import (
"github.com/apache/dubbo-go/common/observer"
)
//The Service Discovery Changed Event Listener
type ServiceInstancesChangedListener struct {
ServiceName string
observer.ConditionalEventListener
ChangedNotify ChangedNotify
}
//On ServiceInstancesChangedEvent the service instances change event
func (sicl *ServiceInstancesChangedListener) OnEvent(e ServiceInstancesChangedEvent) error {
sicl.ChangedNotify.Notify(e)
return nil
}
//get listener priority
func (sicl *ServiceInstancesChangedListener) GetPriority() int {
return -1
}
//get event type
func (sicl *ServiceInstancesChangedListener) GetEventType() reflect.Type {
return reflect.TypeOf(&ServiceInstancesChangedEvent{})
}
//If service name matches,return true or false
func (sicl *ServiceInstancesChangedListener) Accept(e ServiceInstancesChangedEvent) bool {
return e.ServiceName == sicl.ServiceName
}
......
......@@ -17,19 +17,23 @@
package random
import (
"math/rand"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/servicediscovery/instance"
"math/rand"
"time"
)
func init() {
extension.SetServiceInstanceSelector("random", NewRandomServiceInstanceSelector)
}
//the ServiceInstanceSelector implementation based on Random algorithm
type RandomServiceInstanceSelector struct {
}
......
......@@ -17,11 +17,17 @@
package random
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/registry"
"github.com/stretchr/testify/assert"
"testing"
)
func TestRandomServiceInstanceSelector_Select(t *testing.T) {
......
......@@ -23,5 +23,6 @@ import (
)
type ServiceInstanceSelector interface {
//Select an instance of ServiceInstance by the specified ServiceInstance service instances
Select(url common.URL, serviceInstances []registry.ServiceInstance) registry.ServiceInstance
}
......@@ -20,7 +20,17 @@ package servicediscovery
import (
"bytes"
"encoding/json"
"strconv"
"strings"
"sync"
)
import (
cm "github.com/Workiva/go-datastructures/common"
gxset "github.com/dubbogo/gost/container/set"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
......@@ -32,10 +42,6 @@ import (
"github.com/apache/dubbo-go/registry/servicediscovery/proxy"
"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
"github.com/apache/dubbo-go/remoting"
gxset "github.com/dubbogo/gost/container/set"
"strconv"
"strings"
"sync"
)
const (
......@@ -53,14 +59,17 @@ func init() {
// 1. when we registry the service, we should create the mapping from service name to application name
// 2. when we sub
type serviceDiscoveryRegistry struct {
lock sync.RWMutex
url *common.URL
serviceDiscovery registry.ServiceDiscovery
subscribedServices *gxset.HashSet
serviceNameMapping mapping.ServiceNameMapping
metaDataService service.MetadataService
registeredListeners *gxset.HashSet
subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
lock sync.RWMutex
url *common.URL
serviceDiscovery registry.ServiceDiscovery
subscribedServices *gxset.HashSet
serviceNameMapping mapping.ServiceNameMapping
metaDataService service.MetadataService
//cache the registered listen
registeredListeners *gxset.HashSet
//all synthesize
subscribedURLsSynthesizers []synthesizer.SubscribedURLsSynthesizer
//cache exported urls, serviceName->revision->[]URL
serviceRevisionExportedURLsCache map[string]map[string][]common.URL
}
......@@ -104,18 +113,22 @@ func parseServices(literalServices string) *gxset.HashSet {
return set
}
//GetServiceDiscovery for get serviceDiscovery of the registry
func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
return s.serviceDiscovery
}
//GetUrl for get url of the registry
func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
return *s.url
}
//IsAvailable for make sure is't available
func (s *serviceDiscoveryRegistry) IsAvailable() bool {
return true
}
//Destroy for destroy graceful down
func (s *serviceDiscoveryRegistry) Destroy() {
err := s.serviceDiscovery.Destroy()
if err != nil {
......@@ -149,6 +162,7 @@ func shouldRegister(url common.URL) bool {
return false
}
//Subscribe for listen the change of services that from the exported url
func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) {
if !shouldSubscribe(*url) {
return
......@@ -347,17 +361,17 @@ func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registr
}
func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
s.lock.Lock()
// 1. expunge stale
s.expungeStaleRevisionExportedURLs(serviceInstances)
// 2. Initialize
s.initRevisionExportedURLs(serviceInstances)
s.lock.Unlock()
}
func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
serviceName := serviceInstances[0].GetServiceName()
s.lock.Lock()
revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
s.lock.Unlock()
if !exist {
return
}
......@@ -368,7 +382,7 @@ func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInsta
currentRevision := gxset.NewSet()
for _, s := range serviceInstances {
rv := getExportedServicesRevision(s)
if len(rv) != 0 {
if len(rv) > 0 {
currentRevision.Add(rv)
}
}
......@@ -424,7 +438,9 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc
}
serviceName := serviceInstance.GetServiceName()
revision := getExportedServicesRevision(serviceInstance)
s.lock.Lock()
revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName]
s.lock.Unlock()
revisionExportedURLs := revisionExportedURLsMap[revision]
firstGet := false
if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 {
......
......@@ -17,19 +17,23 @@
package rest
import (
"net/url"
"strings"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/registry"
"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
"net/url"
"strings"
)
func init() {
synthesizer.AddSynthesizer(NewRestSubscribedURLsSynthesizer())
}
//SubscribedURLsSynthesizer implementation for rest protocol
type RestSubscribedURLsSynthesizer struct {
}
......
......@@ -17,13 +17,19 @@
package rest
import (
"net/url"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/registry"
"github.com/stretchr/testify/assert"
"net/url"
"testing"
)
func TestRestSubscribedURLsSynthesizer_Synthesize(t *testing.T) {
......
......@@ -23,6 +23,8 @@ import (
)
type SubscribedURLsSynthesizer interface {
//Supports the synthesis of the subscribed url or not
Support(subscribedURL *common.URL) bool
//synthesize the subscribed url
Synthesize(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment