diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index d13646dba86eea04adb3726d33ee9d20457276b6..0b5e2860495604207dc8e4a384225e2fca47df1a 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -72,10 +72,11 @@ func (p *Proxy) Implement(v common.RPCService) { makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value { return func(in []reflect.Value) []reflect.Value { var ( - err error - inv *invocation_impl.RPCInvocation - inArr []interface{} - reply reflect.Value + err error + inv *invocation_impl.RPCInvocation + inIArr []interface{} + inVArr []reflect.Value + reply reflect.Value ) if methodName == "Echo" { methodName = "$echo" @@ -104,21 +105,25 @@ func (p *Proxy) Implement(v common.RPCService) { } if end-start <= 0 { - inArr = []interface{}{} + inIArr = []interface{}{} + inVArr = []reflect.Value{} } else if v, ok := in[start].Interface().([]interface{}); ok && end-start == 1 { - inArr = v + inIArr = v + inVArr = []reflect.Value{in[start]} } else { - inArr = make([]interface{}, end-start) + inIArr = make([]interface{}, end-start) + inVArr = make([]reflect.Value, end-start) index := 0 for i := start; i < end; i++ { - inArr[index] = in[i].Interface() + inIArr[index] = in[i].Interface() + inVArr[index] = in[i] index++ } } inv = invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName), - invocation_impl.WithArguments(inArr), invocation_impl.WithReply(reply.Interface()), - invocation_impl.WithCallBack(p.callBack)) + invocation_impl.WithArguments(inIArr), invocation_impl.WithReply(reply.Interface()), + invocation_impl.WithCallBack(p.callBack), invocation_impl.WithParameterValues(inVArr)) for k, value := range p.attachments { inv.SetAttachments(k, value) diff --git a/config/config_loader.go b/config/config_loader.go index 414bb479025c5d6111a6373fa2626f21ffa73ef0..43237be94bf10168557a99923735f2359cf76e73 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -91,7 +91,7 @@ func Load() { continue } ref.id = key - ref.Refer() + ref.Refer(rpcService) ref.Implement(rpcService) } //wait for invoker is available, if wait over default 3s, then panic diff --git a/config/reference_config.go b/config/reference_config.go index 4e0c56c0bc25e3b71b8edf015580cbe5ac5f0d9c..780133327df3b52c601ceed3700bcc4e363d009f 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -88,7 +88,7 @@ func (refconfig *ReferenceConfig) UnmarshalYAML(unmarshal func(interface{}) erro return nil } -func (refconfig *ReferenceConfig) Refer() { +func (refconfig *ReferenceConfig) Refer(impl interface{}) { url := common.NewURLWithOptions(common.WithPath(refconfig.id), common.WithProtocol(refconfig.Protocol), common.WithParams(refconfig.getUrlMap())) //1. user specified URL, could be peer-to-peer address, or register center's address. @@ -122,12 +122,12 @@ func (refconfig *ReferenceConfig) Refer() { } } if len(refconfig.urls) == 1 { - refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0]) + refconfig.invoker = extension.GetProtocol(refconfig.urls[0].Protocol).Refer(*refconfig.urls[0], impl) } else { invokers := []protocol.Invoker{} var regUrl *common.URL for _, u := range refconfig.urls { - invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u)) + invokers = append(invokers, extension.GetProtocol(u.Protocol).Refer(*u, impl)) if u.Protocol == constant.REGISTRY_PROTOCOL { regUrl = u } @@ -207,7 +207,7 @@ func (refconfig *ReferenceConfig) GenericLoad(id string) { genericService := NewGenericService(refconfig.id) SetConsumerService(genericService) refconfig.id = id - refconfig.Refer() + refconfig.Refer(genericService) refconfig.Implement(genericService) return } diff --git a/config/reference_config_test.go b/config/reference_config_test.go index e689c471ed12b58a40d4416efaa16abfe107e09b..450c8c3487dc53b0f5682ca4ca0eb27c493088b2 100644 --- a/config/reference_config_test.go +++ b/config/reference_config_test.go @@ -184,7 +184,7 @@ func Test_ReferMultireg(t *testing.T) { extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -197,7 +197,7 @@ func Test_Refer(t *testing.T) { extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.Equal(t, "soa.mock", reference.Params["serviceid"]) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) @@ -227,7 +227,7 @@ func Test_ReferP2P(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -241,7 +241,7 @@ func Test_ReferMultiP2P(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;dubbo://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -256,7 +256,7 @@ func Test_ReferMultiP2PWithReg(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) assert.NotNil(t, reference.invoker) assert.NotNil(t, reference.pxy) } @@ -268,7 +268,7 @@ func Test_Implement(t *testing.T) { extension.SetProtocol("registry", GetProtocol) extension.SetCluster("registryAware", cluster_impl.NewRegistryAwareCluster) for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) reference.Implement(&MockService{}) assert.NotNil(t, reference.GetRPCService()) @@ -284,7 +284,7 @@ func Test_Forking(t *testing.T) { m.Url = "dubbo://127.0.0.1:20000;registry://127.0.0.2:20000" for _, reference := range consumerConfig.References { - reference.Refer() + reference.Refer(nil) forks := int(reference.invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS)) assert.Equal(t, 5, forks) assert.NotNil(t, reference.pxy) @@ -324,7 +324,7 @@ func newRegistryProtocol() protocol.Protocol { type mockRegistryProtocol struct{} -func (*mockRegistryProtocol) Refer(url common.URL) protocol.Invoker { +func (*mockRegistryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { return protocol.NewBaseInvoker(url) } diff --git a/go.sum b/go.sum index 9855250a90f72eca314bf54cd9bea03a619b6a5e..804fdd40987c13fbd00973dfec3e7e425a2a3870 100644 --- a/go.sum +++ b/go.sum @@ -449,6 +449,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto= github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ= github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8= +github.com/tevid/gohamcrest v1.1.1 h1:ou+xSqlIw1xfGTg1uq1nif/htZ2S3EzRqLm2BP+tYU0= github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 59d1ea05160696754b46dfead5713684aa7a94f7..2e4f73878c406e699179dcc6225e6d23aebdcf48 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -66,7 +66,7 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -func (dp *DubboProtocol) Refer(url common.URL) protocol.Invoker { +func (dp *DubboProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { invoker := NewDubboInvoker(url, NewClient(Options{ ConnectTimeout: config.GetConsumerConfig().ConnectTimeout, RequestTimeout: config.GetConsumerConfig().RequestTimeout, diff --git a/protocol/dubbo/dubbo_protocol_test.go b/protocol/dubbo/dubbo_protocol_test.go index a6b0bc1df3cf2eb46e07c9dab149d04f62f78012..65ee17a2adc36eb33930b3911cd69810c4642875 100644 --- a/protocol/dubbo/dubbo_protocol_test.go +++ b/protocol/dubbo/dubbo_protocol_test.go @@ -85,7 +85,7 @@ func TestDubboProtocol_Refer(t *testing.T) { "side=provider&timeout=3000×tamp=1556509797245") assert.NoError(t, err) clientConf = &ClientConfig{} - invoker := proto.Refer(url) + invoker := proto.Refer(url, nil) // make sure url eq := invoker.GetUrl().URLEqual(url) diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go new file mode 100644 index 0000000000000000000000000000000000000000..3e3920fce2b6dc442327491aaf3efff3281b2220 --- /dev/null +++ b/protocol/grpc/client.go @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "reflect" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +type Client struct { + *grpc.ClientConn + invoker reflect.Value +} + +func NewClient(impl interface{}, url common.URL) *Client { + + conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + panic(err) + } + + in := []reflect.Value{} + in = append(in, reflect.ValueOf(conn)) + method := reflect.ValueOf(impl).MethodByName("GetDubboStub") + res := method.Call(in) + invoker := res[0].Interface() + + return &Client{ + ClientConn: conn, + invoker: reflect.ValueOf(invoker), + } +} diff --git a/protocol/grpc/grpc_exporter.go b/protocol/grpc/grpc_exporter.go new file mode 100644 index 0000000000000000000000000000000000000000..8446d319f1caf6463b29b27d270dfb9a94d437f2 --- /dev/null +++ b/protocol/grpc/grpc_exporter.go @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type GrpcExporter struct { + *protocol.BaseExporter +} + +func NewGrpcExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *GrpcExporter { + return &GrpcExporter{ + BaseExporter: protocol.NewBaseExporter(key, invoker, exporterMap), + } +} + +func (gg *GrpcExporter) Unexport() { + serviceId := gg.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") + gg.BaseExporter.Unexport() + err := common.ServiceMap.UnRegister(GRPC, serviceId) + if err != nil { + logger.Errorf("[GrpcExporter.Unexport] error: %v", err) + } +} diff --git a/protocol/grpc/grpc_invoker.go b/protocol/grpc/grpc_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..b74612b896addb1ff08c3abe44198c147996a126 --- /dev/null +++ b/protocol/grpc/grpc_invoker.go @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "context" + "reflect" + "sync" +) + +import ( + "github.com/pkg/errors" + "google.golang.org/grpc/connectivity" +) + +import ( + hessian2 "github.com/apache/dubbo-go-hessian2" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +var ErrNoReply = errors.New("request need @response") + +type GrpcInvoker struct { + protocol.BaseInvoker + quitOnce sync.Once + client *Client +} + +func NewGrpcInvoker(url common.URL, client *Client) *GrpcInvoker { + return &GrpcInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + client: client, + } +} + +func (gi *GrpcInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + var ( + result protocol.RPCResult + ) + + if invocation.Reply() == nil { + result.Err = ErrNoReply + } + + in := []reflect.Value{} + in = append(in, reflect.ValueOf(context.Background())) + in = append(in, invocation.ParameterValues()...) + + methodName := invocation.MethodName() + method := gi.client.invoker.MethodByName(methodName) + res := method.Call(in) + + result.Rest = res[0] + // check err + if !res[1].IsNil() { + result.Err = res[1].Interface().(error) + } else { + _ = hessian2.ReflectResponse(res[0], invocation.Reply()) + } + + return &result +} + +func (gi *GrpcInvoker) IsAvailable() bool { + return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown +} + +func (gi *GrpcInvoker) IsDestroyed() bool { + return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown +} + +func (gi *GrpcInvoker) Destroy() { + gi.quitOnce.Do(func() { + gi.BaseInvoker.Destroy() + + if gi.client != nil { + _ = gi.client.Close() + } + }) +} diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go new file mode 100644 index 0000000000000000000000000000000000000000..16c61df0dbb8daf5082088ec0b3f543b4f5e77e8 --- /dev/null +++ b/protocol/grpc/grpc_protocol.go @@ -0,0 +1,89 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const GRPC = "grpc" + +func init() { + extension.SetProtocol(GRPC, GetProtocol) +} + +var grpcProtocol *GrpcProtocol + +type GrpcProtocol struct { + protocol.BaseProtocol + serverMap map[string]*Server + serverLock sync.Mutex +} + +func NewGRPCProtocol() *GrpcProtocol { + return &GrpcProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + } +} + +// 缺少一个 type 信息, 无法进行binding +func (gp *GrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetUrl() + serviceKey := url.ServiceKey() + exporter := NewGrpcExporter(serviceKey, invoker, gp.ExporterMap()) + gp.SetExporterMap(serviceKey, exporter) + logger.Infof("Export service: %s", url.String()) + gp.openServer(url) + return exporter +} + +func (gp *GrpcProtocol) openServer(url common.URL) { + return +} + +func (gp *GrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { + invoker := NewGrpcInvoker(url, NewClient(impl, url)) + gp.SetInvokers(invoker) + logger.Infof("Refer service: %s", url.String()) + return invoker +} + +func (gp *GrpcProtocol) Destroy() { + logger.Infof("GrpcProtocol destroy.") + + gp.BaseProtocol.Destroy() + + for key, server := range gp.serverMap { + delete(gp.serverMap, key) + server.Stop() + } +} + +func GetProtocol() protocol.Protocol { + if grpcProtocol == nil { + grpcProtocol = NewGRPCProtocol() + } + return grpcProtocol +} diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go new file mode 100644 index 0000000000000000000000000000000000000000..83a9e69f192082c24a0ec4f5bfdbf2e99367cd2d --- /dev/null +++ b/protocol/grpc/server.go @@ -0,0 +1,67 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpc + +import ( + "net" +) + +import ( + "google.golang.org/grpc" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +type Server struct { + grpcServer *grpc.Server +} + +func NewServer() *Server { + + return nil +} + +// TODO: unimplemented +func (s *Server) Start(url common.URL) { + var ( + addr string + err error + ) + addr = url.Location + lis, err := net.Listen("tcp", addr) + if err != nil { + panic(err) + } + server := grpc.NewServer() + + s.grpcServer = server + // grpc-go 必须提前注册 + // ServiceDesc 这个信息需要有 + // 需要找一个方法。 + //server.RegisterService() + // 想个办法注册下 + if err = server.Serve(lis); err != nil { + panic(err) + } +} + +func (s *Server) Stop() { + s.grpcServer.Stop() +} diff --git a/protocol/invocation.go b/protocol/invocation.go index 055e7a4cd18707772d6ba75303053f15dc55dbe3..b0ccab39e89c600dc8694cba989a905d9de5e48c 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -24,6 +24,7 @@ import ( type Invocation interface { MethodName() string ParameterTypes() []reflect.Type + ParameterValues() []reflect.Value Arguments() []interface{} Reply() interface{} Attachments() map[string]string diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index bddd83b5db60cc3ccaa1ab0c43aaeec28e77855d..503ff0323644537e14a9e71c288b5a51f0bf7557 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -33,6 +33,7 @@ import ( type RPCInvocation struct { methodName string parameterTypes []reflect.Type + parameterValues []reflect.Value arguments []interface{} reply interface{} callBack interface{} @@ -65,6 +66,10 @@ func (r *RPCInvocation) ParameterTypes() []reflect.Type { return r.parameterTypes } +func (r *RPCInvocation) ParameterValues() []reflect.Value { + return r.parameterValues +} + func (r *RPCInvocation) Arguments() []interface{} { return r.arguments } @@ -137,6 +142,12 @@ func WithParameterTypes(parameterTypes []reflect.Type) option { } } +func WithParameterValues(parameterValues []reflect.Value) option { + return func(invo *RPCInvocation) { + invo.parameterValues = parameterValues + } +} + func WithArguments(arguments []interface{}) option { return func(invo *RPCInvocation) { invo.arguments = arguments diff --git a/protocol/jsonrpc/jsonrpc_protocol.go b/protocol/jsonrpc/jsonrpc_protocol.go index c18345d413edb2d263f1acaef1741514b665f042..a512da77ad4635be57b90228f26407dc674a28eb 100644 --- a/protocol/jsonrpc/jsonrpc_protocol.go +++ b/protocol/jsonrpc/jsonrpc_protocol.go @@ -65,7 +65,7 @@ func (jp *JsonrpcProtocol) Export(invoker protocol.Invoker) protocol.Exporter { return exporter } -func (jp *JsonrpcProtocol) Refer(url common.URL) protocol.Invoker { +func (jp *JsonrpcProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { invoker := NewJsonrpcInvoker(url, NewHTTPClient(&HTTPOptions{ HandshakeTimeout: config.GetConsumerConfig().ConnectTimeout, HTTPTimeout: config.GetConsumerConfig().RequestTimeout, diff --git a/protocol/jsonrpc/jsonrpc_protocol_test.go b/protocol/jsonrpc/jsonrpc_protocol_test.go index 253ab830dd85e5424811b7fd4e7e7e848adad415..5b6a6043cb6aab206105c4093b18fca060c31bad 100644 --- a/protocol/jsonrpc/jsonrpc_protocol_test.go +++ b/protocol/jsonrpc/jsonrpc_protocol_test.go @@ -80,7 +80,7 @@ func TestJsonrpcProtocol_Refer(t *testing.T) { RequestTimeout: 5 * time.Second, } config.SetConsumerConfig(con) - invoker := proto.Refer(url) + invoker := proto.Refer(url, nil) // make sure url eq := invoker.GetUrl().URLEqual(url) diff --git a/protocol/protocol.go b/protocol/protocol.go index 814a85163a99aa3b161b5eafbfed5f13ac4e3eb4..93f57b77a704924b1523288c2406012e49c463d1 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -29,7 +29,7 @@ import ( // Extension - protocol type Protocol interface { Export(invoker Invoker) Exporter - Refer(url common.URL) Invoker + Refer(url common.URL, impl interface{}) Invoker Destroy() } @@ -74,7 +74,7 @@ func (bp *BaseProtocol) Export(invoker Invoker) Exporter { return NewBaseExporter("base", invoker, bp.exporterMap) } -func (bp *BaseProtocol) Refer(url common.URL) Invoker { +func (bp *BaseProtocol) Refer(url common.URL, impl interface{}) Invoker { return NewBaseInvoker(url) } diff --git a/protocol/protocolwrapper/mock_protocol_filter.go b/protocol/protocolwrapper/mock_protocol_filter.go index 2efc34da4469cf369d4bbeb871ccfbdb73123f6a..4f1e5638e31aa050c26729faffe48cb0ffe8b1b8 100644 --- a/protocol/protocolwrapper/mock_protocol_filter.go +++ b/protocol/protocolwrapper/mock_protocol_filter.go @@ -36,7 +36,7 @@ func (pfw *mockProtocolFilter) Export(invoker protocol.Invoker) protocol.Exporte return protocol.NewBaseExporter("key", invoker, &sync.Map{}) } -func (pfw *mockProtocolFilter) Refer(url common.URL) protocol.Invoker { +func (pfw *mockProtocolFilter) Refer(url common.URL, impl interface{}) protocol.Invoker { return protocol.NewBaseInvoker(url) } diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 7c58fabea3cccf5a39e1622fedd4a3a297e05983..e9cc5420639ed89a0966dad59bf29c2881403fad 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -50,11 +50,11 @@ func (pfw *ProtocolFilterWrapper) Export(invoker protocol.Invoker) protocol.Expo return pfw.protocol.Export(invoker) } -func (pfw *ProtocolFilterWrapper) Refer(url common.URL) protocol.Invoker { +func (pfw *ProtocolFilterWrapper) Refer(url common.URL, impl interface{}) protocol.Invoker { if pfw.protocol == nil { pfw.protocol = extension.GetProtocol(url.Protocol) } - return buildInvokerChain(pfw.protocol.Refer(url), constant.REFERENCE_FILTER_KEY) + return buildInvokerChain(pfw.protocol.Refer(url, impl), constant.REFERENCE_FILTER_KEY) } func (pfw *ProtocolFilterWrapper) Destroy() { diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go index dc376313549c24da1cc6cb64a42e8445ef4fe346..103f0fb53e4aa8e150bb5b9b7ad4b127b6320d07 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go @@ -54,7 +54,7 @@ func TestProtocolFilterWrapper_Refer(t *testing.T) { u := common.NewURLWithOptions( common.WithParams(url.Values{}), common.WithParamsValue(constant.REFERENCE_FILTER_KEY, "echo")) - invoker := filtProto.Refer(*u) + invoker := filtProto.Refer(*u, nil) _, ok := invoker.(*FilterInvoker) assert.True(t, ok) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index e88c611f6f20bc182c3630e328caab848affc08b..ad4a1cbd5c14308fda7f7016091b0dcd3cf0e52d 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -58,10 +58,11 @@ type registryDirectory struct { configurators []config_center.Configurator consumerConfigurationListener *consumerConfigurationListener referenceConfigurationListener *referenceConfigurationListener + impl interface{} Options } -func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...Option) (*registryDirectory, error) { +func NewRegistryDirectory(url *common.URL, impl interface{}, registry registry.Registry, opts ...Option) (*registryDirectory, error) { options := Options{ //default 300s serviceTTL: time.Duration(300e9), @@ -79,6 +80,7 @@ func NewRegistryDirectory(url *common.URL, registry registry.Registry, opts ...O serviceType: url.SubURL.Service(), registry: registry, Options: options, + impl: impl, } dir.consumerConfigurationListener = newConsumerConfigurationListener(dir) return dir, nil @@ -198,13 +200,13 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { dir.overrideUrl(newUrl) if cacheInvoker, ok := dir.cacheInvokersMap.Load(newUrl.Key()); !ok { logger.Infof("service will be added in cache invokers: invokers url is %s!", newUrl) - newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl) if newInvoker != nil { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) } } else { logger.Infof("service will be updated in cache invokers: new invoker url is %s, old invoker url is %s", newUrl, cacheInvoker.(protocol.Invoker).GetUrl()) - newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) + newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl, dir.impl) if newInvoker != nil { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) cacheInvoker.(protocol.Invoker).Destroy() diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index b3c1d35aaa66b3437ff89807fba2df0a383921cb..9af4ef96a3e38438df136a6ccdd7794c411d295f 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -64,7 +64,7 @@ func TestSubscribe(t *testing.T) { func TestSubscribe_InvalidUrl(t *testing.T) { url, _ := common.NewURL(context.TODO(), "mock://127.0.0.1:1111") mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - _, err := NewRegistryDirectory(&url, mockRegistry) + _, err := NewRegistryDirectory(&url, nil, mockRegistry) assert.Error(t, err) } @@ -77,7 +77,7 @@ func TestSubscribe_Group(t *testing.T) { suburl.SetParam(constant.CLUSTER_KEY, "mock") regurl.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(®url, mockRegistry) + registryDirectory, _ := NewRegistryDirectory(®url, nil, mockRegistry) go registryDirectory.Subscribe(common.NewURLWithOptions(common.WithPath("testservice"))) @@ -183,7 +183,7 @@ func normalRegistryDir(noMockEvent ...bool) (*registryDirectory, *registry.MockR ) url.SubURL = &suburl mockRegistry, _ := registry.NewMockRegistry(&common.URL{}) - registryDirectory, _ := NewRegistryDirectory(&url, mockRegistry) + registryDirectory, _ := NewRegistryDirectory(&url, nil, mockRegistry) go registryDirectory.Subscribe(&suburl) if len(noMockEvent) == 0 { diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go index 534a4b945965f332e49ff343557fa20355921454..6bb5305d6c265abef8f000fafe39880220161837 100644 --- a/registry/protocol/protocol.go +++ b/registry/protocol/protocol.go @@ -89,7 +89,7 @@ func (proto *registryProtocol) initConfigurationListeners() { proto.serviceConfigurationListeners = &sync.Map{} proto.providerConfigurationListener = newProviderConfigurationListener(proto.overrideListeners) } -func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { +func (proto *registryProtocol) Refer(url common.URL, impl interface{}) protocol.Invoker { var registryUrl = url var serviceUrl = registryUrl.SubURL @@ -108,7 +108,7 @@ func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker { } //new registry directory for store service url from registry - directory, err := directory2.NewRegistryDirectory(®istryUrl, reg) + directory, err := directory2.NewRegistryDirectory(®istryUrl, impl, reg) if err != nil { logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!", serviceUrl.String(), err.Error()) diff --git a/registry/protocol/protocol_test.go b/registry/protocol/protocol_test.go index 0c19da59df6e4fd2f663f9e8d541165fe26c3ffa..2c407cf2dc2ed346febd983f6ebb81c16c60de6d 100644 --- a/registry/protocol/protocol_test.go +++ b/registry/protocol/protocol_test.go @@ -60,7 +60,7 @@ func referNormal(t *testing.T, regProtocol *registryProtocol) { url.SubURL = &suburl - invoker := regProtocol.Refer(url) + invoker := regProtocol.Refer(url, nil) assert.IsType(t, &protocol.BaseInvoker{}, invoker) assert.Equal(t, invoker.GetUrl().String(), url.String()) } @@ -85,7 +85,7 @@ func TestMultiRegRefer(t *testing.T) { url2.SubURL = &suburl2 - regProtocol.Refer(url2) + regProtocol.Refer(url2, nil) var count int regProtocol.registries.Range(func(key, value interface{}) bool { count++ @@ -107,7 +107,7 @@ func TestOneRegRefer(t *testing.T) { url2.SubURL = &suburl2 - regProtocol.Refer(url2) + regProtocol.Refer(url2, nil) var count int regProtocol.registries.Range(func(key, value interface{}) bool { count++ diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index a7fc568f567d720448d0be63c592fae5f8df9bbf..19d65291e94835c60ba412414090999b34bc4d48 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -290,6 +290,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) { z.Lock() a := z.eventRegistry[zkPath] a = append(a, event) + z.eventRegistry[zkPath] = a logger.Debugf("zkClient{%s} register event{path:%s, ptr:%p}", z.name, zkPath, event) z.Unlock() diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 9521ea749027582c015ac998a6f6f68d350cc3bc..96aa45216ad842343836ddff12a921d3bd6a7ca9 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -18,7 +18,9 @@ package zookeeper import ( + "fmt" "path" + "strings" "sync" "time" ) @@ -239,6 +241,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi //listen sub path recursive go func(zkPath string, listener remoting.DataListener) { + fmt.Printf("zkpath: %v \n", zkPath) l.listenDirEvent(zkPath, listener) logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(dubboPath, listener) @@ -273,6 +276,7 @@ func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.Da children []string ) + zkPath = strings.ReplaceAll(zkPath, "$", "%24") l.pathMapLock.Lock() _, ok := l.pathMap[zkPath] l.pathMapLock.Unlock()