Skip to content
Snippets Groups Projects
Commit 42ad3716 authored by vito.he's avatar vito.he Committed by GitHub
Browse files

Merge pull request #317 from fangyincheng/feature/grpc

Mod: modidfy Refer params and add licence
parents 40784602 5eefe728
No related branches found
No related tags found
No related merge requests found
Showing
with 88 additions and 28 deletions
......@@ -90,7 +90,11 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro
}
func (refconfig *ReferenceConfig) Refer(impl interface{}) {
url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap()))
url := common.NewURLWithOptions(common.WithPath(refconfig.id),
common.WithProtocol(refconfig.Protocol),
common.WithParams(refconfig.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, refconfig.id),
)
//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
......@@ -123,12 +127,12 @@ func (refconfig *ReferenceConfig) Refer(impl interface{}) {
}
}
if len(refconfig.urls) == 1 {
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0], impl)
refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0])
} else {
invokers := []protocol.Invoker{}
var regUrl *common.URL
for _, u := range refconfig.urls {
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u, impl))
invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u))
if u.Protocol == constant.REGISTRY_PROTOCOL {
regUrl = u
}
......
......@@ -324,7 +324,7 @@ func newRegistryProtocol() protocol.Protocol {
type mockRegistryProtocol struct{}
func (*mockRegistryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}
......
......@@ -68,7 +68,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}
func (dp *DubboProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
......
......@@ -85,7 +85,7 @@ func TestDubboProtocol_Refer(t *testing.T) {
"side=provider&timeout=3000&timestamp=1556509797245")
assert.NoError(t, err)
clientConf = &ClientConfig{}
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)
// make sure url
eq := invoker.GetUrl().URLEqual(url)
......
......@@ -27,6 +27,8 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config"
)
type Client struct {
......@@ -34,12 +36,14 @@ type Client struct {
invoker reflect.Value
}
func NewClient(impl interface{}, url common.URL) *Client {
func NewClient(url common.URL) *Client {
conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
panic(err)
}
key := url.GetParam(constant.BEAN_NAME_KEY, "")
impl := config.GetConsumerService(key)
invoker := getInvoker(impl, conn)
return &Client{
......
......@@ -47,9 +47,8 @@ func TestNewClient(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()
var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
cli := NewClient(impl, url)
cli := NewClient(url)
assert.NotNil(t, cli)
}
......@@ -37,11 +37,10 @@ func TestInvoke(t *testing.T) {
go internal.InitGrpcServer()
defer internal.ShutdownGrpcServer()
var impl *internal.GrpcGreeterImpl
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.Nil(t, err)
cli := NewClient(impl, url)
cli := NewClient(url)
invoker := NewGrpcInvoker(url, cli)
......
......@@ -78,8 +78,8 @@ func (gp *GrpcProtocol) openServer(url common.URL) {
}
}
func (gp *GrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(impl, url))
func (gp *GrpcProtocol) Refer(url common.URL) protocol.Invoker {
invoker := NewGrpcInvoker(url, NewClient(url))
gp.SetInvokers(invoker)
logger.Infof("Refer service: %s", url.String())
return invoker
......
......@@ -70,8 +70,7 @@ func TestGrpcProtocol_Refer(t *testing.T) {
proto := GetProtocol()
url, err := common.NewURL(context.Background(), "grpc://127.0.0.1:30000/GrpcGreeterImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=GrpcGreeterImpl&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!")
assert.NoError(t, err)
var impl *internal.GrpcGreeterImpl
invoker := proto.Refer(url, impl)
invoker := proto.Refer(url)
// make sure url
eq := invoker.GetUrl().URLEqual(url)
......
......@@ -25,6 +25,14 @@ import (
"google.golang.org/grpc"
)
import (
"github.com/apache/dubbo-go/config"
)
func init() {
config.SetConsumerService(&GrpcGreeterImpl{})
}
// used for dubbo-grpc biz client
type GrpcGreeterImpl struct {
SayHello func(ctx context.Context, in *HelloRequest, out *HelloReply) error
......
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: helloworld.proto
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
grpc-gen:
protoc -I ./ helloworld.proto --go_out=plugins=grpc:.
dubbo-gen:
......
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: helloworld.proto
......
......@@ -67,7 +67,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter {
return exporter
}
func (jp *JsonrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker {
//default requestTimeout
var requestTimeout = config.GetConsumerConfig().RequestTimeout
......
......@@ -80,7 +80,7 @@ func TestJsonrpcProtocol_Refer(t *testing.T) {
RequestTimeout: 5 * time.Second,
}
config.SetConsumerConfig(con)
invoker := proto.Refer(url, nil)
invoker := proto.Refer(url)
// make sure url
eq := invoker.GetUrl().URLEqual(url)
......
......@@ -29,7 +29,7 @@ import (
// Extension - protocol
type Protocol interface {
Export(invoker Invoker) Exporter
Refer(url common.URL, impl interface{}) Invoker
Refer(url common.URL) Invoker
Destroy()
}
......@@ -74,7 +74,7 @@ func (bp *BaseProtocol) Export(invoker Invoker) Exporter {
return NewBaseExporter("base", invoker, bp.exporterMap)
}
func (bp *BaseProtocol) Refer(url common.URL, impl interface{}) Invoker {
func (bp *BaseProtocol) Refer(url common.URL) Invoker {
return NewBaseInvoker(url)
}
......
......@@ -36,7 +36,7 @@ func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporte
return protocol.NewBaseExporter("key", invoker, &sync.Map{})
}
func (pfw *mockProtocolFilter) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker {
return protocol.NewBaseInvoker(url)
}
......
......@@ -50,11 +50,11 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo
return pfw.protocol.Export(invoker)
}
func (pfw *ProtocolFilterWrapper) Refer(url common.URL, impl interface{}) protocol.Invoker {
func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker {
if pfw.protocol == nil {
pfw.protocol = extension.GetProtocol(url.Protocol)
}
return buildInvokerChain(pfw.protocol.Refer(url, impl), constant.REFERENCE_FILTER_KEY)
return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY)
}
func (pfw *ProtocolFilterWrapper) Destroy() {
......
......@@ -54,7 +54,7 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) {
u := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.REFERENCE_FILTER_KEY, "echo"))
invoker := filtProto.Refer(*u, nil)
invoker := filtProto.Refer(*u)
_, ok := invoker.(*FilterInvoker)
assert.True(t, ok)
}
......
......@@ -58,11 +58,10 @@ type registryDirectory struct {
configurators []config_center.Configurator
consumerConfigurationListener *consumerConfigurationListener
referenceConfigurationListener *referenceConfigurationListener
impl interface{}
Options
}
func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) {
options := Options{
//default 300s
serviceTTL: time.Duration(300e9),
......@@ -80,7 +79,6 @@ func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.R
serviceType: url.SubURL.Service(),
registry: registry,
Options: options,
impl: impl,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
return dir, nil
......@@ -200,13 +198,13 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) {
dir.overrideUrl(newUrl)
if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok {
logger.Infof("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
}
} else {
logger.Infof("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl())
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl)
if newInvoker != nil {
dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker)
cacheInvoker.(protocol.Invoker).Destroy()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment