Skip to content
Snippets Groups Projects
Commit 834135bd authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #119 from fangyincheng/develop

Add:support multi-implementation of service
parents db174f84 391c9c24
No related branches found
No related tags found
No related merge requests found
Showing
with 68 additions and 81 deletions
......@@ -143,8 +143,7 @@ func Test_FailoverInvoke2(t *testing.T) {
urlParams.Set(constant.RETRIES_KEY, "2")
urlParams.Set("methods.test."+constant.RETRIES_KEY, "3")
ivc := &invocation.RPCInvocation{}
ivc.SetMethod("test")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
result := normalInvoke(t, 3, urlParams, ivc)
assert.NoError(t, result.Error())
count = 0
......
......@@ -43,8 +43,7 @@ func TestLeastActiveByWeight(t *testing.T) {
invokers = append(invokers, protocol.NewBaseInvoker(url))
}
inv := new(invocation.RPCInvocation)
inv.SetMethod("test")
inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
protocol.BeginCount(invokers[2].GetUrl(), inv.MethodName())
loop = 10000
......
......@@ -67,8 +67,7 @@ func Test_RandomlbSelectWeight(t *testing.T) {
urlParams.Set("methods.test."+constant.WEIGHT_KEY, "10000000000000")
urll, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := &invocation.RPCInvocation{}
ivc.SetMethod("test")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
var selectedInvoker []protocol.Invoker
var selected float64
......@@ -99,8 +98,7 @@ func Test_RandomlbSelectWarmup(t *testing.T) {
urlParams.Set(constant.REMOTE_TIMESTAMP_KEY, strconv.FormatInt(time.Now().Add(time.Minute*(-9)).Unix(), 10))
urll, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.100:20000/com.ikurento.user.UserProvider"), common.WithParams(urlParams))
invokers = append(invokers, protocol.NewBaseInvoker(urll))
ivc := &invocation.RPCInvocation{}
ivc.SetMethod("test")
ivc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("test"))
var selectedInvoker []protocol.Invoker
var selected float64
......
......@@ -29,6 +29,7 @@ const (
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
BEAN_NAME_KEY = "bean.name"
)
const (
......@@ -44,7 +45,6 @@ const (
WEIGHT_KEY = "weight"
WARMUP_KEY = "warmup"
RETRIES_KEY = "retries"
BEAN_NAME = "bean.name"
)
const (
......
......@@ -116,7 +116,9 @@ func (p *Proxy) Implement(v common.RPCService) {
}
}
inv = invocation_impl.NewRPCInvocationForConsumer(methodName, nil, inArr, reply.Interface(), p.callBack, common.URL{}, nil)
inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inArr), invocation_impl.WithReply(reply.Interface()),
invocation_impl.WithCallBack(p.callBack))
for k, value := range p.attachments {
inv.SetAttachments(k, value)
......
......@@ -43,21 +43,15 @@ type TestService struct {
Echo func(interface{}, *interface{}) error
}
func (s *TestService) Service() string {
func (s *TestService) Reference() string {
return "com.test.Path"
}
func (s *TestService) Version() string {
return ""
}
type TestServiceInt int
func (s *TestServiceInt) Service() string {
func (s *TestServiceInt) Reference() string {
return "com.test.TestServiceInt"
}
func (s *TestServiceInt) Version() string {
return ""
}
func TestProxy_Implement(t *testing.T) {
......
......@@ -36,8 +36,7 @@ import (
// rpc service interface
type RPCService interface {
Service() string // Path InterfaceName
Version() string
Reference() string // rpc service id or reference id
}
// for lowercase func
......@@ -149,7 +148,7 @@ func (sm *serviceMap) Register(protocol string, rcvr RPCService) (string, error)
return "", perrors.New(s)
}
sname = rcvr.Service()
sname = rcvr.Reference()
if server := sm.GetService(protocol, sname); server != nil {
return "", perrors.New("service already defined: " + sname)
}
......@@ -172,8 +171,8 @@ func (sm *serviceMap) Register(protocol string, rcvr RPCService) (string, error)
return strings.TrimSuffix(methods, ","), nil
}
func (sm *serviceMap) UnRegister(protocol, serviceName string) error {
if protocol == "" || serviceName == "" {
func (sm *serviceMap) UnRegister(protocol, serviceId string) error {
if protocol == "" || serviceId == "" {
return perrors.New("protocol or serviceName is nil")
}
sm.mutex.RLock()
......@@ -182,16 +181,16 @@ func (sm *serviceMap) UnRegister(protocol, serviceName string) error {
sm.mutex.RUnlock()
return perrors.New("no services for " + protocol)
}
_, ok = svcs[serviceName]
_, ok = svcs[serviceId]
if !ok {
sm.mutex.RUnlock()
return perrors.New("no service for " + serviceName)
return perrors.New("no service for " + serviceId)
}
sm.mutex.RUnlock()
sm.mutex.Lock()
defer sm.mutex.Unlock()
delete(svcs, serviceName)
delete(svcs, serviceId)
delete(sm.serviceMap, protocol)
return nil
......
......@@ -39,12 +39,9 @@ func (s *TestService) MethodTwo(arg1, arg2, arg3 interface{}) (interface{}, erro
func (s *TestService) MethodThree() error {
return nil
}
func (s *TestService) Service() string {
func (s *TestService) Reference() string {
return "com.test.Path"
}
func (s *TestService) Version() string {
return ""
}
func (s *TestService) MethodMapper() map[string]string {
return map[string]string{
"MethodTwo": "methodTwo",
......@@ -65,22 +62,16 @@ func (s *testService) Method3(ctx context.Context, args []interface{}, rsp *stru
func (s *testService) Method4(ctx context.Context, args []interface{}, rsp *struct{}) *testService {
return nil
}
func (s *testService) Service() string {
func (s *testService) Reference() string {
return "com.test.Path"
}
func (s *testService) Version() string {
return ""
}
type TestService1 struct {
}
func (s *TestService1) Service() string {
func (s *TestService1) Reference() string {
return "com.test.Path1"
}
func (s *TestService1) Version() string {
return ""
}
func TestServiceMap_Register(t *testing.T) {
// lowercase
......@@ -180,7 +171,7 @@ func TestSuiteMethod(t *testing.T) {
// wrong number of in return
s1 := &testService{}
method, ok = reflect.TypeOf(s1).MethodByName("Version")
method, ok = reflect.TypeOf(s1).MethodByName("Reference")
assert.True(t, ok)
methodType = suiteMethod(method)
assert.Nil(t, methodType)
......
......@@ -35,8 +35,8 @@ func Test_refresh(t *testing.T) {
c := &BaseConfig{}
mockMap := map[string]string{}
mockMap["dubbo.registries.shanghai_reg1.protocol"] = "mock100"
mockMap["dubbo.reference.MockService.MockService.retries"] = "10"
mockMap["dubbo.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.reference.com.MockService.MockService.retries"] = "10"
mockMap["dubbo.com.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.consumer.check"] = "false"
mockMap["dubbo.application.name"] = "dubbo"
......@@ -88,7 +88,7 @@ func Test_refresh(t *testing.T) {
},
References: map[string]*ReferenceConfig{
"MockService": {
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
......@@ -98,13 +98,14 @@ func Test_refresh(t *testing.T) {
Methods: []*MethodConfig{
{
InterfaceId: "MockService",
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser",
Retries: 2,
Loadbalance: "random",
},
{InterfaceId: "MockService",
InterfaceName: "MockService",
{
InterfaceId: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser1",
Retries: 2,
Loadbalance: "random",
......@@ -128,8 +129,8 @@ func Test_refreshProvider(t *testing.T) {
c := &BaseConfig{}
mockMap := map[string]string{}
mockMap["dubbo.registries.shanghai_reg1.protocol"] = "mock100"
mockMap["dubbo.service.MockService.MockService.retries"] = "10"
mockMap["dubbo.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.service.com.MockService.MockService.retries"] = "10"
mockMap["dubbo.com.MockService.MockService.GetUser.retries"] = "10"
mockMap["dubbo.consumer.check"] = "false"
mockMap["dubbo.application.name"] = "dubbo"
mockMap["dubbo.protocols.jsonrpc1.name"] = "jsonrpc"
......@@ -183,7 +184,7 @@ func Test_refreshProvider(t *testing.T) {
},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
......@@ -193,13 +194,13 @@ func Test_refreshProvider(t *testing.T) {
Methods: []*MethodConfig{
{
InterfaceId: "MockService",
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser",
Retries: 2,
Loadbalance: "random",
},
{InterfaceId: "MockService",
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Name: "GetUser1",
Retries: 2,
Loadbalance: "random",
......
......@@ -71,16 +71,17 @@ func Load() {
logger.Errorf("[consumer config center refresh] %#v", err)
}
refMap = make(map[string]*ReferenceConfig)
for _, ref := range consumerConfig.References {
rpcService := GetConsumerService(ref.InterfaceName)
for key, ref := range consumerConfig.References {
rpcService := GetConsumerService(key)
if rpcService == nil {
logger.Warnf("%s is not exsist!", ref.InterfaceName)
logger.Warnf("%s is not exsist!", key)
continue
}
ref.id = key
ref.Refer()
ref.Implement(rpcService)
refMap[ref.InterfaceName] = ref
refMap[key] = ref
}
//wait for invoker is available, if wait over default 3s, then panic
......@@ -122,17 +123,18 @@ func Load() {
logger.Errorf("[provider config center refresh] %#v", err)
}
srvMap = make(map[string]*ServiceConfig)
for _, svs := range providerConfig.Services {
rpcService := GetProviderService(svs.InterfaceName)
for key, svs := range providerConfig.Services {
rpcService := GetProviderService(key)
if rpcService == nil {
logger.Warnf("%s is not exsist!", svs.InterfaceName)
logger.Warnf("%s is not exsist!", key)
continue
}
svs.id = key
svs.Implement(rpcService)
if err := svs.Export(); err != nil {
panic(fmt.Sprintf("service %s export failed! ", svs.InterfaceName))
panic(fmt.Sprintf("service %s export failed! ", key))
}
srvMap[svs.InterfaceName] = svs
srvMap[key] = svs
}
}
}
......@@ -144,5 +146,5 @@ func GetRPCService(name string) common.RPCService {
// create rpc service for consumer
func RPCService(service common.RPCService) {
providerConfig.Services[service.Service()].Implement(service)
providerConfig.Services[service.Reference()].Implement(service)
}
......@@ -71,12 +71,12 @@ func TestLoad(t *testing.T) {
Load()
assert.Equal(t, ms, GetRPCService(ms.Service()))
assert.Equal(t, ms, GetRPCService(ms.Reference()))
ms2 := &struct {
MockService
}{}
RPCService(ms2)
assert.NotEqual(t, ms2, GetRPCService(ms2.Service()))
assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
......@@ -84,6 +84,7 @@ func TestLoad(t *testing.T) {
consumerConfig = nil
providerConfig = nil
}
func TestWithNoRegLoad(t *testing.T) {
doInit()
doinit()
......@@ -99,12 +100,12 @@ func TestWithNoRegLoad(t *testing.T) {
Load()
assert.Equal(t, ms, GetRPCService(ms.Service()))
assert.Equal(t, ms, GetRPCService(ms.Reference()))
ms2 := &struct {
MockService
}{}
RPCService(ms2)
assert.NotEqual(t, ms2, GetRPCService(ms2.Service()))
assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))
conServices = map[string]common.RPCService{}
proServices = map[string]common.RPCService{}
......@@ -112,6 +113,7 @@ func TestWithNoRegLoad(t *testing.T) {
consumerConfig = nil
providerConfig = nil
}
func TestConfigLoaderWithConfigCenter(t *testing.T) {
extension.SetConfigCenterFactory("mock", func() config_center.DynamicConfigurationFactory {
return &config_center.MockDynamicConfigurationFactory{}
......
......@@ -23,14 +23,10 @@ import (
type MockService struct{}
func (*MockService) Service() string {
func (*MockService) Reference() string {
return "MockService"
}
func (*MockService) Version() string {
return "1.0"
}
func (*MockService) GetUser(ctx context.Context, itf []interface{}, str *struct{}) error {
return nil
}
......
......@@ -38,6 +38,7 @@ import (
type ReferenceConfig struct {
context context.Context
pxy *proxy.Proxy
id string
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
Url string `yaml:"url" json:"url,omitempty" property:"url"`
......@@ -76,7 +77,7 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
}
func (refconfig *ReferenceConfig) Refer() {
url := common.NewURLWithOptions(common.WithPath(refconfig.InterfaceName), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
......
......@@ -84,7 +84,7 @@ func doInit() {
"serviceid": "soa.mock",
},
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Cluster: "failover",
Loadbalance: "random",
......@@ -147,6 +147,7 @@ func Test_ReferP2P(t *testing.T) {
}
consumerConfig = nil
}
func Test_ReferMultiP2P(t *testing.T) {
doInit()
extension.SetProtocol("dubbo", GetProtocol)
......
......@@ -28,12 +28,12 @@ var (
// SetConService is called by init() of implement of RPCService
func SetConsumerService(service common.RPCService) {
conServices[service.Service()] = service
conServices[service.Reference()] = service
}
// SetProService is called by init() of implement of RPCService
func SetProviderService(service common.RPCService) {
proServices[service.Service()] = service
proServices[service.Reference()] = service
}
func GetConsumerService(name string) common.RPCService {
......
......@@ -43,6 +43,7 @@ import (
type ServiceConfig struct {
context context.Context
id string
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
Protocol string `required:"true" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` //multi protocol support, split by ','
InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"`
......@@ -100,15 +101,12 @@ func (srvconfig *ServiceConfig) Export() error {
logger.Errorf(err.Error())
return err
}
//contextPath := proto.ContextPath
//if contextPath == "" {
// contextPath = providerConfig.Path
//}
url := common.NewURLWithOptions(common.WithPath(srvconfig.InterfaceName),
url := common.NewURLWithOptions(common.WithPath(srvconfig.id),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(proto.Port),
common.WithParams(urlMap),
common.WithParamsValue(constant.BEAN_NAME_KEY, srvconfig.id),
common.WithMethods(strings.Split(methods, ",")))
if len(regUrls) > 0 {
......
......@@ -74,7 +74,7 @@ func doinit() {
},
Services: map[string]*ServiceConfig{
"MockService": {
InterfaceName: "MockService",
InterfaceName: "com.MockService",
Protocol: "mock",
Registry: "shanghai_reg1,shanghai_reg2,hangzhou_reg1,hangzhou_reg2",
Cluster: "failover",
......
......@@ -36,8 +36,10 @@ references:
registry: "hangzhouzk,shanghaizk"
filter: ""
protocol : "dubbo"
version: "1.0"
group: "as"
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
......
......@@ -9,7 +9,7 @@ references:
filter: ""
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
url: "dubbo://127.0.0.1:20000"
url: "dubbo://127.0.0.1:20000/UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
......
......@@ -33,6 +33,8 @@ services:
# equivalent to interface of dubbo.xml
interface : "com.ikurento.user.UserProvider"
loadbalance: "random"
version: "1.0"
group: "as"
warmup: "100"
cluster: "failover"
methods:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment