From 3b8ffbb5d7195084d216d9e94bb0acba2ebe60be Mon Sep 17 00:00:00 2001 From: flycash <mingflycash@gmail.com> Date: Wed, 10 Jun 2020 22:53:23 +0800 Subject: [PATCH] Invoke dubbo client success --- config/config_loader.go | 2 +- .../metadata_service_proxy_factory.go | 56 +++++++- .../metadata_service_proxy_factory_test.go | 15 ++- metadata/service/inmemory/service_proxy.go | 122 ++++++++++++++++++ protocol/dubbo/client.go | 2 +- .../servicediscovery/proxy/service_proxy.go | 18 --- .../service_discovery_registry.go | 8 +- 7 files changed, 197 insertions(+), 26 deletions(-) rename registry/servicediscovery/proxy/metadata_service_proxy_factory.go => metadata/service/inmemory/metadata_service_proxy_factory_test.go (68%) create mode 100644 metadata/service/inmemory/service_proxy.go delete mode 100644 registry/servicediscovery/proxy/service_proxy.go diff --git a/config/config_loader.go b/config/config_loader.go index 7d279ff08..049687924 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -148,7 +148,7 @@ func loadConsumerConfig() { if count > maxWait { errMsg := fmt.Sprintf("Failed to check the status of the service %v . No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version) logger.Error(errMsg) - // panic(errMsg) + panic(errMsg) } time.Sleep(time.Second * 1) break diff --git a/metadata/service/inmemory/metadata_service_proxy_factory.go b/metadata/service/inmemory/metadata_service_proxy_factory.go index 813ed7513..de30d1b08 100644 --- a/metadata/service/inmemory/metadata_service_proxy_factory.go +++ b/metadata/service/inmemory/metadata_service_proxy_factory.go @@ -18,7 +18,12 @@ package inmemory import ( + "encoding/json" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/metadata/service" "github.com/apache/dubbo-go/registry" ) @@ -31,5 +36,54 @@ func init() { } func createProxy(ins registry.ServiceInstance) service.MetadataService { - return nil + urls := buildStandardMetadataServiceURL(ins) + if len(urls) == 0 { + logger.Errorf("metadata service urls not found, %v", ins) + return nil + } + p := extension.GetProtocol(urls[0].Protocol) + invoker := p.Refer(*urls[0]) + return &MetadataServiceProxy{invkr: invoker} +} + +// buildStandardMetadataServiceURL will use standard format to build the metadata service url. +// Now we don't need to support spring-cloud format metadata service url. +// +func buildStandardMetadataServiceURL(ins registry.ServiceInstance) []*common.URL { + ps := getMetadataServiceUrlParams(ins) + res := make([]*common.URL, 0, len(ps)) + sn := ins.GetServiceName() + host := ins.GetHost() + for protocol, params := range ps { + + convertedParams := make(map[string][]string, len(params)) + for k, v := range params { + convertedParams[k] = []string{v} + } + + u := common.NewURLWithOptions(common.WithIp(host), + common.WithPath(constant.METADATA_SERVICE_NAME), + common.WithProtocol(protocol), + common.WithPort(params[constant.PORT_KEY]), + common.WithParams(convertedParams), + common.WithParamsValue(constant.GROUP_KEY, sn)) + res = append(res, u) + } + return res +} + +// getMetadataServiceUrlParams this will convert the metadata service url parameters to map structure +// it looks like: +// {"dubbo":{"timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"20880"}} +func getMetadataServiceUrlParams(ins registry.ServiceInstance) map[string]map[string]string { + ps := ins.GetMetadata() + res := make(map[string]map[string]string, 2) + if str, ok := ps[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]; ok && len(str) > 0 { + + err := json.Unmarshal([]byte(str), &res) + if err != nil { + logger.Errorf("could not parse the metadata service url parameters to map", err) + } + } + return res } diff --git a/registry/servicediscovery/proxy/metadata_service_proxy_factory.go b/metadata/service/inmemory/metadata_service_proxy_factory_test.go similarity index 68% rename from registry/servicediscovery/proxy/metadata_service_proxy_factory.go rename to metadata/service/inmemory/metadata_service_proxy_factory_test.go index 6851f4bb7..652169ab7 100644 --- a/registry/servicediscovery/proxy/metadata_service_proxy_factory.go +++ b/metadata/service/inmemory/metadata_service_proxy_factory_test.go @@ -15,9 +15,18 @@ * limitations under the License. */ -package proxy +package inmemory import ( - "github.com/apache/dubbo-go/metadata/service" - "github.com/apache/dubbo-go/registry" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" ) + +func TestMetadataService_GetMetadataServiceUrlParams(t *testing.T) { + str := `{"dubbo":{"timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"20880"}}` + tmp := make(map[string]map[string]string) + err := json.Unmarshal([]byte(str), &tmp) + assert.Nil(t, err) +} diff --git a/metadata/service/inmemory/service_proxy.go b/metadata/service/inmemory/service_proxy.go new file mode 100644 index 000000000..bbe4b1101 --- /dev/null +++ b/metadata/service/inmemory/service_proxy.go @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package inmemory + +import ( + "context" + "reflect" + + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +type MetadataServiceProxy struct { + invkr protocol.Invoker +} + +func (m *MetadataServiceProxy) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) { + + siV := reflect.ValueOf(serviceInterface) + gV := reflect.ValueOf(group) + vV := reflect.ValueOf(version) + pV := reflect.ValueOf(protocol) + + inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("getExportedURLs"), + invocation.WithArguments([]interface{}{siV.Interface(), gV.Interface(), vV.Interface(), pV.Interface()}), + invocation.WithReply(reflect.ValueOf(&[]interface{}{}).Interface()), + invocation.WithAttachments(map[string]string{constant.ASYNC_KEY: "false"}), + invocation.WithParameterValues([]reflect.Value{siV, gV, vV, pV})) + + res := m.invkr.Invoke(context.Background(), inv) + if res.Error() != nil { + logger.Errorf("could not get the metadata service from remote provider: %v", res.Error()) + } + + urlStrs := res.Result().(*[]interface{}) + + ret := make([]common.URL, 0, len(*urlStrs)) + + for _, s := range *urlStrs { + u, err := common.NewURL(s.(string)) + if err != nil { + logger.Errorf("could not convert the string to URL: %s", s) + continue + } + ret = append(ret, u) + } + return ret, nil +} + +func (m *MetadataServiceProxy) Reference() string { + panic("implement me") +} + +func (m *MetadataServiceProxy) ServiceName() (string, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) ExportURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) UnexportURL(url common.URL) error { + panic("implement me") +} + +func (m *MetadataServiceProxy) SubscribeURL(url common.URL) (bool, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) UnsubscribeURL(url common.URL) error { + panic("implement me") +} + +func (m *MetadataServiceProxy) PublishServiceDefinition(url common.URL) error { + panic("implement me") +} + +func (m *MetadataServiceProxy) GetSubscribedURLs() ([]common.URL, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) GetServiceDefinition(interfaceName string, group string, version string) (string, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) { + panic("implement me") +} + +func (m *MetadataServiceProxy) Version() (string, error) { + panic("implement me") +} + +type MetadataServiceStub struct { + GetExportedURLs func(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) +} + +func (m *MetadataServiceStub) Reference() string { + return constant.METADATA_SERVICE_NAME +} diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index e6ffa64d8..08b09590e 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -280,7 +280,7 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac } select { - case <-getty.GetTimeWheel().After(c.opts.RequestTimeout): + case <-getty.GetTimeWheel().After(3 * time.Second): c.removePendingResponse(SequenceType(rsp.seq)) return perrors.WithStack(errClientReadTimeout) case <-rsp.done: diff --git a/registry/servicediscovery/proxy/service_proxy.go b/registry/servicediscovery/proxy/service_proxy.go deleted file mode 100644 index 6555874dd..000000000 --- a/registry/servicediscovery/proxy/service_proxy.go +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package proxy diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 133827d5e..526603bf1 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -501,6 +501,10 @@ func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstanc serviceName := serviceInstance.GetServiceName() revision := getExportedServicesRevision(serviceInstance) revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName] + if revisionExportedURLsMap == nil { + revisionExportedURLsMap = make(map[string][]common.URL) + s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap + } revisionExportedURLs := revisionExportedURLsMap[revision] firstGet := false if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 { @@ -558,8 +562,8 @@ func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsa for _, u := range templateExportURLs { port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol)) if u.Location != host || u.Port != port { - u.Port = port // reset port - u.Location = host // reset host + u.Port = port // reset port + u.Location = host + ":" + port // reset host } cloneUrl := u.CloneExceptParams(removeParamSet) -- GitLab