Skip to content
Snippets Groups Projects
Commit 8a4c4cf1 authored by cvictory's avatar cvictory
Browse files

Merge branch 'develop' of github.com:apache/dubbo-go into notify_all

parents 8bf86eab dd600c99
No related branches found
No related tags found
No related merge requests found
Showing
with 223 additions and 64 deletions
......@@ -253,6 +253,14 @@ If you are using [apache/dubbo-go](github.com/apache/dubbo-go) and think that it
</td>
</tr>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://www.mgtv.com" target="_blank">
<img width="222px" src="https://ugc.hitv.com/platform_oss/F6077F1AA82542CDBDD88FD518E6E727.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
......@@ -251,6 +251,14 @@ make test
</td>
</tr>
<tr></tr>
<tr>
<td align="center" valign="middle">
<a href="http://www.mgtv.com" target="_blank">
<img width="222px" src="https://ugc.hitv.com/platform_oss/F6077F1AA82542CDBDD88FD518E6E727.png">
</a>
</td>
</tr>
<tr></tr>
</tbody>
</table>
</div>
\ No newline at end of file
......@@ -65,6 +65,8 @@ type RouterChain struct {
notify chan struct{}
// Address cache
cache atomic.Value
// init
init sync.Once
}
// Route Loop routers in RouterChain and call Route method to determine the target invokers list.
......@@ -111,6 +113,13 @@ func (c *RouterChain) SetInvokers(invokers []protocol.Invoker) {
c.invokers = invokers
c.mutex.Unlock()
// it should trigger init router for first call
c.init.Do(func() {
go func() {
c.notify <- struct{}{}
}()
})
c.count++
now := time.Now()
if c.count >= countThreshold && now.Sub(c.last) >= timeThreshold {
......
......@@ -96,7 +96,6 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
return int64(sleepWindow)
}
// GetRequestSuccessiveFailureThreshold return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
......
......@@ -97,7 +97,7 @@ func (pi *ProxyInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
args := invocation.Arguments()
// get service
svc := common.ServiceMap.GetService(proto, path)
svc := common.ServiceMap.GetServiceByServiceKey(proto, url.ServiceKey())
if svc == nil {
logger.Errorf("cannot find service [%s] in %s", path, proto)
result.SetError(perrors.Errorf("cannot find service [%s] in %s", path, proto))
......
......@@ -157,11 +157,17 @@ type serviceMap struct {
}
// GetService gets a service defination by protocol and name
func (sm *serviceMap) GetService(protocol, name string) *Service {
func (sm *serviceMap) GetService(protocol, interfaceName, group, version string) *Service {
serviceKey := ServiceKey(interfaceName, group, version)
return sm.GetServiceByServiceKey(protocol, serviceKey)
}
// GetService gets a service defination by protocol and service key
func (sm *serviceMap) GetServiceByServiceKey(protocol, serviceKey string) *Service {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if s, ok := sm.serviceMap[protocol]; ok {
if srv, ok := s[name]; ok {
if srv, ok := s[serviceKey]; ok {
return srv
}
return nil
......@@ -180,7 +186,7 @@ func (sm *serviceMap) GetInterface(interfaceName string) []*Service {
}
// Register registers a service by @interfaceName and @protocol
func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService) (string, error) {
func (sm *serviceMap) Register(interfaceName, protocol, group, version string, rcvr RPCService) (string, error) {
if sm.serviceMap[protocol] == nil {
sm.serviceMap[protocol] = make(map[string]*Service)
}
......@@ -203,8 +209,8 @@ func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService)
return "", perrors.New(s)
}
sname = rcvr.Reference()
if server := sm.GetService(protocol, sname); server != nil {
sname = ServiceKey(interfaceName, group, version)
if server := sm.GetService(protocol, interfaceName, group, version); server != nil {
return "", perrors.New("service already defined: " + sname)
}
s.name = sname
......@@ -228,9 +234,9 @@ func (sm *serviceMap) Register(interfaceName, protocol string, rcvr RPCService)
}
// UnRegister cancels a service by @interfaceName, @protocol and @serviceId
func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) error {
if protocol == "" || serviceId == "" {
return perrors.New("protocol or serviceName is nil")
func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceKey string) error {
if protocol == "" || serviceKey == "" {
return perrors.New("protocol or serviceKey is nil")
}
var (
......@@ -248,9 +254,9 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
if !ok {
return perrors.New("no services for " + protocol)
}
s, ok := svcs[serviceId]
s, ok := svcs[serviceKey]
if !ok {
return perrors.New("no service for " + serviceId)
return perrors.New("no service for " + serviceKey)
}
svrs, ok = sm.interfaceMap[interfaceName]
if !ok {
......@@ -276,7 +282,7 @@ func (sm *serviceMap) UnRegister(interfaceName, protocol, serviceId string) erro
sm.interfaceMap[interfaceName] = append(sm.interfaceMap[interfaceName], svrs[i])
}
}
delete(svcs, serviceId)
delete(svcs, serviceKey)
if len(sm.serviceMap[protocol]) == 0 {
delete(sm.serviceMap, protocol)
}
......
......@@ -85,23 +85,23 @@ func TestServiceMapRegister(t *testing.T) {
// lowercase
s0 := &testService{}
// methods, err := ServiceMap.Register("testporotocol", s0)
_, err := ServiceMap.Register(testInterfaceName, "testporotocol", s0)
_, err := ServiceMap.Register(testInterfaceName, "testporotocol", "", "v0", s0)
assert.EqualError(t, err, "type testService is not exported")
// succ
s := &TestService{}
methods, err := ServiceMap.Register(testInterfaceName, "testporotocol", s)
methods, err := ServiceMap.Register(testInterfaceName, "testporotocol", "", "v1", s)
assert.NoError(t, err)
assert.Equal(t, "MethodOne,MethodThree,methodTwo", methods)
// repeat
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", s)
assert.EqualError(t, err, "service already defined: com.test.Path")
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", "", "v1", s)
assert.EqualError(t, err, "service already defined: testService:v1")
// no method
s1 := &TestService1{}
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", s1)
assert.EqualError(t, err, "type com.test.Path1 has no exported methods of suitable type")
_, err = ServiceMap.Register(testInterfaceName, "testporotocol", "", "v2", s1)
assert.EqualError(t, err, "type testService:v2 has no exported methods of suitable type")
ServiceMap = &serviceMap{
serviceMap: make(map[string]map[string]*Service),
......@@ -111,22 +111,22 @@ func TestServiceMapRegister(t *testing.T) {
func TestServiceMapUnRegister(t *testing.T) {
s := &TestService{}
_, err := ServiceMap.Register("TestService", testProtocol, s)
_, err := ServiceMap.Register("TestService", testProtocol, "", "v1", s)
assert.NoError(t, err)
assert.NotNil(t, ServiceMap.GetService(testProtocol, referenceTestPath))
assert.NotNil(t, ServiceMap.GetService(testProtocol, "TestService", "", "v1"))
assert.Equal(t, 1, len(ServiceMap.GetInterface("TestService")))
err = ServiceMap.UnRegister("", "", referenceTestPath)
assert.EqualError(t, err, "protocol or serviceName is nil")
err = ServiceMap.UnRegister("", "", ServiceKey("TestService", "", "v1"))
assert.EqualError(t, err, "protocol or serviceKey is nil")
err = ServiceMap.UnRegister("", "protocol", referenceTestPath)
err = ServiceMap.UnRegister("", "protocol", ServiceKey("TestService", "", "v1"))
assert.EqualError(t, err, "no services for protocol")
err = ServiceMap.UnRegister("", testProtocol, referenceTestPathDistinct)
assert.EqualError(t, err, "no service for com.test.Path1")
err = ServiceMap.UnRegister("", testProtocol, ServiceKey("TestService", "", "v0"))
assert.EqualError(t, err, "no service for TestService:v0")
// succ
err = ServiceMap.UnRegister("TestService", testProtocol, referenceTestPath)
err = ServiceMap.UnRegister("TestService", testProtocol, ServiceKey("TestService", "", "v1"))
assert.NoError(t, err)
}
......
......@@ -109,7 +109,7 @@ type URL struct {
noCopy noCopy
baseUrl
Path string // like /com.ikurento.dubbo.UserProvider3
Path string // like /com.ikurento.dubbo.UserProvider
Username string
Password string
Methods []string
......
......@@ -104,7 +104,7 @@ func TestLoad(t *testing.T) {
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
err := common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
err := common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
assert.Nil(t, err)
consumerConfig = nil
providerConfig = nil
......@@ -143,7 +143,7 @@ func TestLoadWithSingleReg(t *testing.T) {
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
consumerConfig = nil
providerConfig = nil
}
......@@ -182,7 +182,7 @@ func TestWithNoRegLoad(t *testing.T) {
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
common.ServiceMap.UnRegister("com.MockService", "mock", "MockService")
common.ServiceMap.UnRegister("com.MockService", "mock", common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
consumerConfig = nil
providerConfig = nil
}
......
......@@ -96,7 +96,7 @@ func (c *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
// Refer ...
func (c *ReferenceConfig) Refer(_ interface{}) {
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithPath(c.InterfaceName),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
......@@ -117,7 +117,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
c.urls = append(c.urls, serviceUrl)
} else {
if serviceUrl.Path == "" {
serviceUrl.Path = "/" + c.id
serviceUrl.Path = "/" + c.InterfaceName
}
// merge url need to do
newUrl := common.MergeUrl(serviceUrl, cfgURL)
......
......@@ -171,7 +171,7 @@ func (c *ServiceConfig) Export() error {
proxyFactory := extension.GetProxyFactory(providerConfig.ProxyFactory)
for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.rpcService)
methods, err := common.ServiceMap.Register(c.InterfaceName, proto.Name, c.Group, c.Version, c.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.", c.InterfaceName, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
......@@ -184,7 +184,7 @@ func (c *ServiceConfig) Export() error {
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithPath(c.InterfaceName),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
......
......@@ -105,8 +105,12 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
if v, ok := attachments[constant.INTERFACE_KEY]; ok && v != nil {
dataMap[constant.INTERFACE_KEY] = v.(string)
itf := attachments[constant.INTERFACE_KEY]
if itf == nil || len(itf.(string)) == 0 {
itf = attachments[constant.PATH_KEY]
}
if itf != nil {
dataMap[constant.INTERFACE_KEY] = itf.(string)
}
if v, ok := attachments[constant.METHOD_KEY]; ok && v != nil {
dataMap[constant.METHOD_KEY] = v.(string)
......
......@@ -20,7 +20,6 @@ package filter_impl
import (
"context"
"reflect"
"strings"
)
import (
......@@ -75,7 +74,7 @@ func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Inv
url := invoker.GetUrl()
methodName = invocation.Arguments()[0].(string)
// get service
svc := common.ServiceMap.GetService(url.Protocol, strings.TrimPrefix(url.Path, "/"))
svc := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
// get method
method := svc.Method()[methodName]
if method == nil {
......
......@@ -96,7 +96,7 @@ func TestGenericServiceFilterInvoke(t *testing.T) {
hessian.Object("222")},
}
s := &TestService{}
_, _ = common.ServiceMap.Register("TestService", "testprotocol", s)
_, _ = common.ServiceMap.Register("com.test.Path", "testprotocol", "", "", s)
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
filter := GetGenericServiceFilter()
url, _ := common.NewURL("testprotocol://127.0.0.1:20000/com.test.Path")
......
......@@ -11,11 +11,11 @@ require (
github.com/coreos/etcd v3.3.25+incompatible
github.com/creasty/defaults v1.5.1
github.com/dubbogo/go-zookeeper v1.0.2
github.com/dubbogo/gost v1.9.2
github.com/dubbogo/gost v1.9.5
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/emicklei/go-restful/v3 v3.0.0
github.com/emicklei/go-restful/v3 v3.4.0
github.com/frankban/quicktest v1.4.1 // indirect
github.com/fsnotify/fsnotify v1.4.7
github.com/fsnotify/fsnotify v1.4.9
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.3.0
github.com/golang/mock v1.4.4
......@@ -27,22 +27,22 @@ require (
github.com/hashicorp/vault/api v1.0.5-0.20191108163347-bdd38fca2cff // indirect
github.com/hashicorp/vault/sdk v0.1.14-0.20191112033314-390e96e22eb2
github.com/jinzhu/copier v0.0.0-20190625015134-976e0346caa8
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.3.3
github.com/magiconair/properties v1.8.4
github.com/mitchellh/mapstructure v1.4.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v1.0.1
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.2.6+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_golang v1.8.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/zouyx/agollo/v3 v3.4.5
go.uber.org/atomic v1.6.0
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.26.0
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.16.9
k8s.io/apimachinery v0.16.9
k8s.io/client-go v0.16.9
......
This diff is collapsed.
......@@ -28,7 +28,6 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func TestBuildServiceDefinition(t *testing.T) {
......@@ -44,9 +43,9 @@ func TestBuildServiceDefinition(t *testing.T) {
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, serviceName, group, version, beanName))
assert.NoError(t, err)
_, err = common.ServiceMap.Register(serviceName, protocol, &UserProvider{})
_, err = common.ServiceMap.Register(serviceName, protocol, group, version, &UserProvider{})
assert.NoError(t, err)
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
service := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := BuildServiceDefinition(*service, url)
assert.Equal(t, "{canonicalName:com.ikurento.user.UserProvider, codeSource:, methods:[{name:GetUser,parameterTypes:[{type:slice}],returnType:ptr,params:[] }], types:[]}", sd.String())
}
......@@ -116,8 +116,8 @@ func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) *definit
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, id.ServiceInterface, id.Group, id.Version, beanName))
assert.NoError(t, err)
_, err = common.ServiceMap.Register(id.ServiceInterface, protocol, &definition.UserProvider{})
_, err = common.ServiceMap.Register(id.ServiceInterface, protocol, id.Group, id.Version, &definition.UserProvider{})
assert.NoError(t, err)
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
service := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
return definition.BuildServiceDefinition(*service, url)
}
......@@ -69,7 +69,7 @@ func TestConfigurableExporter(t *testing.T) {
assert.Equal(t, false, exported.IsExported())
assert.NoError(t, exported.Export(registryURL))
assert.Equal(t, true, exported.IsExported())
assert.Regexp(t, "dubbo://:20003/MetadataService*", exported.GetExportedURLs()[0].String())
assert.Regexp(t, "dubbo://:20003/org.apache.dubbo.metadata.MetadataService*", exported.GetExportedURLs()[0].String())
exported.Unexport()
assert.Equal(t, false, exported.IsExported())
})
......
......@@ -124,7 +124,7 @@ func (mts *MetadataService) getAllService(services *sync.Map) []*common.URL {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := urls.ByPosition(i).(*common.URL)
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME {
if url.Service() != constant.METADATA_SERVICE_NAME {
res = append(res, url)
}
}
......@@ -178,7 +178,7 @@ func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
tmpService := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
tmpService := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := definition.BuildServiceDefinition(*tmpService, url)
data, err := sd.ToBytes()
if err != nil {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment